Compare commits

..

11 commits

Author SHA1 Message Date
Luc Didry
211ac32028
🐛 — Fix worker timeout for old results cleaning in recurring tasks (fix #84)
💥 Old results are now removed by their age, not based on their number.

💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
2025-02-18 17:04:26 +01:00
Luc Didry
32f2518294
🏷 — Bump version (0.8.2) 2025-02-18 14:58:35 +01:00
Luc Didry
38cc06e972
🐛 — Fix recurring tasks with gunicorn 2025-02-18 14:57:49 +01:00
Luc Didry
4b78919937
🏷 — Bump version (0.8.1) 2025-02-18 14:22:41 +01:00
Luc Didry
d8f30ebccd
🐛 — Fix todo enum in jobs table 2025-02-18 14:22:12 +01:00
Luc Didry
09674f73ef
🏷 — Bump version (0.8.0) 2025-02-18 13:50:36 +01:00
Luc Didry
c63093bb2f
🔀 Merge remote-tracking branch 'origin/develop' 2025-02-18 13:48:47 +01:00
Luc Didry
657624ed35
📝 — Add enum doc for developers 2025-02-18 13:47:27 +01:00
Luc Didry
471c1eae91
📜 — Add breaking changes to CHANGELOG 2025-02-18 13:43:05 +01:00
Luc Didry
c3708af32a
🐛 — Better httpx.RequestError handling (fix #83) 2025-02-18 13:36:40 +01:00
Luc Didry
23fea9fffa
🐛 — Automatically reconnect to LDAP if unreachable (fix #81) 2025-02-18 11:28:05 +01:00
16 changed files with 132 additions and 69 deletions

View file

@ -2,11 +2,46 @@
## [Unreleased] ## [Unreleased]
- 🐛 — Fix worker timeout for old results cleaning in recurring tasks (#84)
💥 Old results are now removed by their age, not based on their number.
💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
## 0.8.2
Date: 2025-02-18
- 🐛 — Fix recurring tasks with gunicorn
## 0.8.1
Date: 2025-02-18
- 🐛 — Fix todo enum in jobs table
## 0.8.0
Date: 2025-02-18
- ✨ — Allow to customize agent User-Agent header (#78) - ✨ — Allow to customize agent User-Agent header (#78)
- 📝 — Document how to add data to requests (#77) - 📝 — Document how to add data to requests (#77)
- ✨ — No need cron tasks for DB cleaning anymore (#74 and #75) - ✨ — No need cron tasks for DB cleaning anymore (#74 and #75)
- ✨ — No need cron tasks for agents watching (#76) - ✨ — No need cron tasks for agents watching (#76)
- ✨ — Reload configuration asynchronously (#79) - ✨ — Reload configuration asynchronously (#79)
- 🐛 — Automatically reconnect to LDAP if unreachable (#81)
- 🐛 — Better httpx.RequestError handling (#83)
💥 Warning: there is new settings to add to your configuration file.
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
a new example configuration file.
💥 You dont need cron tasks anymore!
Remove your old cron tasks as they will now do nothing but generating errors.
NB: You may want to add `--enqueue` to `reload-config` command in your systemd file.
## 0.7.4 ## 0.7.4

View file

@ -1 +1 @@
VERSION = "0.7.4" VERSION = "0.8.2"

View file

@ -84,6 +84,7 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
await asyncio.sleep(self.wait_time) await asyncio.sleep(self.wait_time)
async def _do_request(self, group: str, details: dict): async def _do_request(self, group: str, details: dict):
logger.debug("_do_request for group %s", group)
headers = {} headers = {}
if details["request_data"] is not None: if details["request_data"] is not None:
request_data = json.loads(details["request_data"]) request_data = json.loads(details["request_data"])
@ -120,6 +121,7 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
) )
except httpx.ReadError: except httpx.ReadError:
sleep(1) sleep(1)
logger.warning("httpx.ReadError for group %s, re-emit request", group)
if details["request_data"] is None or request_data["data"] is None: if details["request_data"] is None or request_data["data"] is None:
response = await http_client.request( # type: ignore[union-attr] response = await http_client.request( # type: ignore[union-attr]
method=details["method"], url=details["url"], timeout=60 method=details["method"], url=details["url"], timeout=60
@ -138,6 +140,9 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
data=request_data["data"], data=request_data["data"],
timeout=60, timeout=60,
) )
except httpx.RequestError as err:
logger.warning("httpx.RequestError for group %s", group)
response = err
self._res_cache[group] = response self._res_cache[group] = response
@ -147,15 +152,21 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
check_class = get_registered_check(task.check) check_class = get_registered_check(task.check)
check = check_class(task) check = check_class(task)
result = await check.run(self._res_cache[task.task_group])
status = result.status
context = result.context
response = self._res_cache[task.task_group]
if isinstance(response, httpx.Response):
result = await check.run(response)
status = result.status
context = result.context
else:
status = "failure"
context = SerializableException.from_exception(response)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
status = "error" status = "error"
context = SerializableException.from_exception(err) context = SerializableException.from_exception(err)
msg = f"An exception occured when running {_task}. {err.__class__.__name__} : {err}" msg = f"An exception occured when running {_task}. {err.__class__.__name__} : {err}"
logger.error(msg) logger.error(msg)
return AgentResult(task_id=task.id, status=status, context=context) return AgentResult(task_id=task.id, status=status, context=context)
async def _get_and_complete_tasks(self): async def _get_and_complete_tasks(self):

View file

@ -152,9 +152,11 @@ ssl:
# Argos will execute some tasks in the background for you # Argos will execute some tasks in the background for you
# every 2 minutes and needs some configuration for that # every 2 minutes and needs some configuration for that
recurring_tasks: recurring_tasks:
# Max number of results per tasks you want to keep # Maximum age of results
# Minimum value is 1, default is 100 # Use m for minutes, h for hours, d for days
max_results: 100 # w for weeks, M for months, y for years
# See https://github.com/timwedde/durations_nlp#scales-reference for details
max_results_age: "1d"
# Max number of seconds a task can be locked # Max number of seconds a task can be locked
# Minimum value is 61, default is 100 # Minimum value is 61, default is 100
max_lock_seconds: 100 max_lock_seconds: 100

View file

@ -49,17 +49,14 @@ class SSL(BaseModel):
class RecurringTasks(BaseModel): class RecurringTasks(BaseModel):
max_results: int max_results_age: float
max_lock_seconds: int max_lock_seconds: int
time_without_agent: int time_without_agent: int
@field_validator("max_results", mode="before") @field_validator("max_results_age", mode="before")
def parse_max_results(cls, value): def parse_max_results_age(cls, value):
"""Ensure that max_results is higher than 0""" """Convert the configured maximum results age to seconds"""
if value >= 1: return Duration(value).to_seconds()
return value
return 100
@field_validator("max_lock_seconds", mode="before") @field_validator("max_lock_seconds", mode="before")
def parse_max_lock_seconds(cls, value): def parse_max_lock_seconds(cls, value):

View file

@ -6,6 +6,7 @@ from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi_login import LoginManager from fastapi_login import LoginManager
from fastapi_utils.tasks import repeat_every from fastapi_utils.tasks import repeat_every
from psutil import Process
from sqlalchemy import create_engine, event from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
@ -39,9 +40,7 @@ def get_application() -> FastAPI:
if config.general.ldap is not None: if config.general.ldap is not None:
import ldap import ldap
l = ldap.initialize(config.general.ldap.uri) appli.state.ldap = ldap.initialize(config.general.ldap.uri)
l.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
appli.state.ldap = l
@appli.state.manager.user_loader() @appli.state.manager.user_loader()
async def query_user(user: str) -> None | str | models.User: async def query_user(user: str) -> None | str | models.User:
@ -118,25 +117,41 @@ def create_manager(cookie_secret: str) -> LoginManager:
@repeat_every(seconds=120, logger=logger) @repeat_every(seconds=120, logger=logger)
async def recurring_tasks() -> None: async def recurring_tasks() -> None:
"""Recurring DB cleanup and watch-agents tasks""" """Recurring DB cleanup and watch-agents tasks"""
# If we are using gunicorn
if not hasattr(app.state, "SessionLocal"):
parent_process = Process(os.getppid())
children = parent_process.children(recursive=True)
# Start the task only once, not for every worker
if children[0].pid == os.getpid():
# and we need to setup database engine
setup_database(app)
else:
return None
set_log_level("info", quiet=True) set_log_level("info", quiet=True)
logger.info("Start background recurring tasks") logger.info("Start background recurring tasks")
with app.state.SessionLocal() as db: with app.state.SessionLocal() as db:
config = app.state.config.recurring_tasks config = app.state.config.recurring_tasks
removed = await queries.remove_old_results(db, config.max_results)
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
if agents == 0:
no_agent_alert(app.state.config)
logger.info("Agent presence checked")
removed = await queries.remove_old_results(db, config.max_results_age)
logger.info("%i result(s) removed", removed) logger.info("%i result(s) removed", removed)
updated = await queries.release_old_locks(db, config.max_lock_seconds) updated = await queries.release_old_locks(db, config.max_lock_seconds)
logger.info("%i lock(s) released", updated) logger.info("%i lock(s) released", updated)
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
if agents == 0:
no_agent_alert(app.state.config)
processed_jobs = await queries.process_jobs(db) processed_jobs = await queries.process_jobs(db)
logger.info("%i job(s) processed", processed_jobs) logger.info("%i job(s) processed", processed_jobs)
logger.info("Background recurring tasks ended") logger.info("Background recurring tasks ended")
return None
@asynccontextmanager @asynccontextmanager
async def lifespan(appli: FastAPI): async def lifespan(appli: FastAPI):

View file

@ -19,16 +19,10 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None: def upgrade() -> None:
enum = sa.Enum(
"RELOAD_CONFIG",
name="todo_enum",
create_type=False,
)
enum.create(op.get_bind(), checkfirst=True)
op.create_table( op.create_table(
"jobs", "jobs",
sa.Column("id", sa.Integer(), nullable=False), sa.Column("id", sa.Integer(), nullable=False),
sa.Column("todo", enum, nullable=False), sa.Column("todo", sa.Enum("RELOAD_CONFIG", name="todo_enum"), nullable=False),
sa.Column("args", sa.String(), nullable=False), sa.Column("args", sa.String(), nullable=False),
sa.Column( sa.Column(
"current", sa.Boolean(), server_default=sa.sql.false(), nullable=False "current", sa.Boolean(), server_default=sa.sql.false(), nullable=False
@ -40,4 +34,3 @@ def upgrade() -> None:
def downgrade() -> None: def downgrade() -> None:
op.drop_table("jobs") op.drop_table("jobs")
sa.Enum(name="todo_enum").drop(op.get_bind(), checkfirst=True)

View file

@ -4,7 +4,7 @@ from hashlib import sha256
from typing import List from typing import List
from urllib.parse import urljoin from urllib.parse import urljoin
from sqlalchemy import asc, desc, func, Select from sqlalchemy import asc, func, Select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from argos import schemas from argos import schemas
@ -409,28 +409,13 @@ async def reschedule_all(db: Session):
db.commit() db.commit()
async def remove_old_results(db: Session, max_results: int): async def remove_old_results(db: Session, max_results_age: float):
tasks = db.query(Task).all() """Remove old results, base on age"""
deleted = 0 max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age)
for task in tasks: deleted = (
# Get the id of the oldest result to keep db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete()
subquery = ( )
db.query(Result.id) db.commit()
.filter(Result.task_id == task.id)
.order_by(desc(Result.id))
.limit(max_results)
.subquery()
)
min_id = db.query(func.min(subquery.c.id)).scalar() # pylint: disable-msg=not-callable
# Delete all the results older than min_id
if min_id:
deleted += (
db.query(Result)
.where(Result.id < min_id, Result.task_id == task.id)
.delete()
)
db.commit()
return deleted return deleted

View file

@ -2,6 +2,8 @@ from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi_login import LoginManager from fastapi_login import LoginManager
from argos.logging import logger
auth_scheme = HTTPBearer() auth_scheme = HTTPBearer()
@ -33,12 +35,19 @@ async def verify_token(
return token return token
async def find_ldap_user(config, ldap, user: str) -> str | None: async def find_ldap_user(config, ldapobj, user: str) -> str | None:
"""Do a LDAP search for user and return its dn""" """Do a LDAP search for user and return its dn"""
import ldap
import ldap.filter as ldap_filter import ldap.filter as ldap_filter
from ldapurl import LDAP_SCOPE_SUBTREE from ldapurl import LDAP_SCOPE_SUBTREE
result = ldap.search_s( try:
ldapobj.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
except ldap.LDAPError as err: # pylint: disable-msg=no-member
logger.error("LDAP error: %s", err)
return None
result = ldapobj.search_s(
config.general.ldap.user_tree, config.general.ldap.user_tree,
LDAP_SCOPE_SUBTREE, LDAP_SCOPE_SUBTREE,
filterstr=ldap_filter.filter_format( filterstr=ldap_filter.filter_format(

View file

@ -90,6 +90,15 @@ async def post_login(
from ldap import INVALID_CREDENTIALS # pylint: disable-msg=no-name-in-module from ldap import INVALID_CREDENTIALS # pylint: disable-msg=no-name-in-module
from argos.server.routes.dependencies import find_ldap_user from argos.server.routes.dependencies import find_ldap_user
invalid_credentials = templates.TemplateResponse(
"login.html",
{
"request": request,
"msg": "Sorry, invalid username or bad password. "
"Or the LDAP server is unreachable (see logs to verify).",
},
)
ldap_dn = await find_ldap_user(config, request.app.state.ldap, username) ldap_dn = await find_ldap_user(config, request.app.state.ldap, username)
if ldap_dn is None: if ldap_dn is None:
return invalid_credentials return invalid_credentials

View file

@ -60,7 +60,7 @@ Options:
--max-tasks INTEGER Number of concurrent tasks this agent can run --max-tasks INTEGER Number of concurrent tasks this agent can run
--wait-time INTEGER Waiting time between two polls on the server --wait-time INTEGER Waiting time between two polls on the server
(seconds) (seconds)
--log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL] --log-level [debug|info|warning|error|critical]
--user-agent TEXT A custom string to append to the User-Agent --user-agent TEXT A custom string to append to the User-Agent
header header
--help Show this message and exit. --help Show this message and exit.

View file

@ -15,3 +15,7 @@ venv/bin/alembic -c argos/server/migrations/alembic.ini revision \
``` ```
Edit the created file to remove comments and adapt it to make sure the migration is complete (Alembic is not powerful enough to cover all the corner cases). Edit the created file to remove comments and adapt it to make sure the migration is complete (Alembic is not powerful enough to cover all the corner cases).
In case you want to add an `Enum` type and use it in an existing table, please have a look at [`argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py).
If you want to add an `Enum` type in a new table, you can do like in [`argos/server/migrations/versions/7d480e6f1112_initial_migrations.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/7d480e6f1112_initial_migrations.py)

View file

@ -90,7 +90,7 @@ User=argos
WorkingDirectory=/opt/argos/ WorkingDirectory=/opt/argos/
EnvironmentFile=/etc/default/argos-server EnvironmentFile=/etc/default/argos-server
ExecStartPre=/opt/argos/venv/bin/argos server migrate ExecStartPre=/opt/argos/venv/bin/argos server migrate
ExecStartPre=/opt/argos/venv/bin/argos server reload-config ExecStartPre=/opt/argos/venv/bin/argos server reload-config --enqueue
ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\ ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
--workers \$ARGOS_SERVER_WORKERS \\ --workers \$ARGOS_SERVER_WORKERS \\
--worker-class uvicorn.workers.UvicornWorker \\ --worker-class uvicorn.workers.UvicornWorker \\

View file

@ -33,6 +33,7 @@ dependencies = [
"Jinja2>=3.0,<4", "Jinja2>=3.0,<4",
"jsonpointer>=3.0,<4", "jsonpointer>=3.0,<4",
"passlib>=1.7.4,<2", "passlib>=1.7.4,<2",
"psutil>=5.9.8,<6",
"psycopg2-binary>=2.9,<3", "psycopg2-binary>=2.9,<3",
"pydantic[email]>=2.4,<3", "pydantic[email]>=2.4,<3",
"pydantic-settings>=2.0,<3", "pydantic-settings>=2.0,<3",

View file

@ -43,9 +43,11 @@ ssl:
# Argos will execute some tasks in the background for you # Argos will execute some tasks in the background for you
# every 2 minutes and needs some configuration for that # every 2 minutes and needs some configuration for that
recurring_tasks: recurring_tasks:
# Max number of results per tasks you want to keep # Maximum age of results
# Minimum value is 1, default is 100 # Use m for minutes, h for hours, d for days
max_results: 100 # w for weeks, M for months, y for years
# See https://github.com/timwedde/durations_nlp#scales-reference for details
max_results_age: "1d"
# Max number of seconds a task can be locked # Max number of seconds a task can be locked
# Minimum value is 61, default is 100 # Minimum value is 61, default is 100
max_lock_seconds: 100 max_lock_seconds: 100

View file

@ -10,9 +10,9 @@ from argos.server.models import Result, Task, User
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name
for _task in ten_tasks: for _task in ten_tasks:
for _ in range(5): for iterator in range(5):
result = Result( result = Result(
submitted_at=datetime.now(), submitted_at=datetime.now() - timedelta(seconds=iterator * 2),
status="success", status="success",
context={"foo": "bar"}, context={"foo": "bar"},
task=_task, task=_task,
@ -24,12 +24,12 @@ async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefi
# So we have 5 results per tasks # So we have 5 results per tasks
assert db.query(Result).count() == 50 assert db.query(Result).count() == 50
# Keep only 2 # Keep only those newer than 1 second ago
deleted = await queries.remove_old_results(db, 2) deleted = await queries.remove_old_results(db, 6)
assert deleted == 30 assert deleted == 20
assert db.query(Result).count() == 20 assert db.query(Result).count() == 30
for _task in ten_tasks: for _task in ten_tasks:
assert db.query(Result).filter(Result.task == _task).count() == 2 assert db.query(Result).filter(Result.task == _task).count() == 3
@pytest.mark.asyncio @pytest.mark.asyncio
@ -245,7 +245,7 @@ def empty_config():
), ),
ssl=schemas.config.SSL(thresholds=[]), ssl=schemas.config.SSL(thresholds=[]),
recurring_tasks=schemas.config.RecurringTasks( recurring_tasks=schemas.config.RecurringTasks(
max_results=100, max_results_age="6s",
max_lock_seconds=120, max_lock_seconds=120,
time_without_agent=300, time_without_agent=300,
), ),