Updated models.py and queries.py

- Removed the `Definition` class and added the `Task` class. It contains all information needed to run the jobs on the workers.
- Added the `Result` class. It stores the results returned by workers.
- In queries.py, updated the `update_from_config` function. Now it checks for the existence of tasks with the same URL, check, and expected result before adding new ones.
This commit is contained in:
Alexis Métaireau 2023-10-02 12:59:30 +02:00
parent eea465af5c
commit 8ac5cdb529
3 changed files with 83 additions and 43 deletions

View file

@ -1,24 +1,49 @@
# On service start. # Argos
🚧 This is mainly a work in progress for now. It's not working, don't try to install it ! 🚧
Argos is an HTTP monitoring service. It's meant to be simple to configure and simple to use.
Features :
- [x] Uses `.yaml` files for configuration ;
- [x] Read the configuration file and convert it to tasks ;
- [x] Store tasks in a database ;
- [ ] Checks can be distributed on the network thanks to a job queue ;
- [x] Multiple paths per websites can be tested ;
- [ ] Handles multiple alerting backends (email, sms, gotify) ;
- [ ] Exposes an HTTP API that can be consumed by other systems ;
- [ ] Exposes a simple read-only website.
Implemented checks :
- [ ] Returned status code matches what you expect ;
- [ ] Returned body matches what you expect ;
- [ ] SSL certificate expires in more than X days ;
## Development notes
### On service start.
1. Read the job definitions file and populate the database. 1. Read the job definitions file and populate the database.
2. From the job definition, create a list of tasks to execute. 2. From the job definition, create a list of tasks to execute.
3. From time to time (?) clean the db. 3. From time to time (?) clean the db.
# On configuration changes : ### On configuration changes :
- Find and tombstone the JobDefinitions that are not useful anymore. - Find and tombstone the JobDefinitions that are not useful anymore.
- Cascade delete the child tasks that are planned. Tombstone them as wel. - Cascade delete the child tasks that are planned. Tombstone them as wel.
# On worker demand : ### On worker demand :
- Find the tasks for which : - Find the tasks for which :
- last_check is not defined - last_check is not defined
- OR last_check + max_timedelta > datetime.now() - OR last_check + max_timedelta > datetime.now()
- AND selected_by not defined. - AND selected_by not defined.
- Mark these tasks as selected by the current worker, on the current date. - Mark these tasks as selected by the current worker, on the current date.
# From time to time: ### From time to time:
- Check for stalled tasks (datetime.now() - selected_at) > MAX_WORKER_TIME. Remove the lock. - Check for stalled tasks (datetime.now() - selected_at) > MAX_WORKER_TIME. Remove the lock.
# On the worker side ### On the worker side
Hey, I'm XX, give me some work. Hey, I'm XX, give me some work.
<Service answers> <Service answers>
OK, this is done, here are the results for Task<id>: response. OK, this is done, here are the results for Task<id>: response.

View file

@ -8,32 +8,37 @@ from datetime import datetime
from .schemas import WebsiteCheck from .schemas import WebsiteCheck
class Base(DeclarativeBase): class Base(DeclarativeBase):
type_annotation_map = { type_annotation_map = {List[WebsiteCheck]: JSON, dict: JSON}
List[WebsiteCheck]: JSON,
dict: JSON
}
class Definition(Base):
__tablename__ = "definitions"
id : Mapped[int]= mapped_column(primary_key=True)
domain : Mapped[str] = mapped_column()
path : Mapped[str] = mapped_column()
# checks : Mapped[List[WebsiteCheck]] = mapped_column()
tasks : Mapped[List["Task"]] = relationship(back_populates="definition")
class Task(Base): class Task(Base):
__tablename__ = "tasks" """
There is one task per check.
id : Mapped[int] = mapped_column(primary_key=True) It contains all information needed to run the jobs on the workers.
# XXX enforce "ok" | "nok" | "unknown" Workers will return information in the result table.
status: Mapped[str] = mapped_column(default="unknown") """
max_delta_days: Mapped[int] = mapped_column()
response: Mapped[dict] = mapped_column(default={}) __tablename__ = "tasks"
last_check: Mapped[datetime] = mapped_column(nullable=True) id: Mapped[int] = mapped_column(primary_key=True)
# Info needed to run the task
url: Mapped[str] = mapped_column()
domain: Mapped[str] = mapped_column()
check: Mapped[str] = mapped_column()
expected: Mapped[str] = mapped_column()
# Orchestration-related
selected_by: Mapped[str] = mapped_column(nullable=True) selected_by: Mapped[str] = mapped_column(nullable=True)
selected_at: Mapped[datetime] = mapped_column(nullable=True) selected_at: Mapped[datetime] = mapped_column(nullable=True)
definition_id : Mapped[str] = mapped_column(ForeignKey("definitions.id"))
definition : Mapped["Definition"] = relationship(back_populates="tasks")
class Result(Base):
__tablename__ = "results"
id: Mapped[int] = mapped_column(primary_key=True)
submitted_at: Mapped[datetime] = mapped_column()
success: Mapped[bool] = mapped_column()
content: Mapped[str] = mapped_column()

View file

@ -1,7 +1,10 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import exists
from . import models, schemas from . import schemas
from .models import Task
from .logging import logger from .logging import logger
from urllib.parse import urljoin
def list_tasks(db: Session, limit: int = 100): def list_tasks(db: Session, limit: int = 100):
@ -10,21 +13,28 @@ def list_tasks(db: Session, limit: int = 100):
def update_from_config(db: Session, config: schemas.Config): def update_from_config(db: Session, config: schemas.Config):
for website in config.websites: for website in config.websites:
domain = website.domain domain = str(website.domain)
for p in website.paths: for p in website.paths:
definition = models.Definition( url = urljoin(domain, str(p.path))
domain = str(website.domain),
path = str(p.path),
)
db.add(definition)
for check in p.checks: for check in p.checks:
for check_key, check_value in check.items(): for check_key, expected in check.items():
logger.debug(f"{check_key=}, {check_value=}") # Check the db for already existing tasks.
task = models.Task(
definition=definition, existing_task = db.query(exists().where(
status= "unknown", Task.url == url
max_delta_days = 1, #XXX This should be defined in the config. and Task.check == check_key
and Task.expected == expected
)).scalar()
if not existing_task:
task = Task(
domain = domain,
url = url,
check = check_key,
expected = expected
) )
logger.debug(f"Adding a new task in the db: {task=}")
db.add(task) db.add(task)
else:
logger.debug(f"Skipping db task creation for {url=}, {check_key=}, {expected=}.")
db.commit() db.commit()