🔀 Merge remote-tracking branch 'origin/develop'

This commit is contained in:
Luc Didry 2025-01-14 16:39:53 +01:00
commit acd90133bd
No known key found for this signature in database
GPG key ID: EA868E12D0257E3C
22 changed files with 693 additions and 105 deletions

View file

@ -2,6 +2,14 @@
## [Unreleased] ## [Unreleased]
- ✨ — IPv4/IPv6 choice for checks, and choice for a dual-stack check (#69)
- ⚡ — Mutualize check requests (#68)
- ✨ — Ability to delay notification after X failures (#71)
- 🐛 — Fix bug when changing IP version not removing tasks (#72)
- ✨ — Allow to specify form data and headers for checks (#70)
- 🚸 — Add a long expiration date on auto-refresh cookies
- 🗃️ — Use bigint type for results id column in PostgreSQL (#73)
## 0.6.1 ## 0.6.1
Date: 2024-11-28 Date: 2024-11-28

View file

@ -6,6 +6,7 @@ import asyncio
import json import json
import logging import logging
import socket import socket
from hashlib import md5
from time import sleep from time import sleep
from typing import List from typing import List
@ -33,7 +34,7 @@ def log_failure(retry_state):
) )
class ArgosAgent: class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
"""The Argos agent is responsible for running the checks and reporting the results.""" """The Argos agent is responsible for running the checks and reporting the results."""
def __init__(self, server: str, auth: str, max_tasks: int, wait_time: int): def __init__(self, server: str, auth: str, max_tasks: int, wait_time: int):
@ -41,18 +42,33 @@ class ArgosAgent:
self.max_tasks = max_tasks self.max_tasks = max_tasks
self.wait_time = wait_time self.wait_time = wait_time
self.auth = auth self.auth = auth
self._http_client = None self._http_client: httpx.AsyncClient | None = None
self._http_client_v4: httpx.AsyncClient | None = None
self._http_client_v6: httpx.AsyncClient | None = None
self._res_cache: dict[str, httpx.Response] = {}
self.agent_id = socket.gethostname() self.agent_id = socket.gethostname()
@retry(after=log_failure, wait=wait_random(min=1, max=2)) @retry(after=log_failure, wait=wait_random(min=1, max=2))
async def run(self): async def run(self):
headers = { auth_header = {
"Authorization": f"Bearer {self.auth}", "Authorization": f"Bearer {self.auth}",
"User-Agent": f"Argos Panoptes agent {VERSION}",
}
self._http_client = httpx.AsyncClient(headers=auth_header)
ua_header = {
"User-Agent": f"Argos Panoptes {VERSION} " "User-Agent": f"Argos Panoptes {VERSION} "
"(about: https://argos-monitoring.framasoft.org/)", "(about: https://argos-monitoring.framasoft.org/)",
} }
self._http_client = httpx.AsyncClient(headers=headers) self._http_client_v4 = httpx.AsyncClient(
headers=ua_header,
transport=httpx.AsyncHTTPTransport(local_address="0.0.0.0"),
)
self._http_client_v6 = httpx.AsyncClient(
headers=ua_header, transport=httpx.AsyncHTTPTransport(local_address="::")
)
logger.info("Running agent against %s", self.server) logger.info("Running agent against %s", self.server)
async with self._http_client: async with self._http_client:
while "forever": while "forever":
@ -61,27 +77,71 @@ class ArgosAgent:
logger.info("Waiting %i seconds before next retry", self.wait_time) logger.info("Waiting %i seconds before next retry", self.wait_time)
await asyncio.sleep(self.wait_time) await asyncio.sleep(self.wait_time)
async def _do_request(self, group: str, details: dict):
headers = {}
if details["request_data"] is not None:
request_data = json.loads(details["request_data"])
if request_data["headers"] is not None:
headers = request_data["headers"]
if details["ip_version"] == "4":
http_client = self._http_client_v4
else:
http_client = self._http_client_v6
try:
if details["request_data"] is None or request_data["data"] is None:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
timeout=60,
)
elif request_data["json"]:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
json=request_data["data"],
timeout=60,
)
else:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
headers=headers,
data=request_data["data"],
timeout=60,
)
except httpx.ReadError:
sleep(1)
if details["request_data"] is None or request_data["data"] is None:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"], url=details["url"], timeout=60
)
elif request_data["json"]:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
json=request_data["data"],
timeout=60,
)
else:
response = await http_client.request( # type: ignore[union-attr]
method=details["method"],
url=details["url"],
data=request_data["data"],
timeout=60,
)
self._res_cache[group] = response
async def _complete_task(self, _task: dict) -> AgentResult: async def _complete_task(self, _task: dict) -> AgentResult:
try: try:
task = Task(**_task) task = Task(**_task)
url = task.url
if task.check == "http-to-https":
url = str(httpx.URL(task.url).copy_with(scheme="http"))
try:
response = await self._http_client.request( # type: ignore[attr-defined]
method=task.method, url=url, timeout=60
)
except httpx.ReadError:
sleep(1)
response = await self._http_client.request( # type: ignore[attr-defined]
method=task.method, url=url, timeout=60
)
check_class = get_registered_check(task.check) check_class = get_registered_check(task.check)
check = check_class(task) check = check_class(task)
result = await check.run(response) result = await check.run(self._res_cache[task.task_group])
status = result.status status = result.status
context = result.context context = result.context
@ -100,12 +160,45 @@ class ArgosAgent:
) )
if response.status_code == httpx.codes.OK: if response.status_code == httpx.codes.OK:
# XXX Maybe we want to group the tests by URL ? (to issue one request per URL)
data = response.json() data = response.json()
logger.info("Received %i tasks from the server", len(data)) logger.info("Received %i tasks from the server", len(data))
req_groups = {}
_tasks = []
for _task in data:
task = Task(**_task)
url = task.url
group = task.task_group
if task.check == "http-to-https":
data = task.request_data
if data is None:
data = ""
url = str(httpx.URL(task.url).copy_with(scheme="http"))
group = (
f"{task.method}-{task.ip_version}-{url}-"
f"{md5(data.encode()).hexdigest()}"
)
_task["task_group"] = group
req_groups[group] = {
"url": url,
"ip_version": task.ip_version,
"method": task.method,
"request_data": task.request_data,
}
_tasks.append(_task)
requests = []
for group, details in req_groups.items():
requests.append(self._do_request(group, details))
if requests:
await asyncio.gather(*requests)
tasks = [] tasks = []
for task in data: for task in _tasks:
tasks.append(self._complete_task(task)) tasks.append(self._complete_task(task))
if tasks: if tasks:

View file

@ -64,6 +64,23 @@ general:
# For ex., to re-try a check one minute after a failure: # For ex., to re-try a check one minute after a failure:
# recheck_delay: "1m" # recheck_delay: "1m"
# Default setting for notifications delay.
# Say you want to be warned right after a failure on a check: set it to 0
# Say you want a second failure on the check before being warned,
# to avoid network hiccups: set it to 1
# Can be superseeded in domain configuration
# If not present, default is 0
# retry_before_notification: 0
# Defaults settings for IPv4/IPv6
# Can be superseeded in domain configuration.
# By default, Argos will check both IPv4 and IPv6 addresses of a domain
# (i.e. by default, both `ipv4` and `ipv6` are set to true).
# To disable the IPv4 check of domains:
# ipv4: false
# To disable the IPv6 check of domains:
# ipv6: false
# Which way do you want to be warned when a check goes to that severity? # Which way do you want to be warned when a check goes to that severity?
# "local" emits a message in the server log # "local" emits a message in the server log
# Youll need to configure mail, gotify or apprise below to be able to use # Youll need to configure mail, gotify or apprise below to be able to use
@ -134,6 +151,8 @@ ssl:
# #
websites: websites:
- domain: "https://mypads.example.org" - domain: "https://mypads.example.org"
# Wait for a second failure before sending notification
retry_before_notification: 1
paths: paths:
- path: "/mypads/" - path: "/mypads/"
# Specify the method of the HTTP request # Specify the method of the HTTP request
@ -171,6 +190,17 @@ websites:
- 302 - 302
- 307 - 307
- path: "/admin/" - path: "/admin/"
methode: "POST"
# Send form data in the request
request_data:
data:
login: "admin"
password: "my-password"
# To send data as JSON (optional, default is false):
is_json: true
# To send additional headers
headers:
Authorization: "Bearer foo-bar-baz"
checks: checks:
# Check that the return HTTP status is one of those # Check that the return HTTP status is one of those
# Similar to status-is, verify that you dont mistyped it! # Similar to status-is, verify that you dont mistyped it!
@ -213,6 +243,8 @@ websites:
- domain: "https://munin.example.org" - domain: "https://munin.example.org"
frequency: "20m" frequency: "20m"
recheck_delay: "5m" recheck_delay: "5m"
# Lets say its an IPv6 only web site
ipv4: false
paths: paths:
- path: "/" - path: "/"
checks: checks:

View file

@ -5,7 +5,7 @@ For database models, see argos.server.models.
import json import json
from typing import Dict, List, Literal, Tuple from typing import Any, Dict, List, Literal, Tuple
from durations_nlp import Duration from durations_nlp import Duration
from pydantic import ( from pydantic import (
@ -18,7 +18,7 @@ from pydantic import (
PositiveInt, PositiveInt,
field_validator, field_validator,
) )
from pydantic.functional_validators import BeforeValidator from pydantic.functional_validators import AfterValidator, BeforeValidator
from pydantic.networks import UrlConstraints from pydantic.networks import UrlConstraints
from pydantic_core import Url from pydantic_core import Url
from typing_extensions import Annotated from typing_extensions import Annotated
@ -104,9 +104,26 @@ def parse_checks(value):
return (name, expected) return (name, expected)
def parse_request_data(value):
"""Turn form or JSON data into JSON string"""
return json.dumps(
{"data": value.data, "json": value.is_json, "headers": value.headers}
)
class RequestData(BaseModel):
data: Any = None
is_json: bool = False
headers: Dict[str, str] | None = None
class WebsitePath(BaseModel): class WebsitePath(BaseModel):
path: str path: str
method: Method = "GET" method: Method = "GET"
request_data: Annotated[
RequestData, AfterValidator(parse_request_data)
] | None = None
checks: List[ checks: List[
Annotated[ Annotated[
Tuple[str, str], Tuple[str, str],
@ -117,8 +134,11 @@ class WebsitePath(BaseModel):
class Website(BaseModel): class Website(BaseModel):
domain: HttpUrl domain: HttpUrl
ipv4: bool | None = None
ipv6: bool | None = None
frequency: float | None = None frequency: float | None = None
recheck_delay: float | None = None recheck_delay: float | None = None
retry_before_notification: int | None = None
paths: List[WebsitePath] paths: List[WebsitePath]
@field_validator("frequency", mode="before") @field_validator("frequency", mode="before")
@ -204,6 +224,9 @@ class General(BaseModel):
ldap: LdapSettings | None = None ldap: LdapSettings | None = None
frequency: float frequency: float
recheck_delay: float | None = None recheck_delay: float | None = None
retry_before_notification: int = 0
ipv4: bool = True
ipv6: bool = True
root_path: str = "" root_path: str = ""
alerts: Alert alerts: Alert
mail: Mail | None = None mail: Mail | None = None

View file

@ -8,7 +8,7 @@ from typing import Literal
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from argos.schemas.utils import Method from argos.schemas.utils import IPVersion, Method
# XXX Refactor using SQLModel to avoid duplication of model data # XXX Refactor using SQLModel to avoid duplication of model data
@ -19,9 +19,14 @@ class Task(BaseModel):
id: int id: int
url: str url: str
domain: str domain: str
ip_version: IPVersion
check: str check: str
method: Method method: Method
request_data: str | None
expected: str expected: str
task_group: str
retry_before_notification: int
contiguous_failures: int
selected_at: datetime | None selected_at: datetime | None
selected_by: str | None selected_by: str | None
@ -31,7 +36,8 @@ class Task(BaseModel):
task_id = self.id task_id = self.id
url = self.url url = self.url
check = self.check check = self.check
return f"Task ({task_id}): {url} - {check}" ip_version = self.ip_version
return f"Task ({task_id}): {url} (IPv{ip_version}) - {check}"
class SerializableException(BaseModel): class SerializableException(BaseModel):

View file

@ -1,6 +1,8 @@
from typing import Literal from typing import Literal
IPVersion = Literal["4", "6"]
Method = Literal[ Method = Literal[
"GET", "HEAD", "POST", "OPTIONS", "CONNECT", "TRACE", "PUT", "PATCH", "DELETE" "GET", "HEAD", "POST", "OPTIONS", "CONNECT", "TRACE", "PUT", "PATCH", "DELETE"
] ]

View file

@ -11,6 +11,55 @@ import httpx
from argos.checks.base import Severity from argos.checks.base import Severity
from argos.logging import logger from argos.logging import logger
from argos.schemas.config import Config, Mail, GotifyUrl from argos.schemas.config import Config, Mail, GotifyUrl
from argos.server.models import Task
def need_alert(
last_severity: str, last_severity_update, severity: str, status: str, task: Task
) -> bool:
## Create alert… or not!
send_notif = False
# Severity has changed, and no retry before notification
if last_severity != severity and task.retry_before_notification == 0:
send_notif = True
# Seems to be a first check: create a notification
elif last_severity != severity and last_severity_update is None:
send_notif = True
# As we created a notification, avoid resending it on a
# future failure
if status != "success":
task.contiguous_failures = task.retry_before_notification
# We need retry before notification, so the severity may not have changed
# since last check
elif task.retry_before_notification != 0:
# If we got a success, and we already have created a notification:
# create notification of success immediately
if (
status == "success"
and task.contiguous_failures >= task.retry_before_notification
):
send_notif = True
task.contiguous_failures = 0
# The status is not a success
elif status != "success":
# This is a new failure
task.contiguous_failures += 1
# Severity has changed, but not to success, thats odd:
# create a notification
if (
last_severity not in ("ok", severity)
and last_severity_update is not None
):
send_notif = True
# As we created a notification, avoid resending it on a
# future failure
task.contiguous_failures = task.retry_before_notification
# Severity has not changed, but there has been enough failures
# to create a notification
elif task.retry_before_notification == task.contiguous_failures:
send_notif = True
return send_notif
def get_icon_from_severity(severity: str) -> str: def get_icon_from_severity(severity: str) -> str:
@ -74,9 +123,9 @@ def notify_with_apprise( # pylint: disable-msg=too-many-positional-arguments
apobj.add(channel) apobj.add(channel)
icon = get_icon_from_severity(severity) icon = get_icon_from_severity(severity)
title = f"[Argos] {icon} {urlparse(task.url).netloc}: status {severity}" title = f"[Argos] {icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
msg = f"""\ msg = f"""\
URL: {task.url} URL: {task.url} (IPv{task.ip_version})
Check: {task.check} Check: {task.check}
Status: {severity} Status: {severity}
Time: {result.submitted_at} Time: {result.submitted_at}
@ -97,7 +146,7 @@ def notify_by_mail( # pylint: disable-msg=too-many-positional-arguments
icon = get_icon_from_severity(severity) icon = get_icon_from_severity(severity)
msg = f"""\ msg = f"""\
URL: {task.url} URL: {task.url} (IPv{task.ip_version})
Check: {task.check} Check: {task.check}
Status: {severity} Status: {severity}
Time: {result.submitted_at} Time: {result.submitted_at}
@ -109,7 +158,9 @@ See results of task on {request.url_for('get_task_results_view', task_id=task.id
""" """
mail = EmailMessage() mail = EmailMessage()
mail["Subject"] = f"[Argos] {icon} {urlparse(task.url).netloc}: status {severity}" mail[
"Subject"
] = f"[Argos] {icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
mail["From"] = config.mailfrom mail["From"] = config.mailfrom
mail.set_content(msg) mail.set_content(msg)
@ -152,9 +203,11 @@ def notify_with_gotify( # pylint: disable-msg=too-many-positional-arguments
elif severity == Severity.UNKNOWN: elif severity == Severity.UNKNOWN:
priority = 5 priority = 5
subject = f"{icon} {urlparse(task.url).netloc}: status {severity}" subject = (
f"{icon} {urlparse(task.url).netloc} (IPv{task.ip_version}): status {severity}"
)
msg = f"""\ msg = f"""\
URL:    <{task.url}>\\ URL:    <{task.url}> (IPv{task.ip_version})\\
Check:  {task.check}\\ Check:  {task.check}\\
Status: {severity}\\ Status: {severity}\\
Time:   {result.submitted_at}\\ Time:   {result.submitted_at}\\

View file

@ -0,0 +1,28 @@
"""Add request data to tasks
Revision ID: 31255a412d63
Revises: 80a29f64f91c
Create Date: 2024-12-09 16:40:20.926138
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "31255a412d63"
down_revision: Union[str, None] = "80a29f64f91c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(sa.Column("request_data", sa.String(), nullable=True))
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("request_data")

View file

@ -0,0 +1,34 @@
"""Add IP version to checks
Revision ID: 64f73a79b7d8
Revises: a1e98cf72a5c
Create Date: 2024-12-02 14:12:40.558033
"""
from typing import Sequence, Union
from alembic import op
from sqlalchemy.dialects.postgresql import ENUM
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "64f73a79b7d8"
down_revision: Union[str, None] = "a1e98cf72a5c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
enum = ENUM("4", "6", name="ip_version_enum", create_type=False)
enum.create(op.get_bind(), checkfirst=False)
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column("ip_version", enum, server_default="4", nullable=False)
)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("ip_version")
ENUM(name="ip_version_enum").drop(op.get_bind(), checkfirst=False)

View file

@ -0,0 +1,41 @@
"""Add retries before notification feature
Revision ID: 80a29f64f91c
Revises: 8b58ced14d6e
Create Date: 2024-12-04 17:03:35.104368
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "80a29f64f91c"
down_revision: Union[str, None] = "8b58ced14d6e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"retry_before_notification",
sa.Integer(),
server_default="0",
nullable=False,
)
)
batch_op.add_column(
sa.Column(
"contiguous_failures", sa.Integer(), server_default="0", nullable=False
)
)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_column("contiguous_failures")
batch_op.drop_column("retry_before_notification")

View file

@ -0,0 +1,35 @@
"""Add task index
Revision ID: 8b58ced14d6e
Revises: 64f73a79b7d8
Create Date: 2024-12-03 16:41:44.842213
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "8b58ced14d6e"
down_revision: Union[str, None] = "64f73a79b7d8"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.add_column(sa.Column("task_group", sa.String(), nullable=True))
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.execute(
"UPDATE tasks SET task_group = method || '-' || ip_version || '-' || url"
)
batch_op.alter_column("task_group", nullable=False)
batch_op.create_index("similar_tasks", ["task_group"], unique=False)
def downgrade() -> None:
with op.batch_alter_table("tasks", schema=None) as batch_op:
batch_op.drop_index("similar_tasks")
batch_op.drop_column("task_group")

View file

@ -0,0 +1,44 @@
"""Use bigint for results id field
Revision ID: bd4b4962696a
Revises: 31255a412d63
Create Date: 2025-01-06 11:44:37.552965
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "bd4b4962696a"
down_revision: Union[str, None] = "31255a412d63"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
bind = op.get_bind()
if bind.engine.name != "sqlite":
with op.batch_alter_table("results", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False,
autoincrement=True,
)
def downgrade() -> None:
bind = op.get_bind()
if bind.engine.name != "sqlite":
with op.batch_alter_table("results", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False,
autoincrement=True,
)

View file

@ -1,6 +1,7 @@
"""Database models""" """Database models"""
from datetime import datetime, timedelta from datetime import datetime, timedelta
from hashlib import md5
from typing import List, Literal from typing import List, Literal
from sqlalchemy import ( from sqlalchemy import (
@ -9,10 +10,23 @@ from sqlalchemy import (
ForeignKey, ForeignKey,
) )
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.schema import Index
from argos.checks import BaseCheck, get_registered_check from argos.checks import BaseCheck, get_registered_check
from argos.schemas import WebsiteCheck from argos.schemas import WebsiteCheck
from argos.schemas.utils import Method from argos.schemas.utils import IPVersion, Method
def compute_task_group(context) -> str:
data = context.current_parameters["request_data"]
if data is None:
data = ""
return (
f"{context.current_parameters['method']}-"
f"{context.current_parameters['ip_version']}-"
f"{context.current_parameters['url']}-"
f"{md5(data.encode()).hexdigest()}"
)
class Base(DeclarativeBase): class Base(DeclarativeBase):
@ -33,11 +47,16 @@ class Task(Base):
# Info needed to run the task # Info needed to run the task
url: Mapped[str] = mapped_column() url: Mapped[str] = mapped_column()
domain: Mapped[str] = mapped_column() domain: Mapped[str] = mapped_column()
ip_version: Mapped[IPVersion] = mapped_column(
Enum("4", "6", name="ip_version_enum"),
)
check: Mapped[str] = mapped_column() check: Mapped[str] = mapped_column()
expected: Mapped[str] = mapped_column() expected: Mapped[str] = mapped_column()
frequency: Mapped[float] = mapped_column() frequency: Mapped[float] = mapped_column()
recheck_delay: Mapped[float] = mapped_column(nullable=True) recheck_delay: Mapped[float] = mapped_column(nullable=True)
already_retried: Mapped[bool] = mapped_column(insert_default=False) already_retried: Mapped[bool] = mapped_column(insert_default=False)
retry_before_notification: Mapped[int] = mapped_column(insert_default=0)
contiguous_failures: Mapped[int] = mapped_column(insert_default=0)
method: Mapped[Method] = mapped_column( method: Mapped[Method] = mapped_column(
Enum( Enum(
"GET", "GET",
@ -53,12 +72,14 @@ class Task(Base):
), ),
insert_default="GET", insert_default="GET",
) )
request_data: Mapped[str] = mapped_column(nullable=True)
# Orchestration-related # Orchestration-related
selected_by: Mapped[str] = mapped_column(nullable=True) selected_by: Mapped[str] = mapped_column(nullable=True)
selected_at: Mapped[datetime] = mapped_column(nullable=True) selected_at: Mapped[datetime] = mapped_column(nullable=True)
completed_at: Mapped[datetime] = mapped_column(nullable=True) completed_at: Mapped[datetime] = mapped_column(nullable=True)
next_run: Mapped[datetime] = mapped_column(nullable=True) next_run: Mapped[datetime] = mapped_column(nullable=True)
task_group: Mapped[str] = mapped_column(insert_default=compute_task_group)
severity: Mapped[Literal["ok", "warning", "critical", "unknown"]] = mapped_column( severity: Mapped[Literal["ok", "warning", "critical", "unknown"]] = mapped_column(
Enum("ok", "warning", "critical", "unknown", name="severity"), Enum("ok", "warning", "critical", "unknown", name="severity"),
@ -72,8 +93,8 @@ class Task(Base):
passive_deletes=True, passive_deletes=True,
) )
def __str__(self): def __str__(self) -> str:
return f"DB Task {self.url} - {self.check} - {self.expected}" return f"DB Task {self.url} (IPv{self.ip_version}) - {self.check} - {self.expected}"
def get_check(self) -> BaseCheck: def get_check(self) -> BaseCheck:
"""Returns a check instance for this specific task""" """Returns a check instance for this specific task"""
@ -114,6 +135,9 @@ class Task(Base):
return self.last_result.status return self.last_result.status
Index("similar_tasks", Task.task_group)
class Result(Base): class Result(Base):
"""There are multiple results per task. """There are multiple results per task.

View file

@ -4,7 +4,7 @@ from hashlib import sha256
from typing import List from typing import List
from urllib.parse import urljoin from urllib.parse import urljoin
from sqlalchemy import asc, desc, func from sqlalchemy import asc, desc, func, Select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from argos import schemas from argos import schemas
@ -14,15 +14,16 @@ from argos.server.models import Result, Task, ConfigCache, User
async def list_tasks(db: Session, agent_id: str, limit: int = 100): async def list_tasks(db: Session, agent_id: str, limit: int = 100):
"""List tasks and mark them as selected""" """List tasks and mark them as selected"""
tasks = ( subquery = (
db.query(Task) db.query(func.distinct(Task.task_group))
.filter( .filter(
Task.selected_by == None, # noqa: E711 Task.selected_by == None, # noqa: E711
((Task.next_run <= datetime.now()) | (Task.next_run == None)), # noqa: E711 ((Task.next_run <= datetime.now()) | (Task.next_run == None)), # noqa: E711
) )
.limit(limit) .limit(limit)
.all() .subquery()
) )
tasks = db.query(Task).filter(Task.task_group.in_(Select(subquery))).all()
now = datetime.now() now = datetime.now()
for task in tasks: for task in tasks:
@ -82,13 +83,22 @@ async def count_results(db: Session):
return db.query(Result).count() return db.query(Result).count()
async def has_config_changed(db: Session, config: schemas.Config) -> bool: async def has_config_changed(db: Session, config: schemas.Config) -> bool: # pylint: disable-msg=too-many-statements
"""Check if websites config has changed by using a hashsum and a config cache""" """Check if websites config has changed by using a hashsum and a config cache"""
websites_hash = sha256(str(config.websites).encode()).hexdigest() websites_hash = sha256(str(config.websites).encode()).hexdigest()
conf_caches = db.query(ConfigCache).all() conf_caches = db.query(ConfigCache).all()
same_config = True same_config = True
keys = [
"websites_hash",
"general_frequency",
"general_recheck_delay",
"general_retry_before_notification",
"general_ipv4",
"general_ipv6",
]
if conf_caches: if conf_caches:
for conf in conf_caches: for conf in conf_caches:
keys.remove(conf.name)
match conf.name: match conf.name:
case "websites_hash": case "websites_hash":
if conf.val != websites_hash: if conf.val != websites_hash:
@ -105,9 +115,67 @@ async def has_config_changed(db: Session, config: schemas.Config) -> bool:
same_config = False same_config = False
conf.val = str(config.general.recheck_delay) conf.val = str(config.general.recheck_delay)
conf.updated_at = datetime.now() conf.updated_at = datetime.now()
case "general_retry_before_notification":
if conf.val != str(config.general.retry_before_notification):
same_config = False
conf.val = str(config.general.retry_before_notification)
conf.updated_at = datetime.now()
case "general_ipv4":
if conf.val != str(config.general.ipv4):
same_config = False
conf.val = str(config.general.ipv4)
conf.updated_at = datetime.now()
case "general_ipv6":
if conf.val != str(config.general.ipv6):
same_config = False
conf.val = str(config.general.ipv6)
conf.updated_at = datetime.now()
for i in keys:
match i:
case "websites_hash":
c = ConfigCache(
name="websites_hash",
val=websites_hash,
updated_at=datetime.now(),
)
case "general_frequency":
c = ConfigCache(
name="general_frequency",
val=str(config.general.frequency),
updated_at=datetime.now(),
)
case "general_recheck_delay":
c = ConfigCache(
name="general_recheck_delay",
val=str(config.general.recheck_delay),
updated_at=datetime.now(),
)
case "general_retry_before_notification":
c = ConfigCache(
name="general_retry_before_notification",
val=str(config.general.retry_before_notification),
updated_at=datetime.now(),
)
case "general_ipv4":
c = ConfigCache(
name="general_ipv4",
val=str(config.general.ipv4),
updated_at=datetime.now(),
)
case "general_ipv6":
c = ConfigCache(
name="general_ipv6",
val=str(config.general.ipv6),
updated_at=datetime.now(),
)
db.add(c)
db.commit() db.commit()
if keys:
return True
if same_config: if same_config:
return False return False
@ -125,15 +193,33 @@ async def has_config_changed(db: Session, config: schemas.Config) -> bool:
val=str(config.general.recheck_delay), val=str(config.general.recheck_delay),
updated_at=datetime.now(), updated_at=datetime.now(),
) )
gen_retry_before_notif = ConfigCache(
name="general_retry_before_notification",
val=str(config.general.retry_before_notification),
updated_at=datetime.now(),
)
gen_ipv4 = ConfigCache(
name="general_ipv4",
val=str(config.general.ipv4),
updated_at=datetime.now(),
)
gen_ipv6 = ConfigCache(
name="general_ipv6",
val=str(config.general.ipv6),
updated_at=datetime.now(),
)
db.add(web_hash) db.add(web_hash)
db.add(gen_freq) db.add(gen_freq)
db.add(gen_recheck) db.add(gen_recheck)
db.add(gen_retry_before_notif)
db.add(gen_ipv4)
db.add(gen_ipv6)
db.commit() db.commit()
return True return True
async def update_from_config(db: Session, config: schemas.Config): async def update_from_config(db: Session, config: schemas.Config): # pylint: disable-msg=too-many-branches
"""Update tasks from config file""" """Update tasks from config file"""
config_changed = await has_config_changed(db, config) config_changed = await has_config_changed(db, config)
if not config_changed: if not config_changed:
@ -145,11 +231,22 @@ async def update_from_config(db: Session, config: schemas.Config):
tasks = [] tasks = []
unique_properties = [] unique_properties = []
seen_tasks: List[int] = [] seen_tasks: List[int] = []
for website in config.websites: for website in config.websites: # pylint: disable-msg=too-many-nested-blocks
domain = str(website.domain) domain = str(website.domain)
frequency = website.frequency or config.general.frequency frequency = website.frequency or config.general.frequency
recheck_delay = website.recheck_delay or config.general.recheck_delay recheck_delay = website.recheck_delay or config.general.recheck_delay
retry_before_notification = (
website.retry_before_notification
if website.retry_before_notification is not None
else config.general.retry_before_notification
)
ipv4 = website.ipv4 if website.ipv4 is not None else config.general.ipv4
ipv6 = website.ipv6 if website.ipv6 is not None else config.general.ipv6
if ipv4 is False and ipv6 is False:
logger.warning("IPv4 AND IPv6 are disabled on website %s!", domain)
continue
for ip_version in ["4", "6"]:
for p in website.paths: for p in website.paths:
url = urljoin(domain, str(p.path)) url = urljoin(domain, str(p.path))
for check_key, expected in p.checks: for check_key, expected in p.checks:
@ -159,43 +256,72 @@ async def update_from_config(db: Session, config: schemas.Config):
.filter( .filter(
Task.url == url, Task.url == url,
Task.method == p.method, Task.method == p.method,
Task.request_data == p.request_data,
Task.check == check_key, Task.check == check_key,
Task.expected == expected, Task.expected == expected,
Task.ip_version == ip_version,
) )
.all() .all()
) )
if (ip_version == "4" and ipv4 is False) or (
ip_version == "6" and ipv6 is False
):
continue
if existing_tasks: if existing_tasks:
existing_task = existing_tasks[0] existing_task = existing_tasks[0]
seen_tasks.append(existing_task.id) seen_tasks.append(existing_task.id)
if frequency != existing_task.frequency: if frequency != existing_task.frequency:
existing_task.frequency = frequency existing_task.frequency = frequency
if recheck_delay != existing_task.recheck_delay: if recheck_delay != existing_task.recheck_delay:
existing_task.recheck_delay = recheck_delay # type: ignore[assignment] existing_task.recheck_delay = recheck_delay # type: ignore[assignment]
if (
retry_before_notification
!= existing_task.retry_before_notification
):
existing_task.retry_before_notification = (
retry_before_notification
)
logger.debug( logger.debug(
"Skipping db task creation for url=%s, " "Skipping db task creation for url=%s, "
"method=%s, check_key=%s, expected=%s, " "method=%s, check_key=%s, expected=%s, "
"frequency=%s, recheck_delay=%s.", "frequency=%s, recheck_delay=%s, "
"retry_before_notification=%s, ip_version=%s.",
url, url,
p.method, p.method,
check_key, check_key,
expected, expected,
frequency, frequency,
recheck_delay, recheck_delay,
retry_before_notification,
ip_version,
) )
else: else:
properties = (url, check_key, expected) properties = (
url,
p.method,
check_key,
expected,
ip_version,
p.request_data,
)
if properties not in unique_properties: if properties not in unique_properties:
unique_properties.append(properties) unique_properties.append(properties)
task = Task( task = Task(
domain=domain, domain=domain,
url=url, url=url,
ip_version=ip_version,
method=p.method, method=p.method,
request_data=p.request_data,
check=check_key, check=check_key,
expected=expected, expected=expected,
frequency=frequency, frequency=frequency,
recheck_delay=recheck_delay, recheck_delay=recheck_delay,
retry_before_notification=retry_before_notification,
already_retried=False, already_retried=False,
) )
logger.debug("Adding a new task in the db: %s", task) logger.debug("Adding a new task in the db: %s", task)

View file

@ -7,7 +7,7 @@ from sqlalchemy.orm import Session
from argos.logging import logger from argos.logging import logger
from argos.schemas import AgentResult, Config, Task from argos.schemas import AgentResult, Config, Task
from argos.server import queries from argos.server import queries
from argos.server.alerting import handle_alert from argos.server.alerting import handle_alert, need_alert
from argos.server.routes.dependencies import get_config, get_db, verify_token from argos.server.routes.dependencies import get_config, get_db, verify_token
route = APIRouter() route = APIRouter()
@ -58,16 +58,26 @@ async def create_results( # pylint: disable-msg=too-many-positional-arguments
logger.error("Unable to find task %i", agent_result.task_id) logger.error("Unable to find task %i", agent_result.task_id)
else: else:
last_severity = task.severity last_severity = task.severity
last_severity_update = task.last_severity_update
result = await queries.create_result(db, agent_result, agent_id) result = await queries.create_result(db, agent_result, agent_id)
check = task.get_check() check = task.get_check()
status, severity = await check.finalize(config, result, **result.context) status, severity = await check.finalize(config, result, **result.context)
result.set_status(status, severity) result.set_status(status, severity)
task.set_times_severity_and_deselect(severity, result.submitted_at) task.set_times_severity_and_deselect(severity, result.submitted_at)
# Dont create an alert if the severity has not changed send_notif = need_alert(
if last_severity != severity: last_severity, last_severity_update, severity, status, task
)
if send_notif:
background_tasks.add_task( background_tasks.add_task(
handle_alert, config, result, task, severity, last_severity, request handle_alert,
config,
result,
task,
severity,
last_severity,
request,
) )
db_results.append(result) db_results.append(result)

View file

@ -357,8 +357,21 @@ async def set_refresh_cookies_view(
request.url_for("get_severity_counts_view"), request.url_for("get_severity_counts_view"),
status_code=status.HTTP_303_SEE_OTHER, status_code=status.HTTP_303_SEE_OTHER,
) )
response.set_cookie(key="auto_refresh_enabled", value=str(auto_refresh_enabled)) # Cookies age in Chrome cant be more than 400 days
# https://developer.chrome.com/blog/cookie-max-age-expires
delta = int(timedelta(days=400).total_seconds())
response.set_cookie( response.set_cookie(
key="auto_refresh_seconds", value=str(max(5, int(auto_refresh_seconds))) key="auto_refresh_enabled",
value=str(auto_refresh_enabled),
httponly=True,
samesite="strict",
expires=delta,
)
response.set_cookie(
key="auto_refresh_seconds",
value=str(max(5, int(auto_refresh_seconds))),
httponly=True,
samesite="strict",
expires=delta,
) )
return response return response

View file

@ -16,7 +16,7 @@
<tbody id="domains-body"> <tbody id="domains-body">
{% for task in tasks %} {% for task in tasks %}
<tr scope="row"> <tr scope="row">
<td>{{ task.url }}</td> <td>{{ task.url }} (IPv{{ task.ip_version }})</td>
<td>{{ task.check }}</td> <td>{{ task.check }}</td>
<td class="status highlight"> <td class="status highlight">
{% if task.status %} {% if task.status %}

View file

@ -1,6 +1,8 @@
---
general: general:
db: db:
# The database URL, as defined in SQLAlchemy docs : https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls # The database URL, as defined in SQLAlchemy docs:
# https://docs.sqlalchemy.org/en/20/core/engines.html#database-urls
url: "sqlite:////tmp/test-argos.db" url: "sqlite:////tmp/test-argos.db"
env: test env: test
cookie_secret: "foo-bar-baz" cookie_secret: "foo-bar-baz"

View file

@ -21,7 +21,7 @@ def test_tasks_retrieval_and_results(authorized_client, app):
assert response.status_code == 200 assert response.status_code == 200
tasks = response.json() tasks = response.json()
assert len(tasks) == 2 assert len(tasks) == 4
results = [] results = []
for task in tasks: for task in tasks:
@ -33,7 +33,7 @@ def test_tasks_retrieval_and_results(authorized_client, app):
response = client.post("/api/results", json=data) response = client.post("/api/results", json=data)
assert response.status_code == 201 assert response.status_code == 201
assert app.state.db.query(models.Result).count() == 2 assert app.state.db.query(models.Result).count() == 4
# The list of tasks should be empty now # The list of tasks should be empty now
response = client.get("/api/tasks") response = client.get("/api/tasks")
@ -60,6 +60,7 @@ def ssl_task(db):
task = models.Task( task = models.Task(
url="https://exemple.com/", url="https://exemple.com/",
domain="https://exemple.com/", domain="https://exemple.com/",
ip_version="6",
method="GET", method="GET",
check="ssl-certificate-expiration", check="ssl-certificate-expiration",
expected="on-check", expected="on-check",

View file

@ -35,8 +35,13 @@ def ssl_task(now):
id=1, id=1,
url="https://example.org", url="https://example.org",
domain="https://example.org", domain="https://example.org",
ip_version="6",
method="GET", method="GET",
request_data=None,
task_group="GET-6-https://example.org",
check="ssl-certificate-expiration", check="ssl-certificate-expiration",
retry_before_notification=0,
contiguous_failures=0,
expected="on-check", expected="on-check",
selected_at=now, selected_at=now,
selected_by="pytest", selected_by="pytest",

View file

@ -70,7 +70,7 @@ async def test_update_from_config_with_duplicate_tasks(db, empty_config): # py
await queries.update_from_config(db, empty_config) await queries.update_from_config(db, empty_config)
# Only one path has been saved in the database # Only one path has been saved in the database
assert db.query(Task).count() == 1 assert db.query(Task).count() == 2
# Calling again with the same data works, and will not result in more tasks being # Calling again with the same data works, and will not result in more tasks being
# created. # created.
@ -87,6 +87,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
same_task = Task( same_task = Task(
url=task.url, url=task.url,
domain=task.domain, domain=task.domain,
ip_version="6",
check=task.check, check=task.check,
expected=task.expected, expected=task.expected,
frequency=task.frequency, frequency=task.frequency,
@ -108,7 +109,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
empty_config.websites = [website] empty_config.websites = [website]
await queries.update_from_config(db, empty_config) await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 2 assert db.query(Task).count() == 4
website = schemas.config.Website( website = schemas.config.Website(
domain=task.domain, domain=task.domain,
@ -122,7 +123,7 @@ async def test_update_from_config_db_can_remove_duplicates_and_old_tasks(
empty_config.websites = [website] empty_config.websites = [website]
await queries.update_from_config(db, empty_config) await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 1 assert db.query(Task).count() == 2
@pytest.mark.asyncio @pytest.mark.asyncio
@ -136,7 +137,7 @@ async def test_update_from_config_db_updates_existing_tasks(db, empty_config, ta
empty_config.websites = [website] empty_config.websites = [website]
await queries.update_from_config(db, empty_config) await queries.update_from_config(db, empty_config)
assert db.query(Task).count() == 1 assert db.query(Task).count() == 2
@pytest.mark.asyncio @pytest.mark.asyncio
@ -212,6 +213,7 @@ def task(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="https://www.example.com", domain="https://www.example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,
@ -271,6 +273,7 @@ def ten_locked_tasks(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="example.com", domain="example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,
@ -291,6 +294,7 @@ def ten_tasks(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="example.com", domain="example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,
@ -311,6 +315,7 @@ def ten_warning_tasks(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="example.com", domain="example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,
@ -331,6 +336,7 @@ def ten_critical_tasks(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="example.com", domain="example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,
@ -351,6 +357,7 @@ def ten_ok_tasks(db):
_task = Task( _task = Task(
url="https://www.example.com", url="https://www.example.com",
domain="example.com", domain="example.com",
ip_version="6",
check="body-contains", check="body-contains",
expected="foo", expected="foo",
frequency=1, frequency=1,

View file

@ -1,3 +1,4 @@
---
- domain: "https://mypads.framapad.org" - domain: "https://mypads.framapad.org"
paths: paths:
- path: "/mypads/" - path: "/mypads/"