From 211ac32028d5df5b793563249949f22aebf8b1f0 Mon Sep 17 00:00:00 2001 From: Luc Didry Date: Tue, 18 Feb 2025 17:04:26 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=E2=80=94=20Fix=20worker=20timeo?= =?UTF-8?q?ut=20for=20old=20results=20cleaning=20in=20recurring=20tasks=20?= =?UTF-8?q?(fix=20#84)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 💥 Old results are now removed by their age, not based on their number. 💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration. Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate a new example configuration file. --- CHANGELOG.md | 10 +++++++++- argos/config-example.yaml | 8 +++++--- argos/schemas/config.py | 13 +++++-------- argos/server/main.py | 12 +++++++----- argos/server/queries.py | 31 ++++++++----------------------- docs/cli.md | 2 +- tests/config.yaml | 8 +++++--- tests/test_queries.py | 16 ++++++++-------- 8 files changed, 48 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18c01a8..f3f3922 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## [Unreleased] +- 🐛 — Fix worker timeout for old results cleaning in recurring tasks (#84) + +💥 Old results are now removed by their age, not based on their number. + +💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration. +Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate +a new example configuration file. + ## 0.8.2 Date: 2025-02-18 @@ -26,7 +34,7 @@ Date: 2025-02-18 - 🐛 — Automatically reconnect to LDAP if unreachable (#81) - 🐛 — Better httpx.RequestError handling (#83) -💥 Warning: there is now new settings to add to your configuration file. +💥 Warning: there is new settings to add to your configuration file. Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate a new example configuration file. diff --git a/argos/config-example.yaml b/argos/config-example.yaml index 3b9141f..f22341f 100644 --- a/argos/config-example.yaml +++ b/argos/config-example.yaml @@ -152,9 +152,11 @@ ssl: # Argos will execute some tasks in the background for you # every 2 minutes and needs some configuration for that recurring_tasks: - # Max number of results per tasks you want to keep - # Minimum value is 1, default is 100 - max_results: 100 + # Maximum age of results + # Use m for minutes, h for hours, d for days + # w for weeks, M for months, y for years + # See https://github.com/timwedde/durations_nlp#scales-reference for details + max_results_age: "1d" # Max number of seconds a task can be locked # Minimum value is 61, default is 100 max_lock_seconds: 100 diff --git a/argos/schemas/config.py b/argos/schemas/config.py index 26d0198..44d8fb7 100644 --- a/argos/schemas/config.py +++ b/argos/schemas/config.py @@ -49,17 +49,14 @@ class SSL(BaseModel): class RecurringTasks(BaseModel): - max_results: int + max_results_age: float max_lock_seconds: int time_without_agent: int - @field_validator("max_results", mode="before") - def parse_max_results(cls, value): - """Ensure that max_results is higher than 0""" - if value >= 1: - return value - - return 100 + @field_validator("max_results_age", mode="before") + def parse_max_results_age(cls, value): + """Convert the configured maximum results age to seconds""" + return Duration(value).to_seconds() @field_validator("max_lock_seconds", mode="before") def parse_max_lock_seconds(cls, value): diff --git a/argos/server/main.py b/argos/server/main.py index 85a6a0b..830a863 100644 --- a/argos/server/main.py +++ b/argos/server/main.py @@ -133,15 +133,17 @@ async def recurring_tasks() -> None: with app.state.SessionLocal() as db: config = app.state.config.recurring_tasks - removed = await queries.remove_old_results(db, config.max_results) - logger.info("%i result(s) removed", removed) - - updated = await queries.release_old_locks(db, config.max_lock_seconds) - logger.info("%i lock(s) released", updated) agents = await queries.get_recent_agents_count(db, config.time_without_agent) if agents == 0: no_agent_alert(app.state.config) + logger.info("Agent presence checked") + + removed = await queries.remove_old_results(db, config.max_results_age) + logger.info("%i result(s) removed", removed) + + updated = await queries.release_old_locks(db, config.max_lock_seconds) + logger.info("%i lock(s) released", updated) processed_jobs = await queries.process_jobs(db) logger.info("%i job(s) processed", processed_jobs) diff --git a/argos/server/queries.py b/argos/server/queries.py index 0329eb9..6e93c02 100644 --- a/argos/server/queries.py +++ b/argos/server/queries.py @@ -4,7 +4,7 @@ from hashlib import sha256 from typing import List from urllib.parse import urljoin -from sqlalchemy import asc, desc, func, Select +from sqlalchemy import asc, func, Select from sqlalchemy.orm import Session from argos import schemas @@ -409,28 +409,13 @@ async def reschedule_all(db: Session): db.commit() -async def remove_old_results(db: Session, max_results: int): - tasks = db.query(Task).all() - deleted = 0 - for task in tasks: - # Get the id of the oldest result to keep - subquery = ( - db.query(Result.id) - .filter(Result.task_id == task.id) - .order_by(desc(Result.id)) - .limit(max_results) - .subquery() - ) - min_id = db.query(func.min(subquery.c.id)).scalar() # pylint: disable-msg=not-callable - - # Delete all the results older than min_id - if min_id: - deleted += ( - db.query(Result) - .where(Result.id < min_id, Result.task_id == task.id) - .delete() - ) - db.commit() +async def remove_old_results(db: Session, max_results_age: float): + """Remove old results, base on age""" + max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age) + deleted = ( + db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete() + ) + db.commit() return deleted diff --git a/docs/cli.md b/docs/cli.md index f04ddb4..cd5963c 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -60,7 +60,7 @@ Options: --max-tasks INTEGER Number of concurrent tasks this agent can run --wait-time INTEGER Waiting time between two polls on the server (seconds) - --log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL] + --log-level [debug|info|warning|error|critical] --user-agent TEXT A custom string to append to the User-Agent header --help Show this message and exit. diff --git a/tests/config.yaml b/tests/config.yaml index c468e20..697c188 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -43,9 +43,11 @@ ssl: # Argos will execute some tasks in the background for you # every 2 minutes and needs some configuration for that recurring_tasks: - # Max number of results per tasks you want to keep - # Minimum value is 1, default is 100 - max_results: 100 + # Maximum age of results + # Use m for minutes, h for hours, d for days + # w for weeks, M for months, y for years + # See https://github.com/timwedde/durations_nlp#scales-reference for details + max_results_age: "1d" # Max number of seconds a task can be locked # Minimum value is 61, default is 100 max_lock_seconds: 100 diff --git a/tests/test_queries.py b/tests/test_queries.py index 3d9e9f3..0068544 100644 --- a/tests/test_queries.py +++ b/tests/test_queries.py @@ -10,9 +10,9 @@ from argos.server.models import Result, Task, User @pytest.mark.asyncio async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name for _task in ten_tasks: - for _ in range(5): + for iterator in range(5): result = Result( - submitted_at=datetime.now(), + submitted_at=datetime.now() - timedelta(seconds=iterator * 2), status="success", context={"foo": "bar"}, task=_task, @@ -24,12 +24,12 @@ async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefi # So we have 5 results per tasks assert db.query(Result).count() == 50 - # Keep only 2 - deleted = await queries.remove_old_results(db, 2) - assert deleted == 30 - assert db.query(Result).count() == 20 + # Keep only those newer than 1 second ago + deleted = await queries.remove_old_results(db, 6) + assert deleted == 20 + assert db.query(Result).count() == 30 for _task in ten_tasks: - assert db.query(Result).filter(Result.task == _task).count() == 2 + assert db.query(Result).filter(Result.task == _task).count() == 3 @pytest.mark.asyncio @@ -245,7 +245,7 @@ def empty_config(): ), ssl=schemas.config.SSL(thresholds=[]), recurring_tasks=schemas.config.RecurringTasks( - max_results=100, + max_results_age="6s", max_lock_seconds=120, time_without_agent=300, ),