Source code for actingweb.auth

import base64
import logging
import math
from datetime import datetime

from pynamodb.exceptions import DoesNotExist, PutError, UpdateError

from actingweb import actor, request_context, trust
from actingweb import config as config_class
from actingweb.constants import TRUSTEE_CREATOR

# This is where each path and subpath in actingweb is assigned an authentication type
# Currently only basic auth is supported. OAuth2 peer auth is automatic if an Authorization Bearer <token> header is
# included in the http request.

logger = logging.getLogger(__name__)


[docs] def add_auth_response(appreq=None, auth_obj=None): """Called after authentication to set appropriate HTTP response based on auth result.""" if not appreq or not auth_obj: return False logger.debug( "add_auth_response: " + str(auth_obj.response["code"]) + ":" + auth_obj.response["text"] ) logger.debug( f"add_auth_response: auth_obj.redirect = {getattr(auth_obj, 'redirect', None)}" ) appreq.response.set_status(auth_obj.response["code"], auth_obj.response["text"]) if auth_obj.response["code"] == 302: logger.debug(f"add_auth_response: Setting redirect to {auth_obj.redirect}") appreq.response.set_redirect(url=auth_obj.redirect) elif auth_obj.response["code"] == 401: if hasattr(appreq, "response") and appreq.response: if hasattr(appreq.response, "write"): appreq.response.write("Authentication required") else: appreq.response.body = "Authentication required" for h, v in list(auth_obj.response["headers"].items()): if hasattr(appreq, "response") and appreq.response: appreq.response.headers[h] = v return True
[docs] class Auth: """The auth class handles authentication and authorisation for the various schemes supported. The check_authentication() function checks the various authentication schemes against the path and does proper authentication. There are two types supported: basic (using creator credentials) and token (received when trust is created or OAuth2). The check_authorisation() function validates the authenticated user against the config.py access list. check_token_auth() can be called from outside the class to do a simple peer/bearer token verification. The response[], acl[], and authn_done variables are useful outside Auth(). authn_done is set when authentication has been done and a final authentication status can be found in response[]. self.response = { "code": 403, # Result code (http) "text": "Forbidden", # Proposed response text "headers": [], # Headers to add to response after authentication has been done } self.acl = { "authenticated": False, # Has authentication been verified and passed? "authorised": False, # Has authorisation been done and appropriate acls set? "rights": '', # "a", "r" (approve or reject) "relationship": None, # E.g. creator, friend, admin, etc "peerid": '', # Peerid if there is a relationship "approved": False, # True if the peer is approved } """ def __init__(self, actor_id, auth_type="basic", config=None): if not config: self.config = config_class.Config() else: self.config = config self.token = None self.type = auth_type self.trust = None # Proposed response code after check_authentication() or authorise() have been called self.response = {"code": 403, "text": "Forbidden", "headers": {}} # Whether authentication is complete or not (depends on flow) self.authn_done = False # acl stores the actual verified credentials and access rights after # authentication and authorisation have been done self.acl = { "authenticated": False, # Has authentication been verified and passed? "authorised": False, # Has authorisation been done and appropriate acls set? "rights": "", # "a", "r" (approve or reject) "relationship": None, # E.g. creator, friend, admin, etc "peerid": "", # Peerid if there is a relationship "approved": False, # True if the peer is approved } self.actor = actor.Actor(actor_id, config=self.config) if not self.actor.id: self.actor = None return if self.type == "basic": self.realm = self.config.auth_realm def __check_basic_auth_creator(self, appreq): if self.type != "basic": logger.warning("Trying to do basic auth when auth type is not basic") self.response["code"] = 403 self.response["text"] = "Forbidden" return False if not self.actor or not self.actor.passphrase: logger.warning( "Trying to do basic auth when no passphrase value can be found." ) self.response["code"] = 403 self.response["text"] = "Forbidden" return False if "Authorization" not in appreq.request.headers: self.response["headers"]["WWW-Authenticate"] = ( 'Basic realm="' + self.realm + '"' ) self.response["code"] = 401 self.response["text"] = "Authentication required" return False authz = appreq.request.headers["Authorization"] (basic, _) = authz.split(" ") if basic.lower() != "basic": self.response["code"] = 403 self.response["text"] = "No basic auth in Authorization header" logger.debug("No basic auth in Authorization header") return False self.authn_done = True au = authz.split(" ")[1] au = au.encode("utf-8") au = base64.b64decode(au) (username, password) = au.split(b":") password = password.decode("utf-8") username = username.decode("utf-8") if not self.actor or username != self.actor.creator: self.response["code"] = 403 self.response["text"] = "Invalid username or password" logger.debug("Wrong creator username") return False if not self.actor or password != self.actor.passphrase: self.response["code"] = 403 self.response["text"] = "Invalid username or password" logger.debug( "Wrong creator passphrase(" + password + ") correct(" + (self.actor.passphrase if self.actor else "") + ")" ) return False self.acl["relationship"] = "creator" self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" request_context.set_peer_id("") # Creator has no peer_id return True def _record_trust_usage(self, trust_record, via_hint: str | None = None) -> None: """Persist usage metadata for bearer-token trusts.""" if not self.actor or not trust_record: return peer_id = trust_record.get("peerid") if not peer_id: return try: usage_time = datetime.utcnow().isoformat() modify_kwargs = {"last_accessed": usage_time} if not trust_record.get("created_at"): modify_kwargs["created_at"] = usage_time if via_hint: canonical_via = trust.canonical_connection_method(via_hint) if canonical_via: modify_kwargs["last_connected_via"] = canonical_via elif trust_record.get("last_connected_via"): canonical_via = trust.canonical_connection_method( trust_record.get("last_connected_via") ) if canonical_via: modify_kwargs["last_connected_via"] = canonical_via if via_hint and not trust_record.get("established_via"): modify_kwargs["established_via"] = via_hint db_trust = trust.Trust( actor_id=self.actor.id, peerid=peer_id, config=self.config ) db_trust.modify(**modify_kwargs) # type: ignore[arg-type] trust_record["last_accessed"] = usage_time trust_record["last_connected_at"] = usage_time if "last_connected_via" in modify_kwargs: trust_record["last_connected_via"] = modify_kwargs["last_connected_via"] if "created_at" in modify_kwargs: trust_record["created_at"] = usage_time if "established_via" in modify_kwargs: trust_record["established_via"] = modify_kwargs["established_via"] except (PutError, UpdateError) as e: logger.warning(f"Database error recording trust usage metadata: {e}") except DoesNotExist: logger.warning("Trust record no longer exists, skipping metadata update") except Exception as e: logger.error( f"Unexpected error recording trust usage metadata: {e}", exc_info=True )
[docs] def check_token_auth(self, appreq, via_hint: str | None = None): """Validate bearer tokens and optionally record how the connection occurred.""" if "Authorization" not in appreq.request.headers: return False auth = appreq.request.headers["Authorization"] auth_parts = auth.split(" ") if len(auth_parts) != 2 or auth_parts[0].lower() != "bearer": return False token = auth_parts[1] self.authn_done = True # First, try OAuth2 authentication if configured if self._check_oauth2_token(token): return True trustee = ( self.actor.store.trustee_root if self.actor and self.actor.store else None ) # If trustee_root is set, creator name is 'trustee' and # bit strength of passphrase is > 80, use passphrase as # token if ( trustee and self.actor and self.actor.creator and self.actor.creator.lower() == TRUSTEE_CREATOR ): if ( self.actor.passphrase and math.floor(len(self.actor.passphrase) * math.log(94, 2)) > 80 ): if token == self.actor.passphrase: self.acl["relationship"] = TRUSTEE_CREATOR self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = self.actor.passphrase if self.actor else None request_context.set_peer_id("") return True else: logger.warning( "Attempted trustee bearer token auth with <80 bit strength token." ) tru = trust.Trust( actor_id=self.actor.id if self.actor else None, token=token, config=self.config, ) new_trust = tru.get() if new_trust: logger.debug(f"Found trust for peer: {new_trust.get('peerid', 'unknown')}") if self.actor and new_trust["peerid"] == self.actor.id: logger.error("Peer == actor!!") return False if new_trust and len(new_trust) > 0: self.acl["relationship"] = new_trust["relationship"] self.acl["peerid"] = new_trust["peerid"] self.acl["approved"] = new_trust["approved"] self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = new_trust["secret"] self.trust = new_trust self._record_trust_usage(new_trust, via_hint=via_hint or "trust") request_context.set_peer_id(new_trust["peerid"]) return True else: return False
def _check_oauth2_token(self, token): """Check if the Bearer token is a valid OAuth2 token and authenticate user.""" try: # First, check if this is an ActingWeb SPA token if self._check_spa_token(token): return True # Quick heuristic to avoid network calls for obvious trust secrets if not self._looks_like_oauth2_token(token): logger.debug("Token doesn't look like OAuth2, skipping validation") return False from .oauth2 import create_oauth2_authenticator authenticator = create_oauth2_authenticator(self.config) if not authenticator.is_enabled(): return False # Validate token and get user info user_info = authenticator.validate_token_and_get_user_info(token) if not user_info: return False # Extract email from user info email = authenticator.get_email_from_user_info(user_info) if not email: return False # For OAuth2, we authenticate users based on their email # The actor lookup is handled at the endpoint level, not here in auth # Here we just validate that the token is valid and get the email # Check if this is the correct actor for this email (when actor_id is provided in URL) if ( self.actor and self.actor.creator and self.actor.creator.lower() == email.lower() ): # This is the correct actor for this email self.acl["relationship"] = "creator" self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = token request_context.set_peer_id("") if logger.isEnabledFor(logging.DEBUG): logger.debug(f"OAuth2 authentication successful for {email}") return True else: # Email doesn't match this actor - this could be: # 1. Wrong actor for this user # 2. New user (actor creation flow handles this) # 3. Factory endpoint (no specific actor yet) logger.debug( f"OAuth2 email {email} doesn't match actor creator {self.actor.creator if self.actor else 'None'}" ) # For factory endpoint or when no actor is loaded, we still consider auth successful # The endpoint handler will use get_by_creator() to find/create the right actor if not self.actor: self.acl["relationship"] = "creator" self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = token request_context.set_peer_id("") logger.debug( f"OAuth2 authentication successful for {email} (no specific actor)" ) return True return False except Exception as e: logger.error(f"Error during OAuth2 token validation: {e}") return False def _check_spa_token(self, token): """Check if the Bearer token is a valid ActingWeb SPA token. SPA tokens are generated by ActingWeb's /oauth/spa/token endpoint and stored in the session manager. These are ActingWeb's own tokens, not OAuth provider tokens. """ try: from .oauth_session import get_oauth2_session_manager session_manager = get_oauth2_session_manager(self.config) token_data = session_manager.validate_access_token(token) if not token_data: return False actor_id = token_data.get("actor_id") if not actor_id: return False # Verify the token is for this actor (if we have one loaded) if self.actor and self.actor.id and self.actor.id != actor_id: logger.debug( f"SPA token actor {actor_id} doesn't match requested actor {self.actor.id}" ) return False # If no actor was loaded, try to load the one from the token if not self.actor or not self.actor.id: from . import actor self.actor = actor.Actor(actor_id, config=self.config) if not self.actor.id: logger.warning( f"SPA token references non-existent actor {actor_id}" ) return False # SPA token is valid and for the correct actor self.acl["relationship"] = "creator" self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = token request_context.set_peer_id("") logger.debug(f"SPA token authentication successful for actor {actor_id}") return True except Exception as e: logger.debug(f"SPA token check failed: {e}") return False def _looks_like_oauth2_token(self, token: str) -> bool: """Quick heuristic to check if token might be an OAuth2 token. This avoids making network calls to OAuth providers for tokens that are clearly trust secrets (short hex strings) or other non-OAuth tokens. OAuth2 tokens are typically: - Google: 100+ chars, JWT-like or opaque - GitHub: 40+ chars, starts with 'gho_', 'ghu_', etc. - SPA tokens: UUID format (already checked separately) Trust secrets are typically: - 40 char hex strings (SHA-1) - Sometimes shorter random strings Returns: True if the token might be an OAuth2 token, False if definitely not """ if not token: return False token_len = len(token) # Trust secrets are typically 40 hex chars or shorter # If it's a short hex-only string, it's likely a trust secret if token_len <= 45 and all(c in "0123456789abcdef" for c in token.lower()): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Token looks like trust secret (hex, {token_len} chars)") return False # GitHub tokens have distinctive prefixes if token.startswith(("gho_", "ghu_", "ghs_", "ghr_")): return True # Very short tokens are unlikely to be OAuth2 if token_len < 20: return False # Long tokens are likely OAuth2 if token_len > 80: return True # JWTs have 3 dot-separated parts if token.count(".") == 2: return True # Default to potentially OAuth2 for medium-length tokens return True async def _check_oauth2_token_async(self, token): """Async version: Check if the Bearer token is a valid OAuth2 token.""" try: # First, check if this is an ActingWeb SPA token (sync - no network) if self._check_spa_token(token): return True # Quick heuristic to avoid network calls for obvious trust secrets if not self._looks_like_oauth2_token(token): logger.debug("Token doesn't look like OAuth2, skipping validation") return False from .oauth2 import create_oauth2_authenticator authenticator = create_oauth2_authenticator(self.config) if not authenticator.is_enabled(): return False # Use async validation for non-blocking network request user_info = await authenticator.validate_token_and_get_user_info_async( token ) if not user_info: return False # Extract email from user info email = authenticator.get_email_from_user_info(user_info) if not email: return False # Check if this is the correct actor for this email if ( self.actor and self.actor.creator and self.actor.creator.lower() == email.lower() ): self.acl["relationship"] = "creator" self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = token if logger.isEnabledFor(logging.DEBUG): logger.debug(f"OAuth2 async authentication successful for {email}") return True else: logger.debug( f"OAuth2 email {email} doesn't match actor creator {self.actor.creator if self.actor else 'None'}" ) if not self.actor: self.acl["relationship"] = "creator" self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = token logger.debug( f"OAuth2 async authentication successful for {email} (no specific actor)" ) return True return False except Exception as e: logger.error(f"Error during async OAuth2 token validation: {e}") return False
[docs] async def check_token_auth_async(self, appreq, via_hint: str | None = None): """Async version: Validate bearer tokens without blocking the event loop.""" if "Authorization" not in appreq.request.headers: return False auth = appreq.request.headers["Authorization"] auth_parts = auth.split(" ") if len(auth_parts) != 2 or auth_parts[0].lower() != "bearer": return False token = auth_parts[1] self.authn_done = True # First, try OAuth2 authentication if configured (async version) if await self._check_oauth2_token_async(token): return True # Fall through to sync trust token validation (no network calls) trustee = ( self.actor.store.trustee_root if self.actor and self.actor.store else None ) if ( trustee and self.actor and self.actor.creator and self.actor.creator.lower() == TRUSTEE_CREATOR ): if ( self.actor.passphrase and math.floor(len(self.actor.passphrase) * math.log(94, 2)) > 80 ): if token == self.actor.passphrase: self.acl["relationship"] = TRUSTEE_CREATOR self.acl["peerid"] = "" self.acl["approved"] = True self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = self.actor.passphrase if self.actor else None request_context.set_peer_id("") return True else: logger.warning( "Attempted trustee bearer token auth with <80 bit strength token." ) tru = trust.Trust( actor_id=self.actor.id if self.actor else None, token=token, config=self.config, ) new_trust = tru.get() if new_trust: logger.debug(f"Found trust for peer: {new_trust.get('peerid', 'unknown')}") if self.actor and new_trust["peerid"] == self.actor.id: logger.error("Peer == actor!!") return False if new_trust and len(new_trust) > 0: self.acl["relationship"] = new_trust["relationship"] self.acl["peerid"] = new_trust["peerid"] self.acl["approved"] = new_trust["approved"] self.acl["authenticated"] = True self.response["code"] = 200 self.response["text"] = "Ok" self.token = new_trust["secret"] self.trust = new_trust self._record_trust_usage(new_trust, via_hint=via_hint or "trust") request_context.set_peer_id(new_trust["peerid"]) return True else: return False
def _should_redirect_to_oauth2(self, appreq, path): """Check if we should redirect to OAuth2 for authentication.""" try: from .oauth2 import create_oauth2_authenticator authenticator = create_oauth2_authenticator(self.config) if not authenticator.is_enabled(): return False # Don't redirect for OAuth callback URLs to avoid infinite loops if "/oauth/callback" in path: return False # When multiple providers are configured, redirect to the factory # page (/login) so the user can choose their provider. providers_cfg = getattr(self.config, "oauth_providers", {}) if len(providers_cfg) > 1: login_url = f"{self.config.proto}{self.config.fqdn}/" self.authn_done = True self.response["code"] = 302 self.response["text"] = "Redirecting to login" self.redirect = login_url logger.debug("Redirecting to factory login (multi-provider)") return True # Single provider: redirect directly to OAuth2 original_url = self._get_original_url(appreq, path) auth_url = authenticator.create_authorization_url( redirect_after_auth=original_url ) if auth_url: self.authn_done = True self.response["code"] = 302 self.response["text"] = "Redirecting to OAuth2" self.redirect = auth_url logger.debug(f"Redirecting to OAuth2: {auth_url[:100]}...") return True except Exception as e: logger.error(f"Error creating OAuth2 redirect: {e}") return False def _get_original_url(self, appreq, path): """Get the original URL being accessed for redirect after auth.""" try: # Try to construct the original URL if hasattr(appreq, "request") and hasattr(appreq.request, "url"): return str(appreq.request.url) elif hasattr(appreq, "request") and hasattr(appreq.request, "uri"): return str(appreq.request.uri) else: # Fallback to constructing from config and path return f"{self.config.proto}{self.config.fqdn}{path}" except Exception: # Last resort fallback return f"{self.config.proto}{self.config.fqdn}{path}"
[docs] def check_authentication(self, appreq, path): """Checks authentication in appreq, redirecting back to path if oauth is done.""" logger.debug( f"Checking authentication for path: {path}, auth type: {self.type}" ) logger.debug("Checking authentication, token auth...") via_hint = self._connection_hint_from_path(path) if self.check_token_auth(appreq, via_hint=via_hint): logger.debug("Token auth succeeded") return elif self.type == "basic": logger.debug("Auth type is 'basic', checking basic authentication...") if self.__check_basic_auth_creator(appreq=appreq): logger.debug( "Basic auth succeeded, response code: %s", self.response["code"] ) return else: # Basic auth failed - mark as done and don't fall through to OAuth2 logger.debug( "Basic auth failed, response code: %s", self.response["code"] ) self.authn_done = True return # If all authentication methods fail, try OAuth2 redirect if configured logger.debug("All auth methods failed, checking OAuth2 redirect...") if self._should_redirect_to_oauth2(appreq, path): logger.debug("OAuth2 redirect triggered") return logger.debug("Authentication done, and failed") self.authn_done = True self.response["code"] = 403 self.response["text"] = "Forbidden" return
def _connection_hint_from_path(self, path: str | None) -> str | None: if not path: return None lowered = path.lower() if lowered.startswith("/mcp"): return "mcp" if lowered.startswith("/subscriptions") or lowered.startswith("/subscription"): return "subscription" if lowered.startswith("/trust") or lowered.startswith("/www/trust"): return "trust" return None
[docs] def check_authorisation( self, path="", subpath="", method="", peerid="", approved=True ): """Checks if the authenticated user has acl access rights in config.py. Takes the path, subpath, method, and peerid of the path (if auth user is different from the peer that owns the path, e.g. creator). If approved is False, then the trust relationship does not need to be approved for access""" # For DELETE operations on trust relationships, always allow deletion regardless of approval status # This ensures that broken or partially approved relationships can still be cleaned up if ( len(self.acl["peerid"]) > 0 and approved and self.acl["approved"] is False and not (path.lower() == "trust" and method.upper() == "DELETE") ): logger.debug( "Rejected authorization because trust relationship is not approved." ) return False if self.acl["relationship"]: relationship = self.acl["relationship"].lower() else: relationship = "" method = method.upper() self.acl["authorised"] = True self.acl["rights"] = "r" if len(path) == 0: return False if not subpath: subpath = "" fullpath = path.lower() + "/" + subpath.lower() # ACLs: ('role', 'path', 'METHODS', 'access') logger.debug( "Testing access for (" + relationship + " " + self.acl["peerid"] + ") on (" + fullpath + " " + peerid + ") using method " + method ) for acl in self.config.access: if acl[0] == "any" and not self.acl["authenticated"]: continue if ( len(acl[0]) > 0 and acl[0] != "any" and acl[0] != relationship and acl[0] != "owner" ): continue # no match on relationship if ( acl[0] == relationship or acl[0] == "any" or len(acl[0]) == 0 or ( acl[0] == "owner" and len(peerid) > 0 and self.acl["peerid"] == peerid ) ): if fullpath.find(acl[1]) == 0: if len(acl[2]) == 0 or acl[2].find(method) != -1: self.acl["rights"] = acl[3] logger.debug( "Granted " + acl[3] + " access with ACL:" + str(acl) ) return True return False
[docs] def check_and_verify_auth(appreq=None, actor_id=None, config=None): """Check and verify authentication for non-ActingWeb routes. This function provides authentication verification for custom routes that don't go through the standard ActingWeb handler system. It performs authentication checks and is designed for use in custom application routes. Args: appreq: Request object in the format used by ActingWeb handlers. actor_id (str | None): Actor ID to verify authentication against. config (Config | None): ActingWeb config object. Returns: dict: A dictionary with the following keys: - ``authenticated`` (bool): True if authentication successful. - ``actor`` (Actor | None): Actor object when authenticated, otherwise None. - ``auth`` (Auth): Auth object with authentication details. - ``response`` (dict): Response details: ``{"code": int, "text": str, "headers": dict}``. - ``redirect`` (str | None): Redirect URL if authentication requires redirect. Example: .. code-block:: python auth_result = check_and_verify_auth(appreq, actor_id, config) if not auth_result['authenticated']: if auth_result['response']['code'] == 302: # Redirect for OAuth return redirect(auth_result['redirect']) # Return error response return error_response( auth_result['response']['code'], auth_result['response']['text'] ) # Authentication successful, use auth_result['actor'] actor = auth_result['actor'] """ if not config: config = config_class.Config() # Use basic auth type for custom routes (supports both basic and Bearer token auth) auth_obj = Auth(actor_id, auth_type="basic", config=config) result = { "authenticated": False, "actor": None, "auth": auth_obj, "response": {"code": 403, "text": "Forbidden", "headers": {}}, "redirect": None, } if not auth_obj.actor: result["response"] = {"code": 404, "text": "Actor not found", "headers": {}} return result # Check authentication without modifying the response object auth_obj.check_authentication(appreq=appreq, path="/custom") # Copy response details result["response"] = { "code": auth_obj.response["code"], "text": auth_obj.response["text"], "headers": auth_obj.response["headers"].copy(), } # Set redirect if needed if hasattr(auth_obj, "redirect") and auth_obj.redirect: result["redirect"] = auth_obj.redirect # Check if authentication was successful if auth_obj.acl["authenticated"] and auth_obj.response["code"] == 200: result["authenticated"] = True result["actor"] = auth_obj.actor return result
[docs] async def check_and_verify_auth_async(appreq=None, actor_id=None, config=None): """Async version: Check and verify authentication for non-ActingWeb routes. This async function provides authentication verification for custom routes that don't go through the standard ActingWeb handler system. It uses async HTTP calls to avoid blocking the event loop during OAuth2 token validation. Use this in async FastAPI endpoints instead of check_and_verify_auth() when the endpoint might receive OAuth2 tokens that need validation against the provider. Args: appreq: Request object in the format used by ActingWeb handlers. actor_id (str | None): Actor ID to verify authentication against. config (Config | None): ActingWeb config object. Returns: dict: A dictionary with the following keys: - ``authenticated`` (bool): True if authentication successful. - ``actor`` (Actor | None): Actor object when authenticated, otherwise None. - ``auth`` (Auth): Auth object with authentication details. - ``response`` (dict): Response details: ``{"code": int, "text": str, "headers": dict}``. - ``redirect`` (str | None): Redirect URL if authentication requires redirect. Example: .. code-block:: python auth_result = await check_and_verify_auth_async(appreq, actor_id, config) if not auth_result['authenticated']: if auth_result['response']['code'] == 302: return RedirectResponse(auth_result['redirect']) return JSONResponse( status_code=auth_result['response']['code'], content={"error": auth_result['response']['text']} ) # Authentication successful, use auth_result['actor'] actor = auth_result['actor'] """ if not config: config = config_class.Config() # Use basic auth type for custom routes (supports both basic and Bearer token auth) auth_obj = Auth(actor_id, auth_type="basic", config=config) result = { "authenticated": False, "actor": None, "auth": auth_obj, "response": {"code": 403, "text": "Forbidden", "headers": {}}, "redirect": None, } if not auth_obj.actor: result["response"] = {"code": 404, "text": "Actor not found", "headers": {}} return result # Use async token auth to avoid blocking during OAuth2 validation via_hint = auth_obj._connection_hint_from_path("/custom") if await auth_obj.check_token_auth_async(appreq, via_hint=via_hint): result["authenticated"] = True result["actor"] = auth_obj.actor result["response"] = { "code": auth_obj.response["code"], "text": auth_obj.response["text"], "headers": auth_obj.response["headers"].copy(), } return result # Token auth failed, try basic auth if available (sync, no network) if auth_obj.type == "basic": auth_obj.check_authentication(appreq=appreq, path="/custom") # Copy response details result["response"] = { "code": auth_obj.response["code"], "text": auth_obj.response["text"], "headers": auth_obj.response["headers"].copy(), } # Set redirect if needed if hasattr(auth_obj, "redirect") and auth_obj.redirect: result["redirect"] = auth_obj.redirect # Check if authentication was successful if auth_obj.acl["authenticated"] and auth_obj.response["code"] == 200: result["authenticated"] = True result["actor"] = auth_obj.actor return result