Migrating to ActingWeb 3.10

Overview

ActingWeb 3.10 introduces automatic subscription processing, a major feature that simplifies callback handling from ~500+ lines of manual code to ~30 lines of application logic.

Key Features:

  • CallbackProcessor: Automatic sequencing, deduplication, and gap handling

  • RemotePeerStore: Standardized storage for peer data with list operations

  • FanOutManager: Circuit breaker-protected delivery to multiple subscribers

  • Peer Capabilities: Protocol v1.4 feature discovery (compression, batch subscriptions)

  • Auto-cleanup: Automatic peer data removal when trust relationships end

Who Needs to Migrate?

You should migrate if:

  • Your application manually handles subscription callback sequencing

  • You maintain custom code for detecting duplicate or out-of-order callbacks

  • You have custom storage for peer data received via subscriptions

  • You want to reduce boilerplate and improve reliability

You can skip migration if:

  • You don’t use subscriptions between actors

  • Your current callback handling is simple and working well

  • You prefer full control over callback processing

Important: Migration is optional. Existing @callback_hook("subscription") handlers continue to work unchanged.

New Components

CallbackProcessor

Handles all aspects of callback sequencing automatically:

from actingweb.callback_processor import CallbackProcessor, ProcessResult

processor = CallbackProcessor(
    actor=actor,
    gap_timeout_seconds=5.0,  # Trigger resync after 5s gap
    max_pending=100,          # Back-pressure limit
)

# Process a callback
result = processor.process_callback(
    peer_id="peer123",
    subscription_id="sub456",
    sequence=42,
    data={"status": "active"},
)

# Result indicates what happened
if result == ProcessResult.PROCESSED:
    # Normal processing - data is ready
    pass
elif result == ProcessResult.DUPLICATE:
    # Already processed this sequence
    pass
elif result == ProcessResult.PENDING:
    # Gap detected, waiting for missing callbacks
    pass
elif result == ProcessResult.RESYNC_REQUIRED:
    # Gap timeout exceeded, need full resync
    pass

RemotePeerStore

Stores and manages data received from peer actors:

from actingweb.remote_storage import RemotePeerStore

store = RemotePeerStore(actor, peer_id)

# Scalar values
store.set_value("profile", {"name": "Alice", "status": "active"})
profile = store.get_value("profile")

# Lists with automatic operations
store.set_list("items", [{"id": 1}, {"id": 2}])
items = store.get_list("items")

# Apply callback data (handles list operations automatically)
store.apply_callback_data("items", {
    "operation": "append",
    "items": [{"id": 3}]
})

# Cleanup
store.delete_all()

FanOutManager

Delivers callbacks to multiple subscribers with circuit breaker protection:

from actingweb.fanout import FanOutManager, FanOutResult

manager = FanOutManager(
    actor=actor,
    max_concurrent=5,      # Parallel deliveries
    default_timeout=30.0,  # Request timeout
)

# Deliver to all subscribers
result: FanOutResult = await manager.deliver(
    target="properties",
    data={"status": "changed"},
)

# Check results
print(f"Delivered: {result.success_count}/{result.total_count}")

# Circuit breaker management
status = manager.get_circuit_breaker_status("peer123")
if status == "OPEN":
    manager.reset_circuit_breaker("peer123")

PeerCapabilities

Discover what protocol features a peer supports (protocol v1.4):

from actingweb.peer_capabilities import PeerCapabilities

caps = PeerCapabilities(actor, peer_id)

if caps.supports_resync_callbacks():
    # Peer handles type="resync" callbacks
    pass

if caps.supports_compression():
    # Peer accepts gzip-compressed payloads
    pass

# Get all capabilities
all_caps = caps.get_all_supported()
print(f"Peer supports: {all_caps}")

Migration Guide

Step 1: Enable Subscription Processing

Add .with_subscription_processing() to your application configuration:

Before (v3.9 and earlier):

from actingweb.interface import ActingWebApp

app = (
    ActingWebApp(
        aw_type="urn:actingweb:example.com:myapp",
        database="dynamodb",
        fqdn="myapp.example.com",
        proto="https://"
    )
    .with_oauth(client_id="...", client_secret="...")
    .with_devtest(enable=False)
)

After (v3.10):

from actingweb.interface import ActingWebApp

app = (
    ActingWebApp(
        aw_type="urn:actingweb:example.com:myapp",
        database="dynamodb",
        fqdn="myapp.example.com",
        proto="https://"
    )
    .with_oauth(client_id="...", client_secret="...")
    .with_devtest(enable=False)
    .with_subscription_processing(
        auto_sequence=True,         # Enable CallbackProcessor
        auto_storage=True,          # Enable RemotePeerStore
        auto_cleanup=True,          # Clean up on trust deletion
        gap_timeout_seconds=5.0,    # Resync trigger timeout
        max_pending=100             # Back-pressure limit
    )
)

For Lambda/Serverless environments, also add:

app = app.with_sync_callbacks()  # Ensures delivery before freeze

Step 2: Replace Callback Hook with Data Hook

Replace your @callback_hook("subscription") with @subscription_data_hook:

Before (manual sequencing):

@app.callback_hook("subscription")
def handle_subscription(actor, req):
    """Manual callback handling - 100+ lines typically."""
    peer_id = req.json.get("id")
    sequence = req.json.get("sequence", 0)
    data = req.json.get("data", {})
    target = req.json.get("target", "properties")

    # Manual sequence checking
    last_seq = get_last_sequence(peer_id, target)
    if sequence <= last_seq:
        return {"status": "duplicate"}

    if sequence > last_seq + 1:
        # Gap detected - store pending
        store_pending(peer_id, sequence, data)
        if check_gap_timeout(peer_id):
            trigger_resync(peer_id, target)
        return {"status": "pending"}

    # Store the data
    save_peer_data(peer_id, target, data)
    update_sequence(peer_id, target, sequence)

    # Process any pending callbacks
    process_pending_callbacks(peer_id, target)

    # Application logic
    notify_user(f"Update from {peer_id}")

    return {"status": "ok"}

After (automatic processing):

@app.subscription_data_hook("properties")
def on_property_change(
    actor,
    peer_id: str,
    target: str,
    data: dict,
    sequence: int,
    callback_type: str,  # "diff" or "resync"
):
    """
    Called with already-sequenced, deduplicated, stored data.
    Library handled: sequencing, deduplication, storage, list operations.
    """
    # Just your application logic!
    if callback_type == "resync":
        refresh_ui(actor, peer_id)
    else:
        notify_user(f"Update from {peer_id}: {data}")

Line reduction: ~100 lines → ~10 lines

Step 3: Migrate Peer Data Storage

Replace custom peer data storage with RemotePeerStore:

Before (custom storage):

def save_peer_data(peer_id, target, data):
    """Custom storage implementation."""
    bucket = f"remote:{peer_id}"
    actor.attributes.set_bucket(bucket)

    if "items" in data:
        existing = actor.attributes.get_attribute(f"{target}:items") or []
        operation = data.get("operation", "replace")
        if operation == "append":
            existing.extend(data["items"])
        elif operation == "delete":
            # Handle delete...
            pass
        actor.attributes.set_attribute(f"{target}:items", existing)
    else:
        for key, value in data.items():
            actor.attributes.set_attribute(f"{target}:{key}", value)

After (RemotePeerStore):

from actingweb.remote_storage import RemotePeerStore

@app.subscription_data_hook("properties")
def on_property_change(actor, peer_id, target, data, sequence, callback_type):
    # Storage is automatic with auto_storage=True
    # If you need manual access:
    store = RemotePeerStore(actor, peer_id)
    items = store.get_list(f"{target}:items")

Step 4: Remove Manual Cleanup Code

Before (manual cleanup):

@app.trust_hook("delete")
def on_trust_deleted(actor, peerid, relationship, trust_data):
    """Manual cleanup of peer data."""
    # Delete all peer data
    bucket = f"remote:{peerid}"
    actor.attributes.set_bucket(bucket)
    for key in actor.attributes.get_all_keys():
        actor.attributes.delete_attribute(key)

    # Clear sequence tracking
    clear_sequence_state(peerid)

    # Clear pending callbacks
    clear_pending_callbacks(peerid)

    logger.info(f"Cleaned up data for peer {peerid}")

After (automatic cleanup):

@app.trust_hook("delete")
def on_trust_deleted(actor, peerid, relationship, trust_data):
    """Application-specific logic only - storage cleanup is automatic."""
    # Note: With auto_cleanup=True, the following happens automatically:
    # - RemotePeerStore data deleted
    # - Callback state cleared
    # - Pending callbacks discarded

    # Only app-specific actions needed:
    notify_websocket_clients(f"Peer {peerid} disconnected")

Step 5: Update List Operation Handling

If you handle list operations manually, the library now supports them automatically:

Supported Operations:

Operation

Description

append

Add items to end of list

extend

Add multiple items to end

insert

Insert at specific index

update

Replace item at index

delete

Remove item at index

pop

Remove and return item

clear

Remove all items

remove

Remove first occurrence

Before (manual list operations):

def apply_list_operation(peer_id, list_name, operation_data):
    op = operation_data.get("operation")
    items = operation_data.get("items", [])
    index = operation_data.get("index", 0)

    existing = get_list(peer_id, list_name)

    if op == "append":
        existing.extend(items)
    elif op == "insert":
        for i, item in enumerate(items):
            existing.insert(index + i, item)
    elif op == "delete":
        if 0 <= index < len(existing):
            del existing[index]
    # ... more operations

    save_list(peer_id, list_name, existing)

After (automatic):

# With auto_storage=True, list operations are applied automatically
# Just read the result if needed:

@app.subscription_data_hook("properties")
def on_property_change(actor, peer_id, target, data, sequence, callback_type):
    store = RemotePeerStore(actor, peer_id)
    current_items = store.get_list("items")  # Already updated
    print(f"Items now: {current_items}")

Configuration Reference

.with_subscription_processing() Parameters

Parameter

Default

Description

auto_sequence

True

Enable CallbackProcessor for sequencing/deduplication

auto_storage

True

Enable RemotePeerStore for storing peer data

auto_cleanup

True

Clean up peer data when trust is deleted

gap_timeout_seconds

5.0

Seconds before a sequence gap triggers resync

max_pending

100

Maximum pending callbacks before returning 429

Choosing Configuration Values

gap_timeout_seconds:

  • Low latency networks (< 100ms): 2-3 seconds

  • Standard networks: 5 seconds (default)

  • High latency/unreliable networks: 10-15 seconds

  • Batch processing scenarios: 30+ seconds

max_pending:

  • Memory-constrained environments: 50

  • Standard deployments: 100 (default)

  • High-throughput systems: 200-500

  • Note: Each pending callback uses ~1-5KB memory

Testing Your Migration

Unit Tests

Test your new data hooks:

import pytest
from actingweb.interface import ActorInterface
from actingweb.remote_storage import RemotePeerStore

def test_data_hook_receives_processed_data():
    """Verify hook receives properly sequenced data."""
    actor = ActorInterface.get_by_id(actor_id, config)
    store = RemotePeerStore(actor, peer_id)

    # Simulate processed callback
    store.set_value("status", {"active": True})

    # Verify storage
    status = store.get_value("status")
    assert status["active"] is True

Integration Tests

Test the full callback flow:

def test_subscription_callback_processing():
    """Test full subscription callback flow."""
    # Create subscriber with subscription processing
    subscriber_app, aw_app = create_test_app(
        enable_subscription_processing=True
    )

    # Create actors and trust
    publisher = create_actor(...)
    subscriber = create_actor(...)
    establish_trust(publisher, subscriber)

    # Create subscription
    subscriber.subscriptions.subscribe_to_peer(
        peer_id=publisher.id,
        target="properties"
    )

    # Change property on publisher
    publisher.properties["status"] = "active"

    # Verify callback was processed
    store = RemotePeerStore(subscriber, publisher.id)
    status = store.get_value("properties:status")
    assert status == "active"

Backward Compatibility

Full Compatibility Maintained

  • Existing callbacks continue to work: @callback_hook("subscription") handlers are unaffected

  • HTTP API unchanged: All REST endpoints work identically

  • Database compatible: No schema changes required

  • Opt-in feature: Must explicitly enable with .with_subscription_processing()

Coexistence

You can use both approaches during migration:

# Raw hook for specific targets
@app.callback_hook("subscription")
def legacy_handler(actor, req):
    target = req.json.get("target")
    if target == "legacy_data":
        # Handle legacy target manually
        return handle_legacy(actor, req)
    # Let other targets fall through to data hooks
    return None

# Data hook for new targets
@app.subscription_data_hook("properties")
def new_handler(actor, peer_id, target, data, sequence, callback_type):
    # Modern handling
    pass

Note: Raw @callback_hook("subscription") takes precedence if registered.

Troubleshooting

Common Migration Issues

Callbacks not reaching data hook:

  1. Verify .with_subscription_processing() is configured

  2. Check that auto_sequence=True (default)

  3. Ensure no raw @callback_hook("subscription") is registered

Data not being stored:

  1. Verify auto_storage=True (default)

  2. Check that the callback contains valid data field

  3. Verify trust relationship is approved

Resyncs happening too frequently:

  1. Increase gap_timeout_seconds (e.g., 10.0)

  2. Check network reliability between actors

  3. Consider using .with_sync_callbacks() if async delivery is unreliable

429 errors from subscribers:

  1. Increase max_pending on subscriber

  2. Slow down publishing rate

  3. Check subscriber processing speed

Breaking Change: Bucket Naming Convention

All library-internal buckets now use ``_`` prefix to avoid namespace collisions with user-defined buckets.

Why This Change?

Application code can create arbitrary buckets via Attributes(actor_id=..., bucket="mydata"). Without a reserved prefix, a user’s bucket="peer_permissions" would collide with the library’s internal bucket of the same name.

Convention:

  • _ prefix = library-internal, managed by ActingWeb

  • No prefix = application/user data

Renamed Buckets

Old Name

New Name

trust_types

_trust_types

trust_permissions

_trust_permissions

peer_profiles

_peer_profiles

peer_capabilities

_peer_capabilities

auth_code_index

_auth_code_index

access_token_index

_access_token_index

refresh_token_index

_refresh_token_index

client_index

_client_index

oauth_sessions

_oauth_sessions

Migration Impact

For most deployments, no action is required. The affected data is typically transient:

  • OAuth sessions/tokens: Recreated automatically on next login

  • Auth indexes: Rebuilt as tokens are issued

For deployments with existing trust relationships:

If you have existing trust types, permissions, peer profiles, or peer capabilities stored, you may need to migrate the data. The simplest approach is to re-establish trust relationships, which will repopulate the new buckets automatically.

Peer Permissions Caching

ActingWeb 3.10 adds peer permissions caching for trust relationships, following the same pattern as peer profiles and peer capabilities.

What It Does

Peer permissions caching stores what permissions remote peers have granted us access to. This is distinct from TrustPermissions which stores what we grant to peers.

When enabled:

  • Permissions are automatically fetched when trust is approved

  • Permissions are refreshed during sync_peer() operations

  • Permissions are cleaned up when trust is deleted

  • Permission callbacks from peers are automatically stored

Enabling Peer Permissions

Add .with_peer_permissions() to your application configuration:

from actingweb.interface import ActingWebApp

app = (
    ActingWebApp(
        aw_type="urn:actingweb:example.com:myapp",
        database="dynamodb",
        fqdn="myapp.example.com",
        proto="https://"
    )
    .with_peer_permissions(
        enable=True,                      # Enable permission caching
        auto_delete_on_revocation=True,   # Delete cached data when revoked
        notify_peer_on_change=True        # Auto-notify peers (default)
    )
    .with_peer_capabilities(enable=True)  # Also recommended
)

Configuration Options:

  • enable: Enable/disable peer permissions caching. Default: True when called.

  • auto_delete_on_revocation: Delete cached peer data when permissions are revoked. Default: False.

  • notify_peer_on_change: Automatically notify peers when you change their permissions by sending a callback to /callbacks/permissions/{actor_id}. Default: True.

Permission Query Endpoint (NEW in v3.10.0a5):

The GET /{actor_id}/permissions/{peer_id} endpoint is now available for proactive permission discovery. This complements the existing permission callback mechanism by supporting:

  • Initial permission baseline during trust establishment

  • Permission refresh without waiting for callbacks

  • Recovery if permission callbacks are missed

The endpoint is automatically registered when using the ActingWeb framework. No migration changes needed. The endpoint returns the full effective permissions (base trust-type defaults merged with per-trust overrides), allowing peers to discover what permissions they have been granted and accurately detect changes.

Using Cached Permissions

Access cached permissions via PeerPermissionStore:

from actingweb.peer_permissions import get_peer_permission_store

# Get the permission store
store = get_peer_permission_store(actor.config)

# Get cached permissions for a specific peer
permissions = store.get_permissions(actor.id, peer_id)

if permissions:
    # Check property access
    if permissions.has_property_access("memory_travel", "read"):
        print("Peer granted us read access to memory_travel")

    # Check method access
    if permissions.has_method_access("sync_data"):
        print("Peer granted us access to sync_data method")

    # Check tool access (MCP)
    if permissions.has_tool_access("search"):
        print("Peer granted us access to search tool")

Permission Callback Handling

When peers notify us of permission changes via callbacks, the library automatically stores the updated permissions. You can register a hook for app-specific handling:

@app.callback_hook("permissions")
def on_permission_change(actor, req):
    """Handle permission update from peer."""
    granting_actor_id = req.json.get("granting_actor_id")
    data = req.json.get("data", {})

    # Permissions are already stored by the library
    # Add your app-specific logic here
    logger.info(f"Permission update from {granting_actor_id}")

    # Example: trigger sync for newly granted permissions
    if data.get("properties", {}).get("patterns"):
        patterns = data["properties"]["patterns"]
        for pattern in patterns:
            if pattern.startswith("memory_"):
                sync_memory_type(actor, granting_actor_id, pattern)

Multi-Provider OAuth Support

ActingWeb 3.10 adds support for configuring multiple OAuth providers simultaneously (e.g., Google and GitHub). Previously, only one provider could be active at a time.

This is a non-breaking, additive change. All existing single-provider configurations continue to work without modification.

What Changed

  • .with_oauth() now accepts an optional provider parameter

  • New config.oauth_providers attribute (dict-of-dicts) stores per-provider credentials

  • Factory login page renders buttons for all configured providers

  • OAuth state parameter carries the provider name for correct callback routing

  • /oauth/config (SPA endpoint) returns all configured providers

  • GitHub email verification enforces verified flag on primary emails

Backward Compatibility

All 12 public API surface areas were verified as fully backward compatible:

  • ``with_oauth()`` without ``provider``: Works identically to before — single-provider config

  • ``config.oauth``: Still populated (points to first configured provider’s credentials)

  • ``config.oauth2_provider``: Still populated (set to first configured provider name)

  • ``create_oauth2_authenticator(config)``: Default provider used when no provider specified

  • ``create_google_authenticator()`` / ``create_github_authenticator()``: Unchanged signatures

  • OAuth callback state parsing: Absent provider field falls back to config.oauth2_provider

  • Template variables: oauth_providers list already existed; new providers append to it

  • REST API responses: /oauth/config returns same structure; additional providers are additive

  • SPA state format: Missing provider key falls back to default provider

  • MCP encrypted state: Missing provider field falls back to default

  • Email verification flow: No signature changes; new email_verification_required hook is additive

  • Factory ``/login`` endpoint: Single-provider still renders single button

No Action Required for Existing Applications

If you are using a single OAuth provider, no changes are needed. Your existing configuration continues to work:

# This still works exactly as before
app = ActingWebApp(
    aw_type="urn:actingweb:example.com:myapp",
    database="dynamodb",
    fqdn="myapp.example.com"
).with_oauth(
    client_id="your-client-id",
    client_secret="your-client-secret"
)

Opting In to Multiple Providers

To add a second provider, call .with_oauth() again with the provider parameter:

app = ActingWebApp(
    aw_type="urn:actingweb:example.com:myapp",
    database="dynamodb",
    fqdn="myapp.example.com"
).with_oauth(
    provider="google",
    client_id="google-client-id",
    client_secret="google-client-secret",
    scope="openid email profile"
).with_oauth(
    provider="github",
    client_id="github-client-id",
    client_secret="github-client-secret",
    scope="read:user user:email"
)

When multiple providers are configured:

  • The factory login page (/login) shows buttons for all providers

  • 401 redirects go to /login (not directly to a single provider’s OAuth page)

  • Each provider’s credentials are isolated in config.oauth_providers

GitHub Email Verification Security Fix

A security hardening was applied to GitHub email handling:

  • _get_github_primary_email() now requires both primary and verified flags

  • Previously, an unverified primary email was accepted, which could allow account-linking attacks via the GitHub /user/emails API

  • If no verified primary email is available, the first verified non-primary email is used

  • If no verified emails exist at all, the email form fallback is triggered (when require_email=True) or None is returned

This change only affects new GitHub OAuth logins. Existing actors already linked via GitHub are not affected.

SPA Flow: Email Collection

When GitHub returns no verified emails and require_email=True, the SPA OAuth callback redirects back to the SPA with email_required=true&session=<id> query parameters instead of returning a hard error. The SPA handles email collection natively via the JSON API at POST /oauth/email.

For HTML template applications, the callback redirects to /oauth/email?session=<id> where the server-rendered aw-oauth-email.html template collects the email.

In both cases, the email verification flow is the same:

  1. Collects the user’s email address (SPA form or HTML template)

  2. POST /oauth/email creates the actor with email_verified = "false"

  3. Fires the email_verification_required lifecycle hook

  4. The application’s hook handler sends a verification email

  5. User clicks GET /oauth/email?verify=<token> to complete verification

See SPA Authentication Guide for the SPA-specific flow details.

MCP Flow: Verified Email Required

The MCP OAuth flow requires a verified email from the provider. If no verified email is available, the flow returns an invalid_grant error with a message explaining that a verified email is required.

See Also