Source code for actingweb.handlers.base_handler

from typing import TYPE_CHECKING, Any, Optional

from actingweb import aw_web_request
from actingweb import config as config_class

if TYPE_CHECKING:
    from actingweb.interface.actor_interface import ActorInterface
    from actingweb.interface.authenticated_views import AuthenticatedActorView
    from actingweb.interface.hooks import HookRegistry


[docs] class BaseHandler: def __init__( self, webobj: aw_web_request.AWWebObj = aw_web_request.AWWebObj(), config: config_class.Config = config_class.Config(), hooks: Optional["HookRegistry"] = None, ) -> None: self.request = webobj.request self.response = webobj.response self.config = config self.hooks = hooks def _get_actor_interface(self, actor) -> Optional["ActorInterface"]: """Get ActorInterface wrapper for given actor.""" if actor: from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) return ActorInterface(actor, service_registry=registry, hooks=self.hooks) return None def _get_authenticated_view( self, actor, auth_result: "AuthResult" ) -> "AuthenticatedActorView | None": """Create an authenticated view of an actor with permission enforcement. This method wraps the actor in an AuthenticatedActorView that enforces permission checks based on the authentication context. Args: actor: The actor to wrap auth_result: The AuthResult from authentication Returns: AuthenticatedActorView with permission enforcement, or None if actor is None """ if not actor: return None from actingweb.interface.actor_interface import ActorInterface from actingweb.interface.authenticated_views import ( AuthContext, AuthenticatedActorView, ) # Get peer/client info from auth object peer_id = "" client_id = "" trust_relationship: dict[str, Any] = {} if auth_result.auth_obj and hasattr(auth_result.auth_obj, "acl"): acl = auth_result.auth_obj.acl peer_id = acl.get("peerid", "") or "" # For OAuth2 clients, use the client_id from ACL if not peer_id and acl.get("oauth2_client"): client_id = acl.get("oauth2_client", "") # Extract trust relationship info if available if peer_id: trust_relationship = { "peer_id": peer_id, "relationship": acl.get("relationship", ""), "approved": acl.get("approved", False), } # Create auth context auth_context = AuthContext( peer_id=peer_id, client_id=client_id, trust_relationship=trust_relationship, ) # Get or create ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface( actor, service_registry=registry, hooks=self.hooks ) return AuthenticatedActorView(actor_interface, auth_context, self.hooks) def _authenticate_dual_context( self, actor_id: str, api_path: str, web_subpath: str, name: str = "", add_response: bool = True, ) -> "AuthResult": """ Authenticate supporting both web UI (OAuth) and API (basic) access. This method detects whether a request is coming from the web UI (by checking for oauth_token cookie) and chooses the appropriate authentication path: - Web UI requests: path="www", subpath=web_subpath (OAuth authentication) - API requests: path=api_path, subpath=name (basic authentication) Args: actor_id: The actor ID api_path: Path to use for API authentication (e.g., "properties", "methods") web_subpath: Subpath to use for web UI authentication (e.g., "properties", "methods") name: Name/subpath for API authentication (optional) add_response: Whether to automatically set HTTP response Returns: AuthResult object with actor, auth status, and authorization helper methods """ # Detect web UI context by checking for OAuth cookie is_web_ui_request = False try: # Check for OAuth cookie indicating web UI context if hasattr(self.request, "cookies") and self.request.cookies: is_web_ui_request = "oauth_token" in self.request.cookies elif hasattr(self.request, "get"): # Alternative cookie access pattern for different frameworks try: oauth_cookie = self.request.get("oauth_token", cookie=True) # type: ignore is_web_ui_request = oauth_cookie is not None except TypeError: # Framework doesn't support cookie parameter is_web_ui_request = False except Exception: # Fallback to basic auth if cookie detection fails is_web_ui_request = False if is_web_ui_request: # Web UI context - use OAuth authentication return self._authenticate_internal( actor_id, "www", web_subpath, add_response ) else: # API context - use basic authentication return self._authenticate_internal(actor_id, api_path, name, add_response) def _authenticate_internal( self, actor_id: str, path: str, subpath: str = "", add_response: bool = True ) -> "AuthResult": """ Internal authentication implementation that replaces auth.init_actingweb(). Args: actor_id: The actor ID path: ActingWeb path for authentication subpath: Optional subpath add_response: Whether to automatically set HTTP response Returns: AuthResult object with actor, auth status, and authorization helper methods """ from .. import auth # Determine auth type based on path if path == "www": auth_type = self.config.www_auth if self.config else "basic" else: auth_type = "basic" # Create Auth object and perform authentication fullpath = "/" + path + "/" + subpath auth_obj = auth.Auth(actor_id, auth_type=auth_type, config=self.config) # Perform authentication - this loads the actor and checks credentials auth_obj.check_authentication(appreq=self, path=fullpath) # Check if actor was found during authentication if not hasattr(auth_obj, "actor") or not auth_obj.actor: if add_response and self.response: self.response.set_status(404, "Actor not found") return AuthResult(None, auth_obj, self.response) # Set response if needed if add_response and self.response: auth.add_auth_response(appreq=self, auth_obj=auth_obj) return AuthResult(auth_obj.actor, auth_obj, self.response)
[docs] def require_authenticated_actor( self, actor_id: str, path: str, method: str = "GET", subpath: str = "" ) -> Any | None: """ Simplified actor authentication and authorization in one call. This method combines actor loading, authentication, and authorization into a single call that handles all the common error cases and sets appropriate HTTP responses. Args: actor_id: Actor ID to authenticate path: ActingWeb path for authorization (e.g., "properties", "trust") method: HTTP method for authorization check ("GET", "POST", etc.) subpath: Optional subpath for authorization Returns: Actor object if authentication and authorization successful, None otherwise. When None is returned, the HTTP response has already been set appropriately. """ auth_result = self._authenticate_internal(actor_id, path, subpath) # Check if actor exists and authentication succeeded if not auth_result.success: return None # Check authorization if not auth_result.authorize(method.upper(), path, subpath): return None return auth_result.actor
[docs] def authenticate_actor( self, actor_id: str, path: str = "", subpath: str = "", add_response: bool = True, ) -> "AuthResult": """ Authenticate actor and return detailed auth result. This provides more control than require_authenticated_actor() by returning an AuthResult object that can be used for custom authorization logic. Args: actor_id: Actor ID to authenticate path: ActingWeb path for auth type selection subpath: Optional subpath add_response: Whether to automatically set HTTP response Returns: AuthResult object with actor, auth status, and authorization helper methods """ return self._authenticate_internal(actor_id, path, subpath, add_response)
[docs] def get_actor_allow_unauthenticated( self, actor_id: str, path: str = "", subpath: str = "" ) -> Any | None: """ Get actor allowing unauthenticated access (for public endpoints like meta). This method loads an actor but doesn't require authentication. It's used for endpoints that should be publicly accessible but still need actor data. Args: actor_id: Actor ID to load path: Path for authorization context subpath: Subpath for authorization context Returns: Actor object if it exists, None otherwise. Sets 404 if actor doesn't exist. """ auth_result = self._authenticate_internal( actor_id, path, subpath, add_response=False ) if not auth_result.actor: if self.response: self.response.set_status(404, "Actor not found") return None # For public endpoints, still check authorization but allow unauthenticated access if auth_result.auth_obj and not auth_result.auth_obj.check_authorisation( path=path, subpath=subpath, method="GET", approved=False ): if self.response: self.response.set_status(403) return None return auth_result.actor
[docs] class AuthResult: """Result of actor authentication with helper methods for authorization.""" def __init__(self, actor, auth_obj, response): self.actor = actor self.auth_obj = auth_obj self.response = response @property def success(self) -> bool: """True if actor exists and authentication succeeded.""" return ( self.actor is not None and self.auth_obj is not None and self.auth_obj.response["code"] == 200 ) @property def authenticated(self) -> bool: """True if authentication succeeded (regardless of authorization).""" return self.auth_obj is not None and self.auth_obj.acl["authenticated"]
[docs] def authorize(self, method: str, path: str = "", subpath: str = "") -> bool: """ Check authorization for the given method and path. Args: method: HTTP method ("GET", "POST", etc.) path: Path for authorization (defaults to path used in authentication) subpath: Subpath for authorization Returns: True if authorized, False otherwise. Sets 403 response if unauthorized. """ if not self.success: return False if self.auth_obj.check_authorisation( path=path, subpath=subpath, method=method.upper() ): return True else: if self.response: self.response.set_status(403) return False