Async Operations
Audience: SDK developers working with async frameworks or peer-to-peer communication.
ActingWeb provides async variants of methods that communicate with remote peers. This enables non-blocking peer operations in async frameworks like FastAPI.
Overview
Peer communication involves HTTP requests to other actors. In synchronous code, these block the thread. Async variants allow concurrent execution without blocking.
Async Methods
Actor Class Async Methods
The core Actor class provides these async methods:
class Actor:
# Peer information
async def get_peer_info_async(self, peer_id: str) -> Optional[Dict]
# Trust operations
async def modify_trust_and_notify_async(
self, peer_id: str, relationship: str, ...
) -> bool
async def create_reciprocal_trust_async(
self, peer_id: str, relationship: str, baseuri: str, ...
) -> Optional[Dict]
async def create_verified_trust_async(
self, peer_id: str, relationship: str, baseuri: str, verify_token: str, ...
) -> Optional[Dict]
async def delete_reciprocal_trust_async(
self, peer_id: str, ...
) -> bool
# Subscription operations
async def create_remote_subscription_async(
self, peer_id: str, baseuri: str, callback_path: str, ...
) -> Optional[Dict]
async def delete_remote_subscription_async(
self, peer_id: str, ...
) -> bool
async def callback_subscription_async(
self, peer_id: str, diff_data: Dict, ...
) -> bool
TrustManager Async Methods
The TrustManager wraps these for easier use:
class TrustManager:
async def create_reciprocal_trust_async(
self, peer_id: str, relationship: str, baseuri: str
) -> Optional[TrustRelationship]
async def create_verified_trust_async(
self, peer_id: str, relationship: str, baseuri: str, verify_token: str
) -> Optional[TrustRelationship]
async def modify_and_notify_async(
self, peer_id: str, relationship: str
) -> bool
async def delete_peer_trust_async(
self, peer_id: str
) -> bool
SubscriptionManager Async Methods
class SubscriptionManager:
async def subscribe_to_peer_async(
self,
peer_id: str,
target: str,
subtarget: str = "",
resource: str = "",
granularity: str = "high"
) -> Optional[str] # Returns subscription URL
async def unsubscribe(
self, peer_id: str, subscription_id: str
) -> bool
AwProxy Async Methods
For direct HTTP communication with peers:
class AwProxy:
async def get_resource_async(
self, path: str, params: Dict = None
) -> Optional[Dict]
async def create_resource_async(
self, path: str, data: Dict
) -> Optional[Dict]
async def change_resource_async(
self, path: str, data: Dict
) -> Optional[Dict]
async def delete_resource_async(
self, path: str
) -> bool
Usage Examples
FastAPI Route with Async Trust Creation
from fastapi import FastAPI, HTTPException
from actingweb.interface import ActorInterface
app = FastAPI()
@app.post("/{actor_id}/connect")
async def connect_to_peer(actor_id: str, peer_baseuri: str):
actor = get_actor(actor_id)
actor_interface = ActorInterface(actor)
# Non-blocking peer communication
trust = await actor_interface.trust.create_verified_trust_async(
peer_id="new_peer",
relationship="friend",
baseuri=peer_baseuri,
verify_token=generate_token()
)
if not trust:
raise HTTPException(status_code=400, detail="Failed to establish trust")
return {"trust_id": trust.peer_id, "status": "connected"}
Concurrent Peer Operations
import asyncio
async def notify_all_peers(actor: ActorInterface, message: Dict):
"""Notify all trusted peers concurrently."""
peers = actor.trust.get_all_relationships()
async def notify_peer(peer):
proxy = AwProxy(
peer_target={"id": actor.id, "peerid": peer["peerid"]},
config=actor.config
)
return await proxy.create_resource_async(
path="callbacks/notification",
data=message
)
# Execute all notifications concurrently
results = await asyncio.gather(
*[notify_peer(p) for p in peers],
return_exceptions=True
)
return results
Async Subscription with Callback
async def subscribe_to_service(actor: ActorInterface, service_uri: str):
"""Subscribe to a remote service."""
# Create trust first
trust = await actor.trust.create_reciprocal_trust_async(
peer_id="service",
relationship="service",
baseuri=service_uri
)
if not trust:
raise RuntimeError("Failed to establish trust")
# Then subscribe (includes automatic baseline sync)
subscription_url = await actor.subscriptions.subscribe_to_peer_async(
peer_id="service",
target="properties",
granularity="high"
)
return subscription_url
Implementation Details
Async via httpx
Async methods use httpx for non-blocking HTTP:
async def get_resource_async(self, path: str) -> Optional[Dict]:
import httpx
url = f"{self.peer_baseuri}/{path}"
headers = self._build_auth_headers()
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=30.0)
if response.status_code == 200:
return response.json()
return None
Async via asyncio.to_thread
Some methods wrap synchronous code with asyncio.to_thread:
async def create_verified_trust_async(self, ...):
# Runs sync method in thread pool
return await asyncio.to_thread(
self.create_verified_trust,
peer_id, relationship, baseuri, verify_token, ...
)
Timeout Handling
Async operations include configurable timeouts:
async def get_peer_info_async(self, peer_id: str, timeout: float = 30.0):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url, headers=headers)
return response.json()
except httpx.TimeoutException:
logger.warning(f"Timeout getting info for peer {peer_id}")
return None
Best Practices
Use Async for External Communication
# Good - non-blocking trust = await actor.trust.create_reciprocal_trust_async(...) # Avoid in async context - blocks event loop trust = actor.trust.create_reciprocal_trust(...)
Use asyncio.gather for Concurrent Operations
# Notify multiple peers concurrently results = await asyncio.gather( notify_peer(peer1), notify_peer(peer2), notify_peer(peer3), return_exceptions=True )
Handle Timeouts Gracefully
try: result = await asyncio.wait_for( actor.trust.create_verified_trust_async(...), timeout=60.0 ) except asyncio.TimeoutError: logger.error("Trust creation timed out") return None
Don’t Mix Sync and Async Carelessly
# In async function - use async variant async def handler(): trust = await actor.trust.create_reciprocal_trust_async(...) # In sync function - use sync variant def handler(): trust = actor.trust.create_reciprocal_trust(...)
Consider Connection Pooling
# Reuse client for multiple requests async with httpx.AsyncClient() as client: result1 = await client.get(url1) result2 = await client.get(url2)
Performance Considerations
Sync vs Async Comparison
Operation |
Sync (blocking) |
Async (concurrent) |
|---|---|---|
Single peer request |
~100ms |
~100ms |
10 peer requests |
~1000ms (serial) |
~100ms (parallel) |
Mixed local + remote |
Blocks on remote |
Overlapped |
When to Use Async
FastAPI/Starlette routes
Multiple peer operations
Long-running peer communication
High-concurrency scenarios
When Sync is OK
Flask routes (WSGI is sync anyway)
Single peer operation
Background tasks with dedicated threads
Simple scripts and CLI tools
See Also
Developer API - Developer API reference
Handler Architecture - Handler internals
Custom Framework Integration - Framework integration