From f41e74d40287f88505763f74f425f8de10960ebb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Thu, 5 Oct 2023 00:48:26 +0200 Subject: [PATCH] Refactor code to handle job failures on clients - Added error handler `SerializableException` in models.py which gets error details from base exception - Added an API to get the results back from the clients --- Pipfile | 3 +- Pipfile.lock | 83 +++++++++++++++++++++++++++++++++++++++- README.md | 1 + argos/checks/__init__.py | 19 +++++++-- argos/client/cli.py | 24 +++++++----- argos/schemas/models.py | 21 ++++++++++ argos/server/api.py | 34 +++++++++++----- argos/server/database.py | 1 + argos/server/queries.py | 27 +++++++++++-- 9 files changed, 184 insertions(+), 29 deletions(-) diff --git a/Pipfile b/Pipfile index 7d25139..421dfed 100644 --- a/Pipfile +++ b/Pipfile @@ -5,12 +5,13 @@ name = "pypi" [packages] fastapi = "*" -sqlalchemy = "*" sqlalchemy-utils = "*" uvicorn = "*" pyyaml = "*" httpx = "*" click = "*" +aiosqlite = "*" +sqlalchemy = {extras = ["asyncio"], version = "*"} [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index ee24ae4..09e1403 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "65abfc821a32d62f8da703a8df902b964e88c2acf91eaeb8b7cf9c2dd2e6b4dd" + "sha256": "01a59c7304004f92b993a672a37e535ee3b3816cdb77093d5431db2124afb567" }, "pipfile-spec": 6, "requires": { @@ -16,6 +16,15 @@ ] }, "default": { + "aiosqlite": { + "hashes": [ + "sha256:95ee77b91c8d2808bd08a59fbebf66270e9090c3d92ffbf260dc0db0b979577d", + "sha256:edba222e03453e094a3ce605db1b970c4b3376264e56f32e2a4959f948d66a96" + ], + "index": "pypi", + "markers": "python_version >= '3.7'", + "version": "==0.19.0" + }, "annotated-types": { "hashes": [ "sha256:47cdc3490d9ac1506ce92c7aaa76c579dc3509ff11e098fc867e5130ab7be802", @@ -58,6 +67,74 @@ "markers": "python_version >= '3.7'", "version": "==0.103.2" }, + "greenlet": { + "hashes": [ + "sha256:02a807b2a58d5cdebb07050efe3d7deaf915468d112dfcf5e426d0564aa3aa4a", + "sha256:0b72b802496cccbd9b31acea72b6f87e7771ccfd7f7927437d592e5c92ed703c", + "sha256:0d3f83ffb18dc57243e0151331e3c383b05e5b6c5029ac29f754745c800f8ed9", + "sha256:10b5582744abd9858947d163843d323d0b67be9432db50f8bf83031032bc218d", + "sha256:123910c58234a8d40eaab595bc56a5ae49bdd90122dde5bdc012c20595a94c14", + "sha256:19834e3f91f485442adc1ee440171ec5d9a4840a1f7bd5ed97833544719ce10b", + "sha256:1d363666acc21d2c204dd8705c0e0457d7b2ee7a76cb16ffc099d6799744ac99", + "sha256:211ef8d174601b80e01436f4e6905aca341b15a566f35a10dd8d1e93f5dbb3b7", + "sha256:269d06fa0f9624455ce08ae0179430eea61085e3cf6457f05982b37fd2cefe17", + "sha256:2e7dcdfad252f2ca83c685b0fa9fba00e4d8f243b73839229d56ee3d9d219314", + "sha256:334ef6ed8337bd0b58bb0ae4f7f2dcc84c9f116e474bb4ec250a8bb9bd797a66", + "sha256:343675e0da2f3c69d3fb1e894ba0a1acf58f481f3b9372ce1eb465ef93cf6fed", + "sha256:37f60b3a42d8b5499be910d1267b24355c495064f271cfe74bf28b17b099133c", + "sha256:38ad562a104cd41e9d4644f46ea37167b93190c6d5e4048fcc4b80d34ecb278f", + "sha256:3c0d36f5adc6e6100aedbc976d7428a9f7194ea79911aa4bf471f44ee13a9464", + "sha256:3fd2b18432e7298fcbec3d39e1a0aa91ae9ea1c93356ec089421fabc3651572b", + "sha256:4a1a6244ff96343e9994e37e5b4839f09a0207d35ef6134dce5c20d260d0302c", + "sha256:4cd83fb8d8e17633ad534d9ac93719ef8937568d730ef07ac3a98cb520fd93e4", + "sha256:527cd90ba3d8d7ae7dceb06fda619895768a46a1b4e423bdb24c1969823b8362", + "sha256:553d6fb2324e7f4f0899e5ad2c427a4579ed4873f42124beba763f16032959af", + "sha256:56867a3b3cf26dc8a0beecdb4459c59f4c47cdd5424618c08515f682e1d46692", + "sha256:621fcb346141ae08cb95424ebfc5b014361621b8132c48e538e34c3c93ac7365", + "sha256:63acdc34c9cde42a6534518e32ce55c30f932b473c62c235a466469a710bfbf9", + "sha256:6512592cc49b2c6d9b19fbaa0312124cd4c4c8a90d28473f86f92685cc5fef8e", + "sha256:6672fdde0fd1a60b44fb1751a7779c6db487e42b0cc65e7caa6aa686874e79fb", + "sha256:6a5b2d4cdaf1c71057ff823a19d850ed5c6c2d3686cb71f73ae4d6382aaa7a06", + "sha256:6a68d670c8f89ff65c82b936275369e532772eebc027c3be68c6b87ad05ca695", + "sha256:6bb36985f606a7c49916eff74ab99399cdfd09241c375d5a820bb855dfb4af9f", + "sha256:73b2f1922a39d5d59cc0e597987300df3396b148a9bd10b76a058a2f2772fc04", + "sha256:7709fd7bb02b31908dc8fd35bfd0a29fc24681d5cc9ac1d64ad07f8d2b7db62f", + "sha256:8060b32d8586e912a7b7dac2d15b28dbbd63a174ab32f5bc6d107a1c4143f40b", + "sha256:80dcd3c938cbcac986c5c92779db8e8ce51a89a849c135172c88ecbdc8c056b7", + "sha256:813720bd57e193391dfe26f4871186cf460848b83df7e23e6bef698a7624b4c9", + "sha256:831d6f35037cf18ca5e80a737a27d822d87cd922521d18ed3dbc8a6967be50ce", + "sha256:871b0a8835f9e9d461b7fdaa1b57e3492dd45398e87324c047469ce2fc9f516c", + "sha256:952256c2bc5b4ee8df8dfc54fc4de330970bf5d79253c863fb5e6761f00dda35", + "sha256:96d9ea57292f636ec851a9bb961a5cc0f9976900e16e5d5647f19aa36ba6366b", + "sha256:9a812224a5fb17a538207e8cf8e86f517df2080c8ee0f8c1ed2bdaccd18f38f4", + "sha256:9adbd8ecf097e34ada8efde9b6fec4dd2a903b1e98037adf72d12993a1c80b51", + "sha256:9de687479faec7db5b198cc365bc34addd256b0028956501f4d4d5e9ca2e240a", + "sha256:a048293392d4e058298710a54dfaefcefdf49d287cd33fb1f7d63d55426e4355", + "sha256:aa15a2ec737cb609ed48902b45c5e4ff6044feb5dcdfcf6fa8482379190330d7", + "sha256:abe1ef3d780de56defd0c77c5ba95e152f4e4c4e12d7e11dd8447d338b85a625", + "sha256:ad6fb737e46b8bd63156b8f59ba6cdef46fe2b7db0c5804388a2d0519b8ddb99", + "sha256:b1660a15a446206c8545edc292ab5c48b91ff732f91b3d3b30d9a915d5ec4779", + "sha256:b505fcfc26f4148551826a96f7317e02c400665fa0883fe505d4fcaab1dabfdd", + "sha256:b822fab253ac0f330ee807e7485769e3ac85d5eef827ca224feaaefa462dc0d0", + "sha256:bdd696947cd695924aecb3870660b7545a19851f93b9d327ef8236bfc49be705", + "sha256:bdfaeecf8cc705d35d8e6de324bf58427d7eafb55f67050d8f28053a3d57118c", + "sha256:be557119bf467d37a8099d91fbf11b2de5eb1fd5fc5b91598407574848dc910f", + "sha256:c3692ecf3fe754c8c0f2c95ff19626584459eab110eaab66413b1e7425cd84e9", + "sha256:c6b5ce7f40f0e2f8b88c28e6691ca6806814157ff05e794cdd161be928550f4c", + "sha256:c94e4e924d09b5a3e37b853fe5924a95eac058cb6f6fb437ebb588b7eda79870", + "sha256:cc3e2679ea13b4de79bdc44b25a0c4fcd5e94e21b8f290791744ac42d34a0353", + "sha256:d1e22c22f7826096ad503e9bb681b05b8c1f5a8138469b255eb91f26a76634f2", + "sha256:d5539f6da3418c3dc002739cb2bb8d169056aa66e0c83f6bacae0cd3ac26b423", + "sha256:d55db1db455c59b46f794346efce896e754b8942817f46a1bada2d29446e305a", + "sha256:e09dea87cc91aea5500262993cbd484b41edf8af74f976719dd83fe724644cd6", + "sha256:e52a712c38e5fb4fd68e00dc3caf00b60cb65634d50e32281a9d6431b33b4af1", + "sha256:e693e759e172fa1c2c90d35dea4acbdd1d609b6936115d3739148d5e4cd11947", + "sha256:ecf94aa539e97a8411b5ea52fc6ccd8371be9550c4041011a091eb8b3ca1d810", + "sha256:f351479a6914fd81a55c8e68963609f792d9b067fb8a60a042c585a621e0de4f", + "sha256:f47932c434a3c8d3c86d865443fadc1fbf574e9b11d6650b656e602b1797908a" + ], + "version": "==3.0.0" + }, "h11": { "hashes": [ "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d", @@ -277,6 +354,9 @@ "version": "==1.3.0" }, "sqlalchemy": { + "extras": [ + "asyncio" + ], "hashes": [ "sha256:014794b60d2021cc8ae0f91d4d0331fe92691ae5467a00841f7130fe877b678e", "sha256:0268256a34806e5d1c8f7ee93277d7ea8cc8ae391f487213139018b6805aeaf6", @@ -320,7 +400,6 @@ "sha256:fb87f763b5d04a82ae84ccff25554ffd903baafba6698e18ebaf32561f2fe4aa", "sha256:fc6b15465fabccc94bf7e38777d665b6a4f95efd1725049d6184b3a39fd54880" ], - "index": "pypi", "markers": "python_version >= '3.7'", "version": "==2.0.21" }, diff --git a/README.md b/README.md index 602ce32..034f0ef 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Features : - [x] Exposes an HTTP API that can be consumed by other systems ; - [ ] Exposes a simple read-only website. - [ ] Packaging (and argos-client / argos-server commands) +- [ ] Handle jobs failures on the clients Implemented checks : diff --git a/argos/checks/__init__.py b/argos/checks/__init__.py index 581dee2..304aaa7 100644 --- a/argos/checks/__init__.py +++ b/argos/checks/__init__.py @@ -26,6 +26,10 @@ class BaseCheck: config: str expected_cls : Type[BaseExpectedValue] = None + def response(self, passed, **kwargs): + status = "success" if passed else "failure" + return status, kwargs + def __init__(self, client: httpx.AsyncClient, task: Task): self.client = client self.task = task @@ -39,20 +43,27 @@ class HTTPStatusCheck(BaseCheck): config = "status-is" expected_cls = ExpectedIntValue - async def run(self): + async def run(self) -> dict: # XXX Get the method from the task task = self.task response = await self.client.request(method="get", url=task.url) logger.error(f"{response.status_code=}, {self.expected=}") - return response.status_code == self.expected + return self.response( + response.status_code == self.expected, + expected=self.expected, + retrieved=response.status_code + ) class HTTPBodyContains(BaseCheck): config = "body-contains" expected_cls = ExpectedStringValue - async def run(self): - return True + async def run(self) -> dict: + response = await self.client.request(method="get", url=self.task.url) + return self.response( + self.expected in response.body + ) class SSLCertificateExpiration(BaseCheck): diff --git a/argos/client/cli.py b/argos/client/cli.py index 5bb988e..951b6f1 100644 --- a/argos/client/cli.py +++ b/argos/client/cli.py @@ -7,20 +7,26 @@ from argos import logging from argos.logging import logger from argos.checks import CheckNotFound, get_check_by_name -from argos.schemas import Task +from argos.schemas import Task, ClientResult, SerializableException + async def complete_task(client: httpx.AsyncClient, task: dict) -> dict: - task = Task(**task) - check_class = get_check_by_name(task.check) - check = check_class(client, task) - result = await check.run() - logger.error(f"{result=}") - return {"id": task.id, "result": "completed"} + try: + task = Task(**task) + check_class = get_check_by_name(task.check) + check = check_class(client, task) + status, context = await check.run() + + except Exception as e: + status = "error" + context = SerializableException.from_exception(e) + return ClientResult(task=task.id, status=status, context=context) -async def post_results(client: httpx.AsyncClient, server: str, results: List[dict]): - response = await client.post(f"{server}/results", json={"results": results}) +async def post_results(client: httpx.AsyncClient, server: str, results: List[ClientResult]): + json_results = [r.model_dump_json() for r in results] + response = await client.post(f"{server}/results", json={"results": json_results}) if response.status_code == httpx.codes.OK: logger.info("Successfully posted results") diff --git a/argos/schemas/models.py b/argos/schemas/models.py index 913b9d2..5ea72b6 100644 --- a/argos/schemas/models.py +++ b/argos/schemas/models.py @@ -1,5 +1,7 @@ from pydantic import BaseModel from datetime import datetime +from typing import Literal +import traceback # XXX Refactor using SQLModel to avoid duplication of model data @@ -14,3 +16,22 @@ class Task(BaseModel): class Config: from_attributes = True + + +class SerializableException(BaseModel): + error_message: str + error_type: str + error_details: str + + @staticmethod + def from_exception(e: BaseException): + return SerializableException( + error_message=str(e), + error_type=str(type(e).__name__), + error_details=traceback.format_exc() + ) + +class ClientResult(BaseModel): + task : int + status : Literal["success", "failure", "error"] + context: dict | SerializableException \ No newline at end of file diff --git a/argos/server/api.py b/argos/server/api.py index e6110a0..2a73a9c 100644 --- a/argos/server/api.py +++ b/argos/server/api.py @@ -1,17 +1,19 @@ -from fastapi import Depends, FastAPI, HTTPException +from fastapi import Depends, FastAPI, HTTPException, Request from sqlalchemy.orm import Session +from pydantic import BaseModel from argos.server import queries, models -from argos import schemas +from argos.schemas import ClientResult, Task +from argos.schemas.config import from_yaml as get_schemas_from_yaml from argos.server.database import SessionLocal, engine from argos.logging import logger +from typing import List models.Base.metadata.create_all(bind=engine) app = FastAPI() -# Dependency def get_db(): db = SessionLocal() try: @@ -23,16 +25,30 @@ def get_db(): @app.on_event("startup") async def read_config_and_populate_db(): # XXX Get filename from environment. - config = schemas.config.from_yaml("config.yaml") + config = get_schemas_from_yaml("config.yaml") db = SessionLocal() try: - queries.update_from_config(db, config) + await queries.update_from_config(db, config) finally: db.close() -@app.get("/tasks", response_model=list[schemas.Task]) -async def read_tasks(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): - users = queries.list_tasks(db, limit) - return users +# XXX Get the default limit from the config +@app.get("/tasks", response_model=list[Task]) +async def read_tasks(request: Request, limit: int = 20, db: Session = Depends(get_db)): + tasks = await queries.list_tasks(db, client_id=request.client.host, limit=limit) + return tasks + + +class Results(BaseModel): + results: List[ClientResult] + +@app.post("/results", status_code=201) +async def create_result(results: Results, db: Session = Depends(get_db)): + result_ids = [] + for client_result in results: + result_id = await queries.create_result(db, client_result) + db.commit() + result_ids.append(result_id) + return {"result_ids": result_ids} \ No newline at end of file diff --git a/argos/server/database.py b/argos/server/database.py index db242f1..e01c07e 100644 --- a/argos/server/database.py +++ b/argos/server/database.py @@ -1,6 +1,7 @@ from sqlalchemy import create_engine from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession SQLALCHEMY_DATABASE_URL = "sqlite:////tmp/argos.db" diff --git a/argos/server/queries.py b/argos/server/queries.py index d177e9f..a09dce1 100644 --- a/argos/server/queries.py +++ b/argos/server/queries.py @@ -3,16 +3,35 @@ from sqlalchemy import exists from argos import schemas from argos.logging import logger -from argos.server.models import Task +from argos.server.models import Task, Result from urllib.parse import urljoin +from datetime import datetime -def list_tasks(db: Session, limit: int = 100): - return db.query(Task).limit(limit).all() +async def list_tasks(db: Session, client_id: str, limit: int = 100): + """List tasks and mark them as selected + """ + tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all() + now = datetime.now() + # XXX: Deactivated for now, as it simplifies testing. + # for task in tasks: + # task.selected_at = now + # task.selected_by = client_id + # db.commit() + return tasks + +async def create_result(db: Session, client_result:schemas.ClientResult): + result = Result() + + result.submitted_at = datetime.now() + result.success = True if client_result.status == "success" else False + result.content = str(client_result.context) + + return db.add(result) -def update_from_config(db: Session, config: schemas.Config): +async def update_from_config(db: Session, config: schemas.Config): for website in config.websites: domain = str(website.domain) for p in website.paths: