argos/argos/server/routes/views.py

376 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Web interface for humans"""
from collections import defaultdict
from datetime import timedelta
from functools import cmp_to_key
from pathlib import Path
from typing import Annotated
from urllib.parse import urlparse
from fastapi import APIRouter, Cookie, Depends, Form, Request, status
from fastapi.responses import RedirectResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from sqlalchemy import func
from sqlalchemy.orm import Session
from argos.checks.base import Status
from argos.schemas import Config
from argos.server import queries
from argos.server.exceptions import NotAuthenticatedException
from argos.server.models import Result, Task, User
from argos.server.routes.dependencies import (
create_user_token,
get_config,
get_db,
get_manager,
good_user_credentials,
)
route = APIRouter()
current_dir = Path(__file__).resolve().parent
templates = Jinja2Templates(directory=current_dir / ".." / "templates")
SEVERITY_LEVELS = {"ok": 1, "warning": 2, "critical": 3, "unknown": 4}
@route.get("/login", include_in_schema=False)
async def login_view(
request: Request,
msg: str | None = None,
config: Config = Depends(get_config),
):
if config.general.unauthenticated_access == "all":
return RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
token = request.cookies.get("access-token")
if token is not None and token != "":
manager = request.app.state.manager
user = await manager.get_current_user(token)
if user is not None:
return RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
if msg == "logout":
msg = "You have been successfully disconnected."
elif msg == "not-authenticated":
msg = "You are not authenticated or your token has expired"
else:
msg = None
return templates.TemplateResponse(
"login.html",
{
"request": request,
"msg": msg,
"remember": config.general.remember_me_duration,
},
)
@route.post("/login", include_in_schema=False)
async def post_login(
request: Request,
db: Session = Depends(get_db),
data: OAuth2PasswordRequestForm = Depends(),
rememberme: Annotated[str | None, Form()] = None,
config: Config = Depends(get_config),
):
if config.general.unauthenticated_access == "all":
return RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
good_credentials = await good_user_credentials(
config, request, data.username, data.password
)
if config.general.ldap is not None:
if not good_credentials:
return templates.TemplateResponse(
"login.html",
{
"request": request,
"msg": "Sorry, invalid username or bad password. "
"Or the LDAP server is unreachable (see logs to verify).",
},
)
elif not good_credentials:
return templates.TemplateResponse(
"login.html",
{"request": request, "msg": "Sorry, invalid username or bad password."},
)
manager = request.app.state.manager
token = await create_user_token(manager, config.general, data.username, rememberme)
response = RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
response.set_cookie(
key=manager.cookie_name,
value=token["token"],
httponly=True,
samesite="strict",
expires=int(token["delta"].total_seconds()),
)
return response
@route.get("/logout", include_in_schema=False)
async def logout_view(
request: Request,
config: Config = Depends(get_config),
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
):
if config.general.unauthenticated_access == "all":
return RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
await queries.block_token(db, request)
response = RedirectResponse(
request.url_for("login_view").include_query_params(msg="logout"),
status_code=status.HTTP_303_SEE_OTHER,
)
response.delete_cookie(key="access-token")
return response
@route.get("/", include_in_schema=False)
async def get_severity_counts_view(
request: Request,
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
auto_refresh_enabled: Annotated[bool, Cookie()] = False,
auto_refresh_seconds: Annotated[int, Cookie()] = 15,
):
"""Shows the number of results per severity"""
counts_dict = await queries.get_severity_counts(db)
agents = db.query(Result.agent_id).distinct().all()
waiting_tasks = await queries.get_waiting_tasks_count(db)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"counts_dict": counts_dict,
"agents": agents,
"waiting_tasks": waiting_tasks,
"auto_refresh_enabled": auto_refresh_enabled,
"auto_refresh_seconds": auto_refresh_seconds,
"user": user,
},
)
@route.get("/domains", include_in_schema=False)
async def get_domains_view(
request: Request,
user: User | None = Depends(get_manager),
config: Config = Depends(get_config),
db: Session = Depends(get_db),
):
"""Show all tasks and their current state"""
if config.general.unauthenticated_access == "dashboard":
if user is None:
raise NotAuthenticatedException
tasks = db.query(Task).all()
domains_severities = defaultdict(list)
domains_last_checks = defaultdict(list) # type: ignore[var-annotated]
for task in tasks:
domain = urlparse(task.url).netloc
domains_severities[domain].append(task.severity)
if task.last_severity_update is not None:
domains_last_checks[domain] = task.last_severity_update
else:
domains_last_checks[domain] = "Waiting to be checked"
def _max_severity(severities):
return max(severities, key=SEVERITY_LEVELS.get)
def _cmp_domains(a, b):
if SEVERITY_LEVELS[a[1]] < SEVERITY_LEVELS[b[1]]:
return 1
if SEVERITY_LEVELS[a[1]] > SEVERITY_LEVELS[b[1]]:
return -1
if a[0] > b[0]:
return 1
if a[0] < b[0]:
return -1
return 0
domains = [(key, _max_severity(value)) for key, value in domains_severities.items()]
domains.sort(key=cmp_to_key(_cmp_domains))
agents = db.query(Result.agent_id).distinct().all()
return templates.TemplateResponse(
"domains.html",
{
"request": request,
"domains": domains,
"last_checks": domains_last_checks,
"total_task_count": len(tasks),
"agents": agents,
"user": user,
},
)
@route.get("/domain/{domain}", include_in_schema=False)
async def get_domain_tasks_view(
request: Request,
domain: str,
user: User | None = Depends(get_manager),
config: Config = Depends(get_config),
db: Session = Depends(get_db),
):
"""Show all tasks attached to a domain"""
if config.general.unauthenticated_access == "dashboard":
if user is None:
raise NotAuthenticatedException
tasks = db.query(Task).filter(Task.domain.contains(f"//{domain}")).all()
return templates.TemplateResponse(
"domain.html",
{
"request": request,
"domain": domain,
"tasks": tasks,
"user": user,
},
)
@route.get("/result/{result_id}", include_in_schema=False)
async def get_result_view(
request: Request,
result_id: int,
user: User | None = Depends(get_manager),
config: Config = Depends(get_config),
db: Session = Depends(get_db),
):
"""Show the details of a result"""
if config.general.unauthenticated_access == "dashboard":
if user is None:
raise NotAuthenticatedException
result = db.query(Result).get(result_id)
return templates.TemplateResponse(
"result.html",
{
"request": request,
"result": result,
"error": Status.ERROR,
"user": user,
},
)
@route.get("/task/{task_id}/results", include_in_schema=False)
async def get_task_results_view(
request: Request,
task_id: int,
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
config: Config = Depends(get_config),
):
"""Show history of a tasks results"""
if config.general.unauthenticated_access == "dashboard":
if user is None:
raise NotAuthenticatedException
results = (
db.query(Result)
.filter(Result.task_id == task_id)
.order_by(Result.submitted_at.desc()) # type: ignore[attr-defined]
.all()
)
task = db.query(Task).get(task_id)
description = ""
if task is not None:
description = task.get_check().get_description(config)
return templates.TemplateResponse(
"results.html",
{
"request": request,
"results": results,
"task": task,
"description": description,
"error": Status.ERROR,
"user": user,
},
)
@route.get("/agents", include_in_schema=False)
async def get_agents_view(
request: Request,
user: User | None = Depends(get_manager),
config: Config = Depends(get_config),
db: Session = Depends(get_db),
):
"""Show argos agents and the last time the server saw them"""
if config.general.unauthenticated_access == "dashboard":
if user is None:
raise NotAuthenticatedException
last_seen = (
db.query(Result.agent_id, func.max(Result.submitted_at).label("submitted_at"))
.group_by(Result.agent_id)
.all()
)
return templates.TemplateResponse(
"agents.html",
{
"request": request,
"last_seen": last_seen,
"user": user,
},
)
@route.post("/refresh", include_in_schema=False)
async def set_refresh_cookies_view(
request: Request,
user: User | None = Depends(get_manager),
auto_refresh_enabled: Annotated[bool, Form()] = False,
auto_refresh_seconds: Annotated[int, Form()] = 15,
):
response = RedirectResponse(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
# Cookies age in Chrome cant be more than 400 days
# https://developer.chrome.com/blog/cookie-max-age-expires
delta = int(timedelta(days=400).total_seconds())
response.set_cookie(
key="auto_refresh_enabled",
value=str(auto_refresh_enabled),
httponly=True,
samesite="strict",
expires=delta,
)
response.set_cookie(
key="auto_refresh_seconds",
value=str(max(5, int(auto_refresh_seconds))),
httponly=True,
samesite="strict",
expires=delta,
)
return response