From ca6584c80327e3feb2d057a0d34e4668be14eced Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Tue, 10 Oct 2023 19:24:50 +0200 Subject: [PATCH] Loop the agents and enhance the selection of tasks on the server --- Pipfile | 1 + Pipfile.lock | 3 ++- README.md | 3 ++- argos/agent.py | 51 ++++++++++++++++++++++++---------------- argos/checks/base.py | 26 ++++++++++++++++++++ argos/checks/checks.py | 20 ++++++++++------ argos/commands.py | 9 +++++-- argos/server/alerting.py | 6 +++++ argos/server/api.py | 27 +++++++++++++-------- argos/server/models.py | 15 ++++++++++-- argos/server/queries.py | 20 ++++++++++------ config.yaml | 2 +- 12 files changed, 132 insertions(+), 51 deletions(-) create mode 100644 argos/server/alerting.py diff --git a/Pipfile b/Pipfile index 5f37610..afa87cd 100644 --- a/Pipfile +++ b/Pipfile @@ -16,6 +16,7 @@ pyopenssl = "*" ipdb = "*" argos = {extras = ["dev"], file = ".", editable = true} pyyaml-include = "*" +jedi = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index a8c98e5..1e36b2d 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "8cc9237ff86d00019539f36e3df5b20edcbbc60f52d1b0fce2b03e51c089ad39" + "sha256": "080d35bdb1052afa4590e047f3f0cd1d8263b1e97d0095c379f362a81a7868ce" }, "pipfile-spec": 6, "requires": { @@ -365,6 +365,7 @@ "sha256:cf0496f3651bc65d7174ac1b7d043eff454892c708a87d1b683e57b569927ffd", "sha256:e983c654fe5c02867aef4cdfce5a2fbb4a50adc0af145f70504238f18ef5e7e0" ], + "index": "pypi", "markers": "python_version >= '3.6'", "version": "==0.19.1" }, diff --git a/README.md b/README.md index 67d0a52..aca7786 100644 --- a/README.md +++ b/README.md @@ -16,11 +16,12 @@ Features : - [x] Change the naming and use service/agent. - [x] Packaging (and `argos agent` / `argos service` commands) - [x] Endpoints are protected by an authentication token +- [ ] Find a way to define when the task should be checked again (via config ? stored on the tasks themselves ?) - [ ] Local task for database cleanup (to run periodically) - [ ] Handles multiple alerting backends (email, sms, gotify) ; - [ ] Exposes a simple read-only website. - [ ] Add a way to specify the severity of the alerts in the config -- [ ] Do not return `selected_at` and `selected_by` in the `/tasks` endpoint (?) +- [ ] No need to return the expected and got values in case it worked in check-status and body-contains Implemented checks : diff --git a/argos/agent.py b/argos/agent.py index fba8768..31b0765 100644 --- a/argos/agent.py +++ b/argos/agent.py @@ -35,28 +35,39 @@ async def post_results( logger.error(f"Successfully posted results {response.json()}") else: logger.error(f"Failed to post results: {response.read()}") + return response -async def run_agent(server: str, auth: str, max_tasks: int): - tasks = [] +async def get_and_complete_tasks(http_client, server, max_tasks): + # Fetch the list of tasks + response = await http_client.get(f"{server}/tasks") + if response.status_code == httpx.codes.OK: + # XXX Maybe we want to group the tests by URL ? (to issue one request per URL) + data = response.json() + logger.info(f"Received {len(data)} tasks from the server") + + tasks = [] + for task in data: + tasks.append(complete_task(http_client, task)) + + if tasks: + results = await asyncio.gather(*tasks) + await post_results(http_client, server, results) + return True + else: + logger.error("Got no tasks from the server.") + return False + else: + logger.error(f"Failed to fetch tasks: {response.read()}") + return False + + +async def run_agent(server: str, auth: str, max_tasks: int, wait_time: int): headers = {"Authorization": f"Bearer {auth}"} async with httpx.AsyncClient(headers=headers) as http_client: - # Fetch the list of tasks - response = await http_client.get(f"{server}/tasks") - - if response.status_code == httpx.codes.OK: - # XXX Maybe we want to group the tests by URL ? (to issue one request per URL) - data = response.json() - logger.info(f"Received {len(data)} tasks from the server") - - for task in data: - tasks.append(complete_task(http_client, task)) - - # Run up to max_tasks concurrent tasks - results = await asyncio.gather(*tasks) - - # Post the results - await post_results(http_client, server, results) - else: - logger.error(f"Failed to fetch tasks: {response.read()}") + while True: + retry_now = await get_and_complete_tasks(http_client, server, max_tasks) + if not retry_now: + logger.error(f"Waiting {wait_time} seconds before next retry") + await asyncio.sleep(wait_time) diff --git a/argos/checks/base.py b/argos/checks/base.py index 427d6b4..aa46d04 100644 --- a/argos/checks/base.py +++ b/argos/checks/base.py @@ -11,6 +11,13 @@ class Status: ON_CHECK = "on-check" SUCCESS = "success" FAILURE = "failure" + ERROR = "error" + + +class Severity: + OK = "ok" + WARNING = "warning" + CRITICAL = "critical" # XXX We could name this Result, but is it could overlap with schemas.Result. @@ -89,6 +96,25 @@ class BaseCheck: status = kwargs.pop("status") return Response.new(status, **kwargs) + @classmethod + async def finalize(self, config, result, **context): + """By default, the finalize considers that : + + - All FAILUREs should be reported as CRITICAL + - All SUCCESS should be reported as OK + - All ERRORS should be reported as CRITICAL. + + This behaviour can be changed in each check, by defining the `finalize` method. + XXX This can also be tweaked by the config. + """ + if result.status in (Status.SUCCESS, Status.ERROR): + return result.status, Severity.OK + elif result.status == Status.FAILURE: + return result.status, Severity.CRITICAL + elif result.status == Status.ON_CHECK: + msg = "Status is 'on-check', but the Check class didn't provide a finalize() method." + raise ValueError(msg) + def get_registered_check(name): return BaseCheck.get_registered_check(name) diff --git a/argos/checks/checks.py b/argos/checks/checks.py index d1cc6ae..e8bfa84 100644 --- a/argos/checks/checks.py +++ b/argos/checks/checks.py @@ -4,8 +4,14 @@ from datetime import datetime from OpenSSL import crypto -from argos.checks.base import (BaseCheck, ExpectedIntValue, - ExpectedStringValue, Response, Status) +from argos.checks.base import ( + BaseCheck, + ExpectedIntValue, + ExpectedStringValue, + Response, + Status, + Severity, +) from argos.logging import logger @@ -54,10 +60,10 @@ class SSLCertificateExpiration(BaseCheck): return self.response(status=Status.ON_CHECK, expires_in=expires_in) @classmethod - async def finalize(cls, config, callback, expires_in): + async def finalize(cls, config, result, expires_in): thresholds = config.ssl.thresholds - thresholds.sort(reverse=True) + thresholds.sort() for days, severity in thresholds: - if expires_in > days: - callback(severity) - break + if expires_in < days: + return Status.FAILURE, severity + return Status.SUCCESS, Severity.OK diff --git a/argos/commands.py b/argos/commands.py index f471425..b998b8b 100644 --- a/argos/commands.py +++ b/argos/commands.py @@ -17,15 +17,20 @@ def cli(): @click.option("--server", required=True, help="Server URL") @click.option("--auth", required=True, help="The authentication token") @click.option("--max-tasks", default=10, help="Maximum number of concurrent tasks") +@click.option( + "--wait-time", + default=10, + help="Wait time (in seconds) between two polls on the server", +) @click.option( "--log-level", default="INFO", type=click.Choice(logging.LOG_LEVELS, case_sensitive=False), ) -def agent(server, auth, max_tasks, log_level): +def agent(server, auth, max_tasks, wait_time, log_level): """Runs an agent""" logging.set_log_level(log_level) - asyncio.run(run_agent(server, auth, max_tasks)) + asyncio.run(run_agent(server, auth, max_tasks, wait_time)) @cli.command() diff --git a/argos/server/alerting.py b/argos/server/alerting.py new file mode 100644 index 0000000..b2775c0 --- /dev/null +++ b/argos/server/alerting.py @@ -0,0 +1,6 @@ +from argos.logging import logger + + +def handle_alert(config, result, task, severity): + msg = f"{result=}, {task=}, {severity=}" + logger.error(msg) diff --git a/argos/server/api.py b/argos/server/api.py index c3a2507..4d62bcc 100644 --- a/argos/server/api.py +++ b/argos/server/api.py @@ -12,6 +12,7 @@ from argos.schemas import AgentResult, Task from argos.schemas.config import from_yaml as get_schemas_from_yaml from argos.server import models, queries from argos.server.database import SessionLocal, engine +from argos.server.alerting import handle_alert models.Base.metadata.create_all(bind=engine) @@ -74,15 +75,20 @@ async def create_result(results: List[AgentResult], db: Session = Depends(get_db result = await queries.create_result(db, agent_result) # XXX Maybe offload this to a queue. # XXX Use a schema for the on-check value. - - if result.status == "on-check": - task = await queries.get_task(db, agent_result.task_id) - if not task: - logger.error(f"Unable to find task {agent_result.task_id}") - else: - check = task.get_check() - callback = logger.error - await check.finalize(app.config, callback=callback, **result.context) + # XXX Get all the tasks at once, to limit the queries on the db + task = await queries.get_task(db, agent_result.task_id) + if not task: + logger.error(f"Unable to find task {agent_result.task_id}") + else: + check = task.get_check() + status, severity = await check.finalize( + app.config, result, **result.context + ) + result.severity = severity + result.status = status + # Set the selection status to None + task.selected_by = None + handle_alert(app.config, result, task, severity) db_results.append(result) db.commit() @@ -92,6 +98,7 @@ async def create_result(results: List[AgentResult], db: Session = Depends(get_db @app.get("/stats", dependencies=[Depends(verify_token)]) async def get_stats(db: Session = Depends(get_db)): return { - "tasks_count": await queries.count_tasks(db), + "upcoming_tasks_count": await queries.count_tasks(db, selected=False), "results_count": await queries.count_results(db), + "selected_tasks_count": await queries.count_tasks(db, selected=True), } diff --git a/argos/server/models.py b/argos/server/models.py index 4cbfa6e..24bd7ff 100644 --- a/argos/server/models.py +++ b/argos/server/models.py @@ -1,8 +1,16 @@ from datetime import datetime from typing import List, Literal -from sqlalchemy import (JSON, Boolean, Column, DateTime, Enum, ForeignKey, - Integer, String) +from sqlalchemy import ( + JSON, + Boolean, + Column, + DateTime, + Enum, + ForeignKey, + Integer, + String, +) from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy_utils import ChoiceType @@ -55,6 +63,9 @@ class Result(Base): status: Mapped[Literal["success", "failure", "error", "on-check"]] = mapped_column( Enum("success", "failure", "error", "on-check") ) + severity: Mapped[Literal["ok", "warning", "critical"]] = mapped_column( + Enum("ok", "warning", "critical") + ) context: Mapped[dict] = mapped_column() def __str__(self): diff --git a/argos/server/queries.py b/argos/server/queries.py index 3ad4a8c..0033cbf 100644 --- a/argos/server/queries.py +++ b/argos/server/queries.py @@ -13,11 +13,11 @@ async def list_tasks(db: Session, agent_id: str, limit: int = 100): """List tasks and mark them as selected""" tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all() now = datetime.now() - # XXX: Deactivated for now, as it simplifies testing. - # for task in tasks: - # task.selected_at = now - # task.selected_by = agent_id - # db.commit() + + for task in tasks: + task.selected_at = now + task.selected_by = agent_id + db.commit() return tasks @@ -36,8 +36,14 @@ async def create_result(db: Session, agent_result: schemas.AgentResult): return result -async def count_tasks(db: Session): - return db.query(Task).count() +async def count_tasks(db: Session, selected=False): + query = db.query(Task) + if selected: + query = query.filter(Task.selected_by is not None) + else: + query = query.filter(Task.selected_by is None) + + return query.count() async def count_results(db: Session): diff --git a/config.yaml b/config.yaml index d39a530..c52a5a8 100644 --- a/config.yaml +++ b/config.yaml @@ -9,8 +9,8 @@ general: - local service: port: 8888 - # Can be generated using `openssl rand -base64 32`. secrets: + # Secrets can be generated using `openssl rand -base64 32`. - "O4kt8Max9/k0EmHaEJ0CGGYbBNFmK8kOZNIoUk3Kjwc" - "x1T1VZR51pxrv5pQUyzooMG4pMUvHNMhA5y/3cUsYVs="