mirror of
https://framagit.org/framasoft/framaspace/argos.git
synced 2025-04-28 18:02:41 +02:00
Refactor argos agent to use classes
This commit is contained in:
parent
31edb95cb8
commit
cd243d92dc
2 changed files with 76 additions and 68 deletions
139
argos/agent.py
139
argos/agent.py
|
@ -1,70 +1,16 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import socket
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from tenacity import after_log, retry, wait_random
|
from tenacity import retry, wait_random
|
||||||
|
|
||||||
from argos.checks import get_registered_check
|
from argos.checks import get_registered_check
|
||||||
from argos.logging import logger
|
from argos.logging import logger
|
||||||
from argos.schemas import AgentResult, SerializableException, Task
|
from argos.schemas import AgentResult, SerializableException, Task
|
||||||
|
|
||||||
|
|
||||||
async def complete_task(http_client: httpx.AsyncClient, task: dict) -> dict:
|
|
||||||
try:
|
|
||||||
task = Task(**task)
|
|
||||||
check_class = get_registered_check(task.check)
|
|
||||||
check = check_class(http_client, task)
|
|
||||||
result = await check.run()
|
|
||||||
status = result.status
|
|
||||||
context = result.context
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
status = "error"
|
|
||||||
context = SerializableException.from_exception(e)
|
|
||||||
msg = f"An exception occured when running {task}. {e.__class__.__name__} : {e}"
|
|
||||||
logger.error(msg)
|
|
||||||
return AgentResult(task_id=task.id, status=status, context=context)
|
|
||||||
|
|
||||||
|
|
||||||
async def post_results(
|
|
||||||
http_client: httpx.AsyncClient, server: str, results: List[AgentResult]
|
|
||||||
):
|
|
||||||
data = [r.model_dump() for r in results]
|
|
||||||
response = await http_client.post(f"{server}/api/results", json=data)
|
|
||||||
|
|
||||||
if response.status_code == httpx.codes.CREATED:
|
|
||||||
logger.error(f"Successfully posted results {response.json()}")
|
|
||||||
else:
|
|
||||||
logger.error(f"Failed to post results: {response.read()}")
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
async def get_and_complete_tasks(http_client, server, max_tasks):
|
|
||||||
# Fetch the list of tasks
|
|
||||||
response = await http_client.get(f"{server}/api/tasks")
|
|
||||||
|
|
||||||
if response.status_code == httpx.codes.OK:
|
|
||||||
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL)
|
|
||||||
data = response.json()
|
|
||||||
logger.info(f"Received {len(data)} tasks from the server")
|
|
||||||
|
|
||||||
tasks = []
|
|
||||||
for task in data:
|
|
||||||
tasks.append(complete_task(http_client, task))
|
|
||||||
|
|
||||||
if tasks:
|
|
||||||
results = await asyncio.gather(*tasks)
|
|
||||||
await post_results(http_client, server, results)
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
logger.error("Got no tasks from the server.")
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
logger.error(f"Failed to fetch tasks: {response.read()}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def log_failure(retry_state):
|
def log_failure(retry_state):
|
||||||
if retry_state.attempt_number < 1:
|
if retry_state.attempt_number < 1:
|
||||||
loglevel = logging.INFO
|
loglevel = logging.INFO
|
||||||
|
@ -78,13 +24,74 @@ def log_failure(retry_state):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@retry(after=log_failure, wait=wait_random(min=1, max=2))
|
class ArgosAgent:
|
||||||
async def run_agent(server: str, auth: str, max_tasks: int, wait_time: int):
|
"""The Argos agent is responsible for running the checks and reporting the results."""
|
||||||
logger.info(f"Running agent against {server}")
|
|
||||||
headers = {"Authorization": f"Bearer {auth}"}
|
def __init__(self, server: str, auth: str, max_tasks: int, wait_time: int):
|
||||||
async with httpx.AsyncClient(headers=headers) as http_client:
|
self.server = server
|
||||||
while True:
|
self.max_tasks = max_tasks
|
||||||
retry_now = await get_and_complete_tasks(http_client, server, max_tasks)
|
self.wait_time = wait_time
|
||||||
if not retry_now:
|
|
||||||
logger.error(f"Waiting {wait_time} seconds before next retry")
|
self.agent_id = socket.gethostname()
|
||||||
await asyncio.sleep(wait_time)
|
headers = {"Authorization": f"Bearer {auth}"}
|
||||||
|
self._http_client = httpx.AsyncClient(headers=headers)
|
||||||
|
|
||||||
|
@retry(after=log_failure, wait=wait_random(min=1, max=2))
|
||||||
|
async def run(self):
|
||||||
|
logger.info(f"Running agent against {self.server}")
|
||||||
|
async with self._http_client:
|
||||||
|
while True:
|
||||||
|
retry_now = await self._get_and_complete_tasks()
|
||||||
|
if not retry_now:
|
||||||
|
logger.error(f"Waiting {self.wait_time} seconds before next retry")
|
||||||
|
await asyncio.sleep(self.wait_time)
|
||||||
|
|
||||||
|
async def _complete_task(self, task: dict) -> dict:
|
||||||
|
try:
|
||||||
|
task = Task(**task)
|
||||||
|
check_class = get_registered_check(task.check)
|
||||||
|
check = check_class(self._http_client, task)
|
||||||
|
result = await check.run()
|
||||||
|
status = result.status
|
||||||
|
context = result.context
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
status = "error"
|
||||||
|
context = SerializableException.from_exception(e)
|
||||||
|
msg = f"An exception occured when running {task}. {e.__class__.__name__} : {e}"
|
||||||
|
logger.error(msg)
|
||||||
|
return AgentResult(task_id=task.id, status=status, context=context)
|
||||||
|
|
||||||
|
async def _get_and_complete_tasks(self):
|
||||||
|
# Fetch the list of tasks
|
||||||
|
response = await self._http_client.get(f"{self.server}/api/tasks")
|
||||||
|
|
||||||
|
if response.status_code == httpx.codes.OK:
|
||||||
|
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL)
|
||||||
|
data = response.json()
|
||||||
|
logger.info(f"Received {len(data)} tasks from the server")
|
||||||
|
|
||||||
|
tasks = []
|
||||||
|
for task in data:
|
||||||
|
tasks.append(self._complete_task(task))
|
||||||
|
|
||||||
|
if tasks:
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
await self._post_results(results)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error("Got no tasks from the server.")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to fetch tasks: {response.read()}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _post_results(self, results: List[AgentResult]):
|
||||||
|
data = [r.model_dump() for r in results]
|
||||||
|
response = await self._http_client.post(f"{self.server}/api/results", json=data)
|
||||||
|
|
||||||
|
if response.status_code == httpx.codes.CREATED:
|
||||||
|
logger.error(f"Successfully posted results {response.json()}")
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to post results: {response.read()}")
|
||||||
|
return response
|
||||||
|
|
|
@ -4,7 +4,7 @@ import subprocess
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from argos import logging
|
from argos import logging
|
||||||
from argos.agent import run_agent
|
from argos.agent import ArgosAgent
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
|
@ -39,7 +39,8 @@ def agent(server, auth, max_tasks, wait_time, log_level):
|
||||||
from argos.logging import logger
|
from argos.logging import logger
|
||||||
|
|
||||||
logger.setLevel(log_level)
|
logger.setLevel(log_level)
|
||||||
asyncio.run(run_agent(server, auth, max_tasks, wait_time))
|
agent = ArgosAgent(server, auth, max_tasks, wait_time)
|
||||||
|
asyncio.run(agent.run())
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
|
|
Loading…
Reference in a new issue