=========================== Migrating to ActingWeb 3.10 =========================== Overview ======== ActingWeb 3.10 introduces **automatic subscription processing**, a major feature that simplifies callback handling from ~500+ lines of manual code to ~30 lines of application logic. **Key Features:** - **CallbackProcessor**: Automatic sequencing, deduplication, and gap handling - **RemotePeerStore**: Standardized storage for peer data with list operations - **FanOutManager**: Circuit breaker-protected delivery to multiple subscribers - **Peer Capabilities**: Protocol v1.4 feature discovery (compression, batch subscriptions) - **Auto-cleanup**: Automatic peer data removal when trust relationships end Who Needs to Migrate? ===================== **You should migrate if:** - Your application manually handles subscription callback sequencing - You maintain custom code for detecting duplicate or out-of-order callbacks - You have custom storage for peer data received via subscriptions - You want to reduce boilerplate and improve reliability **You can skip migration if:** - You don't use subscriptions between actors - Your current callback handling is simple and working well - You prefer full control over callback processing **Important:** Migration is **optional**. Existing ``@callback_hook("subscription")`` handlers continue to work unchanged. New Components ============== CallbackProcessor ----------------- Handles all aspects of callback sequencing automatically: .. code-block:: python from actingweb.callback_processor import CallbackProcessor, ProcessResult processor = CallbackProcessor( actor=actor, gap_timeout_seconds=5.0, # Trigger resync after 5s gap max_pending=100, # Back-pressure limit ) # Process a callback result = processor.process_callback( peer_id="peer123", subscription_id="sub456", sequence=42, data={"status": "active"}, ) # Result indicates what happened if result == ProcessResult.PROCESSED: # Normal processing - data is ready pass elif result == ProcessResult.DUPLICATE: # Already processed this sequence pass elif result == ProcessResult.PENDING: # Gap detected, waiting for missing callbacks pass elif result == ProcessResult.RESYNC_REQUIRED: # Gap timeout exceeded, need full resync pass RemotePeerStore --------------- Stores and manages data received from peer actors: .. code-block:: python from actingweb.remote_storage import RemotePeerStore store = RemotePeerStore(actor, peer_id) # Scalar values store.set_value("profile", {"name": "Alice", "status": "active"}) profile = store.get_value("profile") # Lists with automatic operations store.set_list("items", [{"id": 1}, {"id": 2}]) items = store.get_list("items") # Apply callback data (handles list operations automatically) store.apply_callback_data("items", { "operation": "append", "items": [{"id": 3}] }) # Cleanup store.delete_all() FanOutManager ------------- Delivers callbacks to multiple subscribers with circuit breaker protection: .. code-block:: python from actingweb.fanout import FanOutManager, FanOutResult manager = FanOutManager( actor=actor, max_concurrent=5, # Parallel deliveries default_timeout=30.0, # Request timeout ) # Deliver to all subscribers result: FanOutResult = await manager.deliver( target="properties", data={"status": "changed"}, ) # Check results print(f"Delivered: {result.success_count}/{result.total_count}") # Circuit breaker management status = manager.get_circuit_breaker_status("peer123") if status == "OPEN": manager.reset_circuit_breaker("peer123") PeerCapabilities ---------------- Discover what protocol features a peer supports (protocol v1.4): .. code-block:: python from actingweb.peer_capabilities import PeerCapabilities caps = PeerCapabilities(actor, peer_id) if caps.supports_resync_callbacks(): # Peer handles type="resync" callbacks pass if caps.supports_compression(): # Peer accepts gzip-compressed payloads pass # Get all capabilities all_caps = caps.get_all_supported() print(f"Peer supports: {all_caps}") Migration Guide =============== Step 1: Enable Subscription Processing -------------------------------------- Add ``.with_subscription_processing()`` to your application configuration: **Before (v3.9 and earlier):** .. code-block:: python from actingweb.interface import ActingWebApp app = ( ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com", proto="https://" ) .with_oauth(client_id="...", client_secret="...") .with_devtest(enable=False) ) **After (v3.10):** .. code-block:: python from actingweb.interface import ActingWebApp app = ( ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com", proto="https://" ) .with_oauth(client_id="...", client_secret="...") .with_devtest(enable=False) .with_subscription_processing( auto_sequence=True, # Enable CallbackProcessor auto_storage=True, # Enable RemotePeerStore auto_cleanup=True, # Clean up on trust deletion gap_timeout_seconds=5.0, # Resync trigger timeout max_pending=100 # Back-pressure limit ) ) **For Lambda/Serverless environments**, also add: .. code-block:: python app = app.with_sync_callbacks() # Ensures delivery before freeze Step 2: Replace Callback Hook with Data Hook -------------------------------------------- Replace your ``@callback_hook("subscription")`` with ``@subscription_data_hook``: **Before (manual sequencing):** .. code-block:: python @app.callback_hook("subscription") def handle_subscription(actor, req): """Manual callback handling - 100+ lines typically.""" peer_id = req.json.get("id") sequence = req.json.get("sequence", 0) data = req.json.get("data", {}) target = req.json.get("target", "properties") # Manual sequence checking last_seq = get_last_sequence(peer_id, target) if sequence <= last_seq: return {"status": "duplicate"} if sequence > last_seq + 1: # Gap detected - store pending store_pending(peer_id, sequence, data) if check_gap_timeout(peer_id): trigger_resync(peer_id, target) return {"status": "pending"} # Store the data save_peer_data(peer_id, target, data) update_sequence(peer_id, target, sequence) # Process any pending callbacks process_pending_callbacks(peer_id, target) # Application logic notify_user(f"Update from {peer_id}") return {"status": "ok"} **After (automatic processing):** .. code-block:: python @app.subscription_data_hook("properties") def on_property_change( actor, peer_id: str, target: str, data: dict, sequence: int, callback_type: str, # "diff" or "resync" ): """ Called with already-sequenced, deduplicated, stored data. Library handled: sequencing, deduplication, storage, list operations. """ # Just your application logic! if callback_type == "resync": refresh_ui(actor, peer_id) else: notify_user(f"Update from {peer_id}: {data}") **Line reduction:** ~100 lines → ~10 lines Step 3: Migrate Peer Data Storage --------------------------------- Replace custom peer data storage with RemotePeerStore: **Before (custom storage):** .. code-block:: python def save_peer_data(peer_id, target, data): """Custom storage implementation.""" bucket = f"remote:{peer_id}" actor.attributes.set_bucket(bucket) if "items" in data: existing = actor.attributes.get_attribute(f"{target}:items") or [] operation = data.get("operation", "replace") if operation == "append": existing.extend(data["items"]) elif operation == "delete": # Handle delete... pass actor.attributes.set_attribute(f"{target}:items", existing) else: for key, value in data.items(): actor.attributes.set_attribute(f"{target}:{key}", value) **After (RemotePeerStore):** .. code-block:: python from actingweb.remote_storage import RemotePeerStore @app.subscription_data_hook("properties") def on_property_change(actor, peer_id, target, data, sequence, callback_type): # Storage is automatic with auto_storage=True # If you need manual access: store = RemotePeerStore(actor, peer_id) items = store.get_list(f"{target}:items") Step 4: Remove Manual Cleanup Code ---------------------------------- **Before (manual cleanup):** .. code-block:: python @app.trust_hook("delete") def on_trust_deleted(actor, peerid, relationship, trust_data): """Manual cleanup of peer data.""" # Delete all peer data bucket = f"remote:{peerid}" actor.attributes.set_bucket(bucket) for key in actor.attributes.get_all_keys(): actor.attributes.delete_attribute(key) # Clear sequence tracking clear_sequence_state(peerid) # Clear pending callbacks clear_pending_callbacks(peerid) logger.info(f"Cleaned up data for peer {peerid}") **After (automatic cleanup):** .. code-block:: python @app.trust_hook("delete") def on_trust_deleted(actor, peerid, relationship, trust_data): """Application-specific logic only - storage cleanup is automatic.""" # Note: With auto_cleanup=True, the following happens automatically: # - RemotePeerStore data deleted # - Callback state cleared # - Pending callbacks discarded # Only app-specific actions needed: notify_websocket_clients(f"Peer {peerid} disconnected") Step 5: Update List Operation Handling -------------------------------------- If you handle list operations manually, the library now supports them automatically: **Supported Operations:** .. list-table:: :header-rows: 1 :widths: 20 80 * - Operation - Description * - ``append`` - Add items to end of list * - ``extend`` - Add multiple items to end * - ``insert`` - Insert at specific index * - ``update`` - Replace item at index * - ``delete`` - Remove item at index * - ``pop`` - Remove and return item * - ``clear`` - Remove all items * - ``remove`` - Remove first occurrence **Before (manual list operations):** .. code-block:: python def apply_list_operation(peer_id, list_name, operation_data): op = operation_data.get("operation") items = operation_data.get("items", []) index = operation_data.get("index", 0) existing = get_list(peer_id, list_name) if op == "append": existing.extend(items) elif op == "insert": for i, item in enumerate(items): existing.insert(index + i, item) elif op == "delete": if 0 <= index < len(existing): del existing[index] # ... more operations save_list(peer_id, list_name, existing) **After (automatic):** .. code-block:: python # With auto_storage=True, list operations are applied automatically # Just read the result if needed: @app.subscription_data_hook("properties") def on_property_change(actor, peer_id, target, data, sequence, callback_type): store = RemotePeerStore(actor, peer_id) current_items = store.get_list("items") # Already updated print(f"Items now: {current_items}") Configuration Reference ======================= ``.with_subscription_processing()`` Parameters ---------------------------------------------- .. list-table:: :header-rows: 1 :widths: 25 15 60 * - Parameter - Default - Description * - ``auto_sequence`` - ``True`` - Enable CallbackProcessor for sequencing/deduplication * - ``auto_storage`` - ``True`` - Enable RemotePeerStore for storing peer data * - ``auto_cleanup`` - ``True`` - Clean up peer data when trust is deleted * - ``gap_timeout_seconds`` - ``5.0`` - Seconds before a sequence gap triggers resync * - ``max_pending`` - ``100`` - Maximum pending callbacks before returning 429 Choosing Configuration Values ----------------------------- **gap_timeout_seconds:** - **Low latency networks (< 100ms):** 2-3 seconds - **Standard networks:** 5 seconds (default) - **High latency/unreliable networks:** 10-15 seconds - **Batch processing scenarios:** 30+ seconds **max_pending:** - **Memory-constrained environments:** 50 - **Standard deployments:** 100 (default) - **High-throughput systems:** 200-500 - **Note:** Each pending callback uses ~1-5KB memory Testing Your Migration ====================== Unit Tests ---------- Test your new data hooks: .. code-block:: python import pytest from actingweb.interface import ActorInterface from actingweb.remote_storage import RemotePeerStore def test_data_hook_receives_processed_data(): """Verify hook receives properly sequenced data.""" actor = ActorInterface.get_by_id(actor_id, config) store = RemotePeerStore(actor, peer_id) # Simulate processed callback store.set_value("status", {"active": True}) # Verify storage status = store.get_value("status") assert status["active"] is True Integration Tests ----------------- Test the full callback flow: .. code-block:: python def test_subscription_callback_processing(): """Test full subscription callback flow.""" # Create subscriber with subscription processing subscriber_app, aw_app = create_test_app( enable_subscription_processing=True ) # Create actors and trust publisher = create_actor(...) subscriber = create_actor(...) establish_trust(publisher, subscriber) # Create subscription subscriber.subscriptions.subscribe_to_peer( peer_id=publisher.id, target="properties" ) # Change property on publisher publisher.properties["status"] = "active" # Verify callback was processed store = RemotePeerStore(subscriber, publisher.id) status = store.get_value("properties:status") assert status == "active" Backward Compatibility ====================== Full Compatibility Maintained ----------------------------- - **Existing callbacks continue to work**: ``@callback_hook("subscription")`` handlers are unaffected - **HTTP API unchanged**: All REST endpoints work identically - **Database compatible**: No schema changes required - **Opt-in feature**: Must explicitly enable with ``.with_subscription_processing()`` Coexistence ----------- You can use both approaches during migration: .. code-block:: python # Raw hook for specific targets @app.callback_hook("subscription") def legacy_handler(actor, req): target = req.json.get("target") if target == "legacy_data": # Handle legacy target manually return handle_legacy(actor, req) # Let other targets fall through to data hooks return None # Data hook for new targets @app.subscription_data_hook("properties") def new_handler(actor, peer_id, target, data, sequence, callback_type): # Modern handling pass **Note:** Raw ``@callback_hook("subscription")`` takes precedence if registered. Troubleshooting =============== Common Migration Issues ----------------------- **Callbacks not reaching data hook:** 1. Verify ``.with_subscription_processing()`` is configured 2. Check that ``auto_sequence=True`` (default) 3. Ensure no raw ``@callback_hook("subscription")`` is registered **Data not being stored:** 1. Verify ``auto_storage=True`` (default) 2. Check that the callback contains valid ``data`` field 3. Verify trust relationship is approved **Resyncs happening too frequently:** 1. Increase ``gap_timeout_seconds`` (e.g., 10.0) 2. Check network reliability between actors 3. Consider using ``.with_sync_callbacks()`` if async delivery is unreliable **429 errors from subscribers:** 1. Increase ``max_pending`` on subscriber 2. Slow down publishing rate 3. Check subscriber processing speed Breaking Change: Bucket Naming Convention ========================================= **All library-internal buckets now use ``_`` prefix** to avoid namespace collisions with user-defined buckets. Why This Change? ---------------- Application code can create arbitrary buckets via ``Attributes(actor_id=..., bucket="mydata")``. Without a reserved prefix, a user's ``bucket="peer_permissions"`` would collide with the library's internal bucket of the same name. Convention: - ``_`` prefix = library-internal, managed by ActingWeb - No prefix = application/user data Renamed Buckets --------------- .. list-table:: :header-rows: 1 :widths: 40 40 * - Old Name - New Name * - ``trust_types`` - ``_trust_types`` * - ``trust_permissions`` - ``_trust_permissions`` * - ``peer_profiles`` - ``_peer_profiles`` * - ``peer_capabilities`` - ``_peer_capabilities`` * - ``auth_code_index`` - ``_auth_code_index`` * - ``access_token_index`` - ``_access_token_index`` * - ``refresh_token_index`` - ``_refresh_token_index`` * - ``client_index`` - ``_client_index`` * - ``oauth_sessions`` - ``_oauth_sessions`` Migration Impact ---------------- **For most deployments, no action is required.** The affected data is typically transient: - **OAuth sessions/tokens**: Recreated automatically on next login - **Auth indexes**: Rebuilt as tokens are issued **For deployments with existing trust relationships:** If you have existing trust types, permissions, peer profiles, or peer capabilities stored, you may need to migrate the data. The simplest approach is to re-establish trust relationships, which will repopulate the new buckets automatically. Peer Permissions Caching ======================== ActingWeb 3.10 adds **peer permissions caching** for trust relationships, following the same pattern as peer profiles and peer capabilities. What It Does ------------ Peer permissions caching stores what permissions **remote peers have granted us** access to. This is distinct from ``TrustPermissions`` which stores what **we grant to peers**. When enabled: - Permissions are automatically fetched when trust is approved - Permissions are refreshed during ``sync_peer()`` operations - Permissions are cleaned up when trust is deleted - Permission callbacks from peers are automatically stored Enabling Peer Permissions ------------------------- Add ``.with_peer_permissions()`` to your application configuration: .. code-block:: python from actingweb.interface import ActingWebApp app = ( ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com", proto="https://" ) .with_peer_permissions( enable=True, # Enable permission caching auto_delete_on_revocation=True, # Delete cached data when revoked notify_peer_on_change=True # Auto-notify peers (default) ) .with_peer_capabilities(enable=True) # Also recommended ) **Configuration Options:** - ``enable``: Enable/disable peer permissions caching. Default: ``True`` when called. - ``auto_delete_on_revocation``: Delete cached peer data when permissions are revoked. Default: ``False``. - ``notify_peer_on_change``: Automatically notify peers when you change their permissions by sending a callback to ``/callbacks/permissions/{actor_id}``. Default: ``True``. **Permission Query Endpoint (NEW in v3.10.0a5):** The ``GET /{actor_id}/permissions/{peer_id}`` endpoint is now available for proactive permission discovery. This complements the existing permission callback mechanism by supporting: - Initial permission baseline during trust establishment - Permission refresh without waiting for callbacks - Recovery if permission callbacks are missed The endpoint is automatically registered when using the ActingWeb framework. No migration changes needed. The endpoint returns the full *effective* permissions (base trust-type defaults merged with per-trust overrides), allowing peers to discover what permissions they have been granted and accurately detect changes. Using Cached Permissions ------------------------ Access cached permissions via ``PeerPermissionStore``: .. code-block:: python from actingweb.peer_permissions import get_peer_permission_store # Get the permission store store = get_peer_permission_store(actor.config) # Get cached permissions for a specific peer permissions = store.get_permissions(actor.id, peer_id) if permissions: # Check property access if permissions.has_property_access("memory_travel", "read"): print("Peer granted us read access to memory_travel") # Check method access if permissions.has_method_access("sync_data"): print("Peer granted us access to sync_data method") # Check tool access (MCP) if permissions.has_tool_access("search"): print("Peer granted us access to search tool") Permission Callback Handling ---------------------------- When peers notify us of permission changes via callbacks, the library automatically stores the updated permissions. You can register a hook for app-specific handling: .. code-block:: python @app.callback_hook("permissions") def on_permission_change(actor, req): """Handle permission update from peer.""" granting_actor_id = req.json.get("granting_actor_id") data = req.json.get("data", {}) # Permissions are already stored by the library # Add your app-specific logic here logger.info(f"Permission update from {granting_actor_id}") # Example: trigger sync for newly granted permissions if data.get("properties", {}).get("patterns"): patterns = data["properties"]["patterns"] for pattern in patterns: if pattern.startswith("memory_"): sync_memory_type(actor, granting_actor_id, pattern) Multi-Provider OAuth Support ============================ ActingWeb 3.10 adds support for configuring **multiple OAuth providers simultaneously** (e.g., Google and GitHub). Previously, only one provider could be active at a time. **This is a non-breaking, additive change.** All existing single-provider configurations continue to work without modification. What Changed ------------ - ``.with_oauth()`` now accepts an optional ``provider`` parameter - New ``config.oauth_providers`` attribute (dict-of-dicts) stores per-provider credentials - Factory login page renders buttons for all configured providers - OAuth state parameter carries the ``provider`` name for correct callback routing - ``/oauth/config`` (SPA endpoint) returns all configured providers - GitHub email verification enforces ``verified`` flag on primary emails Backward Compatibility ---------------------- All 12 public API surface areas were verified as fully backward compatible: - **``with_oauth()`` without ``provider``**: Works identically to before — single-provider config - **``config.oauth``**: Still populated (points to first configured provider's credentials) - **``config.oauth2_provider``**: Still populated (set to first configured provider name) - **``create_oauth2_authenticator(config)``**: Default provider used when no provider specified - **``create_google_authenticator()`` / ``create_github_authenticator()``**: Unchanged signatures - **OAuth callback state parsing**: Absent ``provider`` field falls back to ``config.oauth2_provider`` - **Template variables**: ``oauth_providers`` list already existed; new providers append to it - **REST API responses**: ``/oauth/config`` returns same structure; additional providers are additive - **SPA state format**: Missing ``provider`` key falls back to default provider - **MCP encrypted state**: Missing ``provider`` field falls back to default - **Email verification flow**: No signature changes; new ``email_verification_required`` hook is additive - **Factory ``/login`` endpoint**: Single-provider still renders single button No Action Required for Existing Applications --------------------------------------------- If you are using a single OAuth provider, **no changes are needed**. Your existing configuration continues to work: .. code-block:: python # This still works exactly as before app = ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com" ).with_oauth( client_id="your-client-id", client_secret="your-client-secret" ) Opting In to Multiple Providers ------------------------------- To add a second provider, call ``.with_oauth()`` again with the ``provider`` parameter: .. code-block:: python app = ActingWebApp( aw_type="urn:actingweb:example.com:myapp", database="dynamodb", fqdn="myapp.example.com" ).with_oauth( provider="google", client_id="google-client-id", client_secret="google-client-secret", scope="openid email profile" ).with_oauth( provider="github", client_id="github-client-id", client_secret="github-client-secret", scope="read:user user:email" ) When multiple providers are configured: - The factory login page (``/login``) shows buttons for all providers - 401 redirects go to ``/login`` (not directly to a single provider's OAuth page) - Each provider's credentials are isolated in ``config.oauth_providers`` GitHub Email Verification Security Fix --------------------------------------- A security hardening was applied to GitHub email handling: - ``_get_github_primary_email()`` now requires both ``primary`` **and** ``verified`` flags - Previously, an unverified primary email was accepted, which could allow account-linking attacks via the GitHub ``/user/emails`` API - If no verified primary email is available, the first verified non-primary email is used - If no verified emails exist at all, the email form fallback is triggered (when ``require_email=True``) or ``None`` is returned This change only affects **new** GitHub OAuth logins. Existing actors already linked via GitHub are not affected. SPA Flow: Email Collection ^^^^^^^^^^^^^^^^^^^^^^^^^^^ When GitHub returns no verified emails and ``require_email=True``, the SPA OAuth callback redirects **back to the SPA** with ``email_required=true&session=`` query parameters instead of returning a hard error. The SPA handles email collection natively via the JSON API at ``POST /oauth/email``. For HTML template applications, the callback redirects to ``/oauth/email?session=`` where the server-rendered ``aw-oauth-email.html`` template collects the email. In both cases, the email verification flow is the same: 1. Collects the user's email address (SPA form or HTML template) 2. ``POST /oauth/email`` creates the actor with ``email_verified = "false"`` 3. Fires the ``email_verification_required`` lifecycle hook 4. The application's hook handler sends a verification email 5. User clicks ``GET /oauth/email?verify=`` to complete verification See :doc:`../guides/spa-authentication` for the SPA-specific flow details. MCP Flow: Verified Email Required ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The MCP OAuth flow requires a verified email from the provider. If no verified email is available, the flow returns an ``invalid_grant`` error with a message explaining that a verified email is required. See Also ======== - :doc:`../guides/subscriptions` - Full subscription processing guide - :doc:`../guides/trust-relationships` - Trust and subscription lifecycle - :doc:`../guides/authentication` - Authentication and OAuth2 guide - :doc:`../guides/troubleshooting` - Troubleshooting guide - `CHANGELOG `_ - Full changelog