argos/argos/server/queries.py
Alexis Métaireau aee1545814 Deduplicate checks from settings. Fixes #5
- Allow the database to have duplicates (previous calls were causing a server crash)
- Add tests scenarii for duplication
2023-10-25 00:00:25 +02:00

167 lines
5.1 KiB
Python

from datetime import datetime, timedelta
from urllib.parse import urljoin
from sqlalchemy import Select, desc, func
from sqlalchemy.orm import Session
from argos import schemas
from argos.logging import logger
from argos.server.models import Result, Task
async def list_tasks(db: Session, agent_id: str, limit: int = 100):
"""List tasks and mark them as selected"""
tasks = (
db.query(Task)
.filter(
Task.selected_by == None, # noqa: E711
((Task.next_run <= datetime.now()) | (Task.next_run == None)), # noqa: E711
)
.limit(limit)
.all()
)
now = datetime.now()
for task in tasks:
task.selected_at = now
task.selected_by = agent_id
db.commit()
return tasks
async def get_task(db: Session, id: int) -> Task:
return db.get(Task, id)
async def create_result(db: Session, agent_result: schemas.AgentResult, agent_id: str):
result = Result(
submitted_at=datetime.now(),
status=agent_result.status,
context=agent_result.context,
task_id=agent_result.task_id,
agent_id=agent_id,
)
db.add(result)
return result
async def count_tasks(db: Session, selected=False):
query = db.query(Task)
if selected:
query = query.filter(Task.selected_by is not None)
else:
query = query.filter(Task.selected_by is None)
return query.count()
async def count_results(db: Session):
return db.query(Result).count()
async def update_from_config(db: Session, config: schemas.Config):
tasks = []
unique_properties = []
for website in config.websites:
domain = str(website.domain)
frequency = website.frequency or config.general.frequency
for p in website.paths:
url = urljoin(domain, str(p.path))
for check_key, expected in p.checks:
# Check the db for already existing tasks.
existing_tasks = (
db.query(Task)
.filter(
Task.url == url,
Task.check == check_key,
Task.expected == expected,
)
.all()
)
if existing_tasks:
existing_task = existing_tasks[0]
if frequency != existing_task.frequency:
existing_task.frequency = frequency
msg = f"Skipping db task creation for {url=}, {check_key=}, {expected=}, {frequency=}."
logger.debug(msg)
else:
properties = (url, check_key, expected)
if properties not in unique_properties:
unique_properties.append(properties)
task = Task(
domain=domain,
url=url,
check=check_key,
expected=expected,
frequency=frequency,
)
logger.debug(f"Adding a new task in the db: {task}")
tasks.append(task)
db.add_all(tasks)
db.commit()
async def get_severity_counts(db: Session):
# Get the last result of each task
subquery = (
db.query(Result.task_id, func.max(Result.id).label("max_result_id"))
.group_by(Result.task_id)
.subquery()
)
# Join this back to get full result rows, and group by status
query = (
db.query(Result.severity, func.count(Result.id).label("count"))
.join(subquery, Result.id == subquery.columns.max_result_id)
.group_by(Result.severity)
)
# Execute the query and fetch the results
task_counts_by_severity = query.all()
return task_counts_by_severity
async def remove_old_results(db: Session, max_results: int):
tasks = db.query(Task).all()
deleted = 0
for task in tasks:
# Get the id of the oldest result to keep
subquery = (
db.query(Result.id)
.filter(Result.task_id == task.id)
.order_by(desc(Result.id))
.limit(max_results)
.subquery()
)
min_id = db.query(func.min(subquery.c.id)).scalar()
# Delete all the results older than min_id
if min_id:
deleted += (
db.query(Result)
.where(Result.id < min_id, Result.task_id == task.id)
.delete()
)
db.commit()
return deleted
async def release_old_locks(db: Session, max_lock_seconds: int):
# Get all the jobs that have been selected_at for more than max_lock_time
max_acceptable_time = datetime.now() - timedelta(seconds=max_lock_seconds)
subquery = (
db.query(Task.id).filter(Task.selected_at < max_acceptable_time).subquery()
)
# Release the locks on these jobs
updated = (
db.query(Task)
.filter(Task.id.in_(Select(subquery)))
.update({Task.selected_at: None, Task.selected_by: None})
)
db.commit()
return updated