argos/argos/server/routes/dependencies.py

58 lines
1.6 KiB
Python

from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi_login import LoginManager
auth_scheme = HTTPBearer()
def get_db(request: Request):
db = request.app.state.SessionLocal()
try:
yield db
finally:
db.close()
def get_config(request: Request):
return request.app.state.config
async def get_manager(request: Request) -> LoginManager:
if request.app.state.config.general.unauthenticated_access is not None:
return await request.app.state.manager.optional(request)
return await request.app.state.manager(request)
async def verify_token(
request: Request, token: HTTPAuthorizationCredentials = Depends(auth_scheme)
):
"""Verify agent token"""
if token.credentials not in request.app.state.config.service.secrets:
raise HTTPException(status_code=401, detail="Unauthorized")
return token
async def find_ldap_user(config, ldap, user: str) -> str | None:
"""Do a LDAP search for user and return its dn"""
import ldap.filter as ldap_filter
from ldapurl import LDAP_SCOPE_SUBTREE
result = ldap.search_s(
config.general.ldap.user_tree,
LDAP_SCOPE_SUBTREE,
filterstr=ldap_filter.filter_format(
f"(&(%s=%s){config.general.ldap.user_filter})",
[
config.general.ldap.user_attr,
user,
],
),
attrlist=[config.general.ldap.user_attr],
)
# If there is a result, there should, logically, be only one entry
if len(result) > 0:
return result[0][0]
return None