mirror of
https://framagit.org/framasoft/framaspace/argos.git
synced 2025-04-28 18:02:41 +02:00
- Start implementing some tests using pytest - Packaged using pyproject.toml - Implemented SSL checks using httpx - Checks can now run partially on the server, to access the configuration and determine the severity of the error if any - Used black to format all the files - Added an utility to convert strings like "3d" and "3w" to days - The internal representation of SSL thresholds is now a list of tuples - Models were lacking some relationship between Tasks and Results
73 lines
2.3 KiB
Python
73 lines
2.3 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import exists
|
|
|
|
from argos import schemas
|
|
from argos.logging import logger
|
|
from argos.server.models import Task, Result
|
|
|
|
from urllib.parse import urljoin
|
|
from datetime import datetime
|
|
|
|
|
|
async def list_tasks(db: Session, agent_id: str, limit: int = 100):
|
|
"""List tasks and mark them as selected"""
|
|
tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all()
|
|
now = datetime.now()
|
|
# XXX: Deactivated for now, as it simplifies testing.
|
|
# for task in tasks:
|
|
# task.selected_at = now
|
|
# task.selected_by = agent_id
|
|
# db.commit()
|
|
return tasks
|
|
|
|
|
|
async def get_task(db: Session, id):
|
|
return db.query(Task).get(id)
|
|
|
|
|
|
async def create_result(db: Session, agent_result: schemas.AgentResult):
|
|
result = Result(
|
|
submitted_at=datetime.now(),
|
|
status=agent_result.status,
|
|
context=agent_result.context,
|
|
task_id=agent_result.task_id,
|
|
)
|
|
db.add(result)
|
|
return result
|
|
|
|
|
|
async def count_tasks(db: Session):
|
|
return db.query(Task).count()
|
|
|
|
|
|
async def count_results(db: Session):
|
|
return db.query(Result).count()
|
|
|
|
|
|
async def update_from_config(db: Session, config: schemas.Config):
|
|
for website in config.websites:
|
|
domain = str(website.domain)
|
|
for p in website.paths:
|
|
url = urljoin(domain, str(p.path))
|
|
for check in p.checks:
|
|
for check_key, expected in check.items():
|
|
# Check the db for already existing tasks.
|
|
existing_task = db.query(
|
|
exists().where(
|
|
Task.url == url
|
|
and Task.check == check_key
|
|
and Task.expected == expected
|
|
)
|
|
).scalar()
|
|
|
|
if not existing_task:
|
|
task = Task(
|
|
domain=domain, url=url, check=check_key, expected=expected
|
|
)
|
|
logger.debug(f"Adding a new task in the db: {task}")
|
|
db.add(task)
|
|
else:
|
|
logger.debug(
|
|
f"Skipping db task creation for {url=}, {check_key=}, {expected=}."
|
|
)
|
|
db.commit()
|