Migrating to Async/Await Hooks
Note
New in v3.9.0: ActingWeb hooks now support native async/await syntax.
This guide helps you migrate existing synchronous hooks to async/await patterns and explains when and why to use async hooks.
Why Use Async Hooks?
Async hooks provide significant performance benefits for I/O-bound operations:
Without Async (Synchronous):
Request 1: [===HTTP===] (waits) [done]
Request 2: [===HTTP===] (waits) [done]
Request 3: [===HTTP===] (waits) [done]
Total time: ~3 seconds
With Async (Concurrent):
Request 1: [===HTTP===] [done]
Request 2: [===HTTP===] [done]
Request 3: [===HTTP===] [done]
Total time: ~1 second
Use Cases for Async Hooks
Perfect for Async:
External HTTP/API calls (aiohttp, httpx)
Database queries (asyncpg, motor)
AWS services (aioboto3, async Bedrock)
File I/O operations
AwProxy async methods
Keep Synchronous:
Simple calculations
Data transformations
Quick dictionary lookups
CPU-intensive operations
Migration Guide
Step 1: Identify I/O-Bound Hooks
Look for hooks that:
Make HTTP requests
Query databases
Read/write files
Call external services
Example - Before (Synchronous):
import requests
@app.method_hook("fetch_weather")
def get_weather(actor, method_name, data):
# Blocks while waiting for response
response = requests.get(f"https://api.weather.com/v1/forecast?city={data['city']}")
return response.json()
Step 2: Convert to Async
Replace sync libraries with async equivalents:
After (Asynchronous):
import aiohttp
@app.method_hook("fetch_weather")
async def get_weather(actor, method_name, data):
# Non-blocking, allows other requests to process
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.weather.com/v1/forecast?city={data['city']}") as resp:
return await resp.json()
Step 3: Update Library Dependencies
Install async versions of your libraries:
# HTTP clients
pip install aiohttp httpx
# Database
pip install asyncpg motor # PostgreSQL, MongoDB
# AWS
pip install aioboto3
Common Library Replacements
Sync Library |
Async Alternative |
|---|---|
|
|
|
|
|
|
|
|
|
|
Real-World Examples
Example 1: Database Query
Before (Synchronous):
import psycopg2
@app.property_hook("user_profile")
def get_profile(actor, operation, value, path):
if operation == "get":
conn = psycopg2.connect("dbname=mydb")
cursor = conn.cursor()
cursor.execute("SELECT * FROM profiles WHERE actor_id = %s", (actor.id,))
profile = cursor.fetchone()
conn.close()
return dict(profile) if profile else None
return value
After (Asynchronous):
import asyncpg
@app.property_hook("user_profile")
async def get_profile(actor, operation, value, path):
if operation == "get":
conn = await asyncpg.connect("postgresql://localhost/mydb")
profile = await conn.fetchrow(
"SELECT * FROM profiles WHERE actor_id = $1",
actor.id
)
await conn.close()
return dict(profile) if profile else None
return value
Example 2: Multiple HTTP Calls
Before (Sequential, Slow):
import requests
@app.method_hook("aggregate_data")
def aggregate(actor, method_name, data):
# Takes 3+ seconds (sequential)
weather = requests.get("https://api.weather.com/...").json()
news = requests.get("https://api.news.com/...").json()
stocks = requests.get("https://api.stocks.com/...").json()
return {
"weather": weather,
"news": news,
"stocks": stocks
}
After (Concurrent, Fast):
import aiohttp
import asyncio
@app.method_hook("aggregate_data")
async def aggregate(actor, method_name, data):
# Takes ~1 second (concurrent)
async with aiohttp.ClientSession() as session:
weather_task = session.get("https://api.weather.com/...")
news_task = session.get("https://api.news.com/...")
stocks_task = session.get("https://api.stocks.com/...")
weather_resp, news_resp, stocks_resp = await asyncio.gather(
weather_task, news_task, stocks_task
)
return {
"weather": await weather_resp.json(),
"news": await news_resp.json(),
"stocks": await stocks_resp.json()
}
Example 3: Peer Communication with AwProxy
Before (Synchronous):
from actingweb.interface import AwProxy
@app.action_hook("notify_peers")
def notify(actor, action_name, data):
proxy = AwProxy(config)
for peer in actor.trust.get_peers():
# Blocks on each request
proxy.send_message(
peer_url=peer.url,
message=data["message"],
secret=peer.secret
)
return {"notified": len(actor.trust.get_peers())}
After (Asynchronous):
from actingweb.interface import AwProxy
import asyncio
@app.action_hook("notify_peers")
async def notify(actor, action_name, data):
proxy = AwProxy(config)
peers = actor.trust.get_peers()
# Send all messages concurrently
tasks = [
proxy.send_message_async(
peer_url=peer.url,
message=data["message"],
secret=peer.secret
)
for peer in peers
]
results = await asyncio.gather(*tasks)
return {"notified": len([r for r in results if r is not None])}
Mixed Sync and Async
You can use both sync and async hooks in the same application:
# Quick synchronous operation - keep sync
@app.method_hook("add_numbers")
def quick_calc(actor, method_name, data):
return {"sum": data["x"] + data["y"]}
# Slow I/O operation - use async
@app.method_hook("fetch_data")
async def fetch(actor, method_name, data):
async with aiohttp.ClientSession() as session:
async with session.get(data["url"]) as resp:
return {"data": await resp.text()}
# Database query - use async
@app.property_hook("user_settings")
async def settings(actor, operation, value, path):
if operation == "get":
conn = await asyncpg.connect("postgresql://...")
settings = await conn.fetchval(
"SELECT settings FROM users WHERE id = $1",
actor.id
)
await conn.close()
return settings
return value
The framework automatically detects whether each hook is sync or async and executes it appropriately.
Testing Async Hooks
Use pytest-asyncio for testing:
# conftest.py
pytest_plugins = ("pytest_asyncio",)
# test_hooks.py
import pytest
@pytest.mark.asyncio
async def test_async_method_hook(app, test_actor):
"""Test async method hook execution."""
@app.method_hook("test_method")
async def async_hook(actor, method_name, data):
await asyncio.sleep(0.01) # Simulate async I/O
return {"result": "success"}
# Test via async execution
result = await app.hooks.execute_method_hooks_async(
"test_method",
test_actor,
{}
)
assert result == {"result": "success"}
Framework-Specific Notes
FastAPI
Best Performance: FastAPI automatically uses async handlers (AsyncMethodsHandler, AsyncActionsHandler) when available.
Async hooks execute natively without thread pool
True concurrent request handling
Optimal for high-throughput APIs
from fastapi import FastAPI
from actingweb.interface import ActingWebApp
app = ActingWebApp(...)
fastapi = FastAPI()
# Register async hooks
@app.method_hook("fetch_data")
async def fetch(actor, method_name, data):
# Executes natively in FastAPI event loop
...
app.integrate_fastapi(fastapi)
Flask
Compatibility Mode: Flask uses asyncio.run() to execute async hooks.
Async hooks work but aren’t truly concurrent
Still allows using async libraries
Good for gradual migration
from flask import Flask
from actingweb.interface import ActingWebApp
app = ActingWebApp(...)
flask = Flask(__name__)
# Register async hooks
@app.method_hook("fetch_data")
async def fetch(actor, method_name, data):
# Executed via asyncio.run()
...
app.integrate_flask(flask)
MCP (Model Context Protocol)
New in v3.11.0: MCP endpoints now use AsyncMCPHandler for optimal async performance with FastAPI.
MCP tools (action hooks) and prompts (method hooks) execute natively in FastAPI event loop
No thread pool overhead for async MCP tools/prompts
True concurrent execution of multiple MCP requests
from fastapi import FastAPI
from actingweb.interface import ActingWebApp
from actingweb.mcp import mcp_tool, mcp_prompt
app = ActingWebApp(...).with_mcp(enable=True)
fastapi = FastAPI()
# Async MCP tool - executes natively in FastAPI event loop
@app.action_hook("search_data")
@mcp_tool(description="Search external data source")
async def search_tool(actor, action_name, data):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/search?q={data['query']}") as resp:
results = await resp.json()
return {"content": [{"type": "text", "text": str(results)}]}
# Async MCP prompt - also executes natively
@app.method_hook("summarize_notes")
@mcp_prompt(description="Generate notes summary")
async def summarize_prompt(actor, method_name, params):
# Async database query
async with db_pool.acquire() as conn:
notes = await conn.fetch("SELECT * FROM notes WHERE actor_id = $1", actor.id)
return f"Found {len(notes)} notes: " + ", ".join(n["title"] for n in notes)
app.integrate_fastapi(fastapi)
Performance Impact: With AsyncMCPHandler, async MCP tools and prompts:
Execute in ~1-5ms (vs ~10-20ms with thread pool overhead)
Support thousands of concurrent requests
Enable true I/O concurrency (e.g., calling multiple external APIs in parallel)
Note: Flask integration continues using sync MCPHandler - async hooks still work via asyncio.run() but won’t benefit from true concurrency.
Performance Tips
Use Connection Pools
Don’t create new connections per request:
# Bad: Creates new connection each time @app.method_hook("query") async def bad_query(actor, method_name, data): conn = await asyncpg.connect("...") # Expensive! result = await conn.fetch("...") await conn.close() return result # Good: Use connection pool pool = await asyncpg.create_pool("...") @app.method_hook("query") async def good_query(actor, method_name, data): async with pool.acquire() as conn: # Reuses connections result = await conn.fetch("...") return result
Batch Operations with asyncio.gather()
Process multiple items concurrently:
@app.method_hook("process_batch") async def process(actor, method_name, data): tasks = [process_item(item) for item in data["items"]] results = await asyncio.gather(*tasks) return {"results": results}
Don’t Block the Event Loop
Never use blocking calls in async hooks:
# Bad: Blocks event loop @app.method_hook("bad") async def bad(actor, method_name, data): time.sleep(1) # Blocks everything! return {} # Good: Use async sleep @app.method_hook("good") async def good(actor, method_name, data): await asyncio.sleep(1) # Non-blocking return {}
Troubleshooting
“RuntimeError: asyncio.run() cannot be called from a running event loop”
This happens when you call sync hook execution from an async context.
Solution: Use the async execution methods:
# Wrong
result = hooks.execute_method_hooks(...) # In async context
# Right
result = await hooks.execute_method_hooks_async(...)
“Task was destroyed but it is pending”
Ensure all async operations complete before exiting:
@app.method_hook("cleanup")
async def cleanup(actor, method_name, data):
tasks = [do_something_async() for _ in range(10)]
await asyncio.gather(*tasks) # Wait for all
return {"done": True}
Performance Not Improving
Check that you’re:
Using FastAPI (not Flask) for true async
Actually making concurrent calls (use
asyncio.gather())Using async libraries (not sync libraries in async functions)
Not blocking with
time.sleep()or sync database calls
Migration Checklist
Before deploying async hooks to production:
☐ Identify I/O-bound hooks worth migrating
☐ Install async library dependencies
☐ Convert hooks to async def
☐ Replace sync library calls with async equivalents
☐ Test with pytest-asyncio
☐ Verify error handling still works
☐ Check performance improvements
☐ Deploy to staging first
☐ Monitor for “event loop” errors
☐ Verify backward compatibility with existing sync hooks
Further Reading
Hooks - General hooks guide
Hooks Reference - Complete hooks reference with async examples