— Reload configuration asynchronously (fix #79)

This commit is contained in:
Luc Didry 2025-02-17 17:26:56 +01:00
parent 8d82f7f9d6
commit a48c7b74e6
No known key found for this signature in database
GPG key ID: EA868E12D0257E3C
11 changed files with 176 additions and 37 deletions

View file

@ -6,6 +6,7 @@
- 📝 — Document how to add data to requests (#77) - 📝 — Document how to add data to requests (#77)
- ✨ — No need cron tasks for DB cleaning anymore (#74 and #75) - ✨ — No need cron tasks for DB cleaning anymore (#74 and #75)
- ✨ — No need cron tasks for agents watching (#76) - ✨ — No need cron tasks for agents watching (#76)
- ✨ — Reload configuration asynchronously (#79)
## 0.7.4 ## 0.7.4

View file

@ -150,23 +150,40 @@ def start(host, port, config, reload):
envvar="ARGOS_YAML_FILE", envvar="ARGOS_YAML_FILE",
callback=validate_config_access, callback=validate_config_access,
) )
@click.option(
"--enqueue/--no-enqueue",
default=False,
help="Let Argos main recurring tasks handle configurations loading. "
"It may delay the application of the new configuration up to 2 minutes. "
"Default is --no-enqueue",
)
@coroutine @coroutine
async def reload_config(config): async def reload_config(config, enqueue):
"""Read tasks configuration and add/delete tasks in database if needed""" """Read tasks configuration and add/delete tasks in database if needed"""
# Its mandatory to do it before the imports # Its mandatory to do it before the imports
os.environ["ARGOS_YAML_FILE"] = config os.environ["ARGOS_YAML_FILE"] = config
# The imports are made here otherwise the agent will need server configuration files. # The imports are made here otherwise the agent will need server configuration files.
from argos.server import queries from argos.server import queries
from argos.server.main import read_config from argos.server.settings import read_config
_config = read_config(config) _config = read_config(config)
db = await get_db() db = await get_db()
changed = await queries.update_from_config(db, _config)
click.echo(f"{changed['added']} tasks added") config_changed = await queries.has_config_changed(db, _config)
click.echo(f"{changed['vanished']} tasks deleted") if not config_changed:
click.echo("Config has not change")
else:
if enqueue:
msg = await queries.update_from_config_later(db, config_file=config)
click.echo(msg)
else:
changed = await queries.update_from_config(db, _config)
click.echo(f"{changed['added']} task(s) added")
click.echo(f"{changed['vanished']} task(s) deleted")
@server.command() @server.command()
@ -480,8 +497,8 @@ async def test_mail(config, domain, severity):
from argos.logging import set_log_level from argos.logging import set_log_level
from argos.server.alerting import notify_by_mail from argos.server.alerting import notify_by_mail
from argos.server.main import read_config
from argos.server.models import Result, Task from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config) conf = read_config(config)
@ -545,8 +562,8 @@ async def test_gotify(config, domain, severity):
from argos.logging import set_log_level from argos.logging import set_log_level
from argos.server.alerting import notify_with_gotify from argos.server.alerting import notify_with_gotify
from argos.server.main import read_config
from argos.server.models import Result, Task from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config) conf = read_config(config)
@ -613,8 +630,8 @@ async def test_apprise(config, domain, severity, apprise_group):
from argos.logging import set_log_level from argos.logging import set_log_level
from argos.server.alerting import notify_with_apprise from argos.server.alerting import notify_with_apprise
from argos.server.main import read_config
from argos.server.models import Result, Task from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config) conf = read_config(config)

View file

@ -8,11 +8,25 @@ from typing import Literal
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from argos.schemas.utils import IPVersion, Method from argos.schemas.utils import IPVersion, Method, Todo
# XXX Refactor using SQLModel to avoid duplication of model data # XXX Refactor using SQLModel to avoid duplication of model data
class Job(BaseModel):
"""Tasks needing to be executed in recurring tasks processing.
Its quite like a job queue."""
id: int
todo: Todo
args: str
current: bool
added_at: datetime
def __str__(self):
return f"Job ({self.id}): {self.todo}"
class Task(BaseModel): class Task(BaseModel):
"""A task corresponds to a check to execute""" """A task corresponds to a check to execute"""

View file

@ -6,3 +6,5 @@ IPVersion = Literal["4", "6"]
Method = Literal[ Method = Literal[
"GET", "HEAD", "POST", "OPTIONS", "CONNECT", "TRACE", "PUT", "PATCH", "DELETE" "GET", "HEAD", "POST", "OPTIONS", "CONNECT", "TRACE", "PUT", "PATCH", "DELETE"
] ]
Todo = Literal["RELOAD_CONFIG"]

View file

@ -1,5 +1,4 @@
import os import os
import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path from pathlib import Path
@ -7,7 +6,6 @@ from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi_login import LoginManager from fastapi_login import LoginManager
from fastapi_utils.tasks import repeat_every from fastapi_utils.tasks import repeat_every
from pydantic import ValidationError
from sqlalchemy import create_engine, event from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -15,7 +13,7 @@ from argos.logging import logger, set_log_level
from argos.server import models, routes, queries from argos.server import models, routes, queries
from argos.server.alerting import no_agent_alert from argos.server.alerting import no_agent_alert
from argos.server.exceptions import NotAuthenticatedException, auth_exception_handler from argos.server.exceptions import NotAuthenticatedException, auth_exception_handler
from argos.server.settings import read_yaml_config from argos.server.settings import read_config
def get_application() -> FastAPI: def get_application() -> FastAPI:
@ -73,17 +71,6 @@ async def connect_to_db(appli):
return appli.state.db return appli.state.db
def read_config(yaml_file):
try:
config = read_yaml_config(yaml_file)
return config
except ValidationError as err:
logger.error("Errors where found while reading configuration:")
for error in err.errors():
logger.error("%s is %s", error["loc"], error["type"])
sys.exit(1)
def setup_database(appli): def setup_database(appli):
config = appli.state.config config = appli.state.config
db_url = str(config.general.db.url) db_url = str(config.general.db.url)
@ -136,15 +123,18 @@ async def recurring_tasks() -> None:
with app.state.SessionLocal() as db: with app.state.SessionLocal() as db:
config = app.state.config.recurring_tasks config = app.state.config.recurring_tasks
removed = await queries.remove_old_results(db, config.max_results) removed = await queries.remove_old_results(db, config.max_results)
logger.info("%i results removed", removed) logger.info("%i result(s) removed", removed)
updated = await queries.release_old_locks(db, config.max_lock_seconds) updated = await queries.release_old_locks(db, config.max_lock_seconds)
logger.info("%i locks released", updated) logger.info("%i lock(s) released", updated)
agents = await queries.get_recent_agents_count(db, config.time_without_agent) agents = await queries.get_recent_agents_count(db, config.time_without_agent)
if agents == 0: if agents == 0:
no_agent_alert(app.state.config) no_agent_alert(app.state.config)
processed_jobs = await queries.process_jobs(db)
logger.info("%i job(s) processed", processed_jobs)
logger.info("Background recurring tasks ended") logger.info("Background recurring tasks ended")

View file

@ -0,0 +1,43 @@
"""Add job queue
Revision ID: 5f6cb30db996
Revises: bd4b4962696a
Create Date: 2025-02-17 16:56:36.673511
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "5f6cb30db996"
down_revision: Union[str, None] = "bd4b4962696a"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
enum = sa.Enum(
"RELOAD_CONFIG",
name="todo_enum",
create_type=False,
)
enum.create(op.get_bind(), checkfirst=True)
op.create_table(
"jobs",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("todo", enum, nullable=False),
sa.Column("args", sa.String(), nullable=False),
sa.Column(
"current", sa.Boolean(), server_default=sa.sql.false(), nullable=False
),
sa.Column("added_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
def downgrade() -> None:
op.drop_table("jobs")
sa.Enum(name="todo_enum").drop(op.get_bind(), checkfirst=True)

View file

@ -14,7 +14,7 @@ from sqlalchemy.schema import Index
from argos.checks import BaseCheck, get_registered_check from argos.checks import BaseCheck, get_registered_check
from argos.schemas import WebsiteCheck from argos.schemas import WebsiteCheck
from argos.schemas.utils import IPVersion, Method from argos.schemas.utils import IPVersion, Method, Todo
def compute_task_group(context) -> str: def compute_task_group(context) -> str:
@ -33,6 +33,19 @@ class Base(DeclarativeBase):
type_annotation_map = {List[WebsiteCheck]: JSON, dict: JSON} type_annotation_map = {List[WebsiteCheck]: JSON, dict: JSON}
class Job(Base):
"""
Job queue emulation
"""
__tablename__ = "jobs"
id: Mapped[int] = mapped_column(primary_key=True)
todo: Mapped[Todo] = mapped_column(Enum("RELOAD_CONFIG", name="todo_enum"))
args: Mapped[str] = mapped_column()
current: Mapped[bool] = mapped_column(insert_default=False)
added_at: Mapped[datetime] = mapped_column()
class Task(Base): class Task(Base):
""" """
There is one task per check. There is one task per check.

View file

@ -9,7 +9,8 @@ from sqlalchemy.orm import Session
from argos import schemas from argos import schemas
from argos.logging import logger from argos.logging import logger
from argos.server.models import Result, Task, ConfigCache, User from argos.server.models import ConfigCache, Job, Result, Task, User
from argos.server.settings import read_config
async def list_tasks(db: Session, agent_id: str, limit: int = 100): async def list_tasks(db: Session, agent_id: str, limit: int = 100):
@ -219,12 +220,50 @@ async def has_config_changed(db: Session, config: schemas.Config) -> bool: # py
return True return True
async def update_from_config_later(db: Session, config_file):
"""Ask Argos to reload configuration in a recurring task"""
jobs = (
db.query(Job)
.filter(
Job.todo == "RELOAD_CONFIG",
Job.args == config_file,
Job.current == False,
)
.all()
)
if jobs:
return "There is already a config reloading job in the job queue, for the same file"
job = Job(todo="RELOAD_CONFIG", args=config_file, added_at=datetime.now())
db.add(job)
db.commit()
return "Config reloading has been added in the job queue"
async def process_jobs(db: Session) -> int:
"""Process job queue"""
jobs = db.query(Job).filter(Job.current == False).all()
if jobs:
for job in jobs:
job.current = True
db.commit()
if job.todo == "RELOAD_CONFIG":
logger.info("Processing job %i: %s %s", job.id, job.todo, job.args)
_config = read_config(job.args)
changed = await update_from_config(db, _config)
logger.info("%i task(s) added", changed["added"])
logger.info("%i task(s) deleted", changed["vanished"])
db.delete(job)
db.commit()
return len(jobs)
return 0
async def update_from_config(db: Session, config: schemas.Config): # pylint: disable-msg=too-many-branches async def update_from_config(db: Session, config: schemas.Config): # pylint: disable-msg=too-many-branches
"""Update tasks from config file""" """Update tasks from config file"""
config_changed = await has_config_changed(db, config)
if not config_changed:
return {"added": 0, "vanished": 0}
max_task_id = ( max_task_id = (
db.query(func.max(Task.id).label("max_id")).all() # pylint: disable-msg=not-callable db.query(func.max(Task.id).label("max_id")).all() # pylint: disable-msg=not-callable
)[0].max_id )[0].max_id
@ -339,7 +378,8 @@ async def update_from_config(db: Session, config: schemas.Config): # pylint: di
) )
db.commit() db.commit()
logger.info( logger.info(
"%i tasks has been removed since not in config file anymore", vanished_tasks "%i task(s) has been removed since not in config file anymore",
vanished_tasks,
) )
return {"added": len(tasks), "vanished": vanished_tasks} return {"added": len(tasks), "vanished": vanished_tasks}

View file

@ -1,12 +1,26 @@
"""Pydantic schemas for server""" """Pydantic schemas for server"""
import sys
from pathlib import Path from pathlib import Path
import yaml import yaml
from yamlinclude import YamlIncludeConstructor from yamlinclude import YamlIncludeConstructor
from pydantic import ValidationError
from argos.logging import logger
from argos.schemas.config import Config from argos.schemas.config import Config
def read_config(yaml_file):
try:
config = read_yaml_config(yaml_file)
return config
except ValidationError as err:
logger.error("Errors where found while reading configuration:")
for error in err.errors():
logger.error("%s is %s", error["loc"], error["type"])
sys.exit(1)
def read_yaml_config(filename: str) -> Config: def read_yaml_config(filename: str) -> Config:
parsed = _load_yaml(filename) parsed = _load_yaml(filename)
return Config(**parsed) return Config(**parsed)

View file

@ -163,10 +163,15 @@ Usage: argos server reload-config [OPTIONS]
Read tasks configuration and add/delete tasks in database if needed Read tasks configuration and add/delete tasks in database if needed
Options: Options:
--config TEXT Path of the configuration file. If ARGOS_YAML_FILE environment --config TEXT Path of the configuration file. If ARGOS_YAML_FILE
variable is set, its value will be used instead. Default value: environment variable is set, its value will be used
argos-config.yaml and /etc/argos/config.yaml as fallback. instead. Default value: argos-config.yaml and
--help Show this message and exit. /etc/argos/config.yaml as fallback.
--enqueue / --no-enqueue Let Argos main recurring tasks handle
configurations loading. It may delay the
application of the new configuration up to 2
minutes. Default is --no-enqueue
--help Show this message and exit.
``` ```
<!--[[[end]]] <!--[[[end]]]

View file

@ -96,7 +96,7 @@ ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
--worker-class uvicorn.workers.UvicornWorker \\ --worker-class uvicorn.workers.UvicornWorker \\
--bind \$ARGOS_SERVER_SOCKET \\ --bind \$ARGOS_SERVER_SOCKET \\
--forwarded-allow-ips \$ARGOS_SERVER_FORWARDED_ALLOW_IPS --forwarded-allow-ips \$ARGOS_SERVER_FORWARDED_ALLOW_IPS
ExecReload=/opt/argos/venv/bin/argos server reload-config ExecReload=/opt/argos/venv/bin/argos server reload-config --enqueue
SyslogIdentifier=argos-server SyslogIdentifier=argos-server
[Install] [Install]