Migrating to ActingWeb 3.10
Overview
ActingWeb 3.10 introduces automatic subscription processing, a major feature that simplifies callback handling from ~500+ lines of manual code to ~30 lines of application logic.
Key Features:
CallbackProcessor: Automatic sequencing, deduplication, and gap handling
RemotePeerStore: Standardized storage for peer data with list operations
FanOutManager: Circuit breaker-protected delivery to multiple subscribers
Peer Capabilities: Protocol v1.4 feature discovery (compression, batch subscriptions)
Auto-cleanup: Automatic peer data removal when trust relationships end
Who Needs to Migrate?
You should migrate if:
Your application manually handles subscription callback sequencing
You maintain custom code for detecting duplicate or out-of-order callbacks
You have custom storage for peer data received via subscriptions
You want to reduce boilerplate and improve reliability
You can skip migration if:
You don’t use subscriptions between actors
Your current callback handling is simple and working well
You prefer full control over callback processing
Important: Migration is optional. Existing @callback_hook("subscription") handlers continue to work unchanged.
New Components
CallbackProcessor
Handles all aspects of callback sequencing automatically:
from actingweb.callback_processor import CallbackProcessor, ProcessResult
processor = CallbackProcessor(
actor=actor,
gap_timeout_seconds=5.0, # Trigger resync after 5s gap
max_pending=100, # Back-pressure limit
)
# Process a callback
result = processor.process_callback(
peer_id="peer123",
subscription_id="sub456",
sequence=42,
data={"status": "active"},
)
# Result indicates what happened
if result == ProcessResult.PROCESSED:
# Normal processing - data is ready
pass
elif result == ProcessResult.DUPLICATE:
# Already processed this sequence
pass
elif result == ProcessResult.PENDING:
# Gap detected, waiting for missing callbacks
pass
elif result == ProcessResult.RESYNC_REQUIRED:
# Gap timeout exceeded, need full resync
pass
RemotePeerStore
Stores and manages data received from peer actors:
from actingweb.remote_storage import RemotePeerStore
store = RemotePeerStore(actor, peer_id)
# Scalar values
store.set_value("profile", {"name": "Alice", "status": "active"})
profile = store.get_value("profile")
# Lists with automatic operations
store.set_list("items", [{"id": 1}, {"id": 2}])
items = store.get_list("items")
# Apply callback data (handles list operations automatically)
store.apply_callback_data("items", {
"operation": "append",
"items": [{"id": 3}]
})
# Cleanup
store.delete_all()
FanOutManager
Delivers callbacks to multiple subscribers with circuit breaker protection:
from actingweb.fanout import FanOutManager, FanOutResult
manager = FanOutManager(
actor=actor,
max_concurrent=5, # Parallel deliveries
default_timeout=30.0, # Request timeout
)
# Deliver to all subscribers
result: FanOutResult = await manager.deliver(
target="properties",
data={"status": "changed"},
)
# Check results
print(f"Delivered: {result.success_count}/{result.total_count}")
# Circuit breaker management
status = manager.get_circuit_breaker_status("peer123")
if status == "OPEN":
manager.reset_circuit_breaker("peer123")
PeerCapabilities
Discover what protocol features a peer supports (protocol v1.4):
from actingweb.peer_capabilities import PeerCapabilities
caps = PeerCapabilities(actor, peer_id)
if caps.supports_resync_callbacks():
# Peer handles type="resync" callbacks
pass
if caps.supports_compression():
# Peer accepts gzip-compressed payloads
pass
# Get all capabilities
all_caps = caps.get_all_supported()
print(f"Peer supports: {all_caps}")
Migration Guide
Step 1: Enable Subscription Processing
Add .with_subscription_processing() to your application configuration:
Before (v3.9 and earlier):
from actingweb.interface import ActingWebApp
app = (
ActingWebApp(
aw_type="urn:actingweb:example.com:myapp",
database="dynamodb",
fqdn="myapp.example.com",
proto="https://"
)
.with_oauth(client_id="...", client_secret="...")
.with_devtest(enable=False)
)
After (v3.10):
from actingweb.interface import ActingWebApp
app = (
ActingWebApp(
aw_type="urn:actingweb:example.com:myapp",
database="dynamodb",
fqdn="myapp.example.com",
proto="https://"
)
.with_oauth(client_id="...", client_secret="...")
.with_devtest(enable=False)
.with_subscription_processing(
auto_sequence=True, # Enable CallbackProcessor
auto_storage=True, # Enable RemotePeerStore
auto_cleanup=True, # Clean up on trust deletion
gap_timeout_seconds=5.0, # Resync trigger timeout
max_pending=100 # Back-pressure limit
)
)
For Lambda/Serverless environments, also add:
app = app.with_sync_callbacks() # Ensures delivery before freeze
Step 2: Replace Callback Hook with Data Hook
Replace your @callback_hook("subscription") with @subscription_data_hook:
Before (manual sequencing):
@app.callback_hook("subscription")
def handle_subscription(actor, req):
"""Manual callback handling - 100+ lines typically."""
peer_id = req.json.get("id")
sequence = req.json.get("sequence", 0)
data = req.json.get("data", {})
target = req.json.get("target", "properties")
# Manual sequence checking
last_seq = get_last_sequence(peer_id, target)
if sequence <= last_seq:
return {"status": "duplicate"}
if sequence > last_seq + 1:
# Gap detected - store pending
store_pending(peer_id, sequence, data)
if check_gap_timeout(peer_id):
trigger_resync(peer_id, target)
return {"status": "pending"}
# Store the data
save_peer_data(peer_id, target, data)
update_sequence(peer_id, target, sequence)
# Process any pending callbacks
process_pending_callbacks(peer_id, target)
# Application logic
notify_user(f"Update from {peer_id}")
return {"status": "ok"}
After (automatic processing):
@app.subscription_data_hook("properties")
def on_property_change(
actor,
peer_id: str,
target: str,
data: dict,
sequence: int,
callback_type: str, # "diff" or "resync"
):
"""
Called with already-sequenced, deduplicated, stored data.
Library handled: sequencing, deduplication, storage, list operations.
"""
# Just your application logic!
if callback_type == "resync":
refresh_ui(actor, peer_id)
else:
notify_user(f"Update from {peer_id}: {data}")
Line reduction: ~100 lines → ~10 lines
Step 3: Migrate Peer Data Storage
Replace custom peer data storage with RemotePeerStore:
Before (custom storage):
def save_peer_data(peer_id, target, data):
"""Custom storage implementation."""
bucket = f"remote:{peer_id}"
actor.attributes.set_bucket(bucket)
if "items" in data:
existing = actor.attributes.get_attribute(f"{target}:items") or []
operation = data.get("operation", "replace")
if operation == "append":
existing.extend(data["items"])
elif operation == "delete":
# Handle delete...
pass
actor.attributes.set_attribute(f"{target}:items", existing)
else:
for key, value in data.items():
actor.attributes.set_attribute(f"{target}:{key}", value)
After (RemotePeerStore):
from actingweb.remote_storage import RemotePeerStore
@app.subscription_data_hook("properties")
def on_property_change(actor, peer_id, target, data, sequence, callback_type):
# Storage is automatic with auto_storage=True
# If you need manual access:
store = RemotePeerStore(actor, peer_id)
items = store.get_list(f"{target}:items")
Step 4: Remove Manual Cleanup Code
Before (manual cleanup):
@app.trust_hook("delete")
def on_trust_deleted(actor, peerid, relationship, trust_data):
"""Manual cleanup of peer data."""
# Delete all peer data
bucket = f"remote:{peerid}"
actor.attributes.set_bucket(bucket)
for key in actor.attributes.get_all_keys():
actor.attributes.delete_attribute(key)
# Clear sequence tracking
clear_sequence_state(peerid)
# Clear pending callbacks
clear_pending_callbacks(peerid)
logger.info(f"Cleaned up data for peer {peerid}")
After (automatic cleanup):
@app.trust_hook("delete")
def on_trust_deleted(actor, peerid, relationship, trust_data):
"""Application-specific logic only - storage cleanup is automatic."""
# Note: With auto_cleanup=True, the following happens automatically:
# - RemotePeerStore data deleted
# - Callback state cleared
# - Pending callbacks discarded
# Only app-specific actions needed:
notify_websocket_clients(f"Peer {peerid} disconnected")
Step 5: Update List Operation Handling
If you handle list operations manually, the library now supports them automatically:
Supported Operations:
Operation |
Description |
|---|---|
|
Add items to end of list |
|
Add multiple items to end |
|
Insert at specific index |
|
Replace item at index |
|
Remove item at index |
|
Remove and return item |
|
Remove all items |
|
Remove first occurrence |
Before (manual list operations):
def apply_list_operation(peer_id, list_name, operation_data):
op = operation_data.get("operation")
items = operation_data.get("items", [])
index = operation_data.get("index", 0)
existing = get_list(peer_id, list_name)
if op == "append":
existing.extend(items)
elif op == "insert":
for i, item in enumerate(items):
existing.insert(index + i, item)
elif op == "delete":
if 0 <= index < len(existing):
del existing[index]
# ... more operations
save_list(peer_id, list_name, existing)
After (automatic):
# With auto_storage=True, list operations are applied automatically
# Just read the result if needed:
@app.subscription_data_hook("properties")
def on_property_change(actor, peer_id, target, data, sequence, callback_type):
store = RemotePeerStore(actor, peer_id)
current_items = store.get_list("items") # Already updated
print(f"Items now: {current_items}")
Configuration Reference
.with_subscription_processing() Parameters
Parameter |
Default |
Description |
|---|---|---|
|
|
Enable CallbackProcessor for sequencing/deduplication |
|
|
Enable RemotePeerStore for storing peer data |
|
|
Clean up peer data when trust is deleted |
|
|
Seconds before a sequence gap triggers resync |
|
|
Maximum pending callbacks before returning 429 |
Choosing Configuration Values
gap_timeout_seconds:
Low latency networks (< 100ms): 2-3 seconds
Standard networks: 5 seconds (default)
High latency/unreliable networks: 10-15 seconds
Batch processing scenarios: 30+ seconds
max_pending:
Memory-constrained environments: 50
Standard deployments: 100 (default)
High-throughput systems: 200-500
Note: Each pending callback uses ~1-5KB memory
Testing Your Migration
Unit Tests
Test your new data hooks:
import pytest
from actingweb.interface import ActorInterface
from actingweb.remote_storage import RemotePeerStore
def test_data_hook_receives_processed_data():
"""Verify hook receives properly sequenced data."""
actor = ActorInterface.get_by_id(actor_id, config)
store = RemotePeerStore(actor, peer_id)
# Simulate processed callback
store.set_value("status", {"active": True})
# Verify storage
status = store.get_value("status")
assert status["active"] is True
Integration Tests
Test the full callback flow:
def test_subscription_callback_processing():
"""Test full subscription callback flow."""
# Create subscriber with subscription processing
subscriber_app, aw_app = create_test_app(
enable_subscription_processing=True
)
# Create actors and trust
publisher = create_actor(...)
subscriber = create_actor(...)
establish_trust(publisher, subscriber)
# Create subscription
subscriber.subscriptions.subscribe_to_peer(
peer_id=publisher.id,
target="properties"
)
# Change property on publisher
publisher.properties["status"] = "active"
# Verify callback was processed
store = RemotePeerStore(subscriber, publisher.id)
status = store.get_value("properties:status")
assert status == "active"
Backward Compatibility
Full Compatibility Maintained
Existing callbacks continue to work:
@callback_hook("subscription")handlers are unaffectedHTTP API unchanged: All REST endpoints work identically
Database compatible: No schema changes required
Opt-in feature: Must explicitly enable with
.with_subscription_processing()
Coexistence
You can use both approaches during migration:
# Raw hook for specific targets
@app.callback_hook("subscription")
def legacy_handler(actor, req):
target = req.json.get("target")
if target == "legacy_data":
# Handle legacy target manually
return handle_legacy(actor, req)
# Let other targets fall through to data hooks
return None
# Data hook for new targets
@app.subscription_data_hook("properties")
def new_handler(actor, peer_id, target, data, sequence, callback_type):
# Modern handling
pass
Note: Raw @callback_hook("subscription") takes precedence if registered.
Troubleshooting
Common Migration Issues
Callbacks not reaching data hook:
Verify
.with_subscription_processing()is configuredCheck that
auto_sequence=True(default)Ensure no raw
@callback_hook("subscription")is registered
Data not being stored:
Verify
auto_storage=True(default)Check that the callback contains valid
datafieldVerify trust relationship is approved
Resyncs happening too frequently:
Increase
gap_timeout_seconds(e.g., 10.0)Check network reliability between actors
Consider using
.with_sync_callbacks()if async delivery is unreliable
429 errors from subscribers:
Increase
max_pendingon subscriberSlow down publishing rate
Check subscriber processing speed
Breaking Change: Bucket Naming Convention
All library-internal buckets now use ``_`` prefix to avoid namespace collisions with user-defined buckets.
Why This Change?
Application code can create arbitrary buckets via Attributes(actor_id=..., bucket="mydata").
Without a reserved prefix, a user’s bucket="peer_permissions" would collide with
the library’s internal bucket of the same name.
Convention:
_prefix = library-internal, managed by ActingWebNo prefix = application/user data
Renamed Buckets
Old Name |
New Name |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Migration Impact
For most deployments, no action is required. The affected data is typically transient:
OAuth sessions/tokens: Recreated automatically on next login
Auth indexes: Rebuilt as tokens are issued
For deployments with existing trust relationships:
If you have existing trust types, permissions, peer profiles, or peer capabilities stored, you may need to migrate the data. The simplest approach is to re-establish trust relationships, which will repopulate the new buckets automatically.
Peer Permissions Caching
ActingWeb 3.10 adds peer permissions caching for trust relationships, following the same pattern as peer profiles and peer capabilities.
What It Does
Peer permissions caching stores what permissions remote peers have granted us access to.
This is distinct from TrustPermissions which stores what we grant to peers.
When enabled:
Permissions are automatically fetched when trust is approved
Permissions are refreshed during
sync_peer()operationsPermissions are cleaned up when trust is deleted
Permission callbacks from peers are automatically stored
Enabling Peer Permissions
Add .with_peer_permissions() to your application configuration:
from actingweb.interface import ActingWebApp
app = (
ActingWebApp(
aw_type="urn:actingweb:example.com:myapp",
database="dynamodb",
fqdn="myapp.example.com",
proto="https://"
)
.with_peer_permissions(
enable=True, # Enable permission caching
auto_delete_on_revocation=True, # Delete cached data when revoked
notify_peer_on_change=True # Auto-notify peers (default)
)
.with_peer_capabilities(enable=True) # Also recommended
)
Configuration Options:
enable: Enable/disable peer permissions caching. Default:Truewhen called.auto_delete_on_revocation: Delete cached peer data when permissions are revoked. Default:False.notify_peer_on_change: Automatically notify peers when you change their permissions by sending a callback to/callbacks/permissions/{actor_id}. Default:True.
Permission Query Endpoint (NEW in v3.10.0a5):
The GET /{actor_id}/permissions/{peer_id} endpoint is now available for
proactive permission discovery. This complements the existing permission callback
mechanism by supporting:
Initial permission baseline during trust establishment
Permission refresh without waiting for callbacks
Recovery if permission callbacks are missed
The endpoint is automatically registered when using the ActingWeb framework. No migration changes needed. The endpoint returns the full effective permissions (base trust-type defaults merged with per-trust overrides), allowing peers to discover what permissions they have been granted and accurately detect changes.
Using Cached Permissions
Access cached permissions via PeerPermissionStore:
from actingweb.peer_permissions import get_peer_permission_store
# Get the permission store
store = get_peer_permission_store(actor.config)
# Get cached permissions for a specific peer
permissions = store.get_permissions(actor.id, peer_id)
if permissions:
# Check property access
if permissions.has_property_access("memory_travel", "read"):
print("Peer granted us read access to memory_travel")
# Check method access
if permissions.has_method_access("sync_data"):
print("Peer granted us access to sync_data method")
# Check tool access (MCP)
if permissions.has_tool_access("search"):
print("Peer granted us access to search tool")
Permission Callback Handling
When peers notify us of permission changes via callbacks, the library automatically stores the updated permissions. You can register a hook for app-specific handling:
@app.callback_hook("permissions")
def on_permission_change(actor, req):
"""Handle permission update from peer."""
granting_actor_id = req.json.get("granting_actor_id")
data = req.json.get("data", {})
# Permissions are already stored by the library
# Add your app-specific logic here
logger.info(f"Permission update from {granting_actor_id}")
# Example: trigger sync for newly granted permissions
if data.get("properties", {}).get("patterns"):
patterns = data["properties"]["patterns"]
for pattern in patterns:
if pattern.startswith("memory_"):
sync_memory_type(actor, granting_actor_id, pattern)
Multi-Provider OAuth Support
ActingWeb 3.10 adds support for configuring multiple OAuth providers simultaneously (e.g., Google and GitHub). Previously, only one provider could be active at a time.
This is a non-breaking, additive change. All existing single-provider configurations continue to work without modification.
What Changed
.with_oauth()now accepts an optionalproviderparameterNew
config.oauth_providersattribute (dict-of-dicts) stores per-provider credentialsFactory login page renders buttons for all configured providers
OAuth state parameter carries the
providername for correct callback routing/oauth/config(SPA endpoint) returns all configured providersGitHub email verification enforces
verifiedflag on primary emails
Backward Compatibility
All 12 public API surface areas were verified as fully backward compatible:
``with_oauth()`` without ``provider``: Works identically to before — single-provider config
``config.oauth``: Still populated (points to first configured provider’s credentials)
``config.oauth2_provider``: Still populated (set to first configured provider name)
``create_oauth2_authenticator(config)``: Default provider used when no provider specified
``create_google_authenticator()`` / ``create_github_authenticator()``: Unchanged signatures
OAuth callback state parsing: Absent
providerfield falls back toconfig.oauth2_providerTemplate variables:
oauth_providerslist already existed; new providers append to itREST API responses:
/oauth/configreturns same structure; additional providers are additiveSPA state format: Missing
providerkey falls back to default providerMCP encrypted state: Missing
providerfield falls back to defaultEmail verification flow: No signature changes; new
email_verification_requiredhook is additiveFactory ``/login`` endpoint: Single-provider still renders single button
No Action Required for Existing Applications
If you are using a single OAuth provider, no changes are needed. Your existing configuration continues to work:
# This still works exactly as before
app = ActingWebApp(
aw_type="urn:actingweb:example.com:myapp",
database="dynamodb",
fqdn="myapp.example.com"
).with_oauth(
client_id="your-client-id",
client_secret="your-client-secret"
)
Opting In to Multiple Providers
To add a second provider, call .with_oauth() again with the provider parameter:
app = ActingWebApp(
aw_type="urn:actingweb:example.com:myapp",
database="dynamodb",
fqdn="myapp.example.com"
).with_oauth(
provider="google",
client_id="google-client-id",
client_secret="google-client-secret",
scope="openid email profile"
).with_oauth(
provider="github",
client_id="github-client-id",
client_secret="github-client-secret",
scope="read:user user:email"
)
When multiple providers are configured:
The factory login page (
/login) shows buttons for all providers401 redirects go to
/login(not directly to a single provider’s OAuth page)Each provider’s credentials are isolated in
config.oauth_providers
GitHub Email Verification Security Fix
A security hardening was applied to GitHub email handling:
_get_github_primary_email()now requires bothprimaryandverifiedflagsPreviously, an unverified primary email was accepted, which could allow account-linking attacks via the GitHub
/user/emailsAPIIf no verified primary email is available, the first verified non-primary email is used
If no verified emails exist at all, the email form fallback is triggered (when
require_email=True) orNoneis returned
This change only affects new GitHub OAuth logins. Existing actors already linked via GitHub are not affected.
SPA Flow: Email Collection
When GitHub returns no verified emails and require_email=True, the SPA OAuth callback
redirects back to the SPA with email_required=true&session=<id> query parameters
instead of returning a hard error. The SPA handles email collection natively via the
JSON API at POST /oauth/email.
For HTML template applications, the callback redirects to /oauth/email?session=<id>
where the server-rendered aw-oauth-email.html template collects the email.
In both cases, the email verification flow is the same:
Collects the user’s email address (SPA form or HTML template)
POST /oauth/emailcreates the actor withemail_verified = "false"Fires the
email_verification_requiredlifecycle hookThe application’s hook handler sends a verification email
User clicks
GET /oauth/email?verify=<token>to complete verification
See SPA Authentication Guide for the SPA-specific flow details.
MCP Flow: Verified Email Required
The MCP OAuth flow requires a verified email from the provider. If no verified email is
available, the flow returns an invalid_grant error with a message explaining that a
verified email is required.
See Also
Subscription Manager - Full subscription processing guide
Trust Manager - Trust and subscription lifecycle
ActingWeb Authentication System - Authentication and OAuth2 guide
Troubleshooting - Troubleshooting guide
CHANGELOG - Full changelog