From a601fccad3242c5a99264e90fae1f8b52f392ee5 Mon Sep 17 00:00:00 2001 From: Luc Didry Date: Wed, 19 Mar 2025 17:26:02 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=95=20=E2=80=94=20Fix=20order=20of=20t?= =?UTF-8?q?asks=20sent=20to=20agent?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + argos/server/queries.py | 38 +++++++++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b3087b..cedeb98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - 🔊 — Improve check agent log - 🔒️ — Logging out now invalidate tokens - 📝 — Improve OpenAPI doc +- 🤕 — Fix order of tasks sent to agent ## 0.9.0 diff --git a/argos/server/queries.py b/argos/server/queries.py index 581d5ec..faab67e 100644 --- a/argos/server/queries.py +++ b/argos/server/queries.py @@ -18,17 +18,49 @@ from argos.server.settings import read_config async def list_tasks(db: Session, agent_id: str, limit: int = 100): """List tasks and mark them as selected""" + # Process tasks which never has been processed first subquery = ( db.query(func.distinct(Task.task_group)) .filter( Task.selected_by == None, # noqa: E711 - ((Task.next_run <= datetime.now()) | (Task.next_run == None)), # noqa: E711 + Task.next_run == None, # noqa: E711 ) .limit(limit) .subquery() ) tasks = db.query(Task).filter(Task.task_group.in_(Select(subquery))).all() + if len(tasks): + now = datetime.now() + for task in tasks: + task.selected_at = now + task.selected_by = agent_id + db.commit() + return tasks + + # Now we can process tasks normally + all_task_groups = ( + db.query(Task.task_group) + .filter( + Task.selected_by == None, # noqa: E711 + Task.next_run <= datetime.now(), # noqa: E711 + ) + .order_by(asc(Task.next_run)) + .all() + ) + # We need to do distinct(Task.task_group) in Python + # since distinct(Task.task_group) is not compatible with + # an order_by(asc(Task.next_run)) + task_groups: list[str] = [] + for row in all_task_groups: + if len(task_groups) > limit: + break + task_group = row.task_group + if task_group not in task_groups: + task_groups.append(task_group) + + tasks = db.query(Task).filter(Task.task_group.in_(task_groups)).all() + now = datetime.now() for task in tasks: task.selected_at = now @@ -406,14 +438,14 @@ async def get_severity_counts(db: Session) -> dict: async def reschedule_all(db: Session): """Reschedule checks of all non OK tasks ASAP""" - db.query(Task).filter(Task.severity.in_(["warning", "critical", "unknown"])).update( + db.query(Task).filter(Task.severity != "ok").update( {Task.next_run: datetime.now() - timedelta(days=1)} ) db.commit() async def remove_old_results(db: Session, max_results_age: float): - """Remove old results, base on age""" + """Remove old results, based on age""" max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age) deleted = ( db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete()