import json
import logging
from actingweb import actor
from actingweb.db import get_actor
from actingweb.handlers import base_handler
logger = logging.getLogger(__name__)
[docs]
class RootFactoryHandler(base_handler.BaseHandler):
def _wants_json(self) -> bool:
"""Check if client prefers JSON response."""
# Check Accept header
if self.request.headers:
accept = self.request.headers.get("Accept", "")
if "application/json" in accept:
return True
# Check format query parameter
if self.request.get("format") == "json":
return True
return False
def _get_json_config(self):
"""
Return JSON configuration for SPAs doing user login.
GET /?format=json or with Accept: application/json
Returns JSON with:
- oauth_enabled: Whether OAuth is configured
- oauth_providers: List of available OAuth providers with URLs
- spa_mode_supported: Always true
- endpoints: OAuth endpoint URLs
Note: Trust types are NOT included here because this endpoint is for
user login/actor creation, not for MCP client authorization.
For MCP authorization trust types, use /oauth/authorize which handles
MCP client authorization flows.
"""
base_url = f"{self.config.proto}{self.config.fqdn}"
# Build OAuth provider list
oauth_urls = {}
oauth_providers = []
oauth_enabled = False
try:
from actingweb.oauth2 import (
create_oauth2_authenticator,
get_provider_display_name,
)
providers_cfg = getattr(self.config, "oauth_providers", {})
provider_names: list[str] = []
if providers_cfg:
provider_names = list(providers_cfg.keys())
elif self.config.oauth and self.config.oauth.get("client_id"):
provider_names = [getattr(self.config, "oauth2_provider", "google")]
for prov_name in provider_names:
auth = create_oauth2_authenticator(self.config, prov_name)
if auth.is_enabled():
auth_url = auth.create_authorization_url()
oauth_urls[prov_name] = auth_url
oauth_providers.append(
{
"name": prov_name,
"display_name": get_provider_display_name(prov_name),
"authorization_url": auth_url,
"authorization_endpoint": auth.provider.auth_uri,
}
)
oauth_enabled = True
except Exception as e:
logger.warning(f"Failed to generate OAuth URLs for JSON config: {e}")
# Build response
response_data = {
"oauth_enabled": oauth_enabled,
"oauth_providers": oauth_providers,
"spa_mode_supported": True,
"pkce_supported": True,
"token_delivery_modes": ["json", "cookie", "hybrid"],
"refresh_token_rotation": True,
"endpoints": {
# Unified OAuth endpoints (JSON API)
"config": f"{base_url}/oauth/config",
"callback": f"{base_url}/oauth/callback",
"revoke": f"{base_url}/oauth/revoke",
"session": f"{base_url}/oauth/session",
"logout": f"{base_url}/oauth/logout",
# SPA-specific (different purpose than MCP OAuth2)
"spa_authorize": f"{base_url}/oauth/spa/authorize",
"spa_token": f"{base_url}/oauth/spa/token",
# MCP OAuth2 server endpoints
"mcp_authorize": f"{base_url}/oauth/authorize",
"mcp_token": f"{base_url}/oauth/token",
# Email form endpoint
"email_form": f"{base_url}/oauth/email",
},
"discovery": {
"authorization_server": f"{base_url}/.well-known/oauth-authorization-server",
"protected_resource": f"{base_url}/.well-known/oauth-protected-resource",
},
"web_ui_enabled": bool(self.config.ui),
}
# Write JSON response
self.response.write(json.dumps(response_data))
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(200)
# Set CORS headers for SPA access
if self.request.headers:
origin = self.request.headers.get("Origin", "*")
self.response.headers["Access-Control-Allow-Origin"] = origin
self.response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
self.response.headers["Access-Control-Allow-Headers"] = (
"Authorization, Content-Type, Accept"
)
self.response.headers["Access-Control-Allow-Credentials"] = "true"
return response_data
[docs]
def get(self):
if self.request.get("_method") == "POST":
self.post()
return
# Check if JSON response is requested (for SPAs)
if self._wants_json():
return self._get_json_config()
if self.config.ui:
# Provide OAuth login URLs for 3rd party apps to render "Login with Google/GitHub" buttons
oauth_urls = {}
oauth_providers = []
try:
from actingweb.oauth2 import (
create_oauth2_authenticator,
get_provider_display_name,
)
providers_cfg = getattr(self.config, "oauth_providers", {})
provider_names: list[str] = []
if providers_cfg:
provider_names = list(providers_cfg.keys())
elif self.config.oauth and self.config.oauth.get("client_id"):
provider_names = [getattr(self.config, "oauth2_provider", "google")]
for prov_name in provider_names:
auth = create_oauth2_authenticator(self.config, prov_name)
if auth.is_enabled():
auth_url = auth.create_authorization_url()
oauth_urls[prov_name] = auth_url
oauth_providers.append(
{
"name": prov_name,
"display_name": get_provider_display_name(prov_name),
"url": auth_url,
}
)
logger.debug(
f"{prov_name} OAuth URL generated: {auth_url[:100]}..."
)
except Exception as e:
logger.warning(f"Failed to generate OAuth URLs: {e}")
self.response.template_values = {
"oauth_urls": oauth_urls, # Dict: {'google': url, 'github': url}
"oauth_providers": oauth_providers, # List of dicts with name, display_name, url
"oauth_enabled": bool(oauth_urls),
}
else:
self.response.set_status(404)
[docs]
def post(self):
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
is_json = True
if "creator" in params:
creator = params["creator"]
else:
creator = ""
if "trustee_root" in params:
trustee_root = params["trustee_root"]
else:
trustee_root = ""
if "passphrase" in params:
passphrase = params["passphrase"]
else:
passphrase = ""
except ValueError:
is_json = False
creator = self.request.get("creator")
trustee_root = self.request.get("trustee_root")
passphrase = self.request.get("passphrase")
# Normalise creator when using email login flow
if isinstance(creator, str):
creator = creator.strip()
if "@" in creator:
creator = creator.lower()
if not is_json and creator:
existing_actor = actor.Actor(config=self.config)
if existing_actor.get_from_creator(creator):
actor_id = existing_actor.id or ""
redirect_target = f"/{actor_id}/www"
if self.response:
self.response.set_redirect(redirect_target)
self.response.headers["Location"] = (
f"{self.config.root}{actor_id}/www"
)
self.response.set_status(302, "Found")
return
# Create actor using enhanced method with hooks and trustee_root
myself = actor.Actor(config=self.config)
if not myself.create(
url=self.request.url or "",
creator=creator,
passphrase=passphrase,
trustee_root=trustee_root,
hooks=self.hooks,
):
# Check if this is a unique creator constraint violation
if self.config and self.config.unique_creator and creator:
# Check if creator already exists
in_db = get_actor(self.config)
exists = in_db.get_by_creator(creator=creator)
if exists:
self.response.set_status(403, "Creator already exists")
logger.warning(
"Creator already exists, cannot create new Actor("
+ str(self.request.url)
+ " "
+ str(creator)
+ ")"
)
return
# Generic creation failure
self.response.set_status(400, "Not created")
logger.warning(
"Was not able to create new Actor("
+ str(self.request.url)
+ " "
+ str(creator)
+ ")"
)
return
self.response.headers["Location"] = str(self.config.root + (myself.id or ""))
if self.config.www_auth == "oauth" and not is_json:
self.response.set_redirect(self.config.root + (myself.id or "") + "/www")
return
pair = {
"id": myself.id,
"creator": myself.creator,
"passphrase": str(myself.passphrase),
}
if trustee_root and isinstance(trustee_root, str) and len(trustee_root) > 0:
pair["trustee_root"] = trustee_root
if self.config.ui and not is_json:
self.response.template_values = pair
return
out = json.dumps(pair)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(201, "Created")