import asyncio import subprocess import click from argos import logging from argos.agent import ArgosAgent @click.group() def cli(): pass @cli.group() def server(): pass @cli.command() @click.argument("server") @click.argument("auth") @click.option( "--max-tasks", default=10, help="Number of concurrent tasks this agent can run", ) @click.option( "--wait-time", default=10, help="Waiting time between two polls on the server (seconds)", ) @click.option( "--log-level", default="INFO", type=click.Choice(logging.LOG_LEVELS, case_sensitive=False), ) def agent(server, auth, max_tasks, wait_time, log_level): """Get and run tasks to the provided server. Will wait for new tasks. Usage: argos agent https://argos.server "auth-token-here" """ click.echo("Starting argos agent. Will retry forever.") from argos.logging import logger logger.setLevel(log_level) agent = ArgosAgent(server, auth, max_tasks, wait_time) asyncio.run(agent.run()) @server.command() @click.option("--host", default="127.0.0.1", help="Host to bind") @click.option("--port", default=8000, type=int, help="Port to bind") @click.option("--reload", is_flag=True, help="Enable hot reloading") @click.option("--log-config", help="Path to the logging configuration file") def start(host, port, reload, log_config): """Starts the server.""" command = ["uvicorn", "argos.server:app", "--host", host, "--port", str(port)] if reload: command.append("--reload") if log_config: command.extend(["--log-config", log_config]) subprocess.run(command) @server.command() @click.option("--max-results", default=100, help="Number of results per tasks to keep") @click.option( "--max-lock-seconds", default=100, help="The number of seconds after which a lock is considered stale", ) def cleandb(max_results, max_lock_seconds): """Clean the database (to run routinely) - Removes old results from the database. - Removes locks from tasks that have been locked for too long. """ # The imports are made here otherwise the agent will need server configuration files. from argos.server import queries from argos.server.main import connect_to_db, get_application, setup_database async def clean_old_results(): app = get_application() setup_database(app) db = await connect_to_db(app) removed = await queries.remove_old_results(db, max_results) updated = await queries.release_old_locks(db, max_lock_seconds) click.echo(f"{removed} results removed") click.echo(f"{updated} locks released") asyncio.run(clean_old_results()) if __name__ == "__main__": cli()