mirror of
https://framagit.org/framasoft/framaspace/argos.git
synced 2025-04-29 10:22:37 +02:00
100 lines
3 KiB
Python
100 lines
3 KiB
Python
from datetime import datetime, timedelta
|
|
from typing import List, Literal
|
|
|
|
from sqlalchemy import (
|
|
JSON,
|
|
Enum,
|
|
ForeignKey,
|
|
)
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
|
|
|
from argos.checks import BaseCheck, get_registered_check
|
|
from argos.schemas import WebsiteCheck
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
type_annotation_map = {List[WebsiteCheck]: JSON, dict: JSON}
|
|
|
|
|
|
class Task(Base):
|
|
"""
|
|
There is one task per check.
|
|
|
|
It contains all information needed to run the jobs on the workers.
|
|
Workers will return information in the result table.
|
|
"""
|
|
|
|
__tablename__ = "tasks"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
|
|
# Info needed to run the task
|
|
url: Mapped[str] = mapped_column()
|
|
domain: Mapped[str] = mapped_column()
|
|
check: Mapped[str] = mapped_column()
|
|
expected: Mapped[str] = mapped_column()
|
|
frequency: Mapped[int] = mapped_column()
|
|
|
|
# Orchestration-related
|
|
selected_by: Mapped[str] = mapped_column(nullable=True)
|
|
selected_at: Mapped[datetime] = mapped_column(nullable=True)
|
|
completed_at: Mapped[datetime] = mapped_column(nullable=True)
|
|
next_run: Mapped[datetime] = mapped_column(nullable=True)
|
|
|
|
results: Mapped[List["Result"]] = relationship(back_populates="task")
|
|
|
|
def __str__(self):
|
|
return f"DB Task {self.url} - {self.check} - {self.expected}"
|
|
|
|
def get_check(self) -> BaseCheck:
|
|
"""Returns a check instance for this specific task"""
|
|
return get_registered_check(self.check)
|
|
|
|
def set_times_and_deselect(self):
|
|
self.selected_by = None
|
|
|
|
now = datetime.now()
|
|
self.completed_at = now
|
|
self.next_run = now + timedelta(minutes=self.frequency)
|
|
|
|
@property
|
|
def last_result(self):
|
|
if not self.results:
|
|
return None
|
|
return max(self.results, key=lambda r: r.id)
|
|
|
|
@property
|
|
def status(self):
|
|
if not self.last_result:
|
|
return None
|
|
return self.last_result.status
|
|
|
|
@property
|
|
def severity(self):
|
|
if not self.last_result:
|
|
return None
|
|
return self.last_result.severity
|
|
|
|
|
|
class Result(Base):
|
|
__tablename__ = "results"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
task_id: Mapped[int] = mapped_column(ForeignKey("tasks.id"))
|
|
task: Mapped["Task"] = relationship(back_populates="results")
|
|
agent_id: Mapped[str] = mapped_column(nullable=True)
|
|
|
|
submitted_at: Mapped[datetime] = mapped_column()
|
|
# XXX change "on-check" to something better.
|
|
status: Mapped[Literal["success", "failure", "error", "on-check"]] = mapped_column(
|
|
Enum("success", "failure", "error", "on-check", name="status")
|
|
)
|
|
severity: Mapped[Literal["ok", "warning", "critical"]] = mapped_column(
|
|
Enum("ok", "warning", "critical", name="severity")
|
|
)
|
|
context: Mapped[dict] = mapped_column()
|
|
|
|
def set_status(self, status, severity):
|
|
self.severity = severity
|
|
self.status = status
|
|
|
|
def __str__(self):
|
|
return f"DB Result {self.id} - {self.status} - {self.context}"
|