Source code for actingweb.handlers.oauth2_callback

"""OAuth2 callback handler for ActingWeb.

This handler processes OAuth2 callbacks from various providers after user authentication,
exchanges the authorization code for an access token, and sets up the user session.
Uses the consolidated oauth2 module for provider-agnostic OAuth2 handling.

Supports SPA (Single Page Application) mode when spa_mode=true is included in the
OAuth state parameter. In SPA mode, returns JSON with tokens instead of redirecting.
"""

import json
import logging
import time
from typing import TYPE_CHECKING, Any, Optional
from urllib.parse import urlparse

from .base_handler import BaseHandler

if TYPE_CHECKING:
    from .. import aw_web_request
    from ..interface.hooks import HookRegistry
from .. import config as config_class
from ..oauth2 import create_oauth2_authenticator, create_oauth2_trust_relationship
from ..oauth_state import decode_state, validate_expected_email


def _decode_state_with_extras(state: str) -> dict[str, Any]:
    """Decode state JSON and return full dict including extra fields like spa_mode."""
    if not state or not state.strip().startswith("{"):
        return {}
    try:
        result = json.loads(state)
        if isinstance(result, dict):
            return result
        return {}
    except (json.JSONDecodeError, TypeError):
        return {}


logger = logging.getLogger(__name__)


[docs] class OAuth2CallbackHandler(BaseHandler): """Handles OAuth2 callbacks at /oauth/callback for Google/GitHub OAuth flows. This handler processes TWO types of OAuth2 flows: 1. Web UI Login (no trust_type in state): - User clicks "Login with Google/GitHub" on factory page - After OAuth, creates/looks up actor and redirects to UI page (``/www`` if config.ui is enabled, ``/app`` for SPAs when config.ui is disabled) - If email is missing, redirects to /oauth/email for manual input 2. MCP Authorization (trust_type in state, e.g., 'mcp_client'): - OAuth flow initiated with trust_type parameter - After OAuth, creates/looks up actor AND trust relationship - If email is missing, returns error (MCP clients can't use web forms) Note: MCP OAuth2 flow where ActingWeb is the auth server uses encrypted state and is routed to OAuth2EndpointsHandler, not this handler. Expected query parameters: - code: Authorization code to exchange for access token - state: CSRF protection and optional redirect URL, actor_id, trust_type - error: Error code if authentication failed """ def __init__( self, webobj: Optional["aw_web_request.AWWebObj"] = None, config: config_class.Config | None = None, hooks: Optional["HookRegistry"] = None, ) -> None: if config is None: raise RuntimeError("Config is required for OAuth2CallbackHandler") if webobj is None: from .. import aw_web_request webobj = aw_web_request.AWWebObj() super().__init__(webobj, config, hooks) # Create a default authenticator; may be replaced once state is parsed # and the actual provider is known. self.authenticator = create_oauth2_authenticator(config) if config else None
[docs] def get(self) -> dict[str, Any]: """ Handle GET request to /oauth/callback from OAuth2 provider. Expected parameters: - code: Authorization code from OAuth2 provider - state: State parameter for CSRF protection - error: Error code if authentication failed Returns: Response dict with success/error status """ if not self.authenticator or not self.authenticator.is_enabled(): logger.error("OAuth2 not configured") return self.error_response(500, "OAuth2 not configured") # Check for error parameter error = self.request.get("error") if error: error_description = self.request.get("error_description") if not error_description: error_description = "" logger.warning(f"OAuth2 error: {error} - {error_description}") return self.error_response(400, f"Authentication failed: {error}") # Get authorization code code = self.request.get("code") if not code: logger.error("No authorization code in OAuth2 callback") return self.error_response(400, "Missing authorization code") # Get and parse state parameter state = self.request.get("state") if not state: state = "" _, redirect_url, actor_id, trust_type, _expected_email, user_agent = ( decode_state(state) ) # Extract extra fields including spa_mode state_extras = _decode_state_with_extras(state) spa_mode = state_extras.get("spa_mode", False) # Re-create authenticator with the correct provider from state (if present) state_provider = state_extras.get("provider", "") if state_provider and self.config: self.authenticator = create_oauth2_authenticator( self.config, state_provider ) # For SPA mode, use redirect_url from state_extras (JSON format) # The legacy decode_state() doesn't parse JSON state properly spa_redirect_url = state_extras.get("redirect_url", "") if spa_mode else "" logger.debug( f"Parsed state - redirect_url: '{redirect_url}', spa_redirect_url: '{spa_redirect_url}', actor_id: '{actor_id}', trust_type: '{trust_type}', spa_mode: {spa_mode}" ) # For SPA mode, check if this is a browser navigation vs fetch request # Browser navigations need to redirect to SPA callback with code/state # Fetch requests (Accept: application/json) get JSON response with tokens if spa_mode and spa_redirect_url: accept_header = "" if hasattr(self.request, "headers") and self.request.headers: accept_header = self.request.headers.get("Accept", "") if not accept_header: accept_header = self.request.headers.get("accept", "") # If not a JSON fetch request, this is the browser redirect from OAuth provider # We need to process the OAuth flow NOW (code is single-use) and pass # a session token to the SPA instead of the raw code if "application/json" not in accept_header: # Process the OAuth flow and create a pending session result = self._process_spa_oauth_and_create_session( code, state, state_extras, spa_redirect_url ) return result # Critical debug: Check if trust_type was parsed correctly if trust_type: logger.debug( f"Trust type '{trust_type}' found in state - will create trust relationship" ) else: logger.warning( "No trust_type found in parsed state - trust relationship will NOT be created" ) # Exchange code for access token token_data = self.authenticator.exchange_code_for_token(code, state) if not token_data or "access_token" not in token_data: logger.error("Failed to exchange authorization code for access token") return self.error_response(502, "Token exchange failed") access_token = token_data["access_token"] refresh_token = token_data.get("refresh_token") expires_in = token_data.get("expires_in", 3600) # Validate token and get user info user_info = self.authenticator.validate_token_and_get_user_info(access_token) if not user_info: logger.error("Failed to validate token or extract user info") return self.error_response(502, "Token validation failed") # Determine if email is required based on config require_email = bool( self.config and getattr(self.config, "force_email_prop_as_creator", False) ) logger.debug(f"OAuth identifier extraction mode: require_email={require_email}") # Extract identifier (email or provider ID) based on config identifier = self.authenticator.get_email_from_user_info( user_info, access_token, require_email=require_email ) if not identifier: logger.warning("Failed to extract identifier from user info") # If in provider ID mode (require_email=False), this is a critical error if not require_email: logger.error( "Provider ID mode enabled but no identifier available from OAuth provider" ) return self.error_response( 502, "OAuth provider did not return user identifier. Please contact support.", ) # Email required mode - try to get verified emails for dropdown verified_emails: list[str] | None = None if self.authenticator.provider.name == "github" and access_token: verified_emails = self.authenticator.get_github_verified_emails( access_token ) if verified_emails: logger.info( f"Found {len(verified_emails)} verified emails from GitHub" ) # Check if this is an MCP authorization flow if trust_type: logger.error("Cannot complete MCP authorization without identifier") return self.error_response( 502, f"Email required but not provided by OAuth provider. " f"Configure your {self.authenticator.provider.name} account to make email public.", ) # Web UI flow - redirect to email input form logger.info("Web UI login flow - redirecting to email input form") try: from ..oauth_session import get_oauth2_session_manager session_manager = get_oauth2_session_manager(self.config) provider_name = getattr(self.config, "oauth2_provider", "google") session_id = session_manager.store_session( token_data=token_data, user_info=user_info, state=state, provider=provider_name, verified_emails=verified_emails, # NEW: Pass verified emails ) # Redirect to email input form (app will provide template) email_form_url = f"/oauth/email?session={session_id}" self.response.set_status(302, "Found") self.response.set_redirect(email_form_url) return { "status": "email_required", "message": "Email could not be extracted from OAuth provider", "session_id": session_id, "redirect_url": email_form_url, "redirect_performed": True, } except Exception as session_error: logger.error(f"Failed to create OAuth session: {session_error}") # Fall back to error response if session storage fails return self.error_response( 502, "Email extraction failed and could not store session" ) # Validate identifier format based on mode if require_email: # Must be a valid email if "@" not in identifier: logger.error( f"force_email_prop_as_creator enabled but got non-email: {identifier}" ) return self.error_response( 502, "Configuration requires email but OAuth provider returned non-email identifier", ) # Validate against expected email from form (if provided) if not validate_expected_email(state, identifier): logger.error(f"Email validation failed - authenticated as {identifier}") return self.error_response( 403, "Authentication email does not match the email provided in the form", ) else: # Provider ID mode - identifier can be anything logger.debug(f"Using provider identifier: {identifier}") # Use existing actor from state if provided, otherwise lookup/create by identifier actor_instance = None if actor_id: # Try to use the existing actor from the state parameter from .. import actor as actor_module try: actor_instance = actor_module.Actor(config=self.config) if not actor_instance.get(actor_id): logger.warning( f"Actor {actor_id} from state not found, will lookup/create by identifier" ) actor_instance = None else: logger.debug( f"Using existing actor {actor_id} from state parameter" ) # SECURITY: Validate that OAuth identifier matches actor creator # This prevents attackers from: # 1. MCP flow: Authorizing access to someone else's actor # 2. Web flow: Session fixation or account takeover attacks if actor_instance.creator != identifier: logger.error( f"Security violation: OAuth identifier '{identifier}' does not match " f"actor creator '{actor_instance.creator}'. " f"Flow type: {'MCP authorization' if trust_type else 'Web login'}" ) if trust_type: # MCP authorization - clear error message return self.error_response( 403, f"You cannot authorize MCP access to an actor that doesn't belong to you. " f"You authenticated as '{identifier}' but this actor belongs to '{actor_instance.creator}'.", ) else: # Web login - potential session fixation attack return self.error_response( 403, f"Authentication failed: You authenticated as '{identifier}' but attempted to " f"access an actor belonging to '{actor_instance.creator}'. Please log in with the correct account.", ) except Exception as e: logger.warning( f"Failed to load actor {actor_id} from state: {e}, will lookup/create by identifier" ) actor_instance = None # If no actor from state or loading failed, lookup/create by identifier is_new_actor = False if not actor_instance: # Check if actor exists before attempting creation from actingweb.actor import Actor as CoreActor existing_check_actor = CoreActor(config=self.config) actor_exists = existing_check_actor.get_from_creator(identifier) is_new_actor = not actor_exists actor_instance = self.authenticator.lookup_or_create_actor_by_identifier( identifier, user_info=user_info, # Pass user_info for additional metadata ) if not actor_instance: logger.error( f"Failed to lookup or create actor for identifier {identifier}" ) return self.error_response(502, "Actor creation failed") # Store OAuth tokens in actor properties # The auth system expects oauth_token (not oauth_access_token) if actor_instance.store: actor_instance.store.oauth_token = ( access_token # This is what auth.py looks for ) actor_instance.store.oauth_token_expiry = ( str(int(time.time()) + expires_in) if expires_in else None ) if refresh_token: actor_instance.store.oauth_refresh_token = refresh_token actor_instance.store.oauth_token_timestamp = str(int(time.time())) # Extract client metadata for trust relationship storage client_name = None client_version = None client_platform = user_agent # Use User-Agent as platform info if user_agent: try: # Generate session key using same logic as MCP handler client_ip = getattr(self.request, "remote_addr", "unknown") session_key = f"{client_ip}:{hash(user_agent)}" # Import here to avoid circular dependencies from .mcp import MCPHandler stored_client_info = MCPHandler.get_stored_client_info(session_key) if stored_client_info and stored_client_info.get("client_info"): mcp_client_info = stored_client_info["client_info"] client_name = mcp_client_info.get("name", "MCP Client") client_version = mcp_client_info.get("version") # Use implementation info for better platform detection if "implementation" in mcp_client_info: impl = mcp_client_info["implementation"] if isinstance(impl, dict): impl_name = impl.get("name", "Unknown") impl_version = impl.get("version", "") client_platform = f"{impl_name} {impl_version}".strip() logger.debug( f"Extracted MCP client metadata: {client_name} v{client_version} on {client_platform}" ) except Exception as e: logger.debug( f"Could not retrieve MCP client info during OAuth callback: {e}" ) # Continue with User-Agent as platform info # Non-critical, don't fail the OAuth flow # Create trust relationship if trust_type was specified in state logger.debug( f"About to check trust_type for relationship creation: trust_type='{trust_type}'" ) if trust_type: logger.info( f"Creating trust relationship for trust_type='{trust_type}' and identifier='{identifier}'" ) try: from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface( core_actor=actor_instance, service_registry=registry ) # Prepare OAuth tokens for secure storage oauth_tokens = { "access_token": access_token, "refresh_token": refresh_token, "expires_at": int(time.time()) + expires_in if expires_in else 0, "token_type": token_data.get("token_type", "Bearer"), } # Create trust relationship with automatic approval and client metadata trust_created = create_oauth2_trust_relationship( actor_interface, identifier, trust_type, oauth_tokens, client_name=client_name, client_version=client_version, client_platform=client_platform, ) if trust_created: logger.info( f"Successfully created trust relationship: {identifier} -> {trust_type}" ) else: logger.warning( f"Failed to create trust relationship for {identifier} with type {trust_type}" ) except Exception as e: logger.error(f"Error creating OAuth2 trust relationship: {e}") # Don't fail the OAuth flow - just log the error # Execute actor_created lifecycle hook for new actors if is_new_actor and self.hooks: try: # Convert core Actor to ActorInterface for hook consistency from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface( core_actor=actor_instance, service_registry=registry ) self.hooks.execute_lifecycle_hooks("actor_created", actor_interface) except Exception as e: logger.error(f"Error in lifecycle hook for actor_created: {e}") # Execute OAuth success lifecycle hook oauth_valid = True if self.hooks: try: # Convert core Actor to ActorInterface for hook consistency from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface( core_actor=actor_instance, service_registry=registry ) result = self.hooks.execute_lifecycle_hooks( "oauth_success", actor_interface, email=identifier, # Pass identifier (may be email or provider ID) access_token=access_token, token_data=token_data, user_info=user_info, # Pass full user info for displayname etc. ) oauth_valid = bool(result) if result is not None else True except Exception as e: logger.error(f"Error in lifecycle hook for oauth_success: {e}") oauth_valid = False if not oauth_valid: logger.warning( f"OAuth success hook rejected authentication for {identifier}" ) return self.error_response(403, "Authentication rejected") # Set up successful response # Use return_path from state for SPA mode (defaults to /app), /www or /app for traditional mode if spa_mode: return_path = state_extras.get("return_path", "/app") # Support {actor_id} placeholder in return_path if "{actor_id}" in return_path: final_redirect = return_path.replace("{actor_id}", actor_instance.id) else: final_redirect = f"/{actor_instance.id}{return_path}" else: # Traditional (non-SPA) mode: redirect based on config.ui setting if self.config.ui: final_redirect = f"/{actor_instance.id}/www" else: final_redirect = f"/{actor_instance.id}/app" response_data = { "status": "success", "message": "Authentication successful", "actor_id": actor_instance.id, "email": identifier, # identifier (may be email or provider ID) "access_token": access_token, "expires_in": expires_in, "redirect_url": final_redirect, } # SPA mode: Return JSON with tokens instead of redirecting if spa_mode: logger.debug(f"SPA mode enabled - returning JSON response for {identifier}") # Generate ActingWeb SPA tokens instead of returning OAuth provider tokens # This allows the session manager to validate these tokens later from ..oauth_session import get_oauth2_session_manager session_manager = get_oauth2_session_manager(self.config) # Generate ActingWeb access token and store it spa_access_token = self.config.new_token() actor_id_str = actor_instance.id or "" session_manager.store_access_token( spa_access_token, actor_id_str, identifier ) # Generate refresh token for SPA spa_refresh_token = session_manager.create_refresh_token( actor_id_str, identifier ) # Update response with SPA tokens (not OAuth provider tokens) response_data["access_token"] = spa_access_token response_data["success"] = True response_data["token_type"] = "Bearer" response_data["refresh_token"] = spa_refresh_token response_data["expires_at"] = int(time.time()) + 3600 # 1 hour # Set HttpOnly cookie for refresh token (hybrid mode) token_delivery = state_extras.get("token_delivery", "json") if token_delivery == "hybrid" and self.response: self.response.set_cookie( "refresh_token", spa_refresh_token, max_age=86400 * 14, # 2 weeks path="/oauth/spa/token", secure=True, httponly=True, samesite="Strict", ) # Don't include refresh token in JSON for hybrid mode del response_data["refresh_token"] if self.response: self.response.write(json.dumps(response_data)) self.response.headers["Content-Type"] = "application/json" self.response.set_status(200) # Execute OAuth completed lifecycle hook if self.hooks: try: self.hooks.execute_lifecycle_hooks( "oauth_completed", actor_instance, email=identifier, access_token=access_token, redirect_url=response_data["redirect_url"], ) except Exception as e: logger.error(f"Error executing oauth_completed hook: {e}") logger.debug( f"OAuth2 SPA authentication completed successfully for {identifier} -> {actor_instance.id}" ) return response_data # For interactive web authentication, redirect to the actor's UI page # (/www if config.ui is enabled, /app for SPAs) # For API clients, they would use the Bearer token directly logger.debug(f"Redirecting to actor page: {final_redirect}") # Log the original URL for reference but don't use it if redirect_url: logger.debug( f"Original URL was: {redirect_url} (redirecting to UI page instead)" ) # Generate ActingWeb session token for /www mode (same approach as SPA) # This avoids validating Google tokens on every request from ..oauth_session import get_oauth2_session_manager session_manager = get_oauth2_session_manager(self.config) # Generate ActingWeb access token and store it www_access_token = self.config.new_token() actor_id_str = actor_instance.id or "" session_manager.store_access_token(www_access_token, actor_id_str, identifier) # Set session cookie with ActingWeb token (not Google token) cookie_max_age = 3600 # 1 hour - matches token TTL in session manager self.response.set_cookie( "oauth_token", www_access_token, max_age=cookie_max_age, path="/", secure=True, httponly=True, # Protect from XSS samesite="Lax", ) logger.debug( f"Set oauth_token cookie with ActingWeb token for actor {actor_id_str}" ) # Perform the redirect for interactive authentication self.response.set_status(302, "Found") self.response.set_redirect(final_redirect) # Also include the information in the response data for completeness response_data["redirect_performed"] = True # Execute OAuth completed lifecycle hook if self.hooks: try: self.hooks.execute_lifecycle_hooks( "oauth_completed", actor_instance, email=identifier, access_token=access_token, redirect_url=response_data["redirect_url"], ) except Exception as e: logger.error(f"Error executing oauth_completed hook: {e}") logger.debug( f"OAuth2 authentication completed successfully for {identifier} -> {actor_instance.id}" ) return response_data
def _is_safe_redirect(self, url: str) -> bool: """ Check if redirect URL is safe (same domain). Args: url: URL to validate Returns: True if URL is safe to redirect to """ if not url: return False try: # Parse the URL parsed = urlparse(url) # Allow relative URLs (no scheme/netloc) if not parsed.scheme and not parsed.netloc: return True # Allow same domain redirects if parsed.netloc == self.config.fqdn: return True # Reject external redirects return False except Exception: return False
[docs] def error_response(self, status_code: int, message: str) -> dict[str, Any]: """Create error response with template rendering for user-friendly errors.""" self.response.set_status(status_code) # For user-facing errors, try to render template if status_code in [403, 400] and hasattr(self.response, "template_values"): self.response.template_values = { "error": message, "status_code": status_code, } return {"error": True, "status_code": status_code, "message": message}
def _process_spa_oauth_and_create_session( self, code: str, state: str, state_extras: dict[str, Any], spa_redirect_url: str, ) -> dict[str, Any]: """ Process OAuth flow for SPA mode browser navigation. Since OAuth authorization codes are single-use, we must exchange the code immediately when the browser redirects from the OAuth provider. We then store the result in a pending session and redirect to the SPA with a session token instead of the raw code. Args: code: OAuth authorization code state: Original state parameter state_extras: Parsed state extras (spa_mode, redirect_url, etc.) spa_redirect_url: SPA callback URL to redirect to Returns: Response dict with redirect info """ from urllib.parse import urlencode, urlparse, urlunparse from ..oauth_session import get_oauth2_session_manager # Ensure authenticator is available if not self.authenticator: logger.error("SPA OAuth: Authenticator not configured") parsed = urlparse(spa_redirect_url) params = { "error": "server_error", "error_description": "OAuth not configured", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} session_manager = get_oauth2_session_manager(self.config) # Retrieve PKCE code verifier if server-managed PKCE was used code_verifier = None pkce_session_id = state_extras.get("pkce_session_id") if pkce_session_id: pkce_session = session_manager.get_session(pkce_session_id) if pkce_session: code_verifier = pkce_session.get("pkce_verifier") logger.debug( f"Retrieved PKCE code verifier from session {pkce_session_id[:8]}..." ) # PKCE session will expire naturally (short TTL) else: logger.warning( f"PKCE session {pkce_session_id[:8]}... not found or expired" ) # Exchange code for tokens NOW (single-use code) token_data = self.authenticator.exchange_code_for_token( code, state, code_verifier=code_verifier ) if not token_data or "access_token" not in token_data: logger.error("SPA OAuth: Failed to exchange authorization code") # Redirect to SPA with error parsed = urlparse(spa_redirect_url) params = { "error": "token_exchange_failed", "error_description": "Failed to exchange authorization code", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} access_token = token_data["access_token"] refresh_token = token_data.get("refresh_token") expires_in = token_data.get("expires_in", 3600) # Validate token and get user info user_info = self.authenticator.validate_token_and_get_user_info(access_token) if not user_info: logger.error("SPA OAuth: Failed to validate token") parsed = urlparse(spa_redirect_url) params = { "error": "validation_failed", "error_description": "Token validation failed", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} # Determine if email is required based on config require_email = bool( self.config and getattr(self.config, "force_email_prop_as_creator", False) ) # Extract identifier identifier = self.authenticator.get_email_from_user_info( user_info, access_token, require_email=require_email ) if not identifier: if require_email: # Email required but not available — redirect back to SPA logger.info( "SPA OAuth: No verified email from provider, " "redirecting to SPA with email_required" ) # Try to get verified emails for dropdown verified_emails: list[str] | None = None if self.authenticator.provider.name == "github" and access_token: verified_emails = self.authenticator.get_github_verified_emails( access_token ) try: provider_name = state_extras.get( "provider", getattr(self.config, "oauth2_provider", "github"), ) session_id = session_manager.store_session( token_data=token_data, user_info=user_info, state=state, provider=provider_name, verified_emails=verified_emails, ) # Redirect back to SPA with email_required flag, # matching the existing SPA redirect patterns # (success: ?session=..., error: ?error=...) parsed = urlparse(spa_redirect_url) email_params = { "email_required": "true", "session": session_id, } spa_email_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(email_params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_email_url) return { "status": "email_required", "session_id": session_id, "redirect_url": spa_email_url, "redirect_performed": True, } except Exception as session_error: logger.error( f"SPA OAuth: Failed to create email session: {session_error}" ) # Provider ID mode or session creation failed — hard error logger.error("SPA OAuth: Failed to extract identifier") parsed = urlparse(spa_redirect_url) params = { "error": "identifier_failed", "error_description": "Could not extract user identifier", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} # Lookup or create actor actor_instance = self.authenticator.lookup_or_create_actor_by_identifier( identifier, user_info=user_info ) if not actor_instance: logger.error("SPA OAuth: Failed to create actor") parsed = urlparse(spa_redirect_url) params = { "error": "actor_failed", "error_description": "Failed to create user account", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} # Store OAuth tokens in actor properties if actor_instance.store: actor_instance.store.oauth_token = access_token actor_instance.store.oauth_token_expiry = ( str(int(time.time()) + expires_in) if expires_in else None ) if refresh_token: actor_instance.store.oauth_refresh_token = refresh_token actor_instance.store.oauth_token_timestamp = str(int(time.time())) # Execute oauth_success lifecycle hooks (same as non-SPA flow) if self.hooks: try: from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface( core_actor=actor_instance, service_registry=registry ) result = self.hooks.execute_lifecycle_hooks( "oauth_success", actor_interface, email=identifier, access_token=access_token, token_data=token_data, user_info=user_info, ) oauth_valid = bool(result) if result is not None else True except Exception as e: logger.error(f"Error in SPA lifecycle hook for oauth_success: {e}") oauth_valid = False if not oauth_valid: logger.warning( f"SPA OAuth success hook rejected authentication for {identifier}" ) parsed = urlparse(spa_redirect_url) params = { "error": "auth_rejected", "error_description": "Authentication rejected by application", } spa_error_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) self.response.set_status(302, "Found") self.response.set_redirect(spa_error_url) return {"redirect_required": True, "redirect_url": spa_error_url} # Generate SPA session tokens (session_manager already initialized at start of method) spa_access_token = self.config.new_token() actor_id_str = actor_instance.id or "" session_manager.store_access_token(spa_access_token, actor_id_str, identifier) spa_refresh_token = session_manager.create_refresh_token( actor_id_str, identifier ) # Build return path return_path = state_extras.get("return_path", "/app") if "{actor_id}" in return_path: final_redirect = return_path.replace("{actor_id}", actor_id_str) else: final_redirect = f"/{actor_id_str}{return_path}" # Store pending session data for SPA to retrieve # The SPA will call back to get this data using the session token pending_session_id = session_manager.store_session( token_data={ "access_token": spa_access_token, "refresh_token": spa_refresh_token, "actor_id": actor_id_str, "email": identifier, "expires_at": int(time.time()) + 3600, "redirect_url": final_redirect, }, user_info=user_info, state=state, provider=state_extras.get( "provider", getattr(self.config, "oauth2_provider", "google") ), ) # Set HttpOnly cookie for refresh token (hybrid mode) token_delivery = state_extras.get("token_delivery", "json") if token_delivery == "hybrid" and self.response: self.response.set_cookie( "refresh_token", spa_refresh_token, max_age=86400 * 14, # 2 weeks path="/oauth/spa/token", secure=True, httponly=True, samesite="Strict", ) # Redirect to SPA callback with session token parsed = urlparse(spa_redirect_url) params = {"session": pending_session_id} spa_callback_url = urlunparse( ( parsed.scheme or "", parsed.netloc or "", parsed.path, parsed.params, urlencode(params), parsed.fragment, ) ) logger.debug(f"SPA OAuth completed, redirecting to: {spa_callback_url}") self.response.set_status(302, "Found") self.response.set_redirect(spa_callback_url) return {"redirect_required": True, "redirect_url": spa_callback_url}