Source code for actingweb.interface.app

"""
Main ActingWebApp class providing fluent API for application configuration.
"""

import os
from collections.abc import Callable
from typing import TYPE_CHECKING, Any

from .. import __version__
from ..config import Config
from ..subscription_config import SubscriptionProcessingConfig
from .hooks import HookMetadata, HookRegistry

if TYPE_CHECKING:
    from .integrations.fastapi_integration import FastAPIIntegration
    from .integrations.flask_integration import FlaskIntegration


[docs] class ActingWebApp: """ Main application class for ActingWeb with fluent configuration API. Example usage: .. code-block:: python app = ( ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com", ) .with_oauth(client_id="...", client_secret="...") .with_web_ui() .with_devtest() ) @app.lifecycle_hook("actor_created") def handle_actor_created(actor: 'ActorInterface') -> None: # Custom logic after actor creation pass """ def __init__( self, aw_type: str, database: str | None = None, fqdn: str = "", proto: str = "https://", ): self.aw_type = aw_type # Allow DATABASE_BACKEND environment variable to override default self.database = database or os.getenv("DATABASE_BACKEND", "dynamodb") self.fqdn = fqdn or os.getenv("APP_HOST_FQDN", "localhost") self.proto = proto or os.getenv("APP_HOST_PROTOCOL", "https://") # Configuration options # Multi-provider OAuth: dict of provider_name -> config dict # Empty string key "" is used for backward-compat single-provider calls self._oauth_configs: dict[str, dict[str, Any]] = {} self._actors_config: dict[str, dict[str, Any]] = {} self._enable_ui = False self._enable_devtest = False self._enable_bot = False self._bot_config: dict[str, Any] | None = None self._www_auth = "basic" self._unique_creator = False self._force_email_prop_as_creator = False self._enable_mcp = True # MCP enabled by default self._mcp_server_name = "actingweb" self._mcp_instructions: str | None = None self._sync_subscription_callbacks = False # Async by default self._thread_pool_workers = ( 10 # Default thread pool size for FastAPI integration ) # Property lookup configuration self._indexed_properties: list[str] = ["oauthId", "email", "externalUserId"] self._use_lookup_table: bool = ( False # False by default for backward compatibility ) # Peer profile caching configuration # None = disabled, list of attributes = enabled self._peer_profile_attributes: list[str] | None = None # Peer capabilities (methods/actions) caching configuration self._peer_capabilities_caching: bool = False # Peer permissions caching configuration self._peer_permissions_caching: bool = False # Hook registry self.hooks = HookRegistry() # Subscription processing configuration self._subscription_config = SubscriptionProcessingConfig() self._subscription_data_hooks: dict[str, list[Callable[..., Any]]] = {} # Service registry for third-party OAuth2 services self._service_registry: Any | None = None # Lazy initialized # Internal config object (lazy initialized) self._config: Config | None = None # Automatically initialize permission system for better performance self._initialize_permission_system() def _attach_service_registry_to_config(self) -> None: """Ensure the Config instance exposes the shared service registry.""" if self._config is None: return # Always set attribute so downstream code can rely on it existing self._config.service_registry = self._service_registry # type: ignore[attr-defined] def _warn_lambda_async_callbacks(self) -> None: """Warn if running in Lambda environment without sync callbacks enabled.""" import logging logger = logging.getLogger(__name__) # Detect Lambda environment through AWS environment variables # AWS_LAMBDA_FUNCTION_NAME is set in all Lambda functions # AWS_EXECUTION_ENV contains runtime info (e.g., "AWS_Lambda_python3.11") is_lambda = bool( os.environ.get("AWS_LAMBDA_FUNCTION_NAME") or os.environ.get("AWS_EXECUTION_ENV", "").startswith("AWS_Lambda_") ) if is_lambda and not self._sync_subscription_callbacks: logger.warning( "Running in AWS Lambda with async subscription callbacks enabled. " "Fire-and-forget callbacks may be lost when Lambda function freezes. " "Consider enabling sync callbacks with .with_sync_callbacks() " "to ensure callback delivery before function freeze. " "See: https://docs.actingweb.org/guides/lambda-deployment.html" ) def _apply_runtime_changes_to_config(self) -> None: """Propagate builder changes to an existing Config instance. This keeps configuration consistent even if get_config() was called early (e.g., during startup warmups) before builder methods like with_oauth() were invoked. """ if self._config is None: return # Core toggles self._config.ui = self._enable_ui self._config.devtest = self._enable_devtest self._config.www_auth = self._www_auth self._config.unique_creator = self._unique_creator self._config.force_email_prop_as_creator = self._force_email_prop_as_creator # OAuth configuration — multi-provider support if self._oauth_configs: # Build named providers (skip the empty-string default key) named: dict[str, dict[str, Any]] = { k: dict(v) for k, v in self._oauth_configs.items() if k } if named: self._config.oauth_providers = named # Backward compat: oauth points to the first named provider first_name = next(iter(named)) self._config.oauth = dict(named[first_name]) self._config.oauth2_provider = first_name else: # Single unnamed provider (backward compat path) default_cfg = self._oauth_configs.get("") if default_cfg is not None: self._config.oauth = dict(default_cfg) # Actor types and bot config if self._actors_config: self._config.actors = dict(self._actors_config) if self._enable_bot: self._config.bot = dict(self._bot_config or {}) # Property lookup configuration if hasattr(self, "_indexed_properties"): self._config.indexed_properties = self._indexed_properties if hasattr(self, "_use_lookup_table"): self._config.use_lookup_table = self._use_lookup_table # Subscription callback mode if hasattr(self, "_sync_subscription_callbacks"): self._config.sync_subscription_callbacks = self._sync_subscription_callbacks # Warn if running in Lambda without sync callbacks enabled self._warn_lambda_async_callbacks() # Peer profile caching configuration if hasattr(self, "_peer_profile_attributes"): self._config.peer_profile_attributes = self._peer_profile_attributes # Peer capabilities (methods/actions) caching configuration if hasattr(self, "_peer_capabilities_caching"): self._config.peer_capabilities_caching = self._peer_capabilities_caching if hasattr(self, "_peer_capabilities_max_age_seconds"): self._config.peer_capabilities_max_age_seconds = ( self._peer_capabilities_max_age_seconds ) # Peer permissions caching configuration if hasattr(self, "_peer_permissions_caching"): self._config.peer_permissions_caching = self._peer_permissions_caching # Auto-delete on revocation configuration if hasattr(self, "_auto_delete_on_revocation"): self._config.auto_delete_on_revocation = self._auto_delete_on_revocation # Notify peer on change configuration if hasattr(self, "_notify_peer_on_change"): self._config.notify_peer_on_change = self._notify_peer_on_change # Update supported options based on enabled features self._config.update_supported_options() # Keep service registry reference in sync self._attach_service_registry_to_config()
[docs] def with_oauth( self, client_id: str, client_secret: str, scope: str = "", auth_uri: str = "", token_uri: str = "", provider: str = "", **kwargs: Any, ) -> "ActingWebApp": """Configure OAuth authentication. Can be called multiple times with different ``provider`` values to configure multiple OAuth providers simultaneously. .. note:: When calling ``with_oauth()`` multiple times for different providers, **every** call must include an explicit ``provider`` name. Mixing a nameless call (legacy single-provider API) with named calls will silently drop the nameless provider from the multi-provider configuration. Args: client_id: OAuth client ID client_secret: OAuth client secret scope: OAuth scope string auth_uri: Authorization endpoint URL token_uri: Token exchange endpoint URL provider: Provider name (e.g. ``"google"``, ``"github"``). When empty, stores as the single default provider for backward compatibility. **kwargs: Additional OAuth config values """ oauth_cfg: dict[str, Any] = { "client_id": client_id, "client_secret": client_secret, "redirect_uri": f"{self.proto}{self.fqdn}/oauth", "scope": scope, "auth_uri": auth_uri or "https://api.actingweb.net/v1/authorize", "token_uri": token_uri or "https://api.actingweb.net/v1/access_token", "response_type": "code", "grant_type": "authorization_code", "refresh_type": "refresh_token", **kwargs, } # Store under the provider key (empty string for backward-compat default) self._oauth_configs[provider] = oauth_cfg self._www_auth = "oauth" # Ensure existing config (if already created) is updated self._apply_runtime_changes_to_config() return self
[docs] def with_web_ui(self, enable: bool = True) -> "ActingWebApp": """Enable or disable the web UI.""" self._enable_ui = enable self._apply_runtime_changes_to_config() return self
[docs] def with_devtest(self, enable: bool = True) -> "ActingWebApp": """Enable or disable development/testing endpoints.""" self._enable_devtest = enable self._apply_runtime_changes_to_config() return self
[docs] def with_indexed_properties( self, properties: list[str] | None = None ) -> "ActingWebApp": """Configure which properties support reverse lookups via lookup table. Properties specified here will have their values indexed in a separate lookup table, enabling reverse lookups (value -> actor_id) without the 2048-byte size limit imposed by DynamoDB Global Secondary Indexes. Args: properties: List of property names to index. Default is ["oauthId", "email", "externalUserId"]. Set to empty list [] to disable all reverse lookups. Returns: Self for method chaining Example:: app = ( ActingWebApp(...) .with_indexed_properties(["oauthId", "email", "customUserId"]) ) Note: Only properties listed here can be used with Actor.get_from_property(). Changes require application restart to take effect. Use environment variable INDEXED_PROPERTIES for runtime override. """ if properties is not None: self._indexed_properties = properties self._apply_runtime_changes_to_config() return self
[docs] def with_legacy_property_index(self, enable: bool = False) -> "ActingWebApp": """ Enable legacy GSI/index-based property reverse lookup (for migration). When False (default), uses new lookup table approach which supports property values larger than 2048 bytes. When True, uses legacy DynamoDB GSI or PostgreSQL index on value field (limited to 2048 bytes). Args: enable: True to use legacy GSI/index, False for new lookup table Returns: Self for method chaining Note: Set this to True during migration from legacy systems. Once all properties are migrated to lookup table, set back to False (default). """ self._use_lookup_table = not enable self._apply_runtime_changes_to_config() return self
[docs] def with_bot( self, token: str = "", email: str = "", secret: str = "", admin_room: str = "" ) -> "ActingWebApp": """Configure bot integration.""" self._enable_bot = True self._bot_config = { "token": token or os.getenv("APP_BOT_TOKEN", ""), "email": email or os.getenv("APP_BOT_EMAIL", ""), "secret": secret or os.getenv("APP_BOT_SECRET", ""), "admin_room": admin_room or os.getenv("APP_BOT_ADMIN_ROOM", ""), } self._apply_runtime_changes_to_config() return self
[docs] def with_unique_creator(self, enable: bool = True) -> "ActingWebApp": """Enable unique creator constraint.""" self._unique_creator = enable self._apply_runtime_changes_to_config() return self
[docs] def with_email_as_creator(self, enable: bool = True) -> "ActingWebApp": """Force email property as creator.""" self._force_email_prop_as_creator = enable self._apply_runtime_changes_to_config() return self
[docs] def with_mcp( self, enable: bool = True, server_name: str = "actingweb", instructions: str | None = None, ) -> "ActingWebApp": """Enable or disable MCP (Model Context Protocol) functionality. Args: enable: If True, enable MCP support. server_name: Name announced in the MCP initialise handshake. Some clients use this as the default tool prefix (``emm:search`` vs ``actingweb:search``). Defaults to ``"actingweb"``. The first ``with_mcp()`` call sets the process-wide singleton name; subsequent re-configuration does not rename existing per-actor servers. instructions: Optional server-level orientation string surfaced on the MCP ``InitializeResult.instructions`` field per protocol. Clients display it to the LLM on initial connection. Use it to point new LLMs at an entry-point tool (e.g. ``how_to_use()``). Like ``server_name``, the first call wins for the singleton. """ self._enable_mcp = enable self._mcp_server_name = server_name self._mcp_instructions = instructions # Note: aw_supported is computed in Config.__init__. We keep this minimal # to avoid touching unrelated features; OAuth fix does not require recompute. return self
[docs] def with_sync_callbacks(self, enable: bool = True) -> "ActingWebApp": """Enable synchronous subscription callbacks. When enabled, subscription callbacks use blocking HTTP requests instead of async fire-and-forget. This ensures callbacks complete before the request handler returns, which is important for Lambda/serverless where async tasks may be lost when the function freezes after returning a response. Args: enable: If True, use synchronous callbacks. Default is True. Returns: Self for method chaining. """ self._sync_subscription_callbacks = enable self._apply_runtime_changes_to_config() return self
[docs] def with_thread_pool_workers(self, workers: int) -> "ActingWebApp": """Configure thread pool size for FastAPI integration. The thread pool is used to execute synchronous ActingWeb handlers (database operations, HTTP requests) without blocking the async event loop. Tuning guidelines: - Default: 10 workers (suitable for most applications) - Low traffic: 5 workers (reduces memory overhead) - High traffic: 20-50 workers (handles more concurrent requests) - Lambda: 5-10 workers (limited by function concurrency) - Container: Scale based on CPU cores (e.g., 2-5 per core) Memory overhead: ~8MB per worker thread on average. Args: workers: Number of thread pool workers. Must be between 1 and 100. Returns: Self for method chaining. Raises: ValueError: If workers is outside the valid range [1, 100]. Example: >>> app = ActingWebApp(...).with_thread_pool_workers(20) """ if not 1 <= workers <= 100: raise ValueError( f"Thread pool workers must be between 1 and 100, got {workers}" ) self._thread_pool_workers = workers return self
[docs] def with_peer_profile( self, attributes: list[str] | None = None, ) -> "ActingWebApp": """Enable peer profile caching for trust relationships. When enabled, peer profile attributes are automatically fetched and cached when trust relationships are established. Profiles are refreshed during sync_peer() operations. Args: attributes: List of property names to cache from peer actors. Default: ["displayname", "email", "description"] Pass empty list to explicitly disable caching. Returns: Self for method chaining. Example:: app = ( ActingWebApp(...) .with_peer_profile(attributes=["displayname", "email", "avatar_url"]) ) # Access via TrustManager profile = actor.trust.get_peer_profile(peer_id) if profile: print(f"Connected with {profile.displayname}") """ # Set default attributes if None provided if attributes is None: self._peer_profile_attributes = ["displayname", "email", "description"] else: self._peer_profile_attributes = attributes self._apply_runtime_changes_to_config() # Profile cleanup is now handled automatically in core delete_reciprocal_trust() # No hook registration needed return self
[docs] def with_peer_capabilities( self, enable: bool = True, max_age_seconds: int = 3600, ) -> "ActingWebApp": """Enable peer capabilities (methods/actions) caching for trust relationships. When enabled, peer methods and actions are automatically fetched and cached when trust relationships are established. Capabilities are refreshed during sync_peer() operations only if the cache is stale (older than max_age_seconds). Args: enable: Whether to enable capabilities caching. Default True. max_age_seconds: Maximum age in seconds before cached capabilities are considered stale and refetched. Default 3600 (1 hour). Capabilities (methods/actions) rarely change, so 1 hour is conservative. Set to 0 to always refetch. Returns: Self for method chaining. Example:: app = ( ActingWebApp(...) .with_peer_capabilities(enable=True, max_age_seconds=7200) ) # Access via TrustManager capabilities = actor.trust.get_peer_capabilities(peer_id) if capabilities: for method in capabilities.methods: print(f"Method: {method.name} - {method.description}") """ self._peer_capabilities_caching = enable self._peer_capabilities_max_age_seconds = max_age_seconds self._apply_runtime_changes_to_config() # Capabilities cleanup is now handled automatically in core delete_reciprocal_trust() # No hook registration needed return self
[docs] def with_peer_permissions( self, enable: bool = True, auto_delete_on_revocation: bool = False, notify_peer_on_change: bool = True, ) -> "ActingWebApp": """Enable peer permissions caching for trust relationships. When enabled, peer permissions are automatically fetched and cached when trust relationships are established. Permissions are refreshed during sync_peer() operations. This caches what permissions the REMOTE peer has granted US access to. It is distinct from TrustPermissions which stores what WE grant to peers. Permission callbacks from peers are automatically processed and stored when this is enabled. Args: enable: Whether to enable permissions caching. Default True. auto_delete_on_revocation: When True, automatically delete cached peer data from RemotePeerStore when the peer revokes property access. This ensures that when a peer revokes access to certain data (e.g., memory_* properties), the locally cached copies are deleted. Default False. notify_peer_on_change: When True (default), automatically notify peers when their permissions change. This sends a callback to the peer's /callbacks/permissions/{actor_id} endpoint. The notification is fire-and-forget (failures logged but don't block the store operation). Returns: Self for method chaining. Example:: app = ( ActingWebApp(...) .with_peer_permissions( enable=True, auto_delete_on_revocation=True, # Delete cached data on revocation notify_peer_on_change=True # Auto-notify peers (default) ) ) # Access cached permissions via PeerPermissionStore from actingweb.peer_permissions import get_peer_permission_store store = get_peer_permission_store(actor.config) permissions = store.get_permissions(actor.id, peer_id) if permissions: if permissions.has_property_access("memory_travel", "read"): print("Peer granted us access to memory_travel") """ self._peer_permissions_caching = enable self._auto_delete_on_revocation = auto_delete_on_revocation self._notify_peer_on_change = notify_peer_on_change self._apply_runtime_changes_to_config() # Permissions cleanup is now handled automatically in core delete_reciprocal_trust() # No hook registration needed return self
[docs] def add_service( self, name: str, client_id: str, client_secret: str, scopes: list, auth_uri: str, token_uri: str, userinfo_uri: str = "", revocation_uri: str = "", base_api_url: str = "", **extra_params, ) -> "ActingWebApp": """Add a custom third-party OAuth2 service configuration.""" self._get_service_registry().register_service_from_dict( name, { "client_id": client_id, "client_secret": client_secret, "scopes": scopes, "auth_uri": auth_uri, "token_uri": token_uri, "userinfo_uri": userinfo_uri, "revocation_uri": revocation_uri, "base_api_url": base_api_url, "extra_params": extra_params, }, ) return self
[docs] def add_dropbox(self, client_id: str, client_secret: str) -> "ActingWebApp": """Add Dropbox service using pre-configured template.""" self._get_service_registry().register_dropbox(client_id, client_secret) return self
[docs] def add_gmail( self, client_id: str, client_secret: str, readonly: bool = True ) -> "ActingWebApp": """Add Gmail service using pre-configured template.""" self._get_service_registry().register_gmail(client_id, client_secret, readonly) return self
[docs] def add_github(self, client_id: str, client_secret: str) -> "ActingWebApp": """Add GitHub service using pre-configured template.""" self._get_service_registry().register_github(client_id, client_secret) return self
[docs] def add_box(self, client_id: str, client_secret: str) -> "ActingWebApp": """Add Box service using pre-configured template.""" self._get_service_registry().register_box(client_id, client_secret) return self
def _get_service_registry(self): """Get or create the service registry.""" if self._service_registry is None: from .services import ServiceRegistry self._service_registry = ServiceRegistry(self.get_config()) # Ensure config exposes the registry even if it existed earlier self._attach_service_registry_to_config() return self._service_registry
[docs] def get_service_registry(self): """Get the service registry for advanced configuration.""" return self._get_service_registry()
[docs] def add_actor_type( self, name: str, factory: str = "", relationship: str = "friend" ) -> "ActingWebApp": """Add an actor type configuration.""" self._actors_config[name] = { "type": self.aw_type, "factory": factory or f"{self.proto}{self.fqdn}/", "relationship": relationship, } self._apply_runtime_changes_to_config() return self
[docs] def property_hook(self, property_name: str = "*") -> Callable[..., Any]: """Decorator to register property hooks.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: self.hooks.register_property_hook(property_name, func) return func return decorator
[docs] def callback_hook(self, callback_name: str = "*") -> Callable[..., Any]: """Decorator to register actor-level callback hooks.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: self.hooks.register_callback_hook(callback_name, func) return func return decorator
[docs] def app_callback_hook(self, callback_name: str) -> Callable[..., Any]: """Decorator to register application-level callback hooks (no actor context).""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: self.hooks.register_app_callback_hook(callback_name, func) return func return decorator
[docs] def subscription_hook(self, func: Callable[..., Any]) -> Callable[..., Any]: """Decorator to register subscription hooks.""" self.hooks.register_subscription_hook(func) return func
[docs] def lifecycle_hook(self, event: str) -> Callable[..., Any]: """Decorator to register lifecycle hooks.""" def decorator(func: Callable[..., Any]) -> Callable[..., Any]: self.hooks.register_lifecycle_hook(event, func) return func return decorator
[docs] def method_hook( self, method_name: str = "*", description: str = "", input_schema: dict[str, Any] | None = None, output_schema: dict[str, Any] | None = None, annotations: dict[str, Any] | None = None, ) -> Callable[..., Any]: """Decorator to register method hooks with optional metadata. Args: method_name: Name of method to hook ("*" for all methods) description: Human-readable description of what the method does input_schema: JSON schema describing expected input parameters output_schema: JSON schema describing the expected return value annotations: Safety/behavior hints (e.g., readOnlyHint, idempotentHint) """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: # Store metadata on function metadata = HookMetadata( description=description, input_schema=input_schema, output_schema=output_schema, annotations=annotations, ) setattr(func, "_hook_metadata", metadata) # noqa: B010 self.hooks.register_method_hook(method_name, func) return func return decorator
[docs] def action_hook( self, action_name: str = "*", description: str = "", input_schema: dict[str, Any] | None = None, output_schema: dict[str, Any] | None = None, annotations: dict[str, Any] | None = None, ) -> Callable[..., Any]: """Decorator to register action hooks with optional metadata. Args: action_name: Name of action to hook ("*" for all actions) description: Human-readable description of what the action does input_schema: JSON schema describing expected input parameters output_schema: JSON schema describing the expected return value annotations: Safety/behavior hints (e.g., destructiveHint, readOnlyHint) """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: # Store metadata on function metadata = HookMetadata( description=description, input_schema=input_schema, output_schema=output_schema, annotations=annotations, ) setattr(func, "_hook_metadata", metadata) # noqa: B010 self.hooks.register_action_hook(action_name, func) return func return decorator
[docs] def with_subscription_processing( self, auto_sequence: bool = True, auto_storage: bool = True, auto_cleanup: bool = True, gap_timeout_seconds: float = 5.0, max_pending: int = 100, storage_prefix: str = "remote:", max_concurrent_callbacks: int = 10, max_payload_for_high_granularity: int = 65536, circuit_breaker_threshold: int = 5, circuit_breaker_cooldown: float = 60.0, ) -> "ActingWebApp": """Enable automatic subscription processing. When enabled, the library automatically handles: - Callback sequencing and deduplication - Gap detection and resync triggering - Data storage in RemotePeerStore (if auto_storage=True) - Cleanup when trust is deleted (if auto_cleanup=True) Args: auto_sequence: Enable CallbackProcessor for sequence handling auto_storage: Automatically store received data in RemotePeerStore auto_cleanup: Register hook to clean up when trust is deleted gap_timeout_seconds: Time before triggering resync on sequence gap max_pending: Maximum pending callbacks before back-pressure (429) storage_prefix: Bucket prefix for RemotePeerStore max_concurrent_callbacks: Max concurrent callback deliveries max_payload_for_high_granularity: Payload size before granularity downgrade circuit_breaker_threshold: Failures before opening circuit circuit_breaker_cooldown: Seconds before testing recovery Returns: Self for method chaining """ self._subscription_config = SubscriptionProcessingConfig( enabled=True, auto_sequence=auto_sequence, auto_storage=auto_storage, auto_cleanup=auto_cleanup, gap_timeout_seconds=gap_timeout_seconds, max_pending=max_pending, storage_prefix=storage_prefix, max_concurrent_callbacks=max_concurrent_callbacks, max_payload_for_high_granularity=max_payload_for_high_granularity, circuit_breaker_threshold=circuit_breaker_threshold, circuit_breaker_cooldown=circuit_breaker_cooldown, subscription_data_hooks=self._subscription_data_hooks, ) # Cleanup is now handled automatically in core delete_reciprocal_trust() # No hook registration needed return self
[docs] def subscription_data_hook(self, target: str = "*") -> Callable[..., Any]: """Decorator to register subscription data hooks. Use with .with_subscription_processing() for automatic handling. The handler receives already-sequenced, deduplicated data. Args: target: Target to hook (e.g., "properties", "*" for all) Example:: @app.subscription_data_hook("properties") def on_property_change( actor: ActorInterface, peer_id: str, target: str, data: dict, sequence: int, callback_type: str ) -> None: # Data is already sequenced and stored pass """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: if target not in self._subscription_data_hooks: self._subscription_data_hooks[target] = [] self._subscription_data_hooks[target].append(func) return func return decorator
[docs] def get_subscription_config(self) -> SubscriptionProcessingConfig: """Get the subscription processing configuration.""" return self._subscription_config
def _get_default_oauth_config(self) -> dict[str, Any]: """Return the OAuth config dict to pass as ``oauth=`` to Config. If named providers are configured, returns the first one. Otherwise returns the unnamed default (or empty dict). """ named = {k: v for k, v in self._oauth_configs.items() if k} if named: return dict(next(iter(named.values()))) default = self._oauth_configs.get("") return dict(default) if default else {}
[docs] def get_config(self) -> Config: """Get the underlying ActingWeb Config object.""" if self._config is None: # Add default actor type if "myself" not in self._actors_config: self.add_actor_type("myself") self._config = Config( database=self.database, fqdn=self.fqdn, proto=self.proto, aw_type=self.aw_type, desc=f"ActingWeb app: {self.aw_type}", version=__version__, devtest=self._enable_devtest, actors=self._actors_config, force_email_prop_as_creator=self._force_email_prop_as_creator, unique_creator=self._unique_creator, www_auth=self._www_auth, logLevel=os.getenv("LOG_LEVEL", "INFO"), ui=self._enable_ui, bot=self._bot_config or {}, oauth=self._get_default_oauth_config(), mcp=self._enable_mcp, mcp_server_name=self._mcp_server_name, mcp_instructions=self._mcp_instructions, indexed_properties=self._indexed_properties, sync_subscription_callbacks=self._sync_subscription_callbacks, use_lookup_table=self._use_lookup_table, peer_profile_attributes=self._peer_profile_attributes, peer_capabilities_caching=self._peer_capabilities_caching, peer_permissions_caching=self._peer_permissions_caching, ) # Populate multi-provider OAuth config on initial creation named = {k: dict(v) for k, v in self._oauth_configs.items() if k} if named: self._config.oauth_providers = named self._config.oauth2_provider = next(iter(named)) self._attach_service_registry_to_config() # Attach hooks to config so OAuth2 and other modules can access them self._config._hooks = self.hooks # Attach subscription config so handlers can access it self._config._subscription_config = self._subscription_config else: # If config already exists, keep it in sync with latest builder settings self._apply_runtime_changes_to_config() # Ensure hooks are attached even if config was created early self._config._hooks = self.hooks # Ensure subscription config is attached even if config was created early self._config._subscription_config = self._subscription_config return self._config
[docs] def is_mcp_enabled(self) -> bool: """Check if MCP functionality is enabled.""" return self._enable_mcp
[docs] def integrate_flask(self, flask_app: Any) -> "FlaskIntegration": """Integrate with Flask application.""" try: from .integrations.flask_integration import FlaskIntegration except ImportError as e: raise ImportError( "Flask integration requires Flask to be installed. " "Install with: pip install 'actingweb[flask]'" ) from e integration = FlaskIntegration(self, flask_app) integration.setup_routes() return integration
[docs] def integrate_fastapi( self, fastapi_app: Any, templates_dir: str | None = None, **options: Any ) -> "FastAPIIntegration": """ Integrate ActingWeb with FastAPI application. Args: fastapi_app: The FastAPI application instance templates_dir: Directory containing Jinja2 templates (optional) **options: Additional configuration options Returns: FastAPIIntegration instance Raises: ImportError: If FastAPI is not installed """ try: from .integrations.fastapi_integration import FastAPIIntegration except ImportError as e: raise ImportError( "FastAPI integration requires FastAPI to be installed. " "Install with: pip install 'actingweb[fastapi]'" ) from e integration = FastAPIIntegration( self, fastapi_app, templates_dir=templates_dir, thread_pool_workers=self._thread_pool_workers, ) integration.setup_routes() return integration
[docs] def run(self, host: str = "0.0.0.0", port: int = 5000, debug: bool = False) -> None: """Run as standalone application with Flask.""" try: from flask import Flask except ImportError as e: raise ImportError( "Flask is required for standalone mode. " "Install with: pip install 'actingweb[flask]'" ) from e flask_app = Flask(__name__) self.integrate_flask(flask_app) flask_app.run(host=host, port=port, debug=debug)
def _initialize_permission_system(self) -> None: """ Automatically initialize the ActingWeb permission system. This method is called automatically when integrating with web frameworks to ensure optimal performance without requiring manual initialization. """ try: from ..permission_initialization import initialize_permission_system initialize_permission_system(self.get_config()) except Exception as e: import logging logger = logging.getLogger(__name__) logger.info(f"Permission system initialization failed: {e}") logger.info( "System will fall back to basic functionality with lazy loading" )
# Graceful fallback - don't raise exceptions that would break app startup