mirror of
https://framagit.org/framasoft/framaspace/argos.git
synced 2025-04-28 18:02:41 +02:00
Compare commits
5 commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
9389e3a005 | ||
![]() |
159a6e2427 | ||
![]() |
211ac32028 | ||
![]() |
32f2518294 | ||
![]() |
38cc06e972 |
11 changed files with 78 additions and 54 deletions
22
CHANGELOG.md
22
CHANGELOG.md
|
@ -2,6 +2,24 @@
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## 0.9.0
|
||||||
|
|
||||||
|
Date: 2025-02-18
|
||||||
|
|
||||||
|
- 🐛 — Fix worker timeout for old results cleaning in recurring tasks (#84)
|
||||||
|
|
||||||
|
💥 Old results are now removed by their age, not based on their number.
|
||||||
|
|
||||||
|
💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
|
||||||
|
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
|
||||||
|
a new example configuration file.
|
||||||
|
|
||||||
|
## 0.8.2
|
||||||
|
|
||||||
|
Date: 2025-02-18
|
||||||
|
|
||||||
|
- 🐛 — Fix recurring tasks with gunicorn
|
||||||
|
|
||||||
## 0.8.1
|
## 0.8.1
|
||||||
|
|
||||||
Date: 2025-02-18
|
Date: 2025-02-18
|
||||||
|
@ -20,13 +38,15 @@ Date: 2025-02-18
|
||||||
- 🐛 — Automatically reconnect to LDAP if unreachable (#81)
|
- 🐛 — Automatically reconnect to LDAP if unreachable (#81)
|
||||||
- 🐛 — Better httpx.RequestError handling (#83)
|
- 🐛 — Better httpx.RequestError handling (#83)
|
||||||
|
|
||||||
💥 Warning: there is now new settings to add to your configuration file.
|
💥 Warning: there is new settings to add to your configuration file.
|
||||||
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
|
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
|
||||||
a new example configuration file.
|
a new example configuration file.
|
||||||
|
|
||||||
💥 You don’t need cron tasks anymore!
|
💥 You don’t need cron tasks anymore!
|
||||||
Remove your old cron tasks as they will now do nothing but generating errors.
|
Remove your old cron tasks as they will now do nothing but generating errors.
|
||||||
|
|
||||||
|
NB: You may want to add `--enqueue` to `reload-config` command in your systemd file.
|
||||||
|
|
||||||
## 0.7.4
|
## 0.7.4
|
||||||
|
|
||||||
Date: 2025-02-12
|
Date: 2025-02-12
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
VERSION = "0.8.1"
|
VERSION = "0.9.0"
|
||||||
|
|
|
@ -152,9 +152,11 @@ ssl:
|
||||||
# Argos will execute some tasks in the background for you
|
# Argos will execute some tasks in the background for you
|
||||||
# every 2 minutes and needs some configuration for that
|
# every 2 minutes and needs some configuration for that
|
||||||
recurring_tasks:
|
recurring_tasks:
|
||||||
# Max number of results per tasks you want to keep
|
# Maximum age of results
|
||||||
# Minimum value is 1, default is 100
|
# Use m for minutes, h for hours, d for days
|
||||||
max_results: 100
|
# w for weeks, M for months, y for years
|
||||||
|
# See https://github.com/timwedde/durations_nlp#scales-reference for details
|
||||||
|
max_results_age: "1d"
|
||||||
# Max number of seconds a task can be locked
|
# Max number of seconds a task can be locked
|
||||||
# Minimum value is 61, default is 100
|
# Minimum value is 61, default is 100
|
||||||
max_lock_seconds: 100
|
max_lock_seconds: 100
|
||||||
|
|
|
@ -49,17 +49,14 @@ class SSL(BaseModel):
|
||||||
|
|
||||||
|
|
||||||
class RecurringTasks(BaseModel):
|
class RecurringTasks(BaseModel):
|
||||||
max_results: int
|
max_results_age: float
|
||||||
max_lock_seconds: int
|
max_lock_seconds: int
|
||||||
time_without_agent: int
|
time_without_agent: int
|
||||||
|
|
||||||
@field_validator("max_results", mode="before")
|
@field_validator("max_results_age", mode="before")
|
||||||
def parse_max_results(cls, value):
|
def parse_max_results_age(cls, value):
|
||||||
"""Ensure that max_results is higher than 0"""
|
"""Convert the configured maximum results age to seconds"""
|
||||||
if value >= 1:
|
return Duration(value).to_seconds()
|
||||||
return value
|
|
||||||
|
|
||||||
return 100
|
|
||||||
|
|
||||||
@field_validator("max_lock_seconds", mode="before")
|
@field_validator("max_lock_seconds", mode="before")
|
||||||
def parse_max_lock_seconds(cls, value):
|
def parse_max_lock_seconds(cls, value):
|
||||||
|
|
|
@ -6,6 +6,7 @@ from fastapi import FastAPI
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
from fastapi_login import LoginManager
|
from fastapi_login import LoginManager
|
||||||
from fastapi_utils.tasks import repeat_every
|
from fastapi_utils.tasks import repeat_every
|
||||||
|
from psutil import Process
|
||||||
from sqlalchemy import create_engine, event
|
from sqlalchemy import create_engine, event
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
|
@ -116,25 +117,41 @@ def create_manager(cookie_secret: str) -> LoginManager:
|
||||||
@repeat_every(seconds=120, logger=logger)
|
@repeat_every(seconds=120, logger=logger)
|
||||||
async def recurring_tasks() -> None:
|
async def recurring_tasks() -> None:
|
||||||
"""Recurring DB cleanup and watch-agents tasks"""
|
"""Recurring DB cleanup and watch-agents tasks"""
|
||||||
|
# If we are using gunicorn
|
||||||
|
if not hasattr(app.state, "SessionLocal"):
|
||||||
|
parent_process = Process(os.getppid())
|
||||||
|
children = parent_process.children(recursive=True)
|
||||||
|
# Start the task only once, not for every worker
|
||||||
|
if children[0].pid == os.getpid():
|
||||||
|
# and we need to setup database engine
|
||||||
|
setup_database(app)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
set_log_level("info", quiet=True)
|
set_log_level("info", quiet=True)
|
||||||
logger.info("Start background recurring tasks")
|
logger.info("Start background recurring tasks")
|
||||||
|
|
||||||
with app.state.SessionLocal() as db:
|
with app.state.SessionLocal() as db:
|
||||||
config = app.state.config.recurring_tasks
|
config = app.state.config.recurring_tasks
|
||||||
removed = await queries.remove_old_results(db, config.max_results)
|
|
||||||
|
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
|
||||||
|
if agents == 0:
|
||||||
|
no_agent_alert(app.state.config)
|
||||||
|
logger.info("Agent presence checked")
|
||||||
|
|
||||||
|
removed = await queries.remove_old_results(db, config.max_results_age)
|
||||||
logger.info("%i result(s) removed", removed)
|
logger.info("%i result(s) removed", removed)
|
||||||
|
|
||||||
updated = await queries.release_old_locks(db, config.max_lock_seconds)
|
updated = await queries.release_old_locks(db, config.max_lock_seconds)
|
||||||
logger.info("%i lock(s) released", updated)
|
logger.info("%i lock(s) released", updated)
|
||||||
|
|
||||||
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
|
|
||||||
if agents == 0:
|
|
||||||
no_agent_alert(app.state.config)
|
|
||||||
|
|
||||||
processed_jobs = await queries.process_jobs(db)
|
processed_jobs = await queries.process_jobs(db)
|
||||||
logger.info("%i job(s) processed", processed_jobs)
|
logger.info("%i job(s) processed", processed_jobs)
|
||||||
|
|
||||||
logger.info("Background recurring tasks ended")
|
logger.info("Background recurring tasks ended")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(appli: FastAPI):
|
async def lifespan(appli: FastAPI):
|
||||||
|
|
|
@ -4,7 +4,7 @@ from hashlib import sha256
|
||||||
from typing import List
|
from typing import List
|
||||||
from urllib.parse import urljoin
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
from sqlalchemy import asc, desc, func, Select
|
from sqlalchemy import asc, func, Select
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from argos import schemas
|
from argos import schemas
|
||||||
|
@ -409,28 +409,13 @@ async def reschedule_all(db: Session):
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
async def remove_old_results(db: Session, max_results: int):
|
async def remove_old_results(db: Session, max_results_age: float):
|
||||||
tasks = db.query(Task).all()
|
"""Remove old results, base on age"""
|
||||||
deleted = 0
|
max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age)
|
||||||
for task in tasks:
|
deleted = (
|
||||||
# Get the id of the oldest result to keep
|
db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete()
|
||||||
subquery = (
|
)
|
||||||
db.query(Result.id)
|
db.commit()
|
||||||
.filter(Result.task_id == task.id)
|
|
||||||
.order_by(desc(Result.id))
|
|
||||||
.limit(max_results)
|
|
||||||
.subquery()
|
|
||||||
)
|
|
||||||
min_id = db.query(func.min(subquery.c.id)).scalar() # pylint: disable-msg=not-callable
|
|
||||||
|
|
||||||
# Delete all the results older than min_id
|
|
||||||
if min_id:
|
|
||||||
deleted += (
|
|
||||||
db.query(Result)
|
|
||||||
.where(Result.id < min_id, Result.task_id == task.id)
|
|
||||||
.delete()
|
|
||||||
)
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
return deleted
|
return deleted
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ Options:
|
||||||
--max-tasks INTEGER Number of concurrent tasks this agent can run
|
--max-tasks INTEGER Number of concurrent tasks this agent can run
|
||||||
--wait-time INTEGER Waiting time between two polls on the server
|
--wait-time INTEGER Waiting time between two polls on the server
|
||||||
(seconds)
|
(seconds)
|
||||||
--log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL]
|
--log-level [debug|info|warning|error|critical]
|
||||||
--user-agent TEXT A custom string to append to the User-Agent
|
--user-agent TEXT A custom string to append to the User-Agent
|
||||||
header
|
header
|
||||||
--help Show this message and exit.
|
--help Show this message and exit.
|
||||||
|
|
|
@ -90,7 +90,7 @@ User=argos
|
||||||
WorkingDirectory=/opt/argos/
|
WorkingDirectory=/opt/argos/
|
||||||
EnvironmentFile=/etc/default/argos-server
|
EnvironmentFile=/etc/default/argos-server
|
||||||
ExecStartPre=/opt/argos/venv/bin/argos server migrate
|
ExecStartPre=/opt/argos/venv/bin/argos server migrate
|
||||||
ExecStartPre=/opt/argos/venv/bin/argos server reload-config
|
ExecStartPre=/opt/argos/venv/bin/argos server reload-config --enqueue
|
||||||
ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
|
ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
|
||||||
--workers \$ARGOS_SERVER_WORKERS \\
|
--workers \$ARGOS_SERVER_WORKERS \\
|
||||||
--worker-class uvicorn.workers.UvicornWorker \\
|
--worker-class uvicorn.workers.UvicornWorker \\
|
||||||
|
|
|
@ -33,6 +33,7 @@ dependencies = [
|
||||||
"Jinja2>=3.0,<4",
|
"Jinja2>=3.0,<4",
|
||||||
"jsonpointer>=3.0,<4",
|
"jsonpointer>=3.0,<4",
|
||||||
"passlib>=1.7.4,<2",
|
"passlib>=1.7.4,<2",
|
||||||
|
"psutil>=5.9.8,<6",
|
||||||
"psycopg2-binary>=2.9,<3",
|
"psycopg2-binary>=2.9,<3",
|
||||||
"pydantic[email]>=2.4,<3",
|
"pydantic[email]>=2.4,<3",
|
||||||
"pydantic-settings>=2.0,<3",
|
"pydantic-settings>=2.0,<3",
|
||||||
|
|
|
@ -43,9 +43,11 @@ ssl:
|
||||||
# Argos will execute some tasks in the background for you
|
# Argos will execute some tasks in the background for you
|
||||||
# every 2 minutes and needs some configuration for that
|
# every 2 minutes and needs some configuration for that
|
||||||
recurring_tasks:
|
recurring_tasks:
|
||||||
# Max number of results per tasks you want to keep
|
# Maximum age of results
|
||||||
# Minimum value is 1, default is 100
|
# Use m for minutes, h for hours, d for days
|
||||||
max_results: 100
|
# w for weeks, M for months, y for years
|
||||||
|
# See https://github.com/timwedde/durations_nlp#scales-reference for details
|
||||||
|
max_results_age: "1d"
|
||||||
# Max number of seconds a task can be locked
|
# Max number of seconds a task can be locked
|
||||||
# Minimum value is 61, default is 100
|
# Minimum value is 61, default is 100
|
||||||
max_lock_seconds: 100
|
max_lock_seconds: 100
|
||||||
|
|
|
@ -10,9 +10,9 @@ from argos.server.models import Result, Task, User
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name
|
async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name
|
||||||
for _task in ten_tasks:
|
for _task in ten_tasks:
|
||||||
for _ in range(5):
|
for iterator in range(5):
|
||||||
result = Result(
|
result = Result(
|
||||||
submitted_at=datetime.now(),
|
submitted_at=datetime.now() - timedelta(seconds=iterator * 2),
|
||||||
status="success",
|
status="success",
|
||||||
context={"foo": "bar"},
|
context={"foo": "bar"},
|
||||||
task=_task,
|
task=_task,
|
||||||
|
@ -24,12 +24,12 @@ async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefi
|
||||||
|
|
||||||
# So we have 5 results per tasks
|
# So we have 5 results per tasks
|
||||||
assert db.query(Result).count() == 50
|
assert db.query(Result).count() == 50
|
||||||
# Keep only 2
|
# Keep only those newer than 1 second ago
|
||||||
deleted = await queries.remove_old_results(db, 2)
|
deleted = await queries.remove_old_results(db, 6)
|
||||||
assert deleted == 30
|
assert deleted == 20
|
||||||
assert db.query(Result).count() == 20
|
assert db.query(Result).count() == 30
|
||||||
for _task in ten_tasks:
|
for _task in ten_tasks:
|
||||||
assert db.query(Result).filter(Result.task == _task).count() == 2
|
assert db.query(Result).filter(Result.task == _task).count() == 3
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
@ -245,7 +245,7 @@ def empty_config():
|
||||||
),
|
),
|
||||||
ssl=schemas.config.SSL(thresholds=[]),
|
ssl=schemas.config.SSL(thresholds=[]),
|
||||||
recurring_tasks=schemas.config.RecurringTasks(
|
recurring_tasks=schemas.config.RecurringTasks(
|
||||||
max_results=100,
|
max_results_age="6s",
|
||||||
max_lock_seconds=120,
|
max_lock_seconds=120,
|
||||||
time_without_agent=300,
|
time_without_agent=300,
|
||||||
),
|
),
|
||||||
|
|
Loading…
Reference in a new issue