Refactor code to handle job failures on clients

- Added error handler `SerializableException` in models.py which gets error details from base exception
- Added an API to get the results back from the clients
This commit is contained in:
Alexis Métaireau 2023-10-05 00:48:26 +02:00
parent 0a4850c1ed
commit f41e74d402
9 changed files with 184 additions and 29 deletions

View file

@ -5,12 +5,13 @@ name = "pypi"
[packages]
fastapi = "*"
sqlalchemy = "*"
sqlalchemy-utils = "*"
uvicorn = "*"
pyyaml = "*"
httpx = "*"
click = "*"
aiosqlite = "*"
sqlalchemy = {extras = ["asyncio"], version = "*"}
[dev-packages]

83
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "65abfc821a32d62f8da703a8df902b964e88c2acf91eaeb8b7cf9c2dd2e6b4dd"
"sha256": "01a59c7304004f92b993a672a37e535ee3b3816cdb77093d5431db2124afb567"
},
"pipfile-spec": 6,
"requires": {
@ -16,6 +16,15 @@
]
},
"default": {
"aiosqlite": {
"hashes": [
"sha256:95ee77b91c8d2808bd08a59fbebf66270e9090c3d92ffbf260dc0db0b979577d",
"sha256:edba222e03453e094a3ce605db1b970c4b3376264e56f32e2a4959f948d66a96"
],
"index": "pypi",
"markers": "python_version >= '3.7'",
"version": "==0.19.0"
},
"annotated-types": {
"hashes": [
"sha256:47cdc3490d9ac1506ce92c7aaa76c579dc3509ff11e098fc867e5130ab7be802",
@ -58,6 +67,74 @@
"markers": "python_version >= '3.7'",
"version": "==0.103.2"
},
"greenlet": {
"hashes": [
"sha256:02a807b2a58d5cdebb07050efe3d7deaf915468d112dfcf5e426d0564aa3aa4a",
"sha256:0b72b802496cccbd9b31acea72b6f87e7771ccfd7f7927437d592e5c92ed703c",
"sha256:0d3f83ffb18dc57243e0151331e3c383b05e5b6c5029ac29f754745c800f8ed9",
"sha256:10b5582744abd9858947d163843d323d0b67be9432db50f8bf83031032bc218d",
"sha256:123910c58234a8d40eaab595bc56a5ae49bdd90122dde5bdc012c20595a94c14",
"sha256:19834e3f91f485442adc1ee440171ec5d9a4840a1f7bd5ed97833544719ce10b",
"sha256:1d363666acc21d2c204dd8705c0e0457d7b2ee7a76cb16ffc099d6799744ac99",
"sha256:211ef8d174601b80e01436f4e6905aca341b15a566f35a10dd8d1e93f5dbb3b7",
"sha256:269d06fa0f9624455ce08ae0179430eea61085e3cf6457f05982b37fd2cefe17",
"sha256:2e7dcdfad252f2ca83c685b0fa9fba00e4d8f243b73839229d56ee3d9d219314",
"sha256:334ef6ed8337bd0b58bb0ae4f7f2dcc84c9f116e474bb4ec250a8bb9bd797a66",
"sha256:343675e0da2f3c69d3fb1e894ba0a1acf58f481f3b9372ce1eb465ef93cf6fed",
"sha256:37f60b3a42d8b5499be910d1267b24355c495064f271cfe74bf28b17b099133c",
"sha256:38ad562a104cd41e9d4644f46ea37167b93190c6d5e4048fcc4b80d34ecb278f",
"sha256:3c0d36f5adc6e6100aedbc976d7428a9f7194ea79911aa4bf471f44ee13a9464",
"sha256:3fd2b18432e7298fcbec3d39e1a0aa91ae9ea1c93356ec089421fabc3651572b",
"sha256:4a1a6244ff96343e9994e37e5b4839f09a0207d35ef6134dce5c20d260d0302c",
"sha256:4cd83fb8d8e17633ad534d9ac93719ef8937568d730ef07ac3a98cb520fd93e4",
"sha256:527cd90ba3d8d7ae7dceb06fda619895768a46a1b4e423bdb24c1969823b8362",
"sha256:553d6fb2324e7f4f0899e5ad2c427a4579ed4873f42124beba763f16032959af",
"sha256:56867a3b3cf26dc8a0beecdb4459c59f4c47cdd5424618c08515f682e1d46692",
"sha256:621fcb346141ae08cb95424ebfc5b014361621b8132c48e538e34c3c93ac7365",
"sha256:63acdc34c9cde42a6534518e32ce55c30f932b473c62c235a466469a710bfbf9",
"sha256:6512592cc49b2c6d9b19fbaa0312124cd4c4c8a90d28473f86f92685cc5fef8e",
"sha256:6672fdde0fd1a60b44fb1751a7779c6db487e42b0cc65e7caa6aa686874e79fb",
"sha256:6a5b2d4cdaf1c71057ff823a19d850ed5c6c2d3686cb71f73ae4d6382aaa7a06",
"sha256:6a68d670c8f89ff65c82b936275369e532772eebc027c3be68c6b87ad05ca695",
"sha256:6bb36985f606a7c49916eff74ab99399cdfd09241c375d5a820bb855dfb4af9f",
"sha256:73b2f1922a39d5d59cc0e597987300df3396b148a9bd10b76a058a2f2772fc04",
"sha256:7709fd7bb02b31908dc8fd35bfd0a29fc24681d5cc9ac1d64ad07f8d2b7db62f",
"sha256:8060b32d8586e912a7b7dac2d15b28dbbd63a174ab32f5bc6d107a1c4143f40b",
"sha256:80dcd3c938cbcac986c5c92779db8e8ce51a89a849c135172c88ecbdc8c056b7",
"sha256:813720bd57e193391dfe26f4871186cf460848b83df7e23e6bef698a7624b4c9",
"sha256:831d6f35037cf18ca5e80a737a27d822d87cd922521d18ed3dbc8a6967be50ce",
"sha256:871b0a8835f9e9d461b7fdaa1b57e3492dd45398e87324c047469ce2fc9f516c",
"sha256:952256c2bc5b4ee8df8dfc54fc4de330970bf5d79253c863fb5e6761f00dda35",
"sha256:96d9ea57292f636ec851a9bb961a5cc0f9976900e16e5d5647f19aa36ba6366b",
"sha256:9a812224a5fb17a538207e8cf8e86f517df2080c8ee0f8c1ed2bdaccd18f38f4",
"sha256:9adbd8ecf097e34ada8efde9b6fec4dd2a903b1e98037adf72d12993a1c80b51",
"sha256:9de687479faec7db5b198cc365bc34addd256b0028956501f4d4d5e9ca2e240a",
"sha256:a048293392d4e058298710a54dfaefcefdf49d287cd33fb1f7d63d55426e4355",
"sha256:aa15a2ec737cb609ed48902b45c5e4ff6044feb5dcdfcf6fa8482379190330d7",
"sha256:abe1ef3d780de56defd0c77c5ba95e152f4e4c4e12d7e11dd8447d338b85a625",
"sha256:ad6fb737e46b8bd63156b8f59ba6cdef46fe2b7db0c5804388a2d0519b8ddb99",
"sha256:b1660a15a446206c8545edc292ab5c48b91ff732f91b3d3b30d9a915d5ec4779",
"sha256:b505fcfc26f4148551826a96f7317e02c400665fa0883fe505d4fcaab1dabfdd",
"sha256:b822fab253ac0f330ee807e7485769e3ac85d5eef827ca224feaaefa462dc0d0",
"sha256:bdd696947cd695924aecb3870660b7545a19851f93b9d327ef8236bfc49be705",
"sha256:bdfaeecf8cc705d35d8e6de324bf58427d7eafb55f67050d8f28053a3d57118c",
"sha256:be557119bf467d37a8099d91fbf11b2de5eb1fd5fc5b91598407574848dc910f",
"sha256:c3692ecf3fe754c8c0f2c95ff19626584459eab110eaab66413b1e7425cd84e9",
"sha256:c6b5ce7f40f0e2f8b88c28e6691ca6806814157ff05e794cdd161be928550f4c",
"sha256:c94e4e924d09b5a3e37b853fe5924a95eac058cb6f6fb437ebb588b7eda79870",
"sha256:cc3e2679ea13b4de79bdc44b25a0c4fcd5e94e21b8f290791744ac42d34a0353",
"sha256:d1e22c22f7826096ad503e9bb681b05b8c1f5a8138469b255eb91f26a76634f2",
"sha256:d5539f6da3418c3dc002739cb2bb8d169056aa66e0c83f6bacae0cd3ac26b423",
"sha256:d55db1db455c59b46f794346efce896e754b8942817f46a1bada2d29446e305a",
"sha256:e09dea87cc91aea5500262993cbd484b41edf8af74f976719dd83fe724644cd6",
"sha256:e52a712c38e5fb4fd68e00dc3caf00b60cb65634d50e32281a9d6431b33b4af1",
"sha256:e693e759e172fa1c2c90d35dea4acbdd1d609b6936115d3739148d5e4cd11947",
"sha256:ecf94aa539e97a8411b5ea52fc6ccd8371be9550c4041011a091eb8b3ca1d810",
"sha256:f351479a6914fd81a55c8e68963609f792d9b067fb8a60a042c585a621e0de4f",
"sha256:f47932c434a3c8d3c86d865443fadc1fbf574e9b11d6650b656e602b1797908a"
],
"version": "==3.0.0"
},
"h11": {
"hashes": [
"sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d",
@ -277,6 +354,9 @@
"version": "==1.3.0"
},
"sqlalchemy": {
"extras": [
"asyncio"
],
"hashes": [
"sha256:014794b60d2021cc8ae0f91d4d0331fe92691ae5467a00841f7130fe877b678e",
"sha256:0268256a34806e5d1c8f7ee93277d7ea8cc8ae391f487213139018b6805aeaf6",
@ -320,7 +400,6 @@
"sha256:fb87f763b5d04a82ae84ccff25554ffd903baafba6698e18ebaf32561f2fe4aa",
"sha256:fc6b15465fabccc94bf7e38777d665b6a4f95efd1725049d6184b3a39fd54880"
],
"index": "pypi",
"markers": "python_version >= '3.7'",
"version": "==2.0.21"
},

View file

@ -15,6 +15,7 @@ Features :
- [x] Exposes an HTTP API that can be consumed by other systems ;
- [ ] Exposes a simple read-only website.
- [ ] Packaging (and argos-client / argos-server commands)
- [ ] Handle jobs failures on the clients
Implemented checks :

View file

@ -26,6 +26,10 @@ class BaseCheck:
config: str
expected_cls : Type[BaseExpectedValue] = None
def response(self, passed, **kwargs):
status = "success" if passed else "failure"
return status, kwargs
def __init__(self, client: httpx.AsyncClient, task: Task):
self.client = client
self.task = task
@ -39,20 +43,27 @@ class HTTPStatusCheck(BaseCheck):
config = "status-is"
expected_cls = ExpectedIntValue
async def run(self):
async def run(self) -> dict:
# XXX Get the method from the task
task = self.task
response = await self.client.request(method="get", url=task.url)
logger.error(f"{response.status_code=}, {self.expected=}")
return response.status_code == self.expected
return self.response(
response.status_code == self.expected,
expected=self.expected,
retrieved=response.status_code
)
class HTTPBodyContains(BaseCheck):
config = "body-contains"
expected_cls = ExpectedStringValue
async def run(self):
return True
async def run(self) -> dict:
response = await self.client.request(method="get", url=self.task.url)
return self.response(
self.expected in response.body
)
class SSLCertificateExpiration(BaseCheck):

View file

@ -7,20 +7,26 @@ from argos import logging
from argos.logging import logger
from argos.checks import CheckNotFound, get_check_by_name
from argos.schemas import Task
from argos.schemas import Task, ClientResult, SerializableException
async def complete_task(client: httpx.AsyncClient, task: dict) -> dict:
task = Task(**task)
check_class = get_check_by_name(task.check)
check = check_class(client, task)
result = await check.run()
logger.error(f"{result=}")
return {"id": task.id, "result": "completed"}
try:
task = Task(**task)
check_class = get_check_by_name(task.check)
check = check_class(client, task)
status, context = await check.run()
except Exception as e:
status = "error"
context = SerializableException.from_exception(e)
return ClientResult(task=task.id, status=status, context=context)
async def post_results(client: httpx.AsyncClient, server: str, results: List[dict]):
response = await client.post(f"{server}/results", json={"results": results})
async def post_results(client: httpx.AsyncClient, server: str, results: List[ClientResult]):
json_results = [r.model_dump_json() for r in results]
response = await client.post(f"{server}/results", json={"results": json_results})
if response.status_code == httpx.codes.OK:
logger.info("Successfully posted results")

View file

@ -1,5 +1,7 @@
from pydantic import BaseModel
from datetime import datetime
from typing import Literal
import traceback
# XXX Refactor using SQLModel to avoid duplication of model data
@ -14,3 +16,22 @@ class Task(BaseModel):
class Config:
from_attributes = True
class SerializableException(BaseModel):
error_message: str
error_type: str
error_details: str
@staticmethod
def from_exception(e: BaseException):
return SerializableException(
error_message=str(e),
error_type=str(type(e).__name__),
error_details=traceback.format_exc()
)
class ClientResult(BaseModel):
task : int
status : Literal["success", "failure", "error"]
context: dict | SerializableException

View file

@ -1,17 +1,19 @@
from fastapi import Depends, FastAPI, HTTPException
from fastapi import Depends, FastAPI, HTTPException, Request
from sqlalchemy.orm import Session
from pydantic import BaseModel
from argos.server import queries, models
from argos import schemas
from argos.schemas import ClientResult, Task
from argos.schemas.config import from_yaml as get_schemas_from_yaml
from argos.server.database import SessionLocal, engine
from argos.logging import logger
from typing import List
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
try:
@ -23,16 +25,30 @@ def get_db():
@app.on_event("startup")
async def read_config_and_populate_db():
# XXX Get filename from environment.
config = schemas.config.from_yaml("config.yaml")
config = get_schemas_from_yaml("config.yaml")
db = SessionLocal()
try:
queries.update_from_config(db, config)
await queries.update_from_config(db, config)
finally:
db.close()
@app.get("/tasks", response_model=list[schemas.Task])
async def read_tasks(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = queries.list_tasks(db, limit)
return users
# XXX Get the default limit from the config
@app.get("/tasks", response_model=list[Task])
async def read_tasks(request: Request, limit: int = 20, db: Session = Depends(get_db)):
tasks = await queries.list_tasks(db, client_id=request.client.host, limit=limit)
return tasks
class Results(BaseModel):
results: List[ClientResult]
@app.post("/results", status_code=201)
async def create_result(results: Results, db: Session = Depends(get_db)):
result_ids = []
for client_result in results:
result_id = await queries.create_result(db, client_result)
db.commit()
result_ids.append(result_id)
return {"result_ids": result_ids}

View file

@ -1,6 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
SQLALCHEMY_DATABASE_URL = "sqlite:////tmp/argos.db"

View file

@ -3,16 +3,35 @@ from sqlalchemy import exists
from argos import schemas
from argos.logging import logger
from argos.server.models import Task
from argos.server.models import Task, Result
from urllib.parse import urljoin
from datetime import datetime
def list_tasks(db: Session, limit: int = 100):
return db.query(Task).limit(limit).all()
async def list_tasks(db: Session, client_id: str, limit: int = 100):
"""List tasks and mark them as selected
"""
tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all()
now = datetime.now()
# XXX: Deactivated for now, as it simplifies testing.
# for task in tasks:
# task.selected_at = now
# task.selected_by = client_id
# db.commit()
return tasks
async def create_result(db: Session, client_result:schemas.ClientResult):
result = Result()
result.submitted_at = datetime.now()
result.success = True if client_result.status == "success" else False
result.content = str(client_result.context)
return db.add(result)
def update_from_config(db: Session, config: schemas.Config):
async def update_from_config(db: Session, config: schemas.Config):
for website in config.websites:
domain = str(website.domain)
for p in website.paths: