argos/argos/server/routes/api_app.py
2025-03-19 17:38:16 +01:00

220 lines
6.2 KiB
Python

"""Web interface for humans"""
from pathlib import Path
from typing import Annotated, Dict, Literal
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from sqlalchemy.orm import Session
from argos import VERSION
from argos.schemas import Config
from argos.server import queries
from argos.server.models import Result, User
from argos.server.routes.dependencies import (
create_user_token,
get_config,
get_db,
get_manager,
good_user_credentials,
)
route = APIRouter()
current_dir = Path(__file__).resolve().parent
templates = Jinja2Templates(directory=current_dir / ".." / "templates")
SEVERITY_LEVELS = {"ok": 1, "warning": 2, "critical": 3, "unknown": 4}
class Version(BaseModel):
argos_monitoring: str
logged_in: bool
model_config = {
"json_schema_extra": {
"examples": [
{"argos_monitoring": VERSION, "logged_in": True},
{"argos_monitoring": VERSION, "logged_in": False},
]
}
}
@route.get("/version", response_model=Version)
async def version(
request: Request,
):
"""Return a JSON object containing argos version
and indicates if you are logged in"""
token = request.cookies.get("access-token")
if token is not None and token != "":
manager = request.app.state.manager
user = await manager.get_current_user(token)
return Version(argos_monitoring=VERSION, logged_in=user is not None)
return Version(argos_monitoring=VERSION, logged_in=False)
class GrantedLogin(BaseModel):
access: Literal["granted"]
msg: str
cookie_name: Literal["access-token"] | None
cookie: str | None
model_config = {
"json_schema_extra": {
"examples": [
{
"access": "granted",
"msg": "Login successful",
"cookie_name": "access-token",
"cookie": "foobarbaz",
},
{"access": "granted", "msg": "No authentication needed"},
]
}
}
class DeniedLogin(BaseModel):
access: Literal["deniel"]
msg: str
model_config = {
"json_schema_extra": {
"examples": [
{"access": "denied", "msg": "Sorry, invalid username or bad password."},
{
"access": "denied",
"msg": "Sorry, invalid username or bad password. "
"Or the LDAP server is unreachable (see logs to verify).",
},
]
}
}
@route.post(
"/login", response_model=GrantedLogin, responses={401: {"model": DeniedLogin}}
)
async def login(
request: Request,
db: Session = Depends(get_db),
data: OAuth2PasswordRequestForm = Depends(),
rememberme: Annotated[str | None, Form()] = None,
config: Config = Depends(get_config),
):
"""If the login is successful, sends a token to be included as cookie
in requests needing authentication.
"""
if config.general.unauthenticated_access == "all":
return {"access": "granted", "msg": "No authentication needed"}
good_credentials = await good_user_credentials(
config, request, data.username, data.password
)
if config.general.ldap is not None:
if not good_credentials:
return JSONResponse(
content={
"access": "denied",
"msg": "Sorry, invalid username or bad password. "
"Or the LDAP server is unreachable (see logs to verify).",
},
status_code=401,
)
elif not good_credentials:
return JSONResponse(
content={
"access": "denied",
"msg": "Sorry, invalid username or bad password.",
},
status_code=401,
)
manager = request.app.state.manager
token = await create_user_token(manager, config.general, data.username, rememberme)
return JSONResponse(
content={
"access": "granted",
"msg": "Login successful",
"cookie_name": manager.cookie_name,
"cookie": token["token"],
}
)
class SuccessLogout(BaseModel):
logout: Literal["success"]
msg: str
@route.get("/logout", response_model=SuccessLogout)
async def logout(
request: Request,
config: Config = Depends(get_config),
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
):
"""Discard the token used for the request, so logging out the user"""
if config.general.unauthenticated_access == "all":
return {"logout": "error", "msg": "No authentication needed"}
await queries.block_token(db, request)
response = JSONResponse(content={"logout": "success", "msg": "logout successful"})
return response
class SeverityCounts(BaseModel):
severities: Dict[Literal["ok", "warning", "critical", "unknown"], int]
agents: int
model_config = {
"json_schema_extra": {
"examples": [
{
"severities": {"ok": 10, "warning": 0, "critical": 2, "unknown": 0},
"agents": 1,
}
]
}
}
@route.get("/", response_model=SeverityCounts)
async def get_severity_counts(
request: Request,
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
):
"""Shows the number of results per severity and number of known agents"""
counts_dict = await queries.get_severity_counts(db)
agents = db.query(Result.agent_id).distinct().all()
return {
"severities": counts_dict,
"agents": len(agents),
}
@route.post(
"/reschedule/all",
responses={
200: {
"content": {
"application/json": {"example": {"msg": "Non OK tasks reschuled"}}
}
}
},
)
async def reschedule_all(
request: Request,
user: User | None = Depends(get_manager),
db: Session = Depends(get_db),
):
"""Reschedule checks of all non OK tasks ASAP"""
await queries.reschedule_all(db)
return {"msg": "Non OK tasks reschuled"}