Source code for actingweb.handlers.oauth2_endpoints

"""
OAuth2 endpoints handler for ActingWeb (MCP OAuth2 Server Role).

IMPORTANT: ActingWeb has TWO OAuth2 roles:

1. OAuth2 SERVER (this handler) - MCP clients authenticate TO ActingWeb
   - /oauth/register - Dynamic client registration (RFC 7591) for MCP clients
   - /oauth/authorize - Authorization endpoint (MCP client → user consent → ActingWeb)
   - /oauth/token - Token endpoint (issues ActingWeb tokens to MCP clients)
   - /oauth/logout - Logout endpoint

2. OAuth2 CLIENT (see oauth2_spa.py) - Users authenticate VIA Google/GitHub
   - /oauth/spa/authorize - Initiate login with external provider
   - /oauth/spa/token - Refresh external provider tokens
   - /oauth/callback - Receive callback from Google/GitHub

This handler implements role #1: ActingWeb as OAuth2 authorization server for
MCP clients (ChatGPT, Claude, Cursor). User authentication is proxied to
Google/GitHub, but the final tokens issued are ActingWeb tokens.
"""

import json
import logging
from typing import TYPE_CHECKING, Any, Optional

# MCP protocol version constants (single source of truth)
from ..mcp.protocol import LATEST_PROTOCOL_VERSION, SUPPORTED_PROTOCOL_VERSIONS
from .base_handler import BaseHandler

if TYPE_CHECKING:
    from .. import aw_web_request
    from .. import config as config_class
    from ..interface.hooks import HookRegistry

# TrustTypeRegistry imported locally where needed

logger = logging.getLogger(__name__)


[docs] class OAuth2EndpointsHandler(BaseHandler): """ Handler for OAuth2 authorization server endpoints. This handler implements ActingWeb as a full OAuth2 authorization server: 1. Dynamic client registration (RFC 7591) for MCP clients 2. OAuth2 authorization flow with Google user authentication proxy 3. ActingWeb token issuance and management 4. OAuth2 callback handling from Google """ def __init__( self, webobj: Optional["aw_web_request.AWWebObj"] = None, config: Optional["config_class.Config"] = None, hooks: Optional["HookRegistry"] = None, ) -> None: if config is None: raise RuntimeError("Config is required for OAuth2EndpointsHandler") if webobj is None: from .. import aw_web_request webobj = aw_web_request.AWWebObj() super().__init__(webobj, config, hooks) # Initialize OAuth2 server from ..oauth2_server.oauth2_server import get_actingweb_oauth2_server self.oauth2_server = get_actingweb_oauth2_server(config)
[docs] def post(self, path: str = "") -> dict[str, Any]: """ Handle POST requests to OAuth2 endpoints. Routes: - /oauth/register - Dynamic client registration for MCP clients - /oauth/token - Token exchange (authorization_code or refresh_token) - /oauth/authorize - Authorization request processing (email form submission) Args: path: The sub-path after /oauth/ Returns: Response dict """ if path == "register": return self._handle_client_registration() elif path == "token": return self._handle_token_request() elif path == "authorize": return self._handle_authorization_request("POST") elif path == "logout": return self._handle_logout_request("POST") else: return self.error_response(404, f"Unknown OAuth2 endpoint: {path}")
[docs] def options(self, _path: str = "") -> dict[str, Any]: """ Handle OPTIONS requests (CORS preflight). Args: path: The sub-path after /oauth/ Returns: CORS headers response """ # Set CORS headers self.response.headers["Access-Control-Allow-Origin"] = "*" self.response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" self.response.headers["Access-Control-Allow-Headers"] = ( "Authorization, Content-Type, mcp-protocol-version" ) self.response.headers["Access-Control-Max-Age"] = "86400" # 24 hours return {"status": "ok"}
[docs] def get(self, path: str = "") -> dict[str, Any]: """ Handle GET requests to OAuth2 endpoints. Routes: - /oauth/authorize - Authorization endpoint (shows email form) - /oauth/callback - OAuth2 callback from Google (completes MCP flow) - /.well-known/oauth-authorization-server - Authorization server discovery (RFC 8414) Args: path: The sub-path after /oauth/ (or the full well-known path) Returns: Response dict or redirect """ if path == "authorize": return self._handle_authorization_request("GET") elif path == "callback": return self._handle_oauth_callback() elif path == "logout": return self._handle_logout_request("GET") elif path == ".well-known/oauth-authorization-server": return self._handle_authorization_server_discovery() elif path == ".well-known/oauth-protected-resource": return self._handle_protected_resource_discovery() elif path == ".well-known/oauth-protected-resource/mcp": return self._handle_protected_resource_mcp_discovery() else: return self.error_response(404, f"Unknown OAuth2 endpoint: {path}")
def _handle_client_registration(self) -> dict[str, Any]: """ Handle dynamic client registration (RFC 7591) for MCP clients. Request body should contain: - client_name: Human-readable name - redirect_uris: List of allowed redirect URIs Returns: Client registration response per RFC 7591 """ try: # Parse request body body: str | bytes | None = self.request.body if body is None: body_str = "{}" elif isinstance(body, bytes): body_str = body.decode("utf-8", "ignore") else: body_str = str(body) try: registration_data = json.loads(body_str) except json.JSONDecodeError: return self.error_response(400, "Invalid JSON in request body") # Register the client using OAuth2 server try: client_response = self.oauth2_server.handle_client_registration( registration_data ) logger.debug(f"Registered MCP client: {client_response['client_id']}") # Set status to 201 Created per RFC 7591 self.response.set_status(201, "Created") return client_response except ValueError as e: return self.error_response(400, str(e)) except Exception as e: logger.error(f"Client registration failed: {e}") return self.error_response(500, "Client registration failed") except Exception as e: logger.error(f"Error in client registration: {e}") return self.error_response(500, "Internal server error") def _handle_authorization_request(self, method: str = "GET") -> dict[str, Any]: """ Handle OAuth2 authorization requests. For GET: Show email form (same UX as GET /) For POST: Process email and redirect to Google OAuth2 Expected parameters: - client_id: Registered client ID - redirect_uri: Callback URI (must match registered URI) - response_type: Must be "code" - scope: Requested scopes - state: CSRF protection token Returns: Email form or redirect response """ try: # Get request parameters if method == "GET": params = { "client_id": self.request.get("client_id") or "", "redirect_uri": self.request.get("redirect_uri") or "", "response_type": self.request.get("response_type") or "", "scope": self.request.get("scope") or "", "state": self.request.get("state") or "", } else: # POST # Parse form data for POST body: str | bytes | None = self.request.body if body is None: body_str = "" elif isinstance(body, bytes): body_str = body.decode("utf-8", "ignore") else: body_str = str(body) from urllib.parse import parse_qs form_data = parse_qs(body_str) params = { "client_id": form_data.get("client_id", [""])[0], "redirect_uri": form_data.get("redirect_uri", [""])[0], "response_type": form_data.get("response_type", [""])[0], "scope": form_data.get("scope", [""])[0], "state": form_data.get("state", [""])[0], "email": form_data.get("email", [""])[0], "trust_type": form_data.get("trust_type", ["mcp_client"])[ 0 ], # Default to mcp_client "provider": form_data.get("provider", [""])[ 0 ], # OAuth provider (google/github) } # Debug logging for MCP OAuth2 flow logger.debug( f"OAuth2 authorization {method} request with params: {dict(params)}" ) # Handle using OAuth2 server server_response = self.oauth2_server.handle_authorization_request( params, method ) logger.debug(f"OAuth2 server response: {server_response}") if server_response.get("action") == "show_form": # Show email form (preserve existing UX) form_response = self._render_authorization_form(server_response) if form_response is None: # Template values were set, let framework handle rendering return {} # Return empty dict instead of None else: # Return JSON response return form_response elif server_response.get("action") == "redirect": # Redirect to Google OAuth2 redirect_url = server_response.get("url") if redirect_url: self.response.set_status(302, "Found") self.response.set_redirect(redirect_url) return {"status": "redirect", "location": redirect_url} else: return self.error_response(500, "Failed to create redirect URL") else: # Error response error = server_response.get("error", "server_error") description = server_response.get("error_description", "Unknown error") return self.error_response(400, f"{error}: {description}") except Exception as e: logger.error(f"Error in authorization request: {e}") return self.error_response(500, "Internal server error") def _handle_token_request(self) -> dict[str, Any]: """ Handle OAuth2 token requests. This endpoint exchanges authorization codes for ActingWeb access tokens or refreshes existing tokens. Expected parameters: - grant_type: "authorization_code" or "refresh_token" - code: Authorization code (for authorization_code grant) - refresh_token: Refresh token (for refresh_token grant) - redirect_uri: Must match the URI used in authorization request - client_id: Client identifier - client_secret: Client secret (for confidential clients) Returns: Token response with ActingWeb access token """ try: # Parse request body (form-encoded for OAuth2) body: str | bytes | None = self.request.body if body is None: body_str = "" elif isinstance(body, bytes): body_str = body.decode("utf-8", "ignore") else: body_str = str(body) # Parse form data from urllib.parse import parse_qs form_data = parse_qs(body_str) # Debug: log received form data keys only (never log body content which contains secrets) logger.debug(f"Token request form data keys: {list(form_data.keys())}") # Extract parameters (parse_qs returns lists) params = { "grant_type": form_data.get("grant_type", [""])[0], "code": form_data.get("code", [""])[0], "refresh_token": form_data.get("refresh_token", [""])[0], "redirect_uri": form_data.get("redirect_uri", [""])[0], "client_id": form_data.get("client_id", [""])[0], "client_secret": form_data.get("client_secret", [""])[0], "code_verifier": form_data.get("code_verifier", [""])[0], } # Check for client_id in Authorization header if not in form data if not params["client_id"]: if not self.request.headers: return self.error_response( 400, "invalid_request: No Authorization headsers" ) auth_header = self.request.headers.get( "Authorization", "" ) or self.request.headers.get("authorization", "") if auth_header.startswith("Basic "): try: import base64 encoded_creds = auth_header[6:] # Remove "Basic " decoded_creds = base64.b64decode(encoded_creds).decode("utf-8") if ":" in decoded_creds: client_id, client_secret = decoded_creds.split(":", 1) params["client_id"] = client_id if not params["client_secret"]: params["client_secret"] = client_secret logger.debug( f"Extracted client_id from Authorization header: {client_id}" ) except Exception as e: logger.warning(f"Failed to parse Authorization header: {e}") # Handle using OAuth2 server token_response = self.oauth2_server.handle_token_request(params) if "error" in token_response: error = token_response.get("error", "server_error") description = token_response.get("error_description", "Unknown error") # Map to appropriate HTTP status codes if error in ["invalid_client"]: status = 401 # RFC 6749 Section 5.2: MUST include WWW-Authenticate for 401 base_url = f"{self.config.proto}{self.config.fqdn}" self.response.headers["WWW-Authenticate"] = ( f'Basic realm="ActingWeb OAuth2", ' f'error="invalid_client", ' f'error_description="Invalid client credentials", ' f'authorization_uri="{base_url}/oauth/authorize"' ) elif error in [ "invalid_request", "invalid_grant", "unsupported_grant_type", ]: status = 400 else: status = 500 return self.error_response(status, f"{error}: {description}") logger.debug( f"Token request successful for client {params.get('client_id', 'unknown')}" ) return token_response except Exception as e: logger.error(f"Error in token request: {e}") return self.error_response(500, "Internal server error") def _handle_authorization_server_discovery(self) -> dict[str, Any]: """ Handle OAuth2 Authorization Server Discovery (RFC 8414). Returns: ActingWeb OAuth2 authorization server metadata """ # Set CORS headers for discovery endpoint self.response.headers["Access-Control-Allow-Origin"] = "*" self.response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" self.response.headers["Access-Control-Allow-Headers"] = ( "Authorization, Content-Type, mcp-protocol-version" ) return self.oauth2_server.handle_discovery_request() def _handle_oauth_callback(self) -> dict[str, Any]: """ Handle OAuth2 callback from Google. This completes the MCP client authorization flow. Returns: Redirect response to MCP client """ try: # Get callback parameters params = { "code": self.request.get("code") or "", "state": self.request.get("state") or "", "error": self.request.get("error") or "", "error_description": self.request.get("error_description") or "", } # Handle using OAuth2 server callback_response = self.oauth2_server.handle_oauth_callback(params) if callback_response.get("action") == "redirect": # Redirect back to MCP client redirect_url = callback_response.get("url") if redirect_url: self.response.set_status(302, "Found") self.response.set_redirect(redirect_url) return {"status": "redirect", "location": redirect_url} else: return self.error_response( 500, "Failed to create callback redirect URL" ) else: # Error response error = callback_response.get("error", "server_error") description = callback_response.get( "error_description", "OAuth2 callback failed" ) return self.error_response(400, f"{error}: {description}") except Exception as e: logger.error(f"Error in OAuth2 callback: {e}") return self.error_response(500, "Internal server error") def _render_authorization_form( self, form_data: dict[str, Any] ) -> dict[str, Any] | None: """ Render authorization form (same UX as GET /). Args: form_data: Form data from OAuth2 server Returns: Form response or None if template values were set """ # For OAuth2 authorization forms, always try to render HTML template if UI or MCP is enabled # The form is meant for human interaction, so default to HTML unless explicitly requesting JSON # Note: MCP OAuth2 flow needs the form even when config.ui is False (web UI disabled) ui_or_mcp_enabled = self.config.ui or getattr(self.config, "mcp", False) is_browser_request = ( ui_or_mcp_enabled and self.request and self.request.headers and self.request.headers.get("Accept", "").find("application/json") == -1 ) if is_browser_request: # Get available trust types for selection available_trust_types = [] requested_scope = form_data.get("scope", "") client_id = form_data.get("client_id", "") # Get OAuth2 trust type configuration from app (if configured) # The application decides which trust types are available for MCP OAuth2 oauth2_config = getattr(self.config, "_oauth2_trust_types", None) if oauth2_config is None: logger.debug( "No OAuth2 trust type configuration - will use all registry types" ) # No filtering - show all trust types from registry oauth2_config = {"allowed": None, "default": None} # Get trust types from the registry (configured by the application) trust_types = [] try: from actingweb.trust_type_registry import get_registry registry = get_registry(self.config) trust_types = registry.list_types() except RuntimeError: logger.warning( "Trust type registry not initialized - no trust types available. " "Application must configure trust types for MCP OAuth2." ) trust_types = [] except Exception as e: logger.warning(f"Error accessing trust type registry: {e}") trust_types = [] # Get developer-configured OAuth2 trust type restrictions (already initialized above) allowed_by_developer = ( oauth2_config.get("allowed") if oauth2_config else None ) logger.debug( f"Trust type filtering: oauth2_config={oauth2_config}, allowed_by_developer={allowed_by_developer}" ) logger.debug( f"Available trust types from registry: {[tt.name for tt in trust_types]}" ) # Filter trust types based on multiple criteria for trust_type in trust_types: should_include = True # Option 1: Filter by developer configuration (highest priority) if allowed_by_developer is not None: if trust_type.name not in allowed_by_developer: should_include = False # Option 2: Filter by OAuth2 scope if specified # Skip scope filtering if no scope is requested (show all allowed types) if requested_scope and trust_type.oauth_scope and should_include: # Check if requested scope matches or includes this trust type's scope requested_scopes = set(requested_scope.split()) trust_type_scopes = set(trust_type.oauth_scope.split()) # More flexible scope matching for compatibility scope_matches = False if trust_type_scopes & requested_scopes: # Direct intersection scope_matches = True else: # Check for partial matches (e.g., "mcp" matches "actingweb.mcp_client") for req_scope in requested_scopes: for tt_scope in trust_type_scopes: if req_scope in tt_scope or tt_scope in req_scope: scope_matches = True break if scope_matches: break if not scope_matches: should_include = False # Option 3: Check client-specific trust type restrictions # Developers can register clients with allowed_trust_types if client_id and should_include: try: client_data = ( self.oauth2_server.client_registry.validate_client( client_id ) ) if client_data and "allowed_trust_types" in client_data: allowed_types = client_data["allowed_trust_types"] if ( isinstance(allowed_types, list) and trust_type.name not in allowed_types ): should_include = False except Exception: pass # Continue if client lookup fails if should_include: available_trust_types.append( { "name": trust_type.name, "display_name": trust_type.display_name, "description": trust_type.description, "oauth_scope": trust_type.oauth_scope or "", } ) logger.debug(f"Trust type {trust_type.name} included") else: logger.debug(f"Trust type {trust_type.name} excluded") # Log available trust types (no hardcoded fallbacks - app must configure) logger.debug( f"Final available_trust_types count: {len(available_trust_types)}" ) if not available_trust_types: logger.warning( "No trust types available for MCP OAuth2. " "Application must register trust types in the trust type registry." ) # Don't provide hardcoded fallbacks - app must configure trust types # Determine default trust type from app configuration (no hardcoded default) default_trust_type = None if oauth2_config: configured_default = oauth2_config.get("default") if configured_default and any( tt["name"] == configured_default for tt in available_trust_types ): default_trust_type = configured_default # If no default configured but trust types available, use first one if default_trust_type is None and available_trust_types: default_trust_type = available_trust_types[0]["name"] # Generate OAuth provider URLs (like factory handler does) oauth_enabled = False oauth_providers = [] # Check if OAuth is configured and generate provider URLs try: from ..oauth2 import ( create_oauth2_authenticator, get_provider_display_name, ) from ..oauth2_server.oauth2_server import ( get_actingweb_oauth2_server, ) providers_cfg = getattr(self.config, "oauth_providers", {}) # Determine list of provider names to iterate provider_names: list[str] = [] if providers_cfg: provider_names = list(providers_cfg.keys()) elif self.config.oauth and self.config.oauth.get("client_id"): # Single-provider backward-compat provider_names = [getattr(self.config, "oauth2_provider", "google")] if provider_names: oauth2_server = get_actingweb_oauth2_server(self.config) for prov_name in provider_names: # Each provider gets its own encrypted state with provider embedded mcp_context = { "client_id": form_data.get("client_id", ""), "redirect_uri": form_data.get("redirect_uri", ""), "original_state": form_data.get("state", ""), "trust_type": default_trust_type, "flow_type": "mcp_oauth2", "provider": prov_name, } encrypted_state = oauth2_server.state_manager.create_state( mcp_context ) auth = create_oauth2_authenticator(self.config, prov_name) if auth.is_enabled(): auth_url = auth.create_authorization_url( state=encrypted_state, trust_type=default_trust_type or "", ) oauth_providers.append( { "name": prov_name, "display_name": get_provider_display_name( prov_name ), "url": auth_url, } ) oauth_enabled = True logger.debug( f"Generated {prov_name} OAuth authorization URL for MCP client" ) except Exception as e: logger.warning( f"Failed to generate OAuth URLs for authorization form: {e}" ) # Set template values for HTML rendering (like factory handler does) self.response.template_values = { "client_id": form_data.get("client_id", ""), "redirect_uri": form_data.get("redirect_uri", ""), "state": form_data.get("state", ""), "response_type": form_data.get("response_type", "code"), # OAuth2 PKCE "scope": form_data.get("scope", ""), # OAuth2 scope "code_challenge": form_data.get("code_challenge", ""), # OAuth2 PKCE "code_challenge_method": form_data.get( "code_challenge_method", "" ), # OAuth2 PKCE "client_name": form_data.get("client_name", "MCP Client"), "form_action": "/oauth/authorize", "form_method": "POST", "message": f"Authorize {form_data.get('client_name', 'MCP Client')} to access your ActingWeb data", "trust_types": available_trust_types, "default_trust_type": default_trust_type, "oauth2_trust_control_enabled": True, # Indicate that trust type control is available "oauth_enabled": oauth_enabled, # Enable OAuth buttons "oauth_providers": oauth_providers, # OAuth provider list } return None # Template will be rendered by framework # For API requests, return JSON structure return { "action": "show_form", "form_data": { "client_id": form_data.get("client_id", ""), "redirect_uri": form_data.get("redirect_uri", ""), "state": form_data.get("state", ""), "client_name": form_data.get("client_name", "MCP Client"), "form_action": "/oauth/authorize", "form_method": "POST", }, "template": "oauth_authorization_form", # Template to render "message": f"Authorize {form_data.get('client_name', 'MCP Client')} to access your ActingWeb data", } def _handle_protected_resource_discovery(self) -> dict[str, Any]: """ Handle OAuth2 Protected Resource Discovery (RFC 8705). Returns: Protected resource metadata """ # Set CORS headers for discovery endpoint self.response.headers["Access-Control-Allow-Origin"] = "*" self.response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" self.response.headers["Access-Control-Allow-Headers"] = ( "Authorization, Content-Type, mcp-protocol-version" ) base_url = f"{self.config.proto}{self.config.fqdn}" return { "resource": f"{base_url}/mcp", "authorization_servers": [base_url], "scopes_supported": ["mcp"], "bearer_methods_supported": ["header"], "resource_documentation": f"{base_url}/mcp/info", "resource_policy_uri": f"{base_url}", } def _handle_protected_resource_mcp_discovery(self) -> dict[str, Any]: """ Handle OAuth2 Protected Resource Discovery for MCP-specific metadata. Returns: MCP-specific protected resource metadata """ # Set CORS headers for discovery endpoint self.response.headers["Access-Control-Allow-Origin"] = "*" self.response.headers["Access-Control-Allow-Methods"] = "GET, OPTIONS" self.response.headers["Access-Control-Allow-Headers"] = ( "Authorization, Content-Type, mcp-protocol-version" ) base_url = f"{self.config.proto}{self.config.fqdn}" return { "resource": f"{base_url}/mcp", "authorization_servers": [base_url], "scopes_supported": ["mcp"], "bearer_methods_supported": ["header"], "resource_documentation": f"{base_url}/mcp/info", "resource_policy_uri": f"{base_url}", "mcp_version": LATEST_PROTOCOL_VERSION, "supported_protocol_versions": SUPPORTED_PROTOCOL_VERSIONS, "capabilities": { "tools": True, "prompts": True, "resources": False, "roots": False, }, } def _handle_logout_request(self, method: str = "GET") -> dict[str, Any]: """ Handle OAuth2 logout request. This endpoint revokes the current access token and clears session cookies. Works for both GET and POST requests. Args: method: HTTP method (GET or POST) Returns: Response dict with success and redirect information """ try: logger.info(f"Logout request started: method={method}") # Extract token from Authorization header or cookie auth_header = None if self.request.headers: auth_header = self.request.headers.get( "Authorization" ) or self.request.headers.get("authorization") token = None if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] # Remove "Bearer " prefix logger.debug("Found token in Authorization header") else: # Try to get token from cookies (for web sessions) token = ( self.request.cookies.get("oauth_token") if self.request.cookies else None ) if token: logger.debug("Found token in cookies") else: logger.debug("No token found in cookies or Authorization header") # Handle OAuth2 token logout (web UI authentication) try: if token: response = self._handle_provider_token_logout(token) else: # No token provided - just clear cookies response = { "action": "success", "message": "Successfully logged out", "clear_cookies": [ "oauth_token", "oauth_refresh_token", "session_id", ], "redirect_url": f"{self.config.proto}{self.config.fqdn}/", } # Clear MCP token cache if the token was cached there if token and response.get("action") == "success": try: from .mcp import MCPHandler MCPHandler.clear_token_from_cache(token) except Exception as cache_error: logger.warning( f"Failed to clear MCP token cache: {cache_error}" ) # Non-critical - token will expire from cache naturally except Exception as logout_error: logger.error(f"Token logout error: {logout_error}") import traceback logger.error(f"Full logout error: {traceback.format_exc()}") # Continue with basic logout even if Google revocation fails response = { "action": "success", "message": "Logged out (token revocation failed)", "clear_cookies": [ "oauth_token", "oauth_refresh_token", "session_id", ], "redirect_url": f"{self.config.proto}{self.config.fqdn}/", } if response["action"] == "success": # Clear cookies self.response.set_status(200) # Clear OAuth cookies by setting them to expire immediately for cookie_name in response.get("clear_cookies", []): try: # Clear both secure and non-secure versions of cookies self.response.set_cookie( cookie_name, "", max_age=-1, path="/", secure=False ) self.response.set_cookie( cookie_name, "", max_age=-1, path="/", secure=True ) except Exception as cookie_error: logger.warning( f"Failed to clear cookie {cookie_name}: {cookie_error}" ) # Return JSON response return { "success": True, "message": response["message"], "redirect_url": response["redirect_url"], "method": method, "cleared_cookies": response.get("clear_cookies", []), } else: logger.error(f"Logout failed with response: {response}") return self.error_response(500, "Logout failed") except Exception as e: logger.error(f"Logout request error: {e}") import traceback logger.error(f"Full logout handler traceback: {traceback.format_exc()}") return self.error_response(500, "Internal server error during logout") def _handle_provider_token_logout(self, token: str) -> dict[str, Any]: """ Handle logout by invalidating the ActingWeb session and revoking the provider's OAuth token (for providers that support revocation). The token passed here is an ActingWeb-generated session token. We look up the actor from it, then revoke the actual provider token stored in actor.store with the provider's revocation endpoint. Args: token: ActingWeb session token Returns: Response dict indicating logout success/failure """ try: from ..oauth_session import get_oauth2_session_manager session_manager = get_oauth2_session_manager(self.config) # Look up actor from the session token so we can revoke # the provider's token stored in actor.store try: token_data = session_manager.validate_access_token(token) if token_data: actor_id = token_data.get("actor_id") if actor_id: self._revoke_provider_token_for_actor(actor_id) except Exception as lookup_error: logger.debug(f"Provider token lookup during logout: {lookup_error}") # Revoke the ActingWeb session token try: session_manager.revoke_access_token(token) logger.debug("Revoked ActingWeb session token") except Exception as revoke_error: logger.debug(f"Session token revocation: {revoke_error}") # Non-critical — token will expire naturally # Always return success and clear cookies return { "action": "success", "message": "Successfully logged out", "clear_cookies": ["oauth_token", "oauth_refresh_token", "session_id"], "redirect_url": f"{self.config.proto}{self.config.fqdn}/", } except Exception as e: logger.error(f"Error handling token logout: {e}") import traceback logger.error(f"Token logout error traceback: {traceback.format_exc()}") # Still return success to clear cookies and log user out locally return { "action": "success", "message": "Logged out (with errors)", "clear_cookies": ["oauth_token", "oauth_refresh_token", "session_id"], "redirect_url": f"{self.config.proto}{self.config.fqdn}/", } def _revoke_provider_token_for_actor(self, actor_id: str) -> None: """ Revoke the OAuth provider's token stored in actor.store. Looks up the actor, reads the provider token and provider name from the store, and sends the token to the provider's revocation endpoint. Only effective for providers that support revocation (e.g., Google). GitHub does not support token revocation. This is best-effort — failures are logged but do not affect logout. """ try: from .. import actor as actor_module from ..oauth2 import create_oauth2_authenticator actor = actor_module.Actor(actor_id=actor_id, config=self.config) if not actor.id or not actor.store: return provider_token = actor.store.oauth_token provider_name = getattr(actor.store, "oauth_provider", "") or "" if not provider_token: logger.debug(f"No provider token stored for actor {actor_id}") return authenticator = create_oauth2_authenticator(self.config, provider_name) if not authenticator.provider.revocation_uri: logger.debug( f"Provider {provider_name or 'default'} does not support " f"token revocation, skipping" ) return if authenticator.revoke_token(str(provider_token)): logger.info( f"Revoked {provider_name or 'default'} provider token " f"for actor {actor_id}" ) # Clear the stored provider token actor.store.oauth_token = None actor.store.oauth_token_expiry = None else: logger.debug( f"Provider token revocation returned false for actor {actor_id}" ) except Exception as e: logger.debug(f"Provider token revocation for actor {actor_id}: {e}")
[docs] def error_response(self, status_code: int, message: str) -> dict[str, Any]: """Create OAuth2 error response.""" self.response.set_status(status_code) # OAuth2 error format if status_code == 400: return {"error": "invalid_request", "error_description": message} elif status_code == 401: return {"error": "invalid_client", "error_description": message} else: return {"error": "server_error", "error_description": message}