import base64
import json
import logging
from typing import Any
import httpx
import requests
from actingweb import request_context, trust
logger = logging.getLogger(__name__)
try:
from urllib.parse import urlencode as urllib_urlencode
except ImportError:
from urllib.parse import urlencode as urllib_urlencode
# Type alias for timeout parameter
TimeoutType = int | float | tuple[int | float, int | float] | None
[docs]
class AwProxy:
"""Proxy to other trust peers to execute RPC style calls.
Initialise with either trust_target to target a specific
existing trust or use peer_target for simplicity to use
the trust established with the peer.
Args:
trust_target: Trust object for the target peer
peer_target: Simplified peer target dict
config: Configuration object
timeout: HTTP timeout in seconds. Either a single value (used for both
connect and read timeouts) or a tuple (connect_timeout, read_timeout).
Default: (5, 20) = 5s connect, 20s read timeout.
Provides both sync methods (using ``requests``) and async methods
(using ``httpx``) for peer communication:
- Sync: ``get_resource()``, ``create_resource()``, ``change_resource()``, ``delete_resource()``
- Async: ``get_resource_async()``, ``create_resource_async()``, ``change_resource_async()``, ``delete_resource_async()``
Use async methods in FastAPI routes for non-blocking I/O.
"""
def __init__(
self,
trust_target: Any = None,
peer_target: dict[str, Any] | None = None,
config: Any = None,
timeout: TimeoutType = None,
):
self.config = config
self.last_response_code = 0
self.last_response_message = 0
self.last_location: str | None = None
self.peer_passphrase: str | None = None
# Set timeout - supports tuple (connect, read) or single value
# Default: (5, 20) = 5s connect, 20s read timeout
if timeout is None:
self.timeout: tuple[int | float, int | float] = (5, 20)
elif isinstance(timeout, tuple):
self.timeout = timeout
else:
# Single value provided, use for both connect and read
self.timeout = (timeout, timeout)
# Pre-compute httpx timeout for async methods (proper connect/read separation)
# httpx.Timeout accepts tuple format: (connect, read, write, pool)
self._httpx_timeout = httpx.Timeout(
timeout=float(self.timeout[1]), # default for unspecified
connect=float(self.timeout[0]),
read=float(self.timeout[1]),
)
if trust_target and trust_target.trust:
self.trust = trust_target
self.actorid = trust_target.id
elif peer_target and peer_target["id"]:
self.actorid = peer_target["id"]
self.trust = None
# Capture peer passphrase if available for Basic fallback (creator 'trustee')
if "passphrase" in peer_target and peer_target["passphrase"]:
self.peer_passphrase = peer_target["passphrase"]
if peer_target["peerid"]:
self.trust = trust.Trust(
actor_id=self.actorid,
peerid=peer_target["peerid"],
config=self.config,
).get()
if not self.trust or len(self.trust) == 0:
self.trust = None
def _add_correlation_headers(self, headers: dict[str, str]) -> dict[str, str]:
"""Add request correlation headers for tracing peer-to-peer requests.
Generates a new request ID for the outgoing request and includes the
current request ID as the parent for request chain tracking.
Args:
headers: Existing headers dictionary to add correlation headers to
Returns:
Updated headers dictionary with correlation headers
"""
# Generate new request ID for the outgoing peer request
new_request_id = request_context.generate_request_id()
headers["X-Request-ID"] = new_request_id
# Add parent request ID if we're in a request context
parent_request_id = request_context.get_request_id()
if parent_request_id:
headers["X-Parent-Request-ID"] = parent_request_id
# Log correlation for traceability
logger.debug(
f"Peer request correlation: new_id={new_request_id[:8]}... "
f"parent_id={parent_request_id[:8]}..."
)
else:
logger.debug(f"Peer request: new_id={new_request_id[:8]}... (no parent)")
return headers
def _bearer_headers(self):
headers = (
{"Authorization": "Bearer " + self.trust["secret"]}
if self.trust and self.trust.get("secret")
else {}
)
return self._add_correlation_headers(headers)
def _basic_headers(self):
if not self.peer_passphrase:
return self._add_correlation_headers({})
u_p = ("trustee:" + self.peer_passphrase).encode("utf-8")
headers = {"Authorization": "Basic " + base64.b64encode(u_p).decode("utf-8")}
return self._add_correlation_headers(headers)
def _maybe_retry_with_basic(self, method, url, data=None, headers=None):
# Only retry if we have a peer passphrase available
if not self.peer_passphrase:
return None
try:
bh = self._basic_headers()
# If original headers had correlation headers, preserve them in retry
if headers:
if "X-Request-ID" in headers:
bh["X-Request-ID"] = headers["X-Request-ID"]
if "X-Parent-Request-ID" in headers:
bh["X-Parent-Request-ID"] = headers["X-Parent-Request-ID"]
if data is None:
if method == "GET":
return requests.get(url=url, headers=bh, timeout=self.timeout)
if method == "DELETE":
return requests.delete(url=url, headers=bh, timeout=self.timeout)
else:
if method == "POST":
return requests.post(
url=url,
data=data,
headers={**bh, "Content-Type": "application/json"},
timeout=self.timeout,
)
if method == "PUT":
return requests.put(
url=url,
data=data,
headers={**bh, "Content-Type": "application/json"},
timeout=self.timeout,
)
except Exception:
return None
return None
[docs]
def get_resource(self, path=None, params=None):
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
if params:
url = url + "?" + urllib_urlencode(params)
headers = self._bearer_headers()
logger.debug(f"Fetching peer resource from {url}")
try:
response = requests.get(url=url, headers=headers, timeout=self.timeout)
# Retry with Basic if Bearer gets redirected/unauthorized/forbidden
if response.status_code in (302, 401, 403):
retry = self._maybe_retry_with_basic("GET", url, headers=headers)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except Exception:
logger.debug("Not able to get peer resource")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Unable to communciate with trust peer service.",
},
}
logger.debug(f"Get trust peer resource response: {response.status_code}")
if response.status_code < 200 or response.status_code > 299:
logger.info("Not able to get trust peer resource.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when getting resource at(" + url + ")"
)
# If response was an error status and JSON parsing failed, return structured error
if response.status_code < 200 or response.status_code > 299:
result = {
"error": {
"code": response.status_code,
"message": f"HTTP {response.status_code} with non-JSON response",
}
}
else:
# Success status but non-JSON response - log warning for debugging
logger.warning(
f"Peer returned HTTP {response.status_code} with non-JSON content at {url}. "
"This may indicate an unexpected content type or parsing issue."
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
def create_resource(self, path=None, params=None):
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
data = json.dumps(params)
headers = {**self._bearer_headers(), "Content-Type": "application/json"}
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.debug(
"Creating trust peer resource at (" + url + ") with data(" + str(data) + ")"
)
try:
response = requests.post(
url=url, data=data, headers=headers, timeout=self.timeout
)
if response.status_code in (302, 401, 403):
retry = self._maybe_retry_with_basic(
"POST", url, data=data, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except Exception:
logger.debug("Not able to create new peer resource")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Unable to communciate with trust peer service.",
},
}
if "Location" in response.headers:
self.last_location = response.headers["Location"]
else:
self.last_location = None
logger.debug(f"Create trust peer resource response: {response.status_code}")
if response.status_code < 200 or response.status_code > 299:
logger.warning("Not able to create new trust peer resource.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when creating resource at(" + url + ")"
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
def change_resource(self, path=None, params=None):
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
data = json.dumps(params)
# Use _bearer_headers() to include correlation headers
headers = self._bearer_headers()
headers["Content-Type"] = "application/json"
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.debug(
"Changing trust peer resource at (" + url + ") with data(" + str(data) + ")"
)
try:
response = requests.put(
url=url, data=data, headers=headers, timeout=self.timeout
)
if response.status_code in (302, 401, 403):
retry = self._maybe_retry_with_basic(
"PUT", url, data=data, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except Exception:
logger.debug("Not able to change peer resource")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Unable to communciate with trust peer service.",
},
}
logger.debug(f"Change trust peer resource response: {response.status_code}")
if response.status_code < 200 or response.status_code > 299:
logger.warning("Not able to change trust peer resource.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when changing resource at(" + url + ")"
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
def delete_resource(self, path=None):
if not path or len(path) == 0:
return None
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
# Use _bearer_headers() to include correlation headers
headers = self._bearer_headers()
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.info(f"Deleting peer resource at {url}")
try:
response = requests.delete(url=url, headers=headers, timeout=self.timeout)
if response.status_code in (302, 401, 403):
retry = self._maybe_retry_with_basic("DELETE", url, headers=headers)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except Exception:
logger.debug("Not able to delete peer resource")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Unable to communciate with trust peer service.",
},
}
# Async methods using httpx for non-blocking HTTP requests
# These are useful in async frameworks like FastAPI to avoid blocking the event loop
async def _maybe_retry_with_basic_async(
self,
method: str,
url: str,
data: str | None = None,
headers: dict[str, str] | None = None,
) -> httpx.Response | None:
"""Async retry with Basic auth if Bearer fails."""
if not self.peer_passphrase:
return None
try:
bh = self._basic_headers()
# If original headers had correlation headers, preserve them in retry
if headers:
if "X-Request-ID" in headers:
bh["X-Request-ID"] = headers["X-Request-ID"]
if "X-Parent-Request-ID" in headers:
bh["X-Parent-Request-ID"] = headers["X-Parent-Request-ID"]
async with httpx.AsyncClient(timeout=self._httpx_timeout) as client:
if data is None:
if method == "GET":
return await client.get(url, headers=bh)
if method == "DELETE":
return await client.delete(url, headers=bh)
else:
final_headers = {**bh, "Content-Type": "application/json"}
if method == "POST":
return await client.post(
url, content=data, headers=final_headers
)
if method == "PUT":
return await client.put(
url, content=data, headers=final_headers
)
except Exception:
return None
return None
[docs]
async def get_resource_async(
self, path: str | None = None, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
"""Async version of get_resource using httpx.
Use this method in async contexts (e.g., FastAPI routes) for non-blocking
HTTP calls to peer actors.
Args:
path: The resource path on the peer actor (e.g., "trust/friend/permissions")
params: Optional query parameters
Returns:
The JSON response from the peer, or None if the request failed.
"""
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
if params:
url = url + "?" + urllib_urlencode(params)
headers = self._bearer_headers()
logger.debug(f"Fetching peer resource async from {url}")
try:
async with httpx.AsyncClient(timeout=self._httpx_timeout) as client:
response = await client.get(url, headers=headers)
# Retry with Basic if Bearer gets redirected/unauthorized/forbidden
if response.status_code in (302, 401, 403):
retry = await self._maybe_retry_with_basic_async(
"GET", url, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except httpx.TimeoutException:
logger.debug("Timeout getting peer resource async")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Timeout communicating with trust peer service.",
},
}
except httpx.ConnectError as e:
logger.debug(f"Connection error getting peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Unable to connect to trust peer service.",
},
}
except httpx.NetworkError as e:
logger.debug(f"Network error getting peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Network error communicating with trust peer service.",
},
}
except Exception as e:
logger.warning(f"Unexpected error getting peer resource async: {e}")
self.last_response_code = 500
return {
"error": {
"code": 500,
"message": "Internal error communicating with trust peer service.",
},
}
logger.debug(f"Get trust peer resource async response: {response.status_code}")
if response.status_code < 200 or response.status_code > 299:
logger.info("Not able to get trust peer resource async.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when getting resource async at(" + url + ")"
)
# If response was an error status and JSON parsing failed, return structured error
if response.status_code < 200 or response.status_code > 299:
result = {
"error": {
"code": response.status_code,
"message": f"HTTP {response.status_code} with non-JSON response",
}
}
else:
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
async def create_resource_async(
self, path: str | None = None, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
"""Async version of create_resource (POST) using httpx.
Args:
path: The resource path on the peer actor
params: Data to send as JSON body
Returns:
The JSON response from the peer, or None if the request failed.
"""
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
data = json.dumps(params)
headers = {**self._bearer_headers(), "Content-Type": "application/json"}
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.debug(
"Creating trust peer resource async at ("
+ url
+ ") with data("
+ str(data)
+ ")"
)
try:
async with httpx.AsyncClient(timeout=self._httpx_timeout) as client:
response = await client.post(url, content=data, headers=headers)
if response.status_code in (302, 401, 403):
retry = await self._maybe_retry_with_basic_async(
"POST", url, data=data, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except httpx.TimeoutException:
logger.debug("Timeout creating peer resource async")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Timeout communicating with trust peer service.",
},
}
except httpx.ConnectError as e:
logger.debug(f"Connection error creating peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Unable to connect to trust peer service.",
},
}
except httpx.NetworkError as e:
logger.debug(f"Network error creating peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Network error communicating with trust peer service.",
},
}
except Exception as e:
logger.warning(f"Unexpected error creating peer resource async: {e}")
self.last_response_code = 500
return {
"error": {
"code": 500,
"message": "Internal error communicating with trust peer service.",
},
}
if "Location" in response.headers:
self.last_location = response.headers["Location"]
else:
self.last_location = None
logger.debug(
f"Create trust peer resource async response: {response.status_code}"
)
if response.status_code < 200 or response.status_code > 299:
logger.warning("Not able to create new trust peer resource async.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when creating resource async at("
+ url
+ ")"
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
async def change_resource_async(
self, path: str | None = None, params: dict[str, Any] | None = None
) -> dict[str, Any] | None:
"""Async version of change_resource (PUT) using httpx.
Args:
path: The resource path on the peer actor
params: Data to send as JSON body
Returns:
The JSON response from the peer, or None if the request failed.
"""
if not path or len(path) == 0:
return None
if not params:
params = {}
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
data = json.dumps(params)
# Use _bearer_headers() to include correlation headers
headers = self._bearer_headers()
headers["Content-Type"] = "application/json"
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.debug(
"Changing trust peer resource async at ("
+ url
+ ") with data("
+ str(data)
+ ")"
)
try:
async with httpx.AsyncClient(timeout=self._httpx_timeout) as client:
response = await client.put(url, content=data, headers=headers)
if response.status_code in (302, 401, 403):
retry = await self._maybe_retry_with_basic_async(
"PUT", url, data=data, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except httpx.TimeoutException:
logger.debug("Timeout changing peer resource async")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Timeout communicating with trust peer service.",
},
}
except httpx.ConnectError as e:
logger.debug(f"Connection error changing peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Unable to connect to trust peer service.",
},
}
except httpx.NetworkError as e:
logger.debug(f"Network error changing peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Network error communicating with trust peer service.",
},
}
except Exception as e:
logger.warning(f"Unexpected error changing peer resource async: {e}")
self.last_response_code = 500
return {
"error": {
"code": 500,
"message": "Internal error communicating with trust peer service.",
},
}
logger.debug(
f"Change trust peer resource async response: {response.status_code}"
)
if response.status_code < 200 or response.status_code > 299:
logger.warning("Not able to change trust peer resource async.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when changing resource async at("
+ url
+ ")"
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result
[docs]
async def delete_resource_async(
self, path: str | None = None
) -> dict[str, Any] | None:
"""Async version of delete_resource (DELETE) using httpx.
Args:
path: The resource path on the peer actor
Returns:
The JSON response from the peer, or None if the request failed.
"""
if not path or len(path) == 0:
return None
if not self.trust or not self.trust["baseuri"] or not self.trust["secret"]:
return None
# Use _bearer_headers() to include correlation headers
headers = self._bearer_headers()
url = self.trust["baseuri"].strip("/") + "/" + path.strip("/")
logger.info(f"Deleting peer resource async at {url}")
try:
async with httpx.AsyncClient(timeout=self._httpx_timeout) as client:
response = await client.delete(url, headers=headers)
if response.status_code in (302, 401, 403):
retry = await self._maybe_retry_with_basic_async(
"DELETE", url, headers=headers
)
if retry is not None:
response = retry
self.last_response_code = response.status_code
self.last_response_message = response.content
except httpx.TimeoutException:
logger.debug("Timeout deleting peer resource async")
self.last_response_code = 408
return {
"error": {
"code": 408,
"message": "Timeout communicating with trust peer service.",
},
}
except httpx.ConnectError as e:
logger.debug(f"Connection error deleting peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Unable to connect to trust peer service.",
},
}
except httpx.NetworkError as e:
logger.debug(f"Network error deleting peer resource async: {e}")
self.last_response_code = 502
return {
"error": {
"code": 502,
"message": "Network error communicating with trust peer service.",
},
}
except Exception as e:
logger.warning(f"Unexpected error deleting peer resource async: {e}")
self.last_response_code = 500
return {
"error": {
"code": 500,
"message": "Internal error communicating with trust peer service.",
},
}
logger.debug(
f"Delete trust peer resource async response: {response.status_code}"
)
if response.status_code < 200 or response.status_code > 299:
logger.warning("Not able to delete trust peer resource async.")
try:
result = response.json()
except (TypeError, ValueError, KeyError):
logger.debug(
"Not able to parse response when deleting resource async at("
+ url
+ ")"
)
result = {}
# Ensure error responses from peers have a structured error with the HTTP status code
if response.status_code < 200 or response.status_code > 299:
if "error" in result and not isinstance(result["error"], dict):
result["error"] = {
"code": response.status_code,
"message": str(result["error"]),
}
return result