argos/argos/server/routes/views.py
2023-11-21 16:11:03 +01:00

140 lines
4.1 KiB
Python

from collections import defaultdict
from urllib.parse import urlparse
from fastapi import APIRouter, Depends, Request
from fastapi.templating import Jinja2Templates
from sqlalchemy import desc
from sqlalchemy.orm import Session, aliased
from argos.schemas import Config
from argos.server import queries
from argos.server.models import Result, Task
from argos.server.routes.dependencies import get_config, get_db
route = APIRouter()
templates = Jinja2Templates(directory="argos/server/templates")
@route.get("/")
async def get_severity_counts(request: Request, db: Session = Depends(get_db)):
"""Returns the number of results per severity"""
counts = await queries.get_severity_counts(db)
counts_dict = dict(counts)
for key in ("ok", "warning", "critical"):
counts_dict.setdefault(key, 0)
agents = db.query(Result.agent_id).distinct().all()
return templates.TemplateResponse(
"index.html",
{
"request": request,
"counts_dict": counts_dict,
"agents": agents,
},
)
@route.get("/details")
async def read_tasks(request: Request, db: Session = Depends(get_db)):
tasks = db.query(Task).order_by(Task.domain).all()
results = (
db.query(Task, Result)
.join(Result)
.distinct(Task.id)
.order_by(Task.id, desc(Result.submitted_at))
.all()
)
domains_severities = defaultdict(list)
domains_last_checks = defaultdict(list)
for task, result in results:
severity = result.severity or "to-process"
domain = urlparse(task.url).netloc
domains_severities[domain].append(severity)
domains_last_checks[domain].append(result.submitted_at)
def _max_severity(severities):
severity_level = {"ok": 1, "warning": 2, "critical": 3, "to-process": 4}
return max(severities, key=severity_level.get)
domains = [(key, _max_severity(value)) for key, value in domains_severities.items()]
last_checks = {key: max(value) for key, value in domains_last_checks.items()}
domains.sort(key=lambda x: x[1])
agents = db.query(Result.agent_id).distinct().all()
return templates.TemplateResponse(
"details.html",
{
"request": request,
"domains": domains,
"last_checks": last_checks,
"total_task_count": len(tasks),
"agents": agents,
},
)
@route.get("/domain/{domain}")
async def get_domain_tasks(
request: Request, domain: str, db: Session = Depends(get_db)
):
tasks = db.query(Task).filter(Task.domain.contains(domain)).all()
return templates.TemplateResponse(
"domain.html", {"request": request, "domain": domain, "tasks": tasks}
)
@route.get("/result/{result_id}")
async def get_result(request: Request, result_id: int, db: Session = Depends(get_db)):
result = db.query(Result).get(result_id)
return templates.TemplateResponse(
"result.html", {"request": request, "result": result}
)
@route.get("/task/{task_id}/results")
async def get_task_results(
request: Request,
task_id: int,
db: Session = Depends(get_db),
config: Config = Depends(get_config),
):
results = (
db.query(Result)
.filter(Result.task_id == task_id)
.order_by(Result.submitted_at.desc())
.all()
)
task = db.query(Task).get(task_id)
description = task.get_check().get_description(config)
return templates.TemplateResponse(
"results.html",
{
"request": request,
"results": results,
"task": task,
"description": description,
},
)
@route.get("/agents")
async def get_agents(request: Request, db: Session = Depends(get_db)):
t1 = aliased(Result, name="t1")
t2 = aliased(Result, name="t2")
last_seen = (
db.query(t1)
.outerjoin(
t2, (t1.agent_id == t2.agent_id) & (t1.submitted_at < t2.submitted_at)
)
.filter(t2.agent_id.is_(None))
.all()
)
return templates.TemplateResponse(
"agents.html", {"request": request, "last_seen": last_seen}
)