From cd243d92dc035c3e551ba1a791797106a9b2e938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexis=20M=C3=A9taireau?= Date: Wed, 18 Oct 2023 17:48:42 +0200 Subject: [PATCH] Refactor argos agent to use classes --- argos/agent.py | 139 ++++++++++++++++++++++++---------------------- argos/commands.py | 5 +- 2 files changed, 76 insertions(+), 68 deletions(-) diff --git a/argos/agent.py b/argos/agent.py index 4f04c27..4b6f935 100644 --- a/argos/agent.py +++ b/argos/agent.py @@ -1,70 +1,16 @@ import asyncio import logging +import socket from typing import List import httpx -from tenacity import after_log, retry, wait_random +from tenacity import retry, wait_random from argos.checks import get_registered_check from argos.logging import logger from argos.schemas import AgentResult, SerializableException, Task -async def complete_task(http_client: httpx.AsyncClient, task: dict) -> dict: - try: - task = Task(**task) - check_class = get_registered_check(task.check) - check = check_class(http_client, task) - result = await check.run() - status = result.status - context = result.context - - except Exception as e: - status = "error" - context = SerializableException.from_exception(e) - msg = f"An exception occured when running {task}. {e.__class__.__name__} : {e}" - logger.error(msg) - return AgentResult(task_id=task.id, status=status, context=context) - - -async def post_results( - http_client: httpx.AsyncClient, server: str, results: List[AgentResult] -): - data = [r.model_dump() for r in results] - response = await http_client.post(f"{server}/api/results", json=data) - - if response.status_code == httpx.codes.CREATED: - logger.error(f"Successfully posted results {response.json()}") - else: - logger.error(f"Failed to post results: {response.read()}") - return response - - -async def get_and_complete_tasks(http_client, server, max_tasks): - # Fetch the list of tasks - response = await http_client.get(f"{server}/api/tasks") - - if response.status_code == httpx.codes.OK: - # XXX Maybe we want to group the tests by URL ? (to issue one request per URL) - data = response.json() - logger.info(f"Received {len(data)} tasks from the server") - - tasks = [] - for task in data: - tasks.append(complete_task(http_client, task)) - - if tasks: - results = await asyncio.gather(*tasks) - await post_results(http_client, server, results) - return True - else: - logger.error("Got no tasks from the server.") - return False - else: - logger.error(f"Failed to fetch tasks: {response.read()}") - return False - - def log_failure(retry_state): if retry_state.attempt_number < 1: loglevel = logging.INFO @@ -78,13 +24,74 @@ def log_failure(retry_state): ) -@retry(after=log_failure, wait=wait_random(min=1, max=2)) -async def run_agent(server: str, auth: str, max_tasks: int, wait_time: int): - logger.info(f"Running agent against {server}") - headers = {"Authorization": f"Bearer {auth}"} - async with httpx.AsyncClient(headers=headers) as http_client: - while True: - retry_now = await get_and_complete_tasks(http_client, server, max_tasks) - if not retry_now: - logger.error(f"Waiting {wait_time} seconds before next retry") - await asyncio.sleep(wait_time) +class ArgosAgent: + """The Argos agent is responsible for running the checks and reporting the results.""" + + def __init__(self, server: str, auth: str, max_tasks: int, wait_time: int): + self.server = server + self.max_tasks = max_tasks + self.wait_time = wait_time + + self.agent_id = socket.gethostname() + headers = {"Authorization": f"Bearer {auth}"} + self._http_client = httpx.AsyncClient(headers=headers) + + @retry(after=log_failure, wait=wait_random(min=1, max=2)) + async def run(self): + logger.info(f"Running agent against {self.server}") + async with self._http_client: + while True: + retry_now = await self._get_and_complete_tasks() + if not retry_now: + logger.error(f"Waiting {self.wait_time} seconds before next retry") + await asyncio.sleep(self.wait_time) + + async def _complete_task(self, task: dict) -> dict: + try: + task = Task(**task) + check_class = get_registered_check(task.check) + check = check_class(self._http_client, task) + result = await check.run() + status = result.status + context = result.context + + except Exception as e: + status = "error" + context = SerializableException.from_exception(e) + msg = f"An exception occured when running {task}. {e.__class__.__name__} : {e}" + logger.error(msg) + return AgentResult(task_id=task.id, status=status, context=context) + + async def _get_and_complete_tasks(self): + # Fetch the list of tasks + response = await self._http_client.get(f"{self.server}/api/tasks") + + if response.status_code == httpx.codes.OK: + # XXX Maybe we want to group the tests by URL ? (to issue one request per URL) + data = response.json() + logger.info(f"Received {len(data)} tasks from the server") + + tasks = [] + for task in data: + tasks.append(self._complete_task(task)) + + if tasks: + results = await asyncio.gather(*tasks) + await self._post_results(results) + return True + else: + logger.error("Got no tasks from the server.") + return False + else: + logger.error(f"Failed to fetch tasks: {response.read()}") + return False + + async def _post_results(self, results: List[AgentResult]): + data = [r.model_dump() for r in results] + response = await self._http_client.post(f"{self.server}/api/results", json=data) + + if response.status_code == httpx.codes.CREATED: + logger.error(f"Successfully posted results {response.json()}") + else: + logger.error(f"Failed to post results: {response.read()}") + return response diff --git a/argos/commands.py b/argos/commands.py index d6dc90a..c1ae8e9 100644 --- a/argos/commands.py +++ b/argos/commands.py @@ -4,7 +4,7 @@ import subprocess import click from argos import logging -from argos.agent import run_agent +from argos.agent import ArgosAgent @click.group() @@ -39,7 +39,8 @@ def agent(server, auth, max_tasks, wait_time, log_level): from argos.logging import logger logger.setLevel(log_level) - asyncio.run(run_agent(server, auth, max_tasks, wait_time)) + agent = ArgosAgent(server, auth, max_tasks, wait_time) + asyncio.run(agent.run()) @cli.command()