Loop the agents and enhance the selection of tasks on the server

This commit is contained in:
Alexis Métaireau 2023-10-10 19:24:50 +02:00
parent d35be89f4b
commit ca6584c803
12 changed files with 132 additions and 51 deletions

View file

@ -16,6 +16,7 @@ pyopenssl = "*"
ipdb = "*" ipdb = "*"
argos = {extras = ["dev"], file = ".", editable = true} argos = {extras = ["dev"], file = ".", editable = true}
pyyaml-include = "*" pyyaml-include = "*"
jedi = "*"
[dev-packages] [dev-packages]

3
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "8cc9237ff86d00019539f36e3df5b20edcbbc60f52d1b0fce2b03e51c089ad39" "sha256": "080d35bdb1052afa4590e047f3f0cd1d8263b1e97d0095c379f362a81a7868ce"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -365,6 +365,7 @@
"sha256:cf0496f3651bc65d7174ac1b7d043eff454892c708a87d1b683e57b569927ffd", "sha256:cf0496f3651bc65d7174ac1b7d043eff454892c708a87d1b683e57b569927ffd",
"sha256:e983c654fe5c02867aef4cdfce5a2fbb4a50adc0af145f70504238f18ef5e7e0" "sha256:e983c654fe5c02867aef4cdfce5a2fbb4a50adc0af145f70504238f18ef5e7e0"
], ],
"index": "pypi",
"markers": "python_version >= '3.6'", "markers": "python_version >= '3.6'",
"version": "==0.19.1" "version": "==0.19.1"
}, },

View file

@ -16,11 +16,12 @@ Features :
- [x] Change the naming and use service/agent. - [x] Change the naming and use service/agent.
- [x] Packaging (and `argos agent` / `argos service` commands) - [x] Packaging (and `argos agent` / `argos service` commands)
- [x] Endpoints are protected by an authentication token - [x] Endpoints are protected by an authentication token
- [ ] Find a way to define when the task should be checked again (via config ? stored on the tasks themselves ?)
- [ ] Local task for database cleanup (to run periodically) - [ ] Local task for database cleanup (to run periodically)
- [ ] Handles multiple alerting backends (email, sms, gotify) ; - [ ] Handles multiple alerting backends (email, sms, gotify) ;
- [ ] Exposes a simple read-only website. - [ ] Exposes a simple read-only website.
- [ ] Add a way to specify the severity of the alerts in the config - [ ] Add a way to specify the severity of the alerts in the config
- [ ] Do not return `selected_at` and `selected_by` in the `/tasks` endpoint (?) - [ ] No need to return the expected and got values in case it worked in check-status and body-contains
Implemented checks : Implemented checks :

View file

@ -35,28 +35,39 @@ async def post_results(
logger.error(f"Successfully posted results {response.json()}") logger.error(f"Successfully posted results {response.json()}")
else: else:
logger.error(f"Failed to post results: {response.read()}") logger.error(f"Failed to post results: {response.read()}")
return response
async def run_agent(server: str, auth: str, max_tasks: int): async def get_and_complete_tasks(http_client, server, max_tasks):
tasks = [] # Fetch the list of tasks
response = await http_client.get(f"{server}/tasks")
if response.status_code == httpx.codes.OK:
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL)
data = response.json()
logger.info(f"Received {len(data)} tasks from the server")
tasks = []
for task in data:
tasks.append(complete_task(http_client, task))
if tasks:
results = await asyncio.gather(*tasks)
await post_results(http_client, server, results)
return True
else:
logger.error("Got no tasks from the server.")
return False
else:
logger.error(f"Failed to fetch tasks: {response.read()}")
return False
async def run_agent(server: str, auth: str, max_tasks: int, wait_time: int):
headers = {"Authorization": f"Bearer {auth}"} headers = {"Authorization": f"Bearer {auth}"}
async with httpx.AsyncClient(headers=headers) as http_client: async with httpx.AsyncClient(headers=headers) as http_client:
# Fetch the list of tasks while True:
response = await http_client.get(f"{server}/tasks") retry_now = await get_and_complete_tasks(http_client, server, max_tasks)
if not retry_now:
if response.status_code == httpx.codes.OK: logger.error(f"Waiting {wait_time} seconds before next retry")
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL) await asyncio.sleep(wait_time)
data = response.json()
logger.info(f"Received {len(data)} tasks from the server")
for task in data:
tasks.append(complete_task(http_client, task))
# Run up to max_tasks concurrent tasks
results = await asyncio.gather(*tasks)
# Post the results
await post_results(http_client, server, results)
else:
logger.error(f"Failed to fetch tasks: {response.read()}")

View file

@ -11,6 +11,13 @@ class Status:
ON_CHECK = "on-check" ON_CHECK = "on-check"
SUCCESS = "success" SUCCESS = "success"
FAILURE = "failure" FAILURE = "failure"
ERROR = "error"
class Severity:
OK = "ok"
WARNING = "warning"
CRITICAL = "critical"
# XXX We could name this Result, but is it could overlap with schemas.Result. # XXX We could name this Result, but is it could overlap with schemas.Result.
@ -89,6 +96,25 @@ class BaseCheck:
status = kwargs.pop("status") status = kwargs.pop("status")
return Response.new(status, **kwargs) return Response.new(status, **kwargs)
@classmethod
async def finalize(self, config, result, **context):
"""By default, the finalize considers that :
- All FAILUREs should be reported as CRITICAL
- All SUCCESS should be reported as OK
- All ERRORS should be reported as CRITICAL.
This behaviour can be changed in each check, by defining the `finalize` method.
XXX This can also be tweaked by the config.
"""
if result.status in (Status.SUCCESS, Status.ERROR):
return result.status, Severity.OK
elif result.status == Status.FAILURE:
return result.status, Severity.CRITICAL
elif result.status == Status.ON_CHECK:
msg = "Status is 'on-check', but the Check class didn't provide a finalize() method."
raise ValueError(msg)
def get_registered_check(name): def get_registered_check(name):
return BaseCheck.get_registered_check(name) return BaseCheck.get_registered_check(name)

View file

@ -4,8 +4,14 @@ from datetime import datetime
from OpenSSL import crypto from OpenSSL import crypto
from argos.checks.base import (BaseCheck, ExpectedIntValue, from argos.checks.base import (
ExpectedStringValue, Response, Status) BaseCheck,
ExpectedIntValue,
ExpectedStringValue,
Response,
Status,
Severity,
)
from argos.logging import logger from argos.logging import logger
@ -54,10 +60,10 @@ class SSLCertificateExpiration(BaseCheck):
return self.response(status=Status.ON_CHECK, expires_in=expires_in) return self.response(status=Status.ON_CHECK, expires_in=expires_in)
@classmethod @classmethod
async def finalize(cls, config, callback, expires_in): async def finalize(cls, config, result, expires_in):
thresholds = config.ssl.thresholds thresholds = config.ssl.thresholds
thresholds.sort(reverse=True) thresholds.sort()
for days, severity in thresholds: for days, severity in thresholds:
if expires_in > days: if expires_in < days:
callback(severity) return Status.FAILURE, severity
break return Status.SUCCESS, Severity.OK

View file

@ -17,15 +17,20 @@ def cli():
@click.option("--server", required=True, help="Server URL") @click.option("--server", required=True, help="Server URL")
@click.option("--auth", required=True, help="The authentication token") @click.option("--auth", required=True, help="The authentication token")
@click.option("--max-tasks", default=10, help="Maximum number of concurrent tasks") @click.option("--max-tasks", default=10, help="Maximum number of concurrent tasks")
@click.option(
"--wait-time",
default=10,
help="Wait time (in seconds) between two polls on the server",
)
@click.option( @click.option(
"--log-level", "--log-level",
default="INFO", default="INFO",
type=click.Choice(logging.LOG_LEVELS, case_sensitive=False), type=click.Choice(logging.LOG_LEVELS, case_sensitive=False),
) )
def agent(server, auth, max_tasks, log_level): def agent(server, auth, max_tasks, wait_time, log_level):
"""Runs an agent""" """Runs an agent"""
logging.set_log_level(log_level) logging.set_log_level(log_level)
asyncio.run(run_agent(server, auth, max_tasks)) asyncio.run(run_agent(server, auth, max_tasks, wait_time))
@cli.command() @cli.command()

6
argos/server/alerting.py Normal file
View file

@ -0,0 +1,6 @@
from argos.logging import logger
def handle_alert(config, result, task, severity):
msg = f"{result=}, {task=}, {severity=}"
logger.error(msg)

View file

@ -12,6 +12,7 @@ from argos.schemas import AgentResult, Task
from argos.schemas.config import from_yaml as get_schemas_from_yaml from argos.schemas.config import from_yaml as get_schemas_from_yaml
from argos.server import models, queries from argos.server import models, queries
from argos.server.database import SessionLocal, engine from argos.server.database import SessionLocal, engine
from argos.server.alerting import handle_alert
models.Base.metadata.create_all(bind=engine) models.Base.metadata.create_all(bind=engine)
@ -74,15 +75,20 @@ async def create_result(results: List[AgentResult], db: Session = Depends(get_db
result = await queries.create_result(db, agent_result) result = await queries.create_result(db, agent_result)
# XXX Maybe offload this to a queue. # XXX Maybe offload this to a queue.
# XXX Use a schema for the on-check value. # XXX Use a schema for the on-check value.
# XXX Get all the tasks at once, to limit the queries on the db
if result.status == "on-check": task = await queries.get_task(db, agent_result.task_id)
task = await queries.get_task(db, agent_result.task_id) if not task:
if not task: logger.error(f"Unable to find task {agent_result.task_id}")
logger.error(f"Unable to find task {agent_result.task_id}") else:
else: check = task.get_check()
check = task.get_check() status, severity = await check.finalize(
callback = logger.error app.config, result, **result.context
await check.finalize(app.config, callback=callback, **result.context) )
result.severity = severity
result.status = status
# Set the selection status to None
task.selected_by = None
handle_alert(app.config, result, task, severity)
db_results.append(result) db_results.append(result)
db.commit() db.commit()
@ -92,6 +98,7 @@ async def create_result(results: List[AgentResult], db: Session = Depends(get_db
@app.get("/stats", dependencies=[Depends(verify_token)]) @app.get("/stats", dependencies=[Depends(verify_token)])
async def get_stats(db: Session = Depends(get_db)): async def get_stats(db: Session = Depends(get_db)):
return { return {
"tasks_count": await queries.count_tasks(db), "upcoming_tasks_count": await queries.count_tasks(db, selected=False),
"results_count": await queries.count_results(db), "results_count": await queries.count_results(db),
"selected_tasks_count": await queries.count_tasks(db, selected=True),
} }

View file

@ -1,8 +1,16 @@
from datetime import datetime from datetime import datetime
from typing import List, Literal from typing import List, Literal
from sqlalchemy import (JSON, Boolean, Column, DateTime, Enum, ForeignKey, from sqlalchemy import (
Integer, String) JSON,
Boolean,
Column,
DateTime,
Enum,
ForeignKey,
Integer,
String,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy_utils import ChoiceType from sqlalchemy_utils import ChoiceType
@ -55,6 +63,9 @@ class Result(Base):
status: Mapped[Literal["success", "failure", "error", "on-check"]] = mapped_column( status: Mapped[Literal["success", "failure", "error", "on-check"]] = mapped_column(
Enum("success", "failure", "error", "on-check") Enum("success", "failure", "error", "on-check")
) )
severity: Mapped[Literal["ok", "warning", "critical"]] = mapped_column(
Enum("ok", "warning", "critical")
)
context: Mapped[dict] = mapped_column() context: Mapped[dict] = mapped_column()
def __str__(self): def __str__(self):

View file

@ -13,11 +13,11 @@ async def list_tasks(db: Session, agent_id: str, limit: int = 100):
"""List tasks and mark them as selected""" """List tasks and mark them as selected"""
tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all() tasks = db.query(Task).where(Task.selected_by == None).limit(limit).all()
now = datetime.now() now = datetime.now()
# XXX: Deactivated for now, as it simplifies testing.
# for task in tasks: for task in tasks:
# task.selected_at = now task.selected_at = now
# task.selected_by = agent_id task.selected_by = agent_id
# db.commit() db.commit()
return tasks return tasks
@ -36,8 +36,14 @@ async def create_result(db: Session, agent_result: schemas.AgentResult):
return result return result
async def count_tasks(db: Session): async def count_tasks(db: Session, selected=False):
return db.query(Task).count() query = db.query(Task)
if selected:
query = query.filter(Task.selected_by is not None)
else:
query = query.filter(Task.selected_by is None)
return query.count()
async def count_results(db: Session): async def count_results(db: Session):

View file

@ -9,8 +9,8 @@ general:
- local - local
service: service:
port: 8888 port: 8888
# Can be generated using `openssl rand -base64 32`.
secrets: secrets:
# Secrets can be generated using `openssl rand -base64 32`.
- "O4kt8Max9/k0EmHaEJ0CGGYbBNFmK8kOZNIoUk3Kjwc" - "O4kt8Max9/k0EmHaEJ0CGGYbBNFmK8kOZNIoUk3Kjwc"
- "x1T1VZR51pxrv5pQUyzooMG4pMUvHNMhA5y/3cUsYVs=" - "x1T1VZR51pxrv5pQUyzooMG4pMUvHNMhA5y/3cUsYVs="