mirror of
https://framagit.org/framasoft/framaspace/argos.git
synced 2025-05-18 19:20:36 +02:00
Compare commits
11 commits
a48c7b74e6
...
211ac32028
Author | SHA1 | Date | |
---|---|---|---|
![]() |
211ac32028 | ||
![]() |
32f2518294 | ||
![]() |
38cc06e972 | ||
![]() |
4b78919937 | ||
![]() |
d8f30ebccd | ||
![]() |
09674f73ef | ||
![]() |
c63093bb2f | ||
![]() |
657624ed35 | ||
![]() |
471c1eae91 | ||
![]() |
c3708af32a | ||
![]() |
23fea9fffa |
16 changed files with 132 additions and 69 deletions
35
CHANGELOG.md
35
CHANGELOG.md
|
@ -2,11 +2,46 @@
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
- 🐛 — Fix worker timeout for old results cleaning in recurring tasks (#84)
|
||||
|
||||
💥 Old results are now removed by their age, not based on their number.
|
||||
|
||||
💥 Warning: `max_results` setting has been replaced by `max_results_age`, which is a duration.
|
||||
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
|
||||
a new example configuration file.
|
||||
|
||||
## 0.8.2
|
||||
|
||||
Date: 2025-02-18
|
||||
|
||||
- 🐛 — Fix recurring tasks with gunicorn
|
||||
|
||||
## 0.8.1
|
||||
|
||||
Date: 2025-02-18
|
||||
|
||||
- 🐛 — Fix todo enum in jobs table
|
||||
|
||||
## 0.8.0
|
||||
|
||||
Date: 2025-02-18
|
||||
|
||||
- ✨ — Allow to customize agent User-Agent header (#78)
|
||||
- 📝 — Document how to add data to requests (#77)
|
||||
- ✨ — No need cron tasks for DB cleaning anymore (#74 and #75)
|
||||
- ✨ — No need cron tasks for agents watching (#76)
|
||||
- ✨ — Reload configuration asynchronously (#79)
|
||||
- 🐛 — Automatically reconnect to LDAP if unreachable (#81)
|
||||
- 🐛 — Better httpx.RequestError handling (#83)
|
||||
|
||||
💥 Warning: there is new settings to add to your configuration file.
|
||||
Use `argos server generate-config > /etc/argos/config.yaml-dist` to generate
|
||||
a new example configuration file.
|
||||
|
||||
💥 You don’t need cron tasks anymore!
|
||||
Remove your old cron tasks as they will now do nothing but generating errors.
|
||||
|
||||
NB: You may want to add `--enqueue` to `reload-config` command in your systemd file.
|
||||
|
||||
## 0.7.4
|
||||
|
||||
|
|
|
@ -1 +1 @@
|
|||
VERSION = "0.7.4"
|
||||
VERSION = "0.8.2"
|
||||
|
|
|
@ -84,6 +84,7 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
|
|||
await asyncio.sleep(self.wait_time)
|
||||
|
||||
async def _do_request(self, group: str, details: dict):
|
||||
logger.debug("_do_request for group %s", group)
|
||||
headers = {}
|
||||
if details["request_data"] is not None:
|
||||
request_data = json.loads(details["request_data"])
|
||||
|
@ -120,6 +121,7 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
|
|||
)
|
||||
except httpx.ReadError:
|
||||
sleep(1)
|
||||
logger.warning("httpx.ReadError for group %s, re-emit request", group)
|
||||
if details["request_data"] is None or request_data["data"] is None:
|
||||
response = await http_client.request( # type: ignore[union-attr]
|
||||
method=details["method"], url=details["url"], timeout=60
|
||||
|
@ -138,6 +140,9 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
|
|||
data=request_data["data"],
|
||||
timeout=60,
|
||||
)
|
||||
except httpx.RequestError as err:
|
||||
logger.warning("httpx.RequestError for group %s", group)
|
||||
response = err
|
||||
|
||||
self._res_cache[group] = response
|
||||
|
||||
|
@ -147,15 +152,21 @@ class ArgosAgent: # pylint: disable-msg=too-many-instance-attributes
|
|||
|
||||
check_class = get_registered_check(task.check)
|
||||
check = check_class(task)
|
||||
result = await check.run(self._res_cache[task.task_group])
|
||||
|
||||
response = self._res_cache[task.task_group]
|
||||
if isinstance(response, httpx.Response):
|
||||
result = await check.run(response)
|
||||
status = result.status
|
||||
context = result.context
|
||||
|
||||
else:
|
||||
status = "failure"
|
||||
context = SerializableException.from_exception(response)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
status = "error"
|
||||
context = SerializableException.from_exception(err)
|
||||
msg = f"An exception occured when running {_task}. {err.__class__.__name__} : {err}"
|
||||
logger.error(msg)
|
||||
|
||||
return AgentResult(task_id=task.id, status=status, context=context)
|
||||
|
||||
async def _get_and_complete_tasks(self):
|
||||
|
|
|
@ -152,9 +152,11 @@ ssl:
|
|||
# Argos will execute some tasks in the background for you
|
||||
# every 2 minutes and needs some configuration for that
|
||||
recurring_tasks:
|
||||
# Max number of results per tasks you want to keep
|
||||
# Minimum value is 1, default is 100
|
||||
max_results: 100
|
||||
# Maximum age of results
|
||||
# Use m for minutes, h for hours, d for days
|
||||
# w for weeks, M for months, y for years
|
||||
# See https://github.com/timwedde/durations_nlp#scales-reference for details
|
||||
max_results_age: "1d"
|
||||
# Max number of seconds a task can be locked
|
||||
# Minimum value is 61, default is 100
|
||||
max_lock_seconds: 100
|
||||
|
|
|
@ -49,17 +49,14 @@ class SSL(BaseModel):
|
|||
|
||||
|
||||
class RecurringTasks(BaseModel):
|
||||
max_results: int
|
||||
max_results_age: float
|
||||
max_lock_seconds: int
|
||||
time_without_agent: int
|
||||
|
||||
@field_validator("max_results", mode="before")
|
||||
def parse_max_results(cls, value):
|
||||
"""Ensure that max_results is higher than 0"""
|
||||
if value >= 1:
|
||||
return value
|
||||
|
||||
return 100
|
||||
@field_validator("max_results_age", mode="before")
|
||||
def parse_max_results_age(cls, value):
|
||||
"""Convert the configured maximum results age to seconds"""
|
||||
return Duration(value).to_seconds()
|
||||
|
||||
@field_validator("max_lock_seconds", mode="before")
|
||||
def parse_max_lock_seconds(cls, value):
|
||||
|
|
|
@ -6,6 +6,7 @@ from fastapi import FastAPI
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi_login import LoginManager
|
||||
from fastapi_utils.tasks import repeat_every
|
||||
from psutil import Process
|
||||
from sqlalchemy import create_engine, event
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
@ -39,9 +40,7 @@ def get_application() -> FastAPI:
|
|||
if config.general.ldap is not None:
|
||||
import ldap
|
||||
|
||||
l = ldap.initialize(config.general.ldap.uri)
|
||||
l.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
|
||||
appli.state.ldap = l
|
||||
appli.state.ldap = ldap.initialize(config.general.ldap.uri)
|
||||
|
||||
@appli.state.manager.user_loader()
|
||||
async def query_user(user: str) -> None | str | models.User:
|
||||
|
@ -118,25 +117,41 @@ def create_manager(cookie_secret: str) -> LoginManager:
|
|||
@repeat_every(seconds=120, logger=logger)
|
||||
async def recurring_tasks() -> None:
|
||||
"""Recurring DB cleanup and watch-agents tasks"""
|
||||
# If we are using gunicorn
|
||||
if not hasattr(app.state, "SessionLocal"):
|
||||
parent_process = Process(os.getppid())
|
||||
children = parent_process.children(recursive=True)
|
||||
# Start the task only once, not for every worker
|
||||
if children[0].pid == os.getpid():
|
||||
# and we need to setup database engine
|
||||
setup_database(app)
|
||||
else:
|
||||
return None
|
||||
|
||||
set_log_level("info", quiet=True)
|
||||
logger.info("Start background recurring tasks")
|
||||
|
||||
with app.state.SessionLocal() as db:
|
||||
config = app.state.config.recurring_tasks
|
||||
removed = await queries.remove_old_results(db, config.max_results)
|
||||
|
||||
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
|
||||
if agents == 0:
|
||||
no_agent_alert(app.state.config)
|
||||
logger.info("Agent presence checked")
|
||||
|
||||
removed = await queries.remove_old_results(db, config.max_results_age)
|
||||
logger.info("%i result(s) removed", removed)
|
||||
|
||||
updated = await queries.release_old_locks(db, config.max_lock_seconds)
|
||||
logger.info("%i lock(s) released", updated)
|
||||
|
||||
agents = await queries.get_recent_agents_count(db, config.time_without_agent)
|
||||
if agents == 0:
|
||||
no_agent_alert(app.state.config)
|
||||
|
||||
processed_jobs = await queries.process_jobs(db)
|
||||
logger.info("%i job(s) processed", processed_jobs)
|
||||
|
||||
logger.info("Background recurring tasks ended")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(appli: FastAPI):
|
||||
|
|
|
@ -19,16 +19,10 @@ depends_on: Union[str, Sequence[str], None] = None
|
|||
|
||||
|
||||
def upgrade() -> None:
|
||||
enum = sa.Enum(
|
||||
"RELOAD_CONFIG",
|
||||
name="todo_enum",
|
||||
create_type=False,
|
||||
)
|
||||
enum.create(op.get_bind(), checkfirst=True)
|
||||
op.create_table(
|
||||
"jobs",
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("todo", enum, nullable=False),
|
||||
sa.Column("todo", sa.Enum("RELOAD_CONFIG", name="todo_enum"), nullable=False),
|
||||
sa.Column("args", sa.String(), nullable=False),
|
||||
sa.Column(
|
||||
"current", sa.Boolean(), server_default=sa.sql.false(), nullable=False
|
||||
|
@ -40,4 +34,3 @@ def upgrade() -> None:
|
|||
|
||||
def downgrade() -> None:
|
||||
op.drop_table("jobs")
|
||||
sa.Enum(name="todo_enum").drop(op.get_bind(), checkfirst=True)
|
||||
|
|
|
@ -4,7 +4,7 @@ from hashlib import sha256
|
|||
from typing import List
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from sqlalchemy import asc, desc, func, Select
|
||||
from sqlalchemy import asc, func, Select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from argos import schemas
|
||||
|
@ -409,26 +409,11 @@ async def reschedule_all(db: Session):
|
|||
db.commit()
|
||||
|
||||
|
||||
async def remove_old_results(db: Session, max_results: int):
|
||||
tasks = db.query(Task).all()
|
||||
deleted = 0
|
||||
for task in tasks:
|
||||
# Get the id of the oldest result to keep
|
||||
subquery = (
|
||||
db.query(Result.id)
|
||||
.filter(Result.task_id == task.id)
|
||||
.order_by(desc(Result.id))
|
||||
.limit(max_results)
|
||||
.subquery()
|
||||
)
|
||||
min_id = db.query(func.min(subquery.c.id)).scalar() # pylint: disable-msg=not-callable
|
||||
|
||||
# Delete all the results older than min_id
|
||||
if min_id:
|
||||
deleted += (
|
||||
db.query(Result)
|
||||
.where(Result.id < min_id, Result.task_id == task.id)
|
||||
.delete()
|
||||
async def remove_old_results(db: Session, max_results_age: float):
|
||||
"""Remove old results, base on age"""
|
||||
max_acceptable_time = datetime.now() - timedelta(seconds=max_results_age)
|
||||
deleted = (
|
||||
db.query(Result).filter(Result.submitted_at < max_acceptable_time).delete()
|
||||
)
|
||||
db.commit()
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ from fastapi import Depends, HTTPException, Request
|
|||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from fastapi_login import LoginManager
|
||||
|
||||
from argos.logging import logger
|
||||
|
||||
auth_scheme = HTTPBearer()
|
||||
|
||||
|
||||
|
@ -33,12 +35,19 @@ async def verify_token(
|
|||
return token
|
||||
|
||||
|
||||
async def find_ldap_user(config, ldap, user: str) -> str | None:
|
||||
async def find_ldap_user(config, ldapobj, user: str) -> str | None:
|
||||
"""Do a LDAP search for user and return its dn"""
|
||||
import ldap
|
||||
import ldap.filter as ldap_filter
|
||||
from ldapurl import LDAP_SCOPE_SUBTREE
|
||||
|
||||
result = ldap.search_s(
|
||||
try:
|
||||
ldapobj.simple_bind_s(config.general.ldap.bind_dn, config.general.ldap.bind_pwd)
|
||||
except ldap.LDAPError as err: # pylint: disable-msg=no-member
|
||||
logger.error("LDAP error: %s", err)
|
||||
return None
|
||||
|
||||
result = ldapobj.search_s(
|
||||
config.general.ldap.user_tree,
|
||||
LDAP_SCOPE_SUBTREE,
|
||||
filterstr=ldap_filter.filter_format(
|
||||
|
|
|
@ -90,6 +90,15 @@ async def post_login(
|
|||
from ldap import INVALID_CREDENTIALS # pylint: disable-msg=no-name-in-module
|
||||
from argos.server.routes.dependencies import find_ldap_user
|
||||
|
||||
invalid_credentials = templates.TemplateResponse(
|
||||
"login.html",
|
||||
{
|
||||
"request": request,
|
||||
"msg": "Sorry, invalid username or bad password. "
|
||||
"Or the LDAP server is unreachable (see logs to verify).",
|
||||
},
|
||||
)
|
||||
|
||||
ldap_dn = await find_ldap_user(config, request.app.state.ldap, username)
|
||||
if ldap_dn is None:
|
||||
return invalid_credentials
|
||||
|
|
|
@ -60,7 +60,7 @@ Options:
|
|||
--max-tasks INTEGER Number of concurrent tasks this agent can run
|
||||
--wait-time INTEGER Waiting time between two polls on the server
|
||||
(seconds)
|
||||
--log-level [DEBUG|INFO|WARNING|ERROR|CRITICAL]
|
||||
--log-level [debug|info|warning|error|critical]
|
||||
--user-agent TEXT A custom string to append to the User-Agent
|
||||
header
|
||||
--help Show this message and exit.
|
||||
|
|
|
@ -15,3 +15,7 @@ venv/bin/alembic -c argos/server/migrations/alembic.ini revision \
|
|||
```
|
||||
|
||||
Edit the created file to remove comments and adapt it to make sure the migration is complete (Alembic is not powerful enough to cover all the corner cases).
|
||||
|
||||
In case you want to add an `Enum` type and use it in an existing table, please have a look at [`argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/dcf73fa19fce_specify_check_method.py).
|
||||
|
||||
If you want to add an `Enum` type in a new table, you can do like in [`argos/server/migrations/versions/7d480e6f1112_initial_migrations.py`](https://framagit.org/framasoft/framaspace/argos/-/blob/main/argos/server/migrations/versions/7d480e6f1112_initial_migrations.py)
|
||||
|
|
|
@ -90,7 +90,7 @@ User=argos
|
|||
WorkingDirectory=/opt/argos/
|
||||
EnvironmentFile=/etc/default/argos-server
|
||||
ExecStartPre=/opt/argos/venv/bin/argos server migrate
|
||||
ExecStartPre=/opt/argos/venv/bin/argos server reload-config
|
||||
ExecStartPre=/opt/argos/venv/bin/argos server reload-config --enqueue
|
||||
ExecStart=/opt/argos/venv/bin/gunicorn "argos.server.main:get_application()" \\
|
||||
--workers \$ARGOS_SERVER_WORKERS \\
|
||||
--worker-class uvicorn.workers.UvicornWorker \\
|
||||
|
|
|
@ -33,6 +33,7 @@ dependencies = [
|
|||
"Jinja2>=3.0,<4",
|
||||
"jsonpointer>=3.0,<4",
|
||||
"passlib>=1.7.4,<2",
|
||||
"psutil>=5.9.8,<6",
|
||||
"psycopg2-binary>=2.9,<3",
|
||||
"pydantic[email]>=2.4,<3",
|
||||
"pydantic-settings>=2.0,<3",
|
||||
|
|
|
@ -43,9 +43,11 @@ ssl:
|
|||
# Argos will execute some tasks in the background for you
|
||||
# every 2 minutes and needs some configuration for that
|
||||
recurring_tasks:
|
||||
# Max number of results per tasks you want to keep
|
||||
# Minimum value is 1, default is 100
|
||||
max_results: 100
|
||||
# Maximum age of results
|
||||
# Use m for minutes, h for hours, d for days
|
||||
# w for weeks, M for months, y for years
|
||||
# See https://github.com/timwedde/durations_nlp#scales-reference for details
|
||||
max_results_age: "1d"
|
||||
# Max number of seconds a task can be locked
|
||||
# Minimum value is 61, default is 100
|
||||
max_lock_seconds: 100
|
||||
|
|
|
@ -10,9 +10,9 @@ from argos.server.models import Result, Task, User
|
|||
@pytest.mark.asyncio
|
||||
async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefined-outer-name
|
||||
for _task in ten_tasks:
|
||||
for _ in range(5):
|
||||
for iterator in range(5):
|
||||
result = Result(
|
||||
submitted_at=datetime.now(),
|
||||
submitted_at=datetime.now() - timedelta(seconds=iterator * 2),
|
||||
status="success",
|
||||
context={"foo": "bar"},
|
||||
task=_task,
|
||||
|
@ -24,12 +24,12 @@ async def test_remove_old_results(db, ten_tasks): # pylint: disable-msg=redefi
|
|||
|
||||
# So we have 5 results per tasks
|
||||
assert db.query(Result).count() == 50
|
||||
# Keep only 2
|
||||
deleted = await queries.remove_old_results(db, 2)
|
||||
assert deleted == 30
|
||||
assert db.query(Result).count() == 20
|
||||
# Keep only those newer than 1 second ago
|
||||
deleted = await queries.remove_old_results(db, 6)
|
||||
assert deleted == 20
|
||||
assert db.query(Result).count() == 30
|
||||
for _task in ten_tasks:
|
||||
assert db.query(Result).filter(Result.task == _task).count() == 2
|
||||
assert db.query(Result).filter(Result.task == _task).count() == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
@ -245,7 +245,7 @@ def empty_config():
|
|||
),
|
||||
ssl=schemas.config.SSL(thresholds=[]),
|
||||
recurring_tasks=schemas.config.RecurringTasks(
|
||||
max_results=100,
|
||||
max_results_age="6s",
|
||||
max_lock_seconds=120,
|
||||
time_without_agent=300,
|
||||
),
|
||||
|
|
Loading…
Reference in a new issue