Compare commits

...

46 commits
0.6.1 ... main

Author SHA1 Message Date
Luc Didry
9389e3a005
🏷 — Bump version (0.9.0) 2025-02-18 17:05:55 +01:00
Luc Didry
159a6e2427
🔀 Merge remote-tracking branch 'origin/develop' 2025-02-18 17:05:25 +01:00
Luc Didry
211ac32028
🐛 — Fix worker timeout for old results cleaning in recurring tasks (fix #84)
💥 Old results are now removed by their age, not based on their number.

💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
2025-02-18 17:04:26 +01:00
Luc Didry
32f2518294
🏷 — Bump version (0.8.2) 2025-02-18 14:58:35 +01:00
Luc Didry
38cc06e972
🐛 — Fix recurring tasks with gunicorn 2025-02-18 14:57:49 +01:00
Luc Didry
4b78919937
🏷 — Bump version (0.8.1) 2025-02-18 14:22:41 +01:00
Luc Didry
d8f30ebccd
🐛 — Fix todo enum in jobs table 2025-02-18 14:22:12 +01:00
Luc Didry
09674f73ef
🏷 — Bump version (0.8.0) 2025-02-18 13:50:36 +01:00
Luc Didry
c63093bb2f
🔀 Merge remote-tracking branch 'origin/develop' 2025-02-18 13:48:47 +01:00
Luc Didry
657624ed35
📝 — Add enum doc for developers 2025-02-18 13:47:27 +01:00
Luc Didry
471c1eae91
📜 — Add breaking changes to CHANGELOG 2025-02-18 13:43:05 +01:00
Luc Didry
c3708af32a
🐛 — Better httpx.RequestError handling (fix #83) 2025-02-18 13:36:40 +01:00
Luc Didry
23fea9fffa
🐛 — Automatically reconnect to LDAP if unreachable (fix #81) 2025-02-18 11:28:05 +01:00
Luc Didry
a48c7b74e6
— Reload configuration asynchronously (fix #79) 2025-02-17 17:26:56 +01:00
Luc Didry
8d82f7f9d6
— No need cron tasks for agents watching (fix #76) 2025-02-17 15:35:13 +01:00
Luc Didry
fd0c68cd4c
— Add missing dependency for fastapi-utils 2025-02-17 11:03:01 +01:00
Luc Didry
c98cd9c017
— No need cron tasks for DB cleaning anymore (fix #74 and #75) 2025-02-17 10:46:01 +01:00
Luc Didry
73e7a8f414
📝 — Document how to add data to requests (fix #77) 2025-02-12 16:25:10 +01:00
Luc Didry
db54dd2cdd
— Allow to customize agent User-Agent header (fix #78) 2025-02-12 16:10:09 +01:00
Luc Didry
1b484da27a
🏷 — Bump version (0.7.4) 2025-02-12 15:33:36 +01:00
Luc Didry
07f87a0f7d
🔀 Merge remote-tracking branch 'origin/develop' 2025-02-12 15:33:04 +01:00
Luc Didry
60f3079140
🩹 — Add missing enum removal 2025-02-12 15:32:24 +01:00
Luc Didry
ca709dca62
🔀 Merge remote-tracking branch 'dryusdan/dev/fix-method-enum' into develop 2025-02-12 15:02:19 +01:00
Dryusdan
0f099b9df4 Fix method enum in tasks table 2025-01-29 11:37:09 +01:00
Luc Didry
5abdd8414d
🏷 — Bump version (0.7.3) 2025-01-26 07:59:10 +01:00
Luc Didry
06868cdd74
🔀 Merge remote-tracking branch 'origin/develop' 2025-01-26 07:58:34 +01:00
Luc Didry
2b82f7c8f2
🐛 — Fix bug in retry_before_notification logic when success 2025-01-26 07:54:31 +01:00
Luc Didry
797a60a85c
🏷 — Bump version (0.7.2) 2025-01-24 14:07:50 +01:00
Luc Didry
4c4d3b69b2
🔀 Merge remote-tracking branch 'origin/develop' 2025-01-24 14:07:16 +01:00
Luc Didry
c922894567
🐛 — Fix bug in retry_before_notification logic 2025-01-24 14:06:36 +01:00
Luc Didry
8652539086
🏷 — Bump version (0.7.1) 2025-01-15 16:18:16 +01:00
Luc Didry
4f3dfd994b
🔀 Merge remote-tracking branch 'origin/develop' 2025-01-15 16:16:14 +01:00
Luc Didry
28ec85fed3
📝 — Improve release documentation 2025-01-15 16:15:16 +01:00
Luc Didry
586660c02a
🩹 — Check before adding/removing ip_version_enum 2025-01-15 09:14:55 +01:00
Luc Didry
64f8241e74
🩹 — Avoid warning from MySQL only alembic instructions 2025-01-14 17:06:59 +01:00
Luc Didry
3d209fed22
🏷 — Bump version (0.7.0) 2025-01-14 16:41:09 +01:00
Luc Didry
acd90133bd
🔀 Merge remote-tracking branch 'origin/develop' 2025-01-14 16:39:53 +01:00
Luc Didry
be90aa095a
🐛 — Fix strange and buggy behavior 2025-01-14 16:38:43 +01:00
Luc Didry
06f8310505
🗃 — Use bigint type for results id column in PostgreSQL (fix #73) 2025-01-14 16:38:43 +01:00
Luc Didry
fe89d62e88
🐛🗃 — Fix enum migration on PostgreSQL 2025-01-14 16:38:43 +01:00
Luc Didry
1e7672abca
🚸 — Add a long expiration date on auto-refresh cookies 2025-01-14 16:38:43 +01:00
Luc Didry
2ef999fa63
— Allow to specify form data and headers for checks (fix #70) 2025-01-14 16:38:43 +01:00
Luc Didry
9c8be94c20
🐛 — Fix bug when changing IP version not removing tasks (fix #72) 2025-01-14 16:38:38 +01:00
Luc Didry
311d86d130
— Ability to delay notification after X failures (fix #71) 2024-12-09 14:08:55 +01:00
Luc Didry
e0edb50e12
— Mutualize check requests (fix #68) 2024-12-04 15:04:06 +01:00
Luc Didry
ea23ea7c1f
— IPv4/IPv6 choice for checks, and choice for a dual-stack check (fix #69) 2024-12-02 15:24:54 +01:00
38 changed files with 1304 additions and 417 deletions

View file

@ -2,6 +2,90 @@
## [Unreleased]
## 0.9.0
Date: 2025-02-18
- 🐛 — Fix worker timeout for old results cleaning in recurring tasks (#84)
💥 Old results are now removed by their age, not based on their number.
💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
## 0.8.2
Date: 2025-02-18
- 🐛 — Fix recurring tasks with gunicorn
## 0.8.1
Date: 2025-02-18
- 🐛 — Fix todo enum in jobs table
## 0.8.0
Date: 2025-02-18
- ✨ — Allow to customize agent User-Agent header (#78)
- 📝 — Document how to add data to requests (#77)
- ✨ — No need cron tasks for DB cleaning anymore (#74 and #75)
- ✨ — No need cron tasks for agents watching (#76)
- ✨ — Reload configuration asynchronously (#79)
- 🐛 — Automatically reconnect to LDAP if unreachable (#81)
- 🐛 — Better httpx.RequestError handling (#83)
💥 Warning: there is new settings to add to your configuration file.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
💥 You dont need cron tasks anymore!
Remove your old cron tasks as they will now do nothing but generating errors.
NB: You may want to add `--enqueue` to `reload-config` command in your systemd file.
## 0.7.4
Date: 2025-02-12
- 🐛 — Fix method enum in tasks table (thx to Dryusdan)
## 0.7.3
Date: 2025-01-26
- 🐛 — Fix bug in retry_before_notification logic when success
## 0.7.2
Date: 2025-01-24
- 🐛 — Fix bug in retry_before_notification logic
## 0.7.1
Date: 2025-01-15
- 🩹 — Avoid warning from MySQL only alembic instructions
- 🩹 — Check before adding/removing ip_version_enum
- 📝 — Improve release documentation
## 0.7.0
Date: 2025-01-14
- ✨ — IPv4/IPv6 choice for checks, and choice for a dual-stack check (#69)
- ⚡ — Mutualize check requests (#68)
- ✨ — Ability to delay notification after X failures (#71)
- 🐛 — Fix bug when changing IP version not removing tasks (#72)
- ✨ — Allow to specify form data and headers for checks (#70)
- 🚸 — Add a long expiration date on auto-refresh cookies
- 🗃️ — Use bigint type for results id column in PostgreSQL (#73)
## 0.6.1
Date: 2024-11-28

View file

@ -1 +1 @@
VERSION = "0.6.1"
VERSION = "0.9.0"

View file

@ -6,6 +6,7 @@ import asyncio
import json
import logging
import socket
from hashlib import md5
from time import sleep
from typing import List
@ -33,26 +34,47 @@ def log_failure(retry_state):
)
class ArgosAgent:
class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
"""The Argos agent is responsible for running the checks and reporting the results."""
def __init__(self, server: str, auth: str, max_tasks: int, wait_time: int):
def __init__( # pylint: disable-msg=too-many-positional-arguments
self, server: str, auth: str, max_tasks: int, wait_time: int, user_agent: str
):
self.server = server
self.max_tasks = max_tasks
self.wait_time = wait_time
self.auth = auth
self._http_client = None
if user_agent == "":
self.ua = user_agent
else:
self.ua = f" - {user_agent}"
self._http_client: httpx.AsyncClient | None = None
self._http_client_v4: httpx.AsyncClient | None = None
self._http_client_v6: httpx.AsyncClient | None = None
self._res_cache: dict[str, httpx.Response] = {}
self.agent_id = socket.gethostname()
@retry(after=log_failure, wait=wait_random(min=1, max=2))
async def run(self):
headers = {
auth_header = {
"Authorization": f"Bearer {self.auth}",
"User-Agent": f"Argos Panoptes {VERSION} "
"(about: https://argos-monitoring.framasoft.org/)",
"User-Agent": f"Argos Panoptes agent {VERSION}{self.ua}",
}
self._http_client = httpx.AsyncClient(headers=headers)
self._http_client = httpx.AsyncClient(headers=auth_header)
ua_header = {
"User-Agent": f"Argos Panoptes {VERSION} "
f"(about: https://argos-monitoring.framasoft.org/){self.ua}",
}
self._http_client_v4 = httpx.AsyncClient(
headers=ua_header,
transport=httpx.AsyncHTTPTransport(local_address="0.0.0.0"),
)
self._http_client_v6 = httpx.AsyncClient(
headers=ua_header, transport=httpx.AsyncHTTPTransport(local_address="::")
)
logger.info("Running agent against %s", self.server)
async with self._http_client:
while "forever":
@ -61,35 +83,90 @@ class ArgosAgent:
logger.info("Waiting %i seconds before next retry", self.wait_time)
await asyncio.sleep(self.wait_time)
async def _do_request(self, group: str, details: dict):
logger.debug("_do_request for group %s", group)
headers = {}
if details["request_data"] is not None:
request_data = json.loads(details["request_data"])
if request_data["headers"] is not None:
headers = request_data["headers"]
if details["ip_version"] == "4":
http_client = self._http_client_v4
else:
http_client = self._http_client_v6
try:
if details["request_data"] is None or request_data["data"] is None:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
timeout=60,
)
elif request_data["json"]:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
json=request_data["data"],
timeout=60,
)
else:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
data=request_data["data"],
timeout=60,
)
except httpx.ReadError:
sleep(1)
logger.warning("httpx.ReadError for group %s, re-emit request", group)
if details["request_data"] is None or request_data["data"] is None:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"], url=details["url"], timeout=60
)
elif request_data["json"]:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
json=request_data["data"],
timeout=60,
)
else:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
data=request_data["data"],
timeout=60,
)
except httpx.RequestError as err:
logger.warning("httpx.RequestError for group %s", group)
response = err
self._res_cache[group] = response
async def _complete_task(self, _task: dict) -> AgentResult:
try:
task = Task(**_task)
url = task.url
if task.check == "http-to-https":
url = str(httpx.URL(task.url).copy_with(scheme="http"))
try:
response = await self._http_client.request( # type: ignore[attr-defined]
method=task.method, url=url, timeout=60
)
except httpx.ReadError:
sleep(1)
response = await self._http_client.request( # type: ignore[attr-defined]
method=task.method, url=url, timeout=60
)
check_class = get_registered_check(task.check)
check = check_class(task)
response = self._res_cache[task.task_group]
if isinstance(response, httpx.Response):
result = await check.run(response)
status = result.status
context = result.context
else:
status = "failure"
context = SerializableException.from_exception(response)
except Exception as err: # pylint: disable=broad-except
status = "error"
context = SerializableException.from_exception(err)
msg = f"An exception occured when running {_task}. {err.__class__.__name__} : {err}"
logger.error(msg)
return AgentResult(task_id=task.id, status=status, context=context)
async def _get_and_complete_tasks(self):
@ -100,12 +177,45 @@ class ArgosAgent:
)
if response.status_code == httpx.codes.OK:
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL)
data = response.json()
logger.info("Received %i tasks from the server", len(data))
req_groups = {}
_tasks = []
for _task in data:
task = Task(**_task)
url = task.url
group = task.task_group
if task.check == "http-to-https":
data = task.request_data
if data is None:
data = ""
url = str(httpx.URL(task.url).copy_with(scheme="http"))
group = (
f"{task.method}-{task.ip_version}-{url}-"
f"{md5(data.encode()).hexdigest()}"
)
_task["task_group"] = group
req_groups[group] = {
"url": url,
"ip_version": task.ip_version,
"method": task.method,
"request_data": task.request_data,
}
_tasks.append(_task)
requests = []
for group, details in req_groups.items():
requests.append(self._do_request(group, details))
if requests:
await asyncio.gather(*requests)
tasks = []
for task in data:
for task in _tasks:
tasks.append(self._complete_task(task))
if tasks:

View file

@ -92,7 +92,12 @@ def version():
default="INFO",
type=click.Choice(logging.LOG_LEVELS, case_sensitive=False),
)
def agent(server_url, auth, max_tasks, wait_time, log_level):
@click.option(
"--user-agent",
default="",
help="A custom string to append to the User-Agent header",
)
def agent(server_url, auth, max_tasks, wait_time, log_level, user_agent): # pylint: disable-msg=too-many-positional-arguments
"""Get and run tasks for the provided server. Will wait for new tasks.
Usage: argos agent https://argos.example.org "auth-token-here"
@ -108,7 +113,7 @@ def agent(server_url, auth, max_tasks, wait_time, log_level):
from argos.logging import logger
logger.setLevel(log_level)
agent_ = ArgosAgent(server_url, auth, max_tasks, wait_time)
agent_ = ArgosAgent(server_url, auth, max_tasks, wait_time, user_agent)
asyncio.run(agent_.run())
@ -135,101 +140,6 @@ def start(host, port, config, reload):
uvicorn.run("argos.server:app", host=host, port=port, reload=reload)
def validate_max_lock_seconds(ctx, param, value):
if value <= 60:
raise click.BadParameter("Should be strictly higher than 60")
return value
def validate_max_results(ctx, param, value):
if value <= 0:
raise click.BadParameter("Should be a positive integer")
return value
@server.command()
@click.option(
"--max-results",
default=100,
help="Number of results per task to keep",
callback=validate_max_results,
)
@click.option(
"--max-lock-seconds",
default=100,
help=(
"The number of seconds after which a lock is "
"considered stale, must be higher than 60 "
"(the checks have a timeout value of 60 seconds)"
),
callback=validate_max_lock_seconds,
)
@click.option(
"--config",
default="argos-config.yaml",
help="Path of the configuration file. "
"If ARGOS_YAML_FILE environment variable is set, its value will be used instead. "
"Default value: argos-config.yaml and /etc/argos/config.yaml as fallback.",
envvar="ARGOS_YAML_FILE",
callback=validate_config_access,
)
@coroutine
async def cleandb(max_results, max_lock_seconds, config):
"""Clean the database (to run routinely)
\b
- Removes old results from the database.
- Removes locks from tasks that have been locked for too long.
"""
# Its mandatory to do it before the imports
os.environ["ARGOS_YAML_FILE"] = config
# The imports are made here otherwise the agent will need server configuration files.
from argos.server import queries
db = await get_db()
removed = await queries.remove_old_results(db, max_results)
updated = await queries.release_old_locks(db, max_lock_seconds)
click.echo(f"{removed} results removed")
click.echo(f"{updated} locks released")
@server.command()
@click.option(
"--time-without-agent",
default=5,
help="Time without seeing an agent after which a warning will be issued, in minutes. "
"Default is 5 minutes.",
callback=validate_max_results,
)
@click.option(
"--config",
default="argos-config.yaml",
help="Path of the configuration file. "
"If ARGOS_YAML_FILE environment variable is set, its value will be used instead.",
envvar="ARGOS_YAML_FILE",
callback=validate_config_access,
)
@coroutine
async def watch_agents(time_without_agent, config):
"""Watch agents (to run routinely)
Issues a warning if no agent has been seen by the server for a given time.
"""
# Its mandatory to do it before the imports
os.environ["ARGOS_YAML_FILE"] = config
# The imports are made here otherwise the agent will need server configuration files.
from argos.server import queries
db = await get_db()
agents = await queries.get_recent_agents_count(db, time_without_agent)
if agents == 0:
click.echo(f"No agent has been seen in the last {time_without_agent} minutes.")
sysexit(1)
@server.command(short_help="Load or reload tasks configuration")
@click.option(
"--config",
@ -240,23 +150,40 @@ async def watch_agents(time_without_agent, config):
envvar="ARGOS_YAML_FILE",
callback=validate_config_access,
)
@click.option(
"--enqueue/--no-enqueue",
default=False,
help="Let Argos main recurring tasks handle configurations loading. "
"It may delay the application of the new configuration up to 2 minutes. "
"Default is --no-enqueue",
)
@coroutine
async def reload_config(config):
async def reload_config(config, enqueue):
"""Read tasks configuration and add/delete tasks in database if needed"""
# Its mandatory to do it before the imports
os.environ["ARGOS_YAML_FILE"] = config
# The imports are made here otherwise the agent will need server configuration files.
from argos.server import queries
from argos.server.main import read_config
from argos.server.settings import read_config
_config = read_config(config)
db = await get_db()
config_changed = await queries.has_config_changed(db, _config)
if not config_changed:
click.echo("Config has not change")
else:
if enqueue:
msg = await queries.update_from_config_later(db, config_file=config)
click.echo(msg)
else:
changed = await queries.update_from_config(db, _config)
click.echo(f"{changed['added']} tasks added")
click.echo(f"{changed['vanished']} tasks deleted")
click.echo(f"{changed['added']} task(s) added")
click.echo(f"{changed['vanished']} task(s) deleted")
@server.command()
@ -570,8 +497,8 @@ async def test_mail(config, domain, severity):
from argos.logging import set_log_level
from argos.server.alerting import notify_by_mail
from argos.server.main import read_config
from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config)
@ -586,6 +513,7 @@ async def test_mail(config, domain, severity):
check="body-contains",
expected="foo",
frequency=1,
ip_version=4,
selected_by="test",
selected_at=now,
)
@ -634,8 +562,8 @@ async def test_gotify(config, domain, severity):
from argos.logging import set_log_level
from argos.server.alerting import notify_with_gotify
from argos.server.main import read_config
from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config)
@ -650,6 +578,7 @@ async def test_gotify(config, domain, severity):
check="body-contains",
expected="foo",
frequency=1,
ip_version=4,
selected_by="test",
selected_at=now,
)
@ -701,8 +630,8 @@ async def test_apprise(config, domain, severity, apprise_group):
from argos.logging import set_log_level
from argos.server.alerting import notify_with_apprise
from argos.server.main import read_config
from argos.server.models import Result, Task
from argos.server.settings import read_config
conf = read_config(config)
@ -717,6 +646,7 @@ async def test_apprise(config, domain, severity, apprise_group):
check="body-contains",
expected="foo",
frequency=1,
ip_version=4,
selected_by="test",
selected_at=now,
)

View file

@ -64,6 +64,29 @@ general:
# For ex., to re-try a check one minute after a failure:
# recheck_delay: "1m"
# Default setting for notifications delay.
# Say you want to be warned right after a failure on a check: set it to 0
# Say you want a second failure on the check before being warned,
# to avoid network hiccups: set it to 1
# Can be superseeded in domain configuration
# If not present, default is 0
# retry_before_notification: 0
# Defaults settings for IPv4/IPv6
# Can be superseeded in domain configuration.
# By default, Argos will check both IPv4 and IPv6 addresses of a domain
# (i.e. by default, both `ipv4` and `ipv6` are set to true).
# To disable the IPv4 check of domains:
# ipv4: false
# To disable the IPv6 check of domains:
# ipv6: false
# Argos root path
# If not present, default value is ""
# Set it to /foo if you want to use argos at /foo/ instead of /
# on your web server
# root_path: "/foo"
# Which way do you want to be warned when a check goes to that severity?
# "local" emits a message in the server log
# Youll need to configure mail, gotify or apprise below to be able to use
@ -79,11 +102,10 @@ general:
- local
unknown:
- local
# Argos root path
# If not present, default value is ""
# Set it to /foo if you want to use argos at /foo/ instead of /
# on your web server
# root_path: "/foo"
# This alert is triggered when no Argos agent has been seen in a while
# See recurring_tasks.time_without_agent below
no_agent:
- local
# Mail configuration is quite straight-forward
# mail:
# mailfrom: no-reply@example.org
@ -127,6 +149,22 @@ ssl:
- "1d": critical
- "5d": warning
# Argos will execute some tasks in the background for you
# every 2 minutes and needs some configuration for that
recurring_tasks:
# Maximum age of results
# Use m for minutes, h for hours, d for days
# w for weeks, M for months, y for years
# See https://github.com/timwedde/durations_nlp#scales-reference for details
max_results_age: "1d"
# Max number of seconds a task can be locked
# Minimum value is 61, default is 100
max_lock_seconds: 100
# Max number of minutes without seing an agent
# before sending an alert
# Minimum value is 1, default is 5
time_without_agent: 5
# It's also possible to define the checks in another file
# with the include syntax:
#
@ -134,6 +172,8 @@ ssl:
#
websites:
- domain: "https://mypads.example.org"
# Wait for a second failure before sending notification
retry_before_notification: 1
paths:
- path: "/mypads/"
# Specify the method of the HTTP request
@ -171,6 +211,17 @@ websites:
- 302
- 307
- path: "/admin/"
methode: "POST"
# Send form data in the request
request_data:
data:
login: "admin"
password: "my-password"
# To send data as JSON (optional, default is false):
is_json: true
# To send additional headers
headers:
Authorization: "Bearer foo-bar-baz"
checks:
# Check that the return HTTP status is one of those
# Similar to status-is, verify that you dont mistyped it!
@ -213,6 +264,8 @@ websites:
- domain: "https://munin.example.org"
frequency: "20m"
recheck_delay: "5m"
# Lets say its an IPv6 only web site
ipv4: false
paths:
- path: "/"
checks:

View file

@ -14,9 +14,10 @@ logger = logging.getLogger(__name__)
# XXX Does not work ?
def set_log_level(log_level):
def set_log_level(log_level: str, quiet: bool = False):
level = getattr(logging, log_level.upper(), None)
if not isinstance(level, int):
raise ValueError(f"Invalid log level: {log_level}")
logger.setLevel(level=level)
if not quiet:
logger.info("Log level set to %s", log_level)

View file

@ -5,7 +5,7 @@ For database models, see argos.server.models.
import json
from typing import Dict, List, Literal, Tuple
from typing import Any, Dict, List, Literal, Tuple
from durations_nlp import Duration
from pydantic import (
@ -18,7 +18,7 @@ from pydantic import (
PositiveInt,
field_validator,
)
from pydantic.functional_validators import BeforeValidator
from pydantic.functional_validators import AfterValidator, BeforeValidator
from pydantic.networks import UrlConstraints
from pydantic_core import Url
from typing_extensions import Annotated
@ -48,6 +48,33 @@ class SSL(BaseModel):
thresholds: List[Annotated[Tuple[int, Severity], BeforeValidator(parse_threshold)]]
class RecurringTasks(BaseModel):
max_results_age: float
max_lock_seconds: int
time_without_agent: int
@field_validator("max_results_age", mode="before")
def parse_max_results_age(cls, value):
"""Convert the configured maximum results age to seconds"""
return Duration(value).to_seconds()
@field_validator("max_lock_seconds", mode="before")
def parse_max_lock_seconds(cls, value):
"""Ensure that max_lock_seconds is higher or equal to agents requests timeout (60)"""
if value > 60:
return value
return 100
@field_validator("time_without_agent", mode="before")
def parse_time_without_agent(cls, value):
"""Ensure that time_without_agent is at least one minute"""
if value >= 1:
return value
return 5
class WebsiteCheck(BaseModel):
key: str
value: str | List[str] | Dict[str, str]
@ -104,9 +131,26 @@ def parse_checks(value):
return (name, expected)
def parse_request_data(value):
"""Turn form or JSON data into JSON string"""
return json.dumps(
{"data": value.data, "json": value.is_json, "headers": value.headers}
)
class RequestData(BaseModel):
data: Any = None
is_json: bool = False
headers: Dict[str, str] | None = None
class WebsitePath(BaseModel):
path: str
method: Method = "GET"
request_data: Annotated[
RequestData, AfterValidator(parse_request_data)
] | None = None
checks: List[
Annotated[
Tuple[str, str],
@ -117,8 +161,11 @@ class WebsitePath(BaseModel):
class Website(BaseModel):
domain: HttpUrl
ipv4: bool | None = None
ipv6: bool | None = None
frequency: float | None = None
recheck_delay: float | None = None
retry_before_notification: int | None = None
paths: List[WebsitePath]
@field_validator("frequency", mode="before")
@ -170,6 +217,7 @@ class Alert(BaseModel):
warning: List[str]
critical: List[str]
unknown: List[str]
no_agent: List[str]
class GotifyUrl(BaseModel):
@ -204,6 +252,9 @@ class General(BaseModel):
ldap: LdapSettings | None = None
frequency: float
recheck_delay: float | None = None
retry_before_notification: int = 0
ipv4: bool = True
ipv6: bool = True
root_path: str = ""
alerts: Alert
mail: Mail | None = None
@ -241,4 +292,5 @@ class Config(BaseModel):
general: General
service: Service
ssl: SSL
recurring_tasks: RecurringTasks
websites: List[Website]

View file

@ -8,20 +8,39 @@ from typing import Literal
from pydantic import BaseModel, ConfigDict
from argos.schemas.utils import Method
from argos.schemas.utils import IPVersion, Method, Todo
# XXX Refactor using SQLModel to avoid duplication of model data
class Job(BaseModel):
"""Tasks needing to be executed in recurring tasks processing.
Its quite like a job queue."""
id: int
todo: Todo
args: str
current: bool
added_at: datetime
def __str__(self):
return f"Job ({self.id}): {self.todo}"
class Task(BaseModel):
"""A task corresponds to a check to execute"""
id: int
url: str
domain: str
ip_version: IPVersion
check: str
method: Method
request_data: str | None
expected: str
task_group: str
retry_before_notification: int
contiguous_failures: int
selected_at: datetime | None
selected_by: str | None
@ -31,7 +50,8 @@ class Task(BaseModel):
task_id = self.id
url = self.url
check = self.check
return f"Task ({task_id}): {url} - {check}"
ip_version = self.ip_version
return f"Task ({task_id}): {url} (IPv{ip_version}) - {check}"
class SerializableException(BaseModel):

View file

@ -1,6 +1,10 @@
from typing import Literal
IPVersion = Literal["4", "6"]
Method = Literal[
"GET", "HEAD", "POST", "OPTIONS", "CONNECT", "TRACE", "PUT", "PATCH", "DELETE"
]
Todo = Literal["RELOAD_CONFIG"]

View file

@ -11,6 +11,55 @@ import httpx
from argos.checks.base import Severity
from argos.logging import logger
from argos.schemas.config import Config, Mail, GotifyUrl
from argos.server.models import Task
def need_alert(
last_severity: str, last_severity_update, severity: str, status: str, task: Task
) -> bool:
## Create alert… or not!
send_notif = False
# Severity has changed, and no retry before notification
if last_severity != severity and task.retry_before_notification == 0:
send_notif = True
# Seems to be a first check: create a notification
elif last_severity != severity and last_severity_update is None:
send_notif = True
# As we created a notification, avoid resending it on a
# future failure
if status != "success":
task.contiguous_failures = task.retry_before_notification
# We need retry before notification, so the severity may not have changed
# since last check
elif task.retry_before_notification != 0:
# If we got a success, and we already have created a notification:
# create notification of success immediately
if (
status == "success"
and task.contiguous_failures >= task.retry_before_notification + 1
):
send_notif = True
task.contiguous_failures = 0
# The status is not a success
elif status != "success":
# This is a new failure
task.contiguous_failures += 1
# Severity has changed, but not to success, thats odd:
# create a notification
if (
last_severity not in ("ok", severity)
and last_severity_update is not None
):
send_notif = True
# As we created a notification, avoid resending it on a
# future failure
task.contiguous_failures = task.retry_before_notification
# Severity has not changed, but there has been enough failures
# to create a notification
elif task.contiguous_failures == task.retry_before_notification + 1:
send_notif = True
return send_notif
def get_icon_from_severity(severity: str) -> str:
@ -25,6 +74,91 @@ def get_icon_from_severity(severity: str) -> str:
return icon
def send_mail(mail: EmailMessage, config: Mail):
"""Send message by mail"""
if config.ssl:
logger.debug("Mail notification: SSL")
context = ssl.create_default_context()
smtp = smtplib.SMTP_SSL(host=config.host, port=config.port, context=context)
else:
smtp = smtplib.SMTP(
host=config.host, # type: ignore
port=config.port,
)
if config.starttls:
logger.debug("Mail notification: STARTTLS")
context = ssl.create_default_context()
smtp.starttls(context=context)
if config.auth is not None:
logger.debug("Mail notification: authentification")
smtp.login(config.auth.login, config.auth.password)
for address in config.addresses:
logger.debug("Sending mail to %s", address)
logger.debug(mail.get_body())
smtp.send_message(mail, to_addrs=address)
def send_gotify_msg(config, payload):
"""Send message with gotify"""
headers = {"accept": "application/json", "content-type": "application/json"}
for url in config:
logger.debug("Sending gotify message(s) to %s", url.url)
for token in url.tokens:
try:
res = httpx.post(
f"{url.url}message",
params={"token": token},
headers=headers,
json=payload,
)
res.raise_for_status()
except httpx.RequestError as err:
logger.error(
"An error occurred while sending a message to %s with token %s",
err.request.url,
token,
)
def no_agent_alert(config: Config):
"""Alert"""
msg = "You should check whats going on with your Argos agents."
twa = config.recurring_tasks.time_without_agent
if twa > 1:
subject = f"No agent has been seen within the last {twa} minutes"
else:
subject = "No agent has been seen within the last minute"
if "local" in config.general.alerts.no_agent:
logger.error(subject)
if config.general.mail is not None and "mail" in config.general.alerts.no_agent:
mail = EmailMessage()
mail["Subject"] = f"[Argos] {subject}"
mail["From"] = config.general.mail.mailfrom
mail.set_content(msg)
send_mail(mail, config.general.mail)
if config.general.gotify is not None and "gotify" in config.general.alerts.no_agent:
priority = 9
payload = {"title": subject, "message": msg, "priority": priority}
send_gotify_msg(config.general.gotify, payload)
if config.general.apprise is not None:
for notif_way in config.general.alerts.no_agent:
if notif_way.startswith("apprise:"):
group = notif_way[8:]
apobj = apprise.Apprise()
for channel in config.general.apprise[group]:
apobj.add(channel)
apobj.notify(title=subject, body=msg)
def handle_alert(config: Config, result, task, severity, old_severity, request): # pylint: disable-msg=too-many-positional-arguments
"""Dispatch alert through configured alert channels"""
@ -74,9 +208,9 @@ def notify_with_apprise( # pylint: disable-msg=too-many-positional-arguments
apobj.add(channel)
icon = get_icon_from_severity(severity)
title = f"[Argos] {icon} {urlparse(task.url).netloc}: status {severity}"
title = f"[Argos] {icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
msg = f"""\
URL: {task.url}
URL: {task.url} (IPv{task.ip_version})
Check: {task.check}
Status: {severity}
Time: {result.submitted_at}
@ -97,7 +231,7 @@ def notify_by_mail( # pylint: disable-msg=too-many-positional-arguments
icon = get_icon_from_severity(severity)
msg = f"""\
URL: {task.url}
URL: {task.url} (IPv{task.ip_version})
Check: {task.check}
Status: {severity}
Time: {result.submitted_at}
@ -109,39 +243,18 @@ See results of task on {request.url_for('get_task_results_view', task_id=task.id
"""
mail = EmailMessage()
mail["Subject"] = f"[Argos] {icon} {urlparse(task.url).netloc}: status {severity}"
mail[
"Subject"
] = f"[Argos] {icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
mail["From"] = config.mailfrom
mail.set_content(msg)
if config.ssl:
logger.debug("Mail notification: SSL")
context = ssl.create_default_context()
smtp = smtplib.SMTP_SSL(host=config.host, port=config.port, context=context)
else:
smtp = smtplib.SMTP(
host=config.host, # type: ignore
port=config.port,
)
if config.starttls:
logger.debug("Mail notification: STARTTLS")
context = ssl.create_default_context()
smtp.starttls(context=context)
if config.auth is not None:
logger.debug("Mail notification: authentification")
smtp.login(config.auth.login, config.auth.password)
for address in config.addresses:
logger.debug("Sending mail to %s", address)
logger.debug(msg)
smtp.send_message(mail, to_addrs=address)
send_mail(mail, config)
def notify_with_gotify( # pylint: disable-msg=too-many-positional-arguments
result, task, severity: str, old_severity: str, config: List[GotifyUrl], request
) -> None:
logger.debug("Will send gotify notification")
headers = {"accept": "application/json", "content-type": "application/json"}
icon = get_icon_from_severity(severity)
priority = 9
@ -152,9 +265,11 @@ def notify_with_gotify( # pylint: disable-msg=too-many-positional-arguments
elif severity == Severity.UNKNOWN:
priority = 5
subject = f"{icon} {urlparse(task.url).netloc}: status {severity}"
subject = (
f"{icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
)
msg = f"""\
URL:    <{task.url}>\\
URL:    <{task.url}> (IPv{task.ip_version})\\
Check:  {task.check}\\
Status: {severity}\\
Time:   {result.submitted_at}\\
@ -175,20 +290,4 @@ See results of task on <{request.url_for('get_task_results_view', task_id=task.i
payload = {"title": subject, "message": msg, "priority": priority, "extras": extras}
for url in config:
logger.debug("Sending gotify message(s) to %s", url.url)
for token in url.tokens:
try:
res = httpx.post(
f"{url.url}message",
params={"token": token},
headers=headers,
json=payload,
)
res.raise_for_status()
except httpx.RequestError as err:
logger.error(
"An error occurred while sending a message to %s with token %s",
err.request.url,
token,
)
send_gotify_msg(config, payload)

View file

@ -1,19 +1,20 @@
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi_login import LoginManager
from pydantic import ValidationError
from fastapi_utils.tasks import repeat_every
from psutil import Process
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
from argos.logging import logger
from argos.logging import logger, set_log_level
from argos.server import models, routes, queries
from argos.server.alerting import no_agent_alert
from argos.server.exceptions import NotAuthenticatedException, auth_exception_handler
from argos.server.settings import read_yaml_config
from argos.server.settings import read_config
def get_application() -> FastAPI:
@ -39,9 +40,7 @@ def get_application() -> FastAPI:
if config.general.ldap is not None:
import ldap
l = ldap.initialize(config.general.ldap.uri)
l.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
appli.state.ldap = l
appli.state.ldap = ldap.initialize(config.general.ldap.uri)
@appli.state.manager.user_loader()
async def query_user(user: str) -> None | str | models.User:
@ -71,17 +70,6 @@ async def connect_to_db(appli):
return appli.state.db
def read_config(yaml_file):
try:
config = read_yaml_config(yaml_file)
return config
except ValidationError as err:
logger.error("Errors where found while reading configuration:")
for error in err.errors():
logger.error("%s is %s", error["loc"], error["type"])
sys.exit(1)
def setup_database(appli):
config = appli.state.config
db_url = str(config.general.db.url)
@ -126,8 +114,47 @@ def create_manager(cookie_secret: str) -> LoginManager:
)
@repeat_every(seconds=120, logger=logger)
async def recurring_tasks() -> None:
"""Recurring DB cleanup and watch-agents tasks"""
# If we are using gunicorn
if not hasattr(app.state, "SessionLocal"):
parent_process = Process(os.getppid())
children = parent_process.children(recursive=True)
# Start the task only once, not for every worker
if children[0].pid == os.getpid():
# and we need to setup database engine
setup_database(app)
else:
return None
set_log_level("info", quiet=True)
logger.info("Start background recurring tasks")
with app.state.SessionLocal() as db:
config = app.state.config.recurring_tasks
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
if agents == 0:
no_agent_alert(app.state.config)
logger.info("Agent presence checked")
removed = await queries.remove_old_results(db, config.max_results_age)
logger.info("%i result(s) removed", removed)
updated = await queries.release_old_locks(db, config.max_lock_seconds)
logger.info("%i lock(s) released", updated)
processed_jobs = await queries.process_jobs(db)
logger.info("%i job(s) processed", processed_jobs)
logger.info("Background recurring tasks ended")
return None
@asynccontextmanager
async def lifespan(appli):
async def lifespan(appli: FastAPI):
"""Server start and stop actions
Setup database connection then close it at shutdown.
@ -142,6 +169,7 @@ async def lifespan(appli):
"There is no tasks in the database. "
'Please launch the command "argos server reload-config"'
)
await recurring_tasks()
yield

View file

@ -0,0 +1,28 @@
"""Add request data to tasks
Revision ID: 31255a412d63
Revises: 80a29f64f91c
Create Date: 2024-12-09 16:40:20.926138
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "31255a412d63"
down_revision: Union[str, None] = "80a29f64f91c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(sa.Column("request_data", sa.String(), nullable=True))
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("request_data")

View file

@ -0,0 +1,36 @@
"""Add job queue
Revision ID: 5f6cb30db996
Revises: bd4b4962696a
Create Date: 2025-02-17 16:56:36.673511
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "5f6cb30db996"
down_revision: Union[str, None] = "bd4b4962696a"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"jobs",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("todo", sa.Enum("RELOAD_CONFIG", name="todo_enum"), nullable=False),
sa.Column("args", sa.String(), nullable=False),
sa.Column(
"current", sa.Boolean(), server_default=sa.sql.false(), nullable=False
),
sa.Column("added_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
def downgrade() -> None:
op.drop_table("jobs")

View file

@ -0,0 +1,34 @@
"""Add IP version to checks
Revision ID: 64f73a79b7d8
Revises: a1e98cf72a5c
Create Date: 2024-12-02 14:12:40.558033
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.dialects.postgresql import ENUM
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "64f73a79b7d8"
down_revision: Union[str, None] = "a1e98cf72a5c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
enum = ENUM("4", "6", name="ip_version_enum", create_type=False)
enum.create(op.get_bind(), checkfirst=True)
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column("ip_version", enum, server_default="4", nullable=False)
)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("ip_version")
ENUM(name="ip_version_enum").drop(op.get_bind(), checkfirst=True)

View file

@ -0,0 +1,41 @@
"""Add retries before notification feature
Revision ID: 80a29f64f91c
Revises: 8b58ced14d6e
Create Date: 2024-12-04 17:03:35.104368
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "80a29f64f91c"
down_revision: Union[str, None] = "8b58ced14d6e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"retry_before_notification",
sa.Integer(),
server_default="0",
nullable=False,
)
)
batch_op.add_column(
sa.Column(
"contiguous_failures", sa.Integer(), server_default="0", nullable=False
)
)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("contiguous_failures")
batch_op.drop_column("retry_before_notification")

View file

@ -0,0 +1,35 @@
"""Add task index
Revision ID: 8b58ced14d6e
Revises: 64f73a79b7d8
Create Date: 2024-12-03 16:41:44.842213
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "8b58ced14d6e"
down_revision: Union[str, None] = "64f73a79b7d8"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(sa.Column("task_group", sa.String(), nullable=True))
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.execute(
"UPDATE tasks SET task_group = method || '-' || ip_version || '-' || url"
)
batch_op.alter_column("task_group", nullable=False)
batch_op.create_index("similar_tasks", ["task_group"], unique=False)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_index("similar_tasks")
batch_op.drop_column("task_group")

View file

@ -0,0 +1,42 @@
"""Use bigint for results id field
Revision ID: bd4b4962696a
Revises: 31255a412d63
Create Date: 2025-01-06 11:44:37.552965
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "bd4b4962696a"
down_revision: Union[str, None] = "31255a412d63"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
bind = op.get_bind()
if bind.engine.name != "sqlite":
with op.batch_alter_table("results", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False,
)
def downgrade() -> None:
bind = op.get_bind()
if bind.engine.name != "sqlite":
with op.batch_alter_table("results", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False,
)

View file

@ -5,6 +5,7 @@ Revises: c780864dc407
Create Date: 2024-11-26 14:40:27.510587
"""
from typing import Sequence, Union
from alembic import op
@ -19,11 +20,7 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"method",
sa.Enum(
enum = sa.Enum(
"GET",
"HEAD",
"POST",
@ -34,7 +31,14 @@ def upgrade() -> None:
"PATCH",
"DELETE",
name="method",
),
create_type=False,
)
enum.create(op.get_bind(), checkfirst=True)
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"method",
enum,
nullable=False,
server_default="GET",
)
@ -44,3 +48,4 @@ def upgrade() -> None:
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("method")
sa.Enum(name="method").drop(op.get_bind(), checkfirst=True)

View file

@ -1,6 +1,7 @@
"""Database models"""
from datetime import datetime, timedelta
from hashlib import md5
from typing import List, Literal
from sqlalchemy import (
@ -9,16 +10,42 @@ from sqlalchemy import (
ForeignKey,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.schema import Index
from argos.checks import BaseCheck, get_registered_check
from argos.schemas import WebsiteCheck
from argos.schemas.utils import Method
from argos.schemas.utils import IPVersion, Method, Todo
def compute_task_group(context) -> str:
data = context.current_parameters["request_data"]
if data is None:
data = ""
return (
f"{context.current_parameters['method']}-"
f"{context.current_parameters['ip_version']}-"
f"{context.current_parameters['url']}-"
f"{md5(data.encode()).hexdigest()}"
)
class Base(DeclarativeBase):
type_annotation_map = {List[WebsiteCheck]: JSON, dict: JSON}
class Job(Base):
"""
Job queue emulation
"""
__tablename__ = "jobs"
id: Mapped[int] = mapped_column(primary_key=True)
todo: Mapped[Todo] = mapped_column(Enum("RELOAD_CONFIG", name="todo_enum"))
args: Mapped[str] = mapped_column()
current: Mapped[bool] = mapped_column(insert_default=False)
added_at: Mapped[datetime] = mapped_column()
class Task(Base):
"""
There is one task per check.
@ -33,11 +60,16 @@ class Task(Base):
# Info needed to run the task
url: Mapped[str] = mapped_column()
domain: Mapped[str] = mapped_column()
ip_version: Mapped[IPVersion] = mapped_column(
Enum("4", "6", name="ip_version_enum"),
)
check: Mapped[str] = mapped_column()
expected: Mapped[str] = mapped_column()
frequency: Mapped[float] = mapped_column()
recheck_delay: Mapped[float] = mapped_column(nullable=True)
already_retried: Mapped[bool] = mapped_column(insert_default=False)
retry_before_notification: Mapped[int] = mapped_column(insert_default=0)
contiguous_failures: Mapped[int] = mapped_column(insert_default=0)
method: Mapped[Method] = mapped_column(
Enum(
"GET",
@ -53,12 +85,14 @@ class Task(Base):
),
insert_default="GET",
)
request_data: Mapped[str] = mapped_column(nullable=True)
# Orchestration-related
selected_by: Mapped[str] = mapped_column(nullable=True)
selected_at: Mapped[datetime] = mapped_column(nullable=True)
completed_at: Mapped[datetime] = mapped_column(nullable=True)
next_run: Mapped[datetime] = mapped_column(nullable=True)
task_group: Mapped[str] = mapped_column(insert_default=compute_task_group)
severity: Mapped[Literal["ok", "warning", "critical", "unknown"]] = mapped_column(
Enum("ok", "warning", "critical", "unknown", name="severity"),
@ -72,8 +106,8 @@ class Task(Base):
passive_deletes=True,
)
def __str__(self):
return f"DB Task {self.url} - {self.check} - {self.expected}"
def __str__(self) -> str:
return f"DB Task {self.url} (IPv{self.ip_version}) - {self.check} - {self.expected}"
def get_check(self) -> BaseCheck:
"""Returns a check instance for this specific task"""
@ -114,6 +148,9 @@ class Task(Base):
return self.last_result.status
Index("similar_tasks", Task.task_group)
class Result(Base):
"""There are multiple results per task.

View file

@ -4,25 +4,27 @@ from hashlib import sha256
from typing import List
from urllib.parse import urljoin
from sqlalchemy import asc, desc, func
from sqlalchemy import asc, func, Select
from sqlalchemy.orm import Session
from argos import schemas
from argos.logging import logger
from argos.server.models import Result, Task, ConfigCache, User
from argos.server.models import ConfigCache, Job, Result, Task, User
from argos.server.settings import read_config
async def list_tasks(db: Session, agent_id: str, limit: int = 100):
"""List tasks and mark them as selected"""
tasks = (
db.query(Task)
subquery = (
db.query(func.distinct(Task.task_group))
.filter(
Task.selected_by == None, # noqa: E711
((Task.next_run <= datetime.now()) | (Task.next_run == None)), # noqa: E711
)
.limit(limit)
.all()
.subquery()
)
tasks = db.query(Task).filter(Task.task_group.in_(Select(subquery))).all()
now = datetime.now()
for task in tasks:
@ -82,13 +84,22 @@ async def count_results(db: Session):
return db.query(Result).count()
async def has_config_changed(db: Session, config: schemas.Config) -> bool:
async def has_config_changed(db: Session, config: schemas.Config) -> bool: # pylint: disable-msg=too-many-statements
"""Check if websites config has changed by using a hashsum and a config cache"""
websites_hash = sha256(str(config.websites).encode()).hexdigest()
conf_caches = db.query(ConfigCache).all()
same_config = True
keys = [
"websites_hash",
"general_frequency",
"general_recheck_delay",
"general_retry_before_notification",
"general_ipv4",
"general_ipv6",
]
if conf_caches:
for conf in conf_caches:
keys.remove(conf.name)
match conf.name:
case "websites_hash":
if conf.val != websites_hash:
@ -105,9 +116,67 @@ async def has_config_changed(db: Session, config: schemas.Config) -> bool:
same_config = False
conf.val = str(config.general.recheck_delay)
conf.updated_at = datetime.now()
case "general_retry_before_notification":
if conf.val != str(config.general.retry_before_notification):
same_config = False
conf.val = str(config.general.retry_before_notification)
conf.updated_at = datetime.now()
case "general_ipv4":
if conf.val != str(config.general.ipv4):
same_config = False
conf.val = str(config.general.ipv4)
conf.updated_at = datetime.now()
case "general_ipv6":
if conf.val != str(config.general.ipv6):
same_config = False
conf.val = str(config.general.ipv6)
conf.updated_at = datetime.now()
for i in keys:
match i:
case "websites_hash":
c = ConfigCache(
name="websites_hash",
val=websites_hash,
updated_at=datetime.now(),
)
case "general_frequency":
c = ConfigCache(
name="general_frequency",
val=str(config.general.frequency),
updated_at=datetime.now(),
)
case "general_recheck_delay":
c = ConfigCache(
name="general_recheck_delay",
val=str(config.general.recheck_delay),
updated_at=datetime.now(),
)
case "general_retry_before_notification":
c = ConfigCache(
name="general_retry_before_notification",
val=str(config.general.retry_before_notification),
updated_at=datetime.now(),
)
case "general_ipv4":
c = ConfigCache(
name="general_ipv4",
val=str(config.general.ipv4),
updated_at=datetime.now(),
)
case "general_ipv6":
c = ConfigCache(
name="general_ipv6",
val=str(config.general.ipv6),
updated_at=datetime.now(),
)
db.add(c)
db.commit()
if keys:
return True
if same_config:
return False
@ -125,31 +194,98 @@ async def has_config_changed(db: Session, config: schemas.Config) -> bool:
val=str(config.general.recheck_delay),
updated_at=datetime.now(),
)
gen_retry_before_notif = ConfigCache(
name="general_retry_before_notification",
val=str(config.general.retry_before_notification),
updated_at=datetime.now(),
)
gen_ipv4 = ConfigCache(
name="general_ipv4",
val=str(config.general.ipv4),
updated_at=datetime.now(),
)
gen_ipv6 = ConfigCache(
name="general_ipv6",
val=str(config.general.ipv6),
updated_at=datetime.now(),
)
db.add(web_hash)
db.add(gen_freq)
db.add(gen_recheck)
db.add(gen_retry_before_notif)
db.add(gen_ipv4)
db.add(gen_ipv6)
db.commit()
return True
async def update_from_config(db: Session, config: schemas.Config):
"""Update tasks from config file"""
config_changed = await has_config_changed(db, config)
if not config_changed:
return {"added": 0, "vanished": 0}
async def update_from_config_later(db: Session, config_file):
"""Ask Argos to reload configuration in a recurring task"""
jobs = (
db.query(Job)
.filter(
Job.todo == "RELOAD_CONFIG",
Job.args == config_file,
Job.current == False,
)
.all()
)
if jobs:
return "There is already a config reloading job in the job queue, for the same file"
job = Job(todo="RELOAD_CONFIG", args=config_file, added_at=datetime.now())
db.add(job)
db.commit()
return "Config reloading has been added in the job queue"
async def process_jobs(db: Session) -> int:
"""Process job queue"""
jobs = db.query(Job).filter(Job.current == False).all()
if jobs:
for job in jobs:
job.current = True
db.commit()
if job.todo == "RELOAD_CONFIG":
logger.info("Processing job %i: %s %s", job.id, job.todo, job.args)
_config = read_config(job.args)
changed = await update_from_config(db, _config)
logger.info("%i task(s) added", changed["added"])
logger.info("%i task(s) deleted", changed["vanished"])
db.delete(job)
db.commit()
return len(jobs)
return 0
async def update_from_config(db: Session, config: schemas.Config): # pylint: disable-msg=too-many-branches
"""Update tasks from config file"""
max_task_id = (
db.query(func.max(Task.id).label("max_id")).all() # pylint: disable-msg=not-callable
)[0].max_id
tasks = []
unique_properties = []
seen_tasks: List[int] = []
for website in config.websites:
for website in config.websites: # pylint: disable-msg=too-many-nested-blocks
domain = str(website.domain)
frequency = website.frequency or config.general.frequency
recheck_delay = website.recheck_delay or config.general.recheck_delay
retry_before_notification = (
website.retry_before_notification
if website.retry_before_notification is not None
else config.general.retry_before_notification
)
ipv4 = website.ipv4 if website.ipv4 is not None else config.general.ipv4
ipv6 = website.ipv6 if website.ipv6 is not None else config.general.ipv6
if ipv4 is False and ipv6 is False:
logger.warning("IPv4 AND IPv6 are disabled on website %s!", domain)
continue
for ip_version in ["4", "6"]:
for p in website.paths:
url = urljoin(domain, str(p.path))
for check_key, expected in p.checks:
@ -159,43 +295,72 @@ async def update_from_config(db: Session, config: schemas.Config):
.filter(
Task.url == url,
Task.method == p.method,
Task.request_data == p.request_data,
Task.check == check_key,
Task.expected == expected,
Task.ip_version == ip_version,
)
.all()
)
if (ip_version == "4" and ipv4 is False) or (
ip_version == "6" and ipv6 is False
):
continue
if existing_tasks:
existing_task = existing_tasks[0]
seen_tasks.append(existing_task.id)
if frequency != existing_task.frequency:
existing_task.frequency = frequency
if recheck_delay != existing_task.recheck_delay:
existing_task.recheck_delay = recheck_delay # type: ignore[assignment]
if (
retry_before_notification
!= existing_task.retry_before_notification
):
existing_task.retry_before_notification = (
retry_before_notification
)
logger.debug(
"Skipping db task creation for url=%s, "
"method=%s, check_key=%s, expected=%s, "
"frequency=%s, recheck_delay=%s.",
"frequency=%s, recheck_delay=%s, "
"retry_before_notification=%s, ip_version=%s.",
url,
p.method,
check_key,
expected,
frequency,
recheck_delay,
retry_before_notification,
ip_version,
)
else:
properties = (url, check_key, expected)
properties = (
url,
p.method,
check_key,
expected,
ip_version,
p.request_data,
)
if properties not in unique_properties:
unique_properties.append(properties)
task = Task(
domain=domain,
url=url,
ip_version=ip_version,
method=p.method,
request_data=p.request_data,
check=check_key,
expected=expected,
frequency=frequency,
recheck_delay=recheck_delay,
retry_before_notification=retry_before_notification,
already_retried=False,
)
logger.debug("Adding a new task in the db: %s", task)
@ -213,7 +378,8 @@ async def update_from_config(db: Session, config: schemas.Config):
)
db.commit()
logger.info(
"%i tasks has been removed since not in config file anymore", vanished_tasks
"%i task(s) has been removed since not in config file anymore",
vanished_tasks,
)
return {"added": len(tasks), "vanished": vanished_tasks}
@ -243,26 +409,11 @@ async def reschedule_all(db: Session):
db.commit()
async def remove_old_results(db: Session, max_results: int):
tasks = db.query(Task).all()
deleted = 0
for task in tasks:
# Get the id of the oldest result to keep
subquery = (
db.query(Result.id)
.filter(Result.task_id == task.id)
.order_by(desc(Result.id))
.limit(max_results)
.subquery()
)
min_id = db.query(func.min(subquery.c.id)).scalar() # pylint: disable-msg=not-callable
# Delete all the results older than min_id
if min_id:
deleted += (
db.query(Result)
.where(Result.id < min_id, Result.task_id == task.id)
.delete()
async def remove_old_results(db: Session, max_results_age: float):
"""Remove old results, base on age"""
max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age)
deleted = (
db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete()
)
db.commit()

View file

@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
from argos.logging import logger
from argos.schemas import AgentResult, Config, Task
from argos.server import queries
from argos.server.alerting import handle_alert
from argos.server.alerting import handle_alert, need_alert
from argos.server.routes.dependencies import get_config, get_db, verify_token
route = APIRouter()
@ -58,16 +58,26 @@ async def create_results( # pylint: disable-msg=too-many-positional-arguments
logger.error("Unable to find task %i", agent_result.task_id)
else:
last_severity = task.severity
last_severity_update = task.last_severity_update
result = await queries.create_result(db, agent_result, agent_id)
check = task.get_check()
status, severity = await check.finalize(config, result, **result.context)
result.set_status(status, severity)
task.set_times_severity_and_deselect(severity, result.submitted_at)
# Dont create an alert if the severity has not changed
if last_severity != severity:
send_notif = need_alert(
last_severity, last_severity_update, severity, status, task
)
if send_notif:
background_tasks.add_task(
handle_alert, config, result, task, severity, last_severity, request
handle_alert,
config,
result,
task,
severity,
last_severity,
request,
)
db_results.append(result)

View file

@ -2,6 +2,8 @@ from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi_login import LoginManager
from argos.logging import logger
auth_scheme = HTTPBearer()
@ -33,12 +35,19 @@ async def verify_token(
return token
async def find_ldap_user(config, ldap, user: str) -> str | None:
async def find_ldap_user(config, ldapobj, user: str) -> str | None:
"""Do a LDAP search for user and return its dn"""
import ldap
import ldap.filter as ldap_filter
from ldapurl import LDAP_SCOPE_SUBTREE
result = ldap.search_s(
try:
ldapobj.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
except ldap.LDAPError as err: # pylint: disable-msg=no-member
logger.error("LDAP error: %s", err)
return None
result = ldapobj.search_s(
config.general.ldap.user_tree,
LDAP_SCOPE_SUBTREE,
filterstr=ldap_filter.filter_format(

View file

@ -90,6 +90,15 @@ async def post_login(
from ldap import INVALID_CREDENTIALS # pylint: disable-msg=no-name-in-module
from argos.server.routes.dependencies import find_ldap_user
invalid_credentials = templates.TemplateResponse(
"login.html",
{
"request": request,
"msg": "Sorry, invalid username or bad password. "
"Or the LDAP server is unreachable (see logs to verify).",
},
)
ldap_dn = await find_ldap_user(config, request.app.state.ldap, username)
if ldap_dn is None:
return invalid_credentials
@ -357,8 +366,21 @@ async def set_refresh_cookies_view(
request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER,
)
response.set_cookie(key="auto_refresh_enabled", value=str(auto_refresh_enabled))
# Cookies age in Chrome cant be more than 400 days
# https://developer.chrome.com/blog/cookie-max-age-expires
delta = int(timedelta(days=400).total_seconds())
response.set_cookie(
key="auto_refresh_seconds", value=str(max(5, int(auto_refresh_seconds)))
key="auto_refresh_enabled",
value=str(auto_refresh_enabled),
httponly=True,
samesite="strict",
expires=delta,
)
response.set_cookie(
key="auto_refresh_seconds",
value=str(max(5, int(auto_refresh_seconds))),
httponly=True,
samesite="strict",
expires=delta,
)
return response

View file

@ -1,12 +1,26 @@
"""Pydantic schemas for server"""
import sys
from pathlib import Path
import yaml
from yamlinclude import YamlIncludeConstructor
from pydantic import ValidationError
from argos.logging import logger
from argos.schemas.config import Config
def read_config(yaml_file):
try:
config = read_yaml_config(yaml_file)
return config
except ValidationError as err:
logger.error("Errors where found while reading configuration:")
for error in err.errors():
logger.error("%s is %s", error["loc"], error["type"])
sys.exit(1)
def read_yaml_config(filename: str) -> Config:
parsed = _load_yaml(filename)
return Config(**parsed)

View file

@ -16,7 +16,7 @@
<tbody id="domains-body">
{% for task in tasks %}
<tr scope="row">
<td>{{ task.url }}</td>
<td>{{ task.url }} (IPv{{ task.ip_version }})</td>
<td>{{ task.check }}</td>
<td class="status highlight">
{% if task.status %}

View file

@ -82,6 +82,48 @@ caption: argos-config.yaml
- json-is: '{"foo": "bar", "baz": 42}'
```
## Add data to requests
If you want to specify query parameters, just put them in the path:
```{code-block} yaml
websites:
- domain: "https://contact.example.org"
paths:
- path: "/index.php?action=show_messages"
method: "GET"
```
If you want, for example, to test a form and send some data to it:
```{code-block} yaml
websites:
- domain: "https://contact.example.org"
paths:
- path: "/"
method: "POST"
request_data:
# These are the data sent to the server: title and msg
data:
title: "Hello my friend"
msg: "How are you today?"
# To send data as JSON (optional, default is false):
is_json: true
```
If you need to send some headers in the request:
```{code-block} yaml
websites:
- domain: "https://contact.example.org"
paths:
- path: "/api/mail"
method: "PUT"
request_data:
headers:
Authorization: "Bearer foo-bar-baz"
```
## SSL certificate expiration
Checks that the SSL certificate will not expire soon. You need to define the thresholds in the configuration, and set the `on-check` option to enable the check.

View file

@ -60,7 +60,9 @@ Options:
--max-tasks INTEGER Number of concurrent tasks this agent can run
--wait-time INTEGER Waiting time between two polls on the server
(seconds)
--log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL]
--log-level [debug|info|warning|error|critical]
--user-agent TEXT A custom string to append to the User-Agent
header
--help Show this message and exit.
```
@ -82,7 +84,6 @@ Options:
--help Show this message and exit.
Commands:
cleandb Clean the database (to run routinely)
generate-config Output a self-documented example config file.
generate-token Generate a token for agents
migrate Run database migrations
@ -93,7 +94,6 @@ Commands:
test-gotify Send a test gotify notification
test-mail Send a test email
user User management
watch-agents Watch agents (to run routinely)
```
<!--[[[end]]]
@ -150,65 +150,6 @@ Options:
-->
### Server cleandb
<!--
.. [[[cog
help(["server", "cleandb", "--help"])
.. ]]] -->
```man
Usage: argos server cleandb [OPTIONS]
Clean the database (to run routinely)
- Removes old results from the database.
- Removes locks from tasks that have been locked for too long.
Options:
--max-results INTEGER Number of results per task to keep
--max-lock-seconds INTEGER The number of seconds after which a lock is
considered stale, must be higher than 60 (the
checks have a timeout value of 60 seconds)
--config TEXT Path of the configuration file. If ARGOS_YAML_FILE
environment variable is set, its value will be
used instead. Default value: argos-config.yaml and
/etc/argos/config.yaml as fallback.
--help Show this message and exit.
```
<!--[[[end]]]
-->
### Server watch-agents
<!--
.. [[[cog
help(["server", "cleandb", "--help"])
.. ]]] -->
```man
Usage: argos server cleandb [OPTIONS]
Clean the database (to run routinely)
- Removes old results from the database.
- Removes locks from tasks that have been locked for too long.
Options:
--max-results INTEGER Number of results per task to keep
--max-lock-seconds INTEGER The number of seconds after which a lock is
considered stale, must be higher than 60 (the
checks have a timeout value of 60 seconds)
--config TEXT Path of the configuration file. If ARGOS_YAML_FILE
environment variable is set, its value will be
used instead. Default value: argos-config.yaml and
/etc/argos/config.yaml as fallback.
--help Show this message and exit.
```
<!--[[[end]]]
-->
### Server reload-config
<!--
@ -222,9 +163,14 @@ Usage: argos server reload-config [OPTIONS]
Read tasks configuration and add/delete tasks in database if needed
Options:
--config TEXT Path of the configuration file. If ARGOS_YAML_FILE environment
variable is set, its value will be used instead. Default value:
argos-config.yaml and /etc/argos/config.yaml as fallback.
--config TEXT Path of the configuration file. If ARGOS_YAML_FILE
environment variable is set, its value will be used
instead. Default value: argos-config.yaml and
/etc/argos/config.yaml as fallback.
--enqueue / --no-enqueue Let Argos main recurring tasks handle
configurations loading. It may delay the
application of the new configuration up to 2
minutes. Default is --no-enqueue
--help Show this message and exit.
```

View file

@ -14,7 +14,9 @@ description: Many thanks to their developers!
- [Alembic](https://alembic.sqlalchemy.org) is used for DB migrations;
- [Tenacity](https://github.com/jd/tenacity) a small utility to retry a function in case an error occured;
- [Uvicorn](https://www.uvicorn.org/) is the tool used to run our server;
- [Gunicorn](https://gunicorn.org/) is the recommended WSGI HTTP server for production.
- [Gunicorn](https://gunicorn.org/) is the recommended WSGI HTTP server for production;
- [Apprise](https://github.com/caronc/apprise/wiki) allows Argos to send notifications through a lot of channels;
- [FastAPI Utilities](https://fastapiutils.github.io/fastapi-utils/) is in charge of recurring tasks.
## CSS framework

View file

@ -15,3 +15,7 @@ venv/bin/alembic -c argos/server/migrations/alembic.ini revision \
```
Edit the created file to remove comments and adapt it to make sure the migration is complete (Alembic is not powerful enough to cover all the corner cases).
In case you want to add an `Enum` type and use it in an existing table, please have a look at [`argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py).
If you want to add an `Enum` type in a new table, you can do like in [`argos/server/migrations/versions/7d480e6f1112_initial_migrations.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/7d480e6f1112_initial_migrations.py)

View file

@ -41,7 +41,8 @@ git add argos/__init__.py CHANGELOG.md
git commit -m "🏷 — Bump version ($(hatch version))"
# Create a tag on the git repository and push it
git tag "$(hatch version)" && git push
git tag "$(hatch version)" -m "$(hatch version)" &&
git push --follow-tags
# Build the project
hatch build --clean

View file

@ -191,18 +191,6 @@ The only requirement is that the agent can reach the server through HTTP or HTTP
argos agent http://localhost:8000 "auth-token"
```
## Cleaning the database
You have to run cleaning task periodically. `argos server cleandb --help` will give you more information on how to do that.
Here is a crontab example, which will clean the db each hour:
```bash
# Run the cleaning tasks every hour (at minute 7)
# Keeps 10 results per task, and remove tasks locks older than 1 hour
7 * * * * argos server cleandb --max-results 10 --max-lock-seconds 3600
```
## Watch the agents
In order to be sure that agents are up and communicate with the server, you can periodically run the `argos server watch-agents` command.

View file

@ -90,13 +90,13 @@ User=argos
WorkingDirectory=/opt/argos/
EnvironmentFile=/etc/default/argos-server
ExecStartPre=/opt/argos/venv/bin/argos server migrate
ExecStartPre=/opt/argos/venv/bin/argos server reload-config
ExecStartPre=/opt/argos/venv/bin/argos server reload-config --enqueue
ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
--workers \$ARGOS_SERVER_WORKERS \\
--worker-class uvicorn.workers.UvicornWorker \\
--bind \$ARGOS_SERVER_SOCKET \\
--forwarded-allow-ips \$ARGOS_SERVER_FORWARDED_ALLOW_IPS
ExecReload=/opt/argos/venv/bin/argos server reload-config
ExecReload=/opt/argos/venv/bin/argos server reload-config --enqueue
SyslogIdentifier=argos-server
[Install]
@ -153,8 +153,7 @@ If all works well, you have to put some cron tasks in `argos` crontab:
```bash
cat <<EOF | crontab -u argos -
*/10 * * * * /opt/argos/venv/bin/argos server cleandb --max-lock-seconds 120 --max-results 1200
*/10 * * * * /opt/argos/venv/bin/argos server watch-agents --time-without-agent 10
*/10 * * * * /opt/argos/venv/bin/argos server watch-agents --time-without-agent 10:
EOF
```

View file

@ -28,10 +28,12 @@ dependencies = [
"durations-nlp>=1.0.1,<2",
"fastapi>=0.103,<0.104",
"fastapi-login>=1.10.0,<2",
"fastapi-utils>=0.8.0,<0.9",
"httpx>=0.27.2,<0.28.0",
"Jinja2>=3.0,<4",
"jsonpointer>=3.0,<4",
"passlib>=1.7.4,<2",
"psutil>=5.9.8,<6",
"psycopg2-binary>=2.9,<3",
"pydantic[email]>=2.4,<3",
"pydantic-settings>=2.0,<3",
@ -41,6 +43,7 @@ dependencies = [
"sqlalchemy[asyncio]>=2.0,<3",
"sqlalchemy-utils>=0.41,<1",
"tenacity>=8.2,<9",
"typing_inspect>=0.9.0,<1",
"uvicorn>=0.23,<1",
]

View file

@ -1,9 +1,21 @@
---
general:
# Except for frequency and recheck_delay settings, changes in general
# section of the configuration will need a restart of argos server.
db:
# The database URL, as defined in SQLAlchemy docs : https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls
# The database URL, as defined in SQLAlchemy docs:
# https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls
url: "sqlite:////tmp/test-argos.db"
# Can be "production", "dev", "test".
# If not present, default value is "production"
env: test
# To get a good string for cookie_secret, run:
# openssl rand -hex 32
cookie_secret: "foo-bar-baz"
# Default delay for checks.
# Can be superseeded in domain configuration.
# For ex., to run checks every 5 minutes:
frequency: "1m"
alerts:
ok:
@ -14,12 +26,37 @@ general:
- local
unknown:
- local
no_agent:
- local
service:
secrets:
# Secrets can be generated using `argos server generate-token`.
# You need at least one. Write them as a list, like:
# - secret_token
- "O4kt8Max9/k0EmHaEJ0CGGYbBNFmK8kOZNIoUk3Kjwc"
- "x1T1VZR51pxrv5pQUyzooMG4pMUvHNMhA5y/3cUsYVs="
ssl:
thresholds:
- "1d": critical
"5d": warning
- "5d": warning
# Argos will execute some tasks in the background for you
# every 2 minutes and needs some configuration for that
recurring_tasks:
# Maximum age of results
# Use m for minutes, h for hours, d for days
# w for weeks, M for months, y for years
# See https://github.com/timwedde/durations_nlp#scales-reference for details
max_results_age: "1d"
# Max number of seconds a task can be locked
# Minimum value is 61, default is 100
max_lock_seconds: 100
# Max number of seconds without seing an agent
# before sending an alert
# Minimum value is 61, default is 300
time_without_agent: 300
# It's also possible to define the checks in another file
# with the include syntax:
#
websites: !include websites.yaml

View file

@ -21,7 +21,7 @@ def test_tasks_retrieval_and_results(authorized_client, app):
assert response.status_code == 200
tasks = response.json()
assert len(tasks) == 2
assert len(tasks) == 4
results = []
for task in tasks:
@ -33,7 +33,7 @@ def test_tasks_retrieval_and_results(authorized_client, app):
response = client.post("/api/results", json=data)
assert response.status_code == 201
assert app.state.db.query(models.Result).count() == 2
assert app.state.db.query(models.Result).count() == 4
# The list of tasks should be empty now
response = client.get("/api/tasks")
@ -60,6 +60,7 @@ def ssl_task(db):
task = models.Task(
url="https://exemple.com/",
domain="https://exemple.com/",
ip_version="6",
method="GET",
check="ssl-certificate-expiration",
expected="on-check",

View file

@ -35,8 +35,13 @@ def ssl_task(now):
id=1,
url="https://example.org",
domain="https://example.org",
ip_version="6",
method="GET",
request_data=None,
task_group="GET-6-https://example.org",
check="ssl-certificate-expiration",
retry_before_notification=0,
contiguous_failures=0,
expected="on-check",
selected_at=now,
selected_by="pytest",

View file

@ -10,9 +10,9 @@ from argos.server.models import Result, Task, User
@pytest.mark.asyncio
async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name
for _task in ten_tasks:
for _ in range(5):
for iterator in range(5):
result = Result(
submitted_at=datetime.now(),
submitted_at=datetime.now() - timedelta(seconds=iterator * 2),
status="success",
context={"foo": "bar"},
task=_task,
@ -24,12 +24,12 @@ async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefi
# So we have 5 results per tasks
assert db.query(Result).count() == 50
# Keep only 2
deleted = await queries.remove_old_results(db, 2)
assert deleted == 30
assert db.query(Result).count() == 20
# Keep only those newer than 1 second ago
deleted = await queries.remove_old_results(db, 6)
assert deleted == 20
assert db.query(Result).count() == 30
for _task in ten_tasks:
assert db.query(Result).filter(Result.task == _task).count() == 2
assert db.query(Result).filter(Result.task == _task).count() == 3
@pytest.mark.asyncio
@ -70,7 +70,7 @@ async def test_update_from_config_with_duplicate_tasks(db, empty_config): # py
await queries.update_from_config(db, empty_config)
# Only one path has been saved in the database
assert db.query(Task).count() == 1
assert db.query(Task).count() == 2
# Calling again with the same data works, and will not result in more tasks being
# created.
@ -87,6 +87,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
same_task = Task(
url=task.url,
domain=task.domain,
ip_version="6",
check=task.check,
expected=task.expected,
frequency=task.frequency,
@ -108,7 +109,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
empty_config.websites = [website]
await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 2
assert db.query(Task).count() == 4
website = schemas.config.Website(
domain=task.domain,
@ -122,7 +123,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
empty_config.websites = [website]
await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 1
assert db.query(Task).count() == 2
@pytest.mark.asyncio
@ -136,7 +137,7 @@ async def test_update_from_config_db_updates_existing_tasks(db, empty_config, ta
empty_config.websites = [website]
await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 1
assert db.query(Task).count() == 2
@pytest.mark.asyncio
@ -212,6 +213,7 @@ def task(db):
_task = Task(
url="https://www.example.com",
domain="https://www.example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,
@ -233,6 +235,7 @@ def empty_config():
warning=["", ""],
critical=["", ""],
unknown=["", ""],
no_agent=["", ""],
),
),
service=schemas.config.Service(
@ -241,6 +244,11 @@ def empty_config():
]
),
ssl=schemas.config.SSL(thresholds=[]),
recurring_tasks=schemas.config.RecurringTasks(
max_results_age="6s",
max_lock_seconds=120,
time_without_agent=300,
),
websites=[],
)
@ -271,6 +279,7 @@ def ten_locked_tasks(db):
_task = Task(
url="https://www.example.com",
domain="example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,
@ -291,6 +300,7 @@ def ten_tasks(db):
_task = Task(
url="https://www.example.com",
domain="example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,
@ -311,6 +321,7 @@ def ten_warning_tasks(db):
_task = Task(
url="https://www.example.com",
domain="example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,
@ -331,6 +342,7 @@ def ten_critical_tasks(db):
_task = Task(
url="https://www.example.com",
domain="example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,
@ -351,6 +363,7 @@ def ten_ok_tasks(db):
_task = Task(
url="https://www.example.com",
domain="example.com",
ip_version="6",
check="body-contains",
expected="foo",
frequency=1,

View file

@ -1,3 +1,4 @@
---
- domain: "https://mypads.framapad.org"
paths:
- path: "/mypads/"