Source code for actingweb.actor

import base64
import datetime
import json
import logging
from typing import Any

import requests

from actingweb import attribute, peertrustee, property, subscription, trust
from actingweb.constants import (
    DEFAULT_CREATOR,
)
from actingweb.db import get_actor, get_actor_list, get_subscription_suspension
from actingweb.permission_evaluator import PermissionResult, get_permission_evaluator

logger = logging.getLogger(__name__)


[docs] class ActorError(Exception): """Base exception class for Actor-related errors.""" pass
[docs] class ActorNotFoundError(ActorError): """Raised when an actor cannot be found.""" pass
[docs] class InvalidActorDataError(ActorError): """Raised when actor data is invalid or corrupted.""" pass
[docs] class PeerCommunicationError(ActorError): """Raised when communication with peer actors fails.""" pass
[docs] class TrustRelationshipError(ActorError): """Raised when trust relationship operations fail.""" pass
[docs] class DummyPropertyClass: """Only used to deprecate get_property() in 2.4.4""" def __init__(self, v: Any = None) -> None: self.value = v
[docs] class Actor: ################### # Basic operations ################### def __init__(self, actor_id: str | None = None, config: Any | None = None) -> None: self.config = config self.property_list: Any | None = None self.subs_list: list[dict[str, Any]] | None = None self.actor: dict[str, Any] | None = None self.passphrase: str | None = None self.creator: str | None = None self.last_response_code: int = 0 self.last_response_message: str = "" self.id: str | None = actor_id if self.config: self.handle = get_actor(self.config) else: self.handle = None if actor_id and config: self.store = attribute.InternalStore(actor_id=actor_id, config=config) self.property = property.PropertyStore(actor_id=actor_id, config=config) self.property_lists = property.PropertyListStore( actor_id=actor_id, config=config ) else: self.store = None self.property = None self.property_lists = None self.get(actor_id=actor_id)
[docs] def get_peer_info( self, url: str, max_retries: int = 3, retry_delay: float = 0.5 ) -> dict[str, Any]: """Contacts another actor over http/s to retrieve meta information. Includes retry logic for transient network failures with exponential backoff. Note: This sync method blocks the event loop. In FastAPI/uvicorn contexts, use AsyncTrustHandler which calls create_reciprocal_trust_async() instead. :param url: Root URI of a remote actor :param max_retries: Maximum number of retry attempts (default: 3) :param retry_delay: Initial delay between retries in seconds (default: 0.5) :rtype: dict :return: The json response from the /meta path in the data element and last_response_code/last_response_message set to the results of the https request Example:: { "last_response_code": 200, "last_response_message": "OK", "data": {} } """ import time last_error: Exception | None = None for attempt in range(max_retries): try: if attempt > 0: logger.info( f"Retry attempt {attempt + 1}/{max_retries} for peer info from {url}" ) else: logger.debug(f"Fetching peer info from {url}") response = requests.get(url=url + "/meta", timeout=(5, 10)) res = { "last_response_code": response.status_code, "last_response_message": response.content, "data": json.loads(response.content.decode("utf-8", "ignore")), } logger.debug( f"Got peer info from url({url}) with body({response.content})" ) return res except (TypeError, ValueError, KeyError) as e: # JSON parsing errors - don't retry logger.warning(f"Invalid response from peer {url}: {e}") return { "last_response_code": 500, "last_response_message": str(e), } except requests.exceptions.RequestException as e: # Network errors - retry with exponential backoff last_error = e if attempt < max_retries - 1: delay = retry_delay * (2**attempt) # Exponential backoff logger.warning( f"Network error fetching peer info from {url}: {e}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) else: logger.warning( f"Network error fetching peer info from {url} after " f"{max_retries} attempts: {e}" ) # All retries exhausted return { "last_response_code": 500, "last_response_message": str(last_error) if last_error else "Unknown error", }
[docs] async def get_peer_info_async(self, url: str) -> dict[str, Any]: """Async version of get_peer_info using httpx. Contacts another actor over HTTP/S to retrieve meta information without blocking. :param url: Root URI of a remote actor :return: Dict with last_response_code, last_response_message, and data """ import httpx try: logger.debug(f"Fetching peer info async from {url}") async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url + "/meta") res = { "last_response_code": response.status_code, "last_response_message": response.content, "data": response.json(), } logger.debug( f"Got peer info async from url({url}) with body({response.content})" ) return res except (TypeError, ValueError, KeyError) as e: # JSON parsing errors logger.warning(f"Invalid response from peer {url}: {e}") return { "last_response_code": 500, "last_response_message": str(e), "data": {}, } except httpx.TimeoutException as e: logger.warning(f"Timeout fetching peer info async from {url}: {e}") return { "last_response_code": 408, "last_response_message": "Timeout", "data": {}, } except httpx.RequestError as e: logger.warning(f"Network error fetching peer info async from {url}: {e}") return { "last_response_code": 500, "last_response_message": str(e), "data": {}, }
[docs] def get(self, actor_id: str | None = None) -> dict[str, Any] | None: """Retrieves an actor from storage or initialises if it does not exist""" if not actor_id and not self.id: return None elif not actor_id: actor_id = self.id if self.handle and self.actor and len(self.actor) > 0: return self.actor if self.handle: self.actor = self.handle.get(actor_id=actor_id) else: self.actor = None if self.actor and len(self.actor) > 0: self.id = self.actor["id"] self.creator = self.actor["creator"] self.passphrase = self.actor["passphrase"] self.store = attribute.InternalStore(actor_id=self.id, config=self.config) self.property = property.PropertyStore(actor_id=self.id, config=self.config) self.property_lists = property.PropertyListStore( actor_id=self.id, config=self.config ) if self.config and self.config.force_email_prop_as_creator: em = self.store.email if em and em.lower() != self.creator: self.modify(creator=em.lower()) else: self.id = None self.creator = None self.passphrase = None return self.actor
[docs] def get_from_property( self, name: str = "oauthId", value: str | None = None ) -> None: """Initialise an actor by matching on a stored property. Use with caution as the property's value de-facto becomes a security token. If multiple properties are found with the same value, no actor will be initialised. Also note that this is a costly operation as all properties of this type will be retrieved and proceessed. """ actor_id = property.Property( name=name, value=value, config=self.config ).get_actor_id() if not actor_id: self.id = None self.creator = None self.passphrase = None return self.get(actor_id=actor_id)
[docs] def get_from_creator(self, creator: str | None = None) -> bool: """Initialise an actor by matching on creator/email. Returns True if an actor could be loaded, otherwise False. When multiple actors share the same creator (possible when unique_creator is disabled), the first deterministic match will be selected in order to provide stable behaviour for login flows that do not specify an explicit actor ID. """ self.id = None self.creator = None self.passphrase = None if not self.config or not creator: return False lookup_creator = creator.lower() if "@" in creator else creator exists = get_actor(self.config).get_by_creator(creator=lookup_creator) if not exists: return False # Normalise return to a list of candidate records candidates: list[dict[str, Any]] if isinstance(exists, list): candidates = [c for c in exists if c] else: candidates = [exists] if not candidates: return False # Ensure deterministic selection order even when DynamoDB returns arbitrary order candidates.sort(key=lambda item: item.get("id", "")) for candidate in candidates: actor_id = candidate.get("id") if not actor_id: continue self.get(actor_id=actor_id) if self.id: return True return False
[docs] def create( self, url: str, creator: str, passphrase: str, actor_id: str | None = None, delete: bool = False, trustee_root: str | None = None, hooks: Any = None, ) -> bool: """ "Creates a new actor and persists it. If delete is True, any existing actors with same creator value will be deleted. If it is False, the one with the correct passphrase will be chosen (if any) """ seed = url now = datetime.datetime.now(datetime.UTC) seed += now.strftime("%Y%m%dT%H%M%S%f") if len(creator) > 0: self.creator = creator else: self.creator = DEFAULT_CREATOR if self.config and self.config.unique_creator: in_db = get_actor(self.config) exists = in_db.get_by_creator(creator=self.creator) if exists: # If uniqueness is turned on at a later point, we may have multiple accounts # with creator as "creator". Check if we have an internal value "email" and then # set creator to the email address. if delete: for c in exists: anactor = Actor(actor_id=c["id"], config=self.config) anactor.delete() else: if ( self.config and self.config.force_email_prop_as_creator and self.creator == DEFAULT_CREATOR ): for c in exists: anactor = Actor(actor_id=c["id"], config=self.config) em = anactor.store.email if anactor.store else None if em: anactor.modify(creator=em.lower()) for c in exists: if c["passphrase"] == passphrase: self.handle = in_db self.id = c["id"] self.passphrase = c["passphrase"] self.creator = c["creator"] return True return False if passphrase and len(passphrase) > 0: self.passphrase = passphrase else: self.passphrase = self.config.new_token() if self.config else "" if actor_id: self.id = actor_id else: self.id = self.config.new_uuid(seed) if self.config else "" if not self.handle and self.config: self.handle = get_actor(self.config) if self.handle: self.handle.create( creator=self.creator, passphrase=self.passphrase, actor_id=self.id ) self.store = attribute.InternalStore(actor_id=self.id, config=self.config) self.property = property.PropertyStore(actor_id=self.id, config=self.config) self.property_lists = property.PropertyListStore( actor_id=self.id, config=self.config ) # Set trustee_root if provided if ( trustee_root and isinstance(trustee_root, str) and len(trustee_root) > 0 and self.store ): self.store.trustee_root = trustee_root # Execute actor_created lifecycle hook if hooks are provided if hooks: try: from actingweb.interface.actor_interface import ActorInterface registry = getattr(self.config, "service_registry", None) actor_interface = ActorInterface(self, service_registry=registry) hooks.execute_lifecycle_hooks("actor_created", actor_interface) except Exception as e: # Log hook execution error but don't fail actor creation logger.warning( f"Actor created successfully but lifecycle hook failed: {e}" ) return True
[docs] def modify(self, creator: str | None = None) -> bool: if not self.handle or not creator: logger.debug("Attempted modify of actor with no handle or no param changed") return False if "@" in creator: creator = creator.lower() self.creator = creator if self.actor: self.actor["creator"] = creator self.handle.modify(creator=creator) return True
[docs] def delete(self) -> None: """Deletes an actor and cleans up all relevant stored data""" if not self.handle: logger.debug("Attempted delete of actor with no handle") return self.delete_peer_trustee(shorttype="*") if not self.property_list: self.property_list = property.Properties( actor_id=self.id, config=self.config ) self.property_list.delete() subs = subscription.Subscriptions(actor_id=self.id, config=self.config) subs.fetch() subs.delete() trusts = trust.Trusts(actor_id=self.id, config=self.config) relationships = trusts.fetch() if relationships: for rel in relationships: if isinstance(rel, dict) and "peerid" in rel: self.delete_reciprocal_trust( peerid=rel.get("peerid", ""), delete_peer=True ) trusts.delete() buckets = attribute.Buckets(actor_id=self.id, config=self.config) buckets.delete() self.handle.delete()
###################### # Advanced operations ######################
[docs] def set_property(self, name, value): """Sets an actor's property name to value. (DEPRECATED, use actor's property store!)""" if self.property: self.property[name] = value
[docs] def get_property(self, name): """Retrieves a property object named name. (DEPRECATED, use actor's property store!)""" return DummyPropertyClass(self.property[name] if self.property else None)
[docs] def delete_property(self, name): """Deletes a property name. (DEPRECATED, use actor's property store!)""" if self.property: self.property[name] = None
[docs] def delete_properties(self): """Deletes all properties.""" if not self.property_list: self.property_list = property.Properties( actor_id=self.id, config=self.config ) return self.property_list.delete()
[docs] def get_properties(self): """Retrieves properties from db and returns a dict.""" self.property_list = property.Properties(actor_id=self.id, config=self.config) return self.property_list.fetch()
[docs] def delete_peer_trustee(self, shorttype=None, peerid=None): if not peerid and not shorttype: return False if shorttype == "*": if self.config and self.config.actors: for t in self.config.actors: self.delete_peer_trustee(shorttype=t) return True if ( shorttype and self.config and self.config.actors and shorttype not in self.config.actors ): logger.error(f"Got a request to delete an unknown actor type({shorttype})") return False peer_data = None new_peer = None if peerid: new_peer = peertrustee.PeerTrustee( actor_id=self.id, peerid=peerid, config=self.config ) peer_data = new_peer.get() if ( isinstance(peer_data, bool) or not peer_data or (isinstance(peer_data, dict) and len(peer_data) == 0) ): return False elif shorttype: new_peer = peertrustee.PeerTrustee( actor_id=self.id, short_type=shorttype, config=self.config ) peer_data = new_peer.get() if ( isinstance(peer_data, bool) or not peer_data or (isinstance(peer_data, dict) and len(peer_data) == 0) ): return False if not peer_data or isinstance(peer_data, bool): return False logger.info(f"Deleting peer actor at {peer_data['baseuri']}") u_p = b"trustee:" + peer_data["passphrase"].encode("utf-8") headers = { "Authorization": "Basic " + base64.b64encode(u_p).decode("utf-8"), } try: response = requests.delete( url=peer_data["baseuri"], headers=headers, timeout=(5, 10) ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: logger.debug("Not able to delete peer actor remotely due to network issues") self.last_response_code = 408 return False if response.status_code < 200 or response.status_code > 299: logger.debug("Not able to delete peer actor remotely, peer is unwilling") return False # Delete trust, peer is already deleted remotely if peer_data and not self.delete_reciprocal_trust( peerid=peer_data["peerid"], delete_peer=False ): logger.debug("Not able to delete peer actor trust in db") if new_peer and not new_peer.delete(): logger.debug("Not able to delete peer actor in db") return False return True
[docs] def get_peer_trustee(self, shorttype=None, peerid=None): """Get a peer, either existing or create it as trustee Will retrieve an existing peer or create a new and establish trust. If no trust exists, a new trust will be established. Use either peerid to target a specific known peer, or shorttype to allow creation of a new peer if none exists """ if not peerid and not shorttype: return None if ( shorttype and self.config and self.config.actors and shorttype not in self.config.actors ): logger.error(f"Got a request to create an unknown actor type({shorttype})") return None if peerid: new_peer = peertrustee.PeerTrustee( actor_id=self.id, peerid=peerid, config=self.config ) else: new_peer = peertrustee.PeerTrustee( actor_id=self.id, short_type=shorttype, config=self.config ) peer_data = new_peer.get() if peer_data and not isinstance(peer_data, bool) and len(peer_data) > 0: logger.debug("Found peer in getPeer, now checking existing trust...") dbtrust = trust.Trust( actor_id=self.id, peerid=peer_data["peerid"], config=self.config ) new_trust = dbtrust.get() if new_trust and len(new_trust) > 0: return peer_data logger.debug("Did not find existing trust, will create a new one") factory = "" if ( self.config and self.config.actors and shorttype and shorttype in self.config.actors ): factory = self.config.actors[shorttype]["factory"] # If peer did not exist, create it as trustee if not peer_data or isinstance(peer_data, bool) or len(peer_data) == 0: if len(factory) == 0: logger.error( f"Peer actor of shorttype({shorttype}) does not have factory set." ) params = { "creator": "trustee", "trustee_root": (self.config.root + self.id) if self.config else "", } data = json.dumps(params) logger.debug(f"Creating peer actor at factory({factory})") response = None try: response = requests.post( url=factory, data=data, timeout=(5, 10), headers={"Content-Type": "application/json"}, ) if response: self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: logger.debug("Not able to create new peer actor") self.last_response_code = 408 logger.info(f"Created peer actor, response code: {self.last_response_code}") if self.last_response_code < 200 or self.last_response_code > 299: return None try: if response and response.content: content_str = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) data = json.loads(content_str) else: data = {} except (TypeError, ValueError, KeyError): logger.warning( f"Not able to parse response when creating peer at factory({factory})" ) return None if response and "Location" in response.headers: baseuri = response.headers["Location"] elif response and "location" in response.headers: baseuri = response.headers["location"] else: logger.warning( "No location uri found in response when creating a peer as trustee" ) baseuri = "" res = self.get_peer_info(baseuri) if ( not res or res["last_response_code"] < 200 or res["last_response_code"] >= 300 ): return None info_peer = res["data"] if ( not info_peer or ("id" in info_peer and not info_peer["id"]) or ("type" in info_peer and not info_peer["type"]) ): logger.info( f"Received invalid peer info when trying to create peer actor at: {factory}" ) return None new_peer = peertrustee.PeerTrustee( actor_id=self.id, peerid=info_peer["id"], peer_type=info_peer["type"], config=self.config, ) if not new_peer.create(baseuri=baseuri, passphrase=data["passphrase"]): logger.error( f"Failed to create in db new peer Actor({self.id}) at {baseuri}" ) return None # Now peer exists, create trust new_peer_data = new_peer.get() if not new_peer_data or isinstance(new_peer_data, bool): return None secret = self.config.new_token() if self.config else "" relationship = "" if ( self.config and self.config.actors and shorttype and shorttype in self.config.actors ): relationship = self.config.actors[shorttype]["relationship"] new_trust = self.create_reciprocal_trust( url=new_peer_data["baseuri"], secret=secret, desc="Trust from trustee to " + (shorttype or ""), relationship=relationship, ) if not new_trust or len(new_trust) == 0: logger.warning( f"Not able to establish trust relationship with peer at factory({factory})" ) else: # Approve the relationship params = { "approved": True, } u_p = b"trustee:" + new_peer_data["passphrase"].encode("utf-8") headers = { "Authorization": "Basic " + base64.b64encode(u_p).decode("utf-8"), "Content-Type": "application/json", } data = json.dumps(params) try: response = requests.put( url=new_peer_data["baseuri"] + "/trust/" + relationship + "/" + (self.id or ""), data=data, headers=headers, timeout=(5, 10), ) if response: self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: self.last_response_code = 408 self.last_response_message = ( "Not able to approve peer actor trust remotely" ) if self.last_response_code < 200 or self.last_response_code > 299: logger.debug("Not able to delete peer actor remotely") return new_peer_data
[docs] def get_trust_relationship(self, peerid=None): if not peerid: return None return trust.Trust(actor_id=self.id, peerid=peerid, config=self.config).get()
[docs] def get_trust_relationships(self, relationship="", peerid="", trust_type=""): """Retrieves all trust relationships or filtered.""" trust_list = trust.Trusts(actor_id=self.id, config=self.config) relationships = trust_list.fetch() rels = [] if relationships: for rel in relationships: if isinstance(rel, dict): if len(relationship) > 0 and relationship != rel.get( "relationship", "" ): continue if len(peerid) > 0 and peerid != rel.get("peerid", ""): continue if len(trust_type) > 0 and trust_type != rel.get("type", ""): continue rels.append(rel) return rels
[docs] def modify_trust_and_notify( self, relationship=None, peerid=None, baseuri="", secret="", desc="", approved=None, verified=None, verification_token=None, peer_approved=None, # Client metadata for OAuth2 clients client_name=None, client_version=None, client_platform=None, oauth_client_id=None, # Connection tracking last_accessed=None, last_connected_via=None, ): """Changes a trust relationship and noties the peer if approval is changed.""" if not relationship or not peerid: return False relationships = self.get_trust_relationships( relationship=relationship, peerid=peerid ) if not relationships: return False this_trust = relationships[0] # IMPORTANT: Save approval to database BEFORE notifying peer # This prevents race condition where peer tries to subscribe back # before our approval is saved dbtrust = trust.Trust(actor_id=self.id, peerid=peerid, config=self.config) try: result = dbtrust.modify( baseuri=baseuri, secret=secret, desc=desc, approved=approved, verified=verified, verification_token=verification_token, peer_approved=peer_approved, client_name=client_name, client_version=client_version, client_platform=client_platform, oauth_client_id=oauth_client_id, last_accessed=last_accessed, last_connected_via=last_connected_via, ) except Exception as e: logger.error(f"Exception in dbtrust.modify: {e}", exc_info=True) return False # Now that approval is saved, notify peer so their auto-subscribe will succeed headers = {} if approved is True and this_trust["approved"] is False: params = { "approved": True, } requrl = this_trust["baseuri"] + "/trust/" + relationship + "/" + self.id if this_trust["secret"]: headers = { "Authorization": "Bearer " + this_trust["secret"], "Content-Type": "application/json", } data = json.dumps(params) # Note the POST here instead of PUT. POST is used to used to notify about # state change in the relationship (i.e. not change the object as PUT # would do) logger.debug( "Trust relationship has been approved, notifying peer at url(" + requrl + ")" ) try: response = requests.post( url=requrl, data=data, headers=headers, timeout=(5, 10) ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: logger.debug("Not able to notify peer at url(" + requrl + ")") self.last_response_code = 500 return result
[docs] async def modify_trust_and_notify_async( self, relationship=None, peerid=None, baseuri="", secret="", desc="", approved=None, verified=None, verification_token=None, peer_approved=None, # Client metadata for OAuth2 clients client_name=None, client_version=None, client_platform=None, oauth_client_id=None, # Connection tracking last_accessed=None, last_connected_via=None, ): """Async version of modify_trust_and_notify - prevents blocking on peer notification. Changes a trust relationship and notifies the peer if approval is changed. Database operations remain synchronous, but peer HTTP notification is async. """ if not relationship or not peerid: return False relationships = self.get_trust_relationships( relationship=relationship, peerid=peerid ) if not relationships: return False this_trust = relationships[0] # IMPORTANT: Save approval to database BEFORE notifying peer # This prevents race condition where peer tries to subscribe back # before our approval is saved dbtrust = trust.Trust(actor_id=self.id, peerid=peerid, config=self.config) result = dbtrust.modify( baseuri=baseuri, secret=secret, desc=desc, approved=approved, verified=verified, verification_token=verification_token, peer_approved=peer_approved, client_name=client_name, client_version=client_version, client_platform=client_platform, oauth_client_id=oauth_client_id, last_accessed=last_accessed, last_connected_via=last_connected_via, ) # Now that approval is saved, notify peer async so their auto-subscribe will succeed if approved is True and this_trust["approved"] is False: from .aw_proxy import AwProxy logger.debug( f"Trust relationship approved, notifying peer async at {this_trust['baseuri']}" ) try: proxy = AwProxy( peer_target={ "baseuri": this_trust["baseuri"], "secret": this_trust.get("secret", ""), }, config=self.config, ) await proxy.change_resource_async( path=f"trust/{relationship}/{self.id}", params={"approved": True}, ) self.last_response_code = proxy.last_response_code self.last_response_message = ( proxy.last_response_message.decode("utf-8", "ignore") if isinstance(proxy.last_response_message, bytes) else str(proxy.last_response_message) ) except Exception as e: logger.debug(f"Not able to notify peer async: {e}") self.last_response_code = 500 return result
[docs] def create_reciprocal_trust( self, url, secret=None, desc="", relationship="", # trust type/permission level (e.g., "friend", "admin") - goes in URL trust_type="", # peer's expected ActingWeb mini-app type for validation (optional) ): """Creates a new reciprocal trust relationship locally and by requesting a relationship from a peer actor. Args: relationship: The trust type/permission level to request (friend, admin, etc.) trust_type: Expected peer mini-app type for validation (optional) """ if len(url) == 0: return False if not secret or len(secret) == 0: return False res = self.get_peer_info(url) if ( not res or res["last_response_code"] < 200 or res["last_response_code"] >= 300 ): return False peer = res["data"] if not peer["id"] or not peer["type"] or len(peer["type"]) == 0: logger.info( "Received invalid peer info when trying to establish trust: " + url ) return False if len(trust_type) > 0: if trust_type.lower() != peer["type"].lower(): logger.info("Peer is of the wrong actingweb type: " + peer["type"]) return False if not relationship or len(relationship) == 0: relationship = self.config.default_relationship if self.config else "" # Create trust, so that peer can do a verify on the relationship (using # verification_token) when we request the relationship dbtrust = trust.Trust(actor_id=self.id, peerid=peer["id"], config=self.config) if not dbtrust.create( baseuri=url, secret=secret, peer_type=peer["type"], relationship=relationship, approved=True, verified=True, # Requesting actor has verified=True by default per ActingWeb spec desc=desc, established_via="trust", ): logger.warning( "Trying to establish a new Reciprocal trust when peer relationship already exists (" + peer["id"] + ")" ) return False # Since we are initiating the relationship, we implicitly approve it # It is not verified until the peer has verified us new_trust = dbtrust.get() params = { "baseuri": (self.config.root if self.config else "") + (self.id or ""), "id": self.id, "type": self.config.aw_type if self.config else "", "secret": secret, "desc": desc, "verify": new_trust["verification_token"] if new_trust else "", } requrl = url + "/trust/" + relationship data = json.dumps(params) logger.debug( f"Creating reciprocal trust at url({requrl}) for peer {params.get('id', 'unknown')}" ) try: response = requests.post( url=requrl, data=data, timeout=(5, 10), headers={ "Content-Type": "application/json", }, ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: logger.debug("Not able to create trust with peer, deleting my trust.") dbtrust.delete() return False if self.last_response_code == 201 or self.last_response_code == 202: # Reload the trust to check if approval was done mod_trust = trust.Trust( actor_id=self.id, peerid=peer["id"], config=self.config ) mod_trust_data = mod_trust.get() if not mod_trust_data or len(mod_trust_data) == 0: logger.error( "Couldn't find trust relationship after peer POST and verification" ) return False if self.last_response_code == 201: # Already approved by peer (probably auto-approved) # Do it direct on the trust (and not self.modifyTrustAndNotify) to avoid a callback # to the peer mod_trust.modify(peer_approved=True) return mod_trust.get() else: logger.debug("Not able to create trust with peer, deleting my trust.") dbtrust.delete() return False
[docs] async def create_reciprocal_trust_async( self, url, secret=None, desc="", relationship="", trust_type="", ): """Async version of create_reciprocal_trust - prevents blocking on peer HTTP calls. Creates a new reciprocal trust relationship locally and by requesting a relationship from a peer actor. """ if len(url) == 0: return False if not secret or len(secret) == 0: return False # Get peer info async res = await self.get_peer_info_async(url) if ( not res or res["last_response_code"] < 200 or res["last_response_code"] >= 300 ): return False peer = res["data"] if not peer.get("id") or not peer.get("type") or len(peer["type"]) == 0: logger.info( "Received invalid peer info when trying to establish trust: " + url ) return False if len(trust_type) > 0: if trust_type.lower() != peer["type"].lower(): logger.info("Peer is of the wrong actingweb type: " + peer["type"]) return False if not relationship or len(relationship) == 0: relationship = self.config.default_relationship if self.config else "" # Create trust locally (synchronous DB operation) dbtrust = trust.Trust(actor_id=self.id, peerid=peer["id"], config=self.config) if not dbtrust.create( baseuri=url, secret=secret, peer_type=peer["type"], relationship=relationship, approved=True, verified=True, desc=desc, established_via="trust", ): logger.warning( f"Trying to establish a new Reciprocal trust when peer relationship already exists ({peer['id']})" ) return False # Request relationship from peer async new_trust = dbtrust.get() params = { "baseuri": (self.config.root if self.config else "") + (self.id or ""), "id": self.id, "type": self.config.aw_type if self.config else "", "secret": secret, "desc": desc, "verify": new_trust["verification_token"] if new_trust else "", } import httpx requrl = url + "/trust/" + relationship data = json.dumps(params) logger.info( f"Requesting trust relationship async from peer at ({requrl}) with data({data})" ) try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( requrl, content=data, headers={"Content-Type": "application/json"}, ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except httpx.TimeoutException: logger.debug("Timeout creating trust with peer async, deleting my trust.") dbtrust.delete() return False except httpx.RequestError as e: logger.debug( f"Not able to create trust with peer async: {e}, deleting my trust." ) dbtrust.delete() return False if self.last_response_code == 201 or self.last_response_code == 202: # Reload trust to check if approval was done mod_trust = trust.Trust( actor_id=self.id, peerid=peer["id"], config=self.config ) mod_trust_data = mod_trust.get() if not mod_trust_data or len(mod_trust_data) == 0: logger.error( "Couldn't find trust relationship after peer POST and verification" ) return False if self.last_response_code == 201: # Already approved by peer (probably auto-approved) mod_trust.modify(peer_approved=True) return mod_trust.get() else: logger.debug("Not able to create trust with peer async, deleting my trust.") dbtrust.delete() return False
[docs] def create_verified_trust( self, baseuri="", peerid=None, approved=False, secret=None, verification_token=None, trust_type=None, # peer's ActingWeb mini-app type (e.g., "urn:actingweb:example.com:banking") peer_approved=None, relationship=None, # trust type/permission level (e.g., "friend", "admin", "partner") desc="", ): """Creates a new trust when requested and call backs to initiating actor to verify relationship. Args: trust_type: The peer's ActingWeb mini-application type URI relationship: The trust type/permission level (friend, admin, etc.) """ if not peerid or len(baseuri) == 0 or not relationship: return False requrl = baseuri + "/trust/" + relationship + "/" + self.id if not secret or len(secret) == 0: logger.debug( "No secret received from requesting peer(" + peerid + ") at url (" + requrl + "). Verification is not possible." ) verified = False else: headers = { "Authorization": "Bearer " + secret, } logger.debug( f"Verifying trust at requesting peer({peerid}) at url ({requrl})" ) try: response = requests.get(url=requrl, headers=headers, timeout=(5, 10)) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) try: content_str = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) data = json.loads(content_str) logger.debug( f"Verifying trust response: verified={data.get('verified', False)}, " f"approved={data.get('approved', False)}, peer_approved={data.get('peer_approved', False)}" ) if data["verification_token"] == verification_token: verified = True else: verified = False except ValueError: logger.debug( "No json body in response when verifying trust at url(" + requrl + ")" ) verified = False except Exception: logger.debug("No response when verifying trust at url" + requrl + ")") verified = False new_trust = trust.Trust(actor_id=self.id, peerid=peerid, config=self.config) if not new_trust.create( baseuri=baseuri, secret=secret or "", peer_type=trust_type or "", approved=approved, peer_approved=peer_approved if peer_approved is not None else False, relationship=relationship, verified=verified, desc=desc, established_via="trust", ): return False else: return new_trust.get()
[docs] def delete_reciprocal_trust(self, peerid=None, delete_peer=False): """Deletes a trust relationship and requests deletion of peer's relationship as well.""" failed_once = False # For multiple relationships, this will be True if at least one deletion at peer failed success_once = False # True if at least one relationship was deleted at peer if not peerid: rels = self.get_trust_relationships() else: rels = self.get_trust_relationships(peerid=peerid) for rel in rels: # For OAuth2-established trusts, there is no remote actor endpoint to call. # Skip remote deletion and delete locally only. is_oauth2_trust = ( (rel.get("established_via") == "oauth2") or (rel.get("established_via") == "oauth2_client") or (rel.get("type") == "oauth2") or (rel.get("type") == "oauth2_client") or (str(rel.get("peerid", "")).startswith("oauth2:")) or (str(rel.get("peerid", "")).startswith("oauth2_client:")) ) # Additional safety check: prevent self-deletion if baseuri points to this actor is_self_deletion = ( rel.get("baseuri", "").endswith(f"/{self.id}") or rel.get("baseuri", "") == f"{self.config.root}{self.id}" if self.config else False ) if delete_peer and not is_oauth2_trust and not is_self_deletion: url = rel["baseuri"] + "/trust/" + rel["relationship"] + "/" + self.id headers = {} if rel["secret"]: headers = { "Authorization": "Bearer " + rel["secret"], } logger.info(f"Deleting reciprocal relationship at {url}") try: response = requests.delete( url=url, headers=headers, timeout=(5, 10) ) except Exception: logger.debug( "Failed to delete reciprocal relationship at url(" + url + ")" ) failed_once = True continue if ( response.status_code < 200 or response.status_code > 299 ) and response.status_code != 404: logger.debug( "Failed to delete reciprocal relationship at url(" + url + ")" ) failed_once = True continue else: success_once = True elif delete_peer and (is_oauth2_trust or is_self_deletion): # Treat as successful remote delete for OAuth2 trusts and self-deletions reason = ( "OAuth2-established trust" if is_oauth2_trust else "self-deletion detected" ) logger.debug( f"Skipping remote delete for {reason}; deleting locally only" ) success_once = True if not self.subs_list: self.subs_list = subscription.Subscriptions( actor_id=self.id, config=self.config ).fetch() # Delete this peer's subscriptions if self.subs_list: for sub in self.subs_list: if sub["peerid"] == rel["peerid"]: logger.debug( "Deleting subscription(" + sub["subscriptionid"] + ") as part of trust deletion." ) sub_obj = self.get_subscription_obj( peerid=sub["peerid"], subid=sub["subscriptionid"], callback=sub["callback"], ) if sub_obj: sub_obj.delete() # Delete associated trust permissions before deleting the trust try: from .trust_permissions import TrustPermissionStore if self.config is not None and self.id is not None: permission_store = TrustPermissionStore(self.config) permission_store.delete_permissions(self.id, rel["peerid"]) except Exception as e: logger.warning( f"Failed to delete trust permissions for {rel['peerid']}: {e}" ) # Clean up remote peer data (RemotePeerStore) # No config check needed - delete_all() is a no-op if no data exists try: from .interface.actor_interface import ActorInterface from .remote_storage import RemotePeerStore actor_interface = ActorInterface(self) store = RemotePeerStore( actor_interface, rel["peerid"], validate_peer_id=False, ) store.delete_all() logger.info(f"Cleaned up RemotePeerStore for peer {rel['peerid']}") except ImportError: pass # RemotePeerStore not available except Exception as e: logger.warning( f"Failed to cleanup RemotePeerStore for {rel['peerid']}: {e}" ) # Clean up callback processor state # No config check needed - clear operation is a no-op if no state exists try: from .callback_processor import CallbackProcessor processor = CallbackProcessor(self) # type: ignore[arg-type] processor.clear_all_state_for_peer(rel["peerid"]) logger.info( f"Cleaned up CallbackProcessor state for peer {rel['peerid']}" ) except ImportError: pass # CallbackProcessor not available except Exception as e: logger.warning( f"Failed to cleanup callback state for {rel['peerid']}: {e}" ) # Clean up cached peer profile # No config check needed - delete is a no-op if no profile exists if self.config is not None and self.id is not None: try: from .peer_profile import get_peer_profile_store profile_store = get_peer_profile_store(self.config) profile_store.delete_profile(self.id, rel["peerid"]) logger.info(f"Cleaned up peer profile for peer {rel['peerid']}") except ImportError: pass # Peer profile system not available except Exception as e: logger.warning( f"Failed to cleanup peer profile for {rel['peerid']}: {e}" ) # Clean up cached peer capabilities (methods/actions) # No config check needed - delete is a no-op if nothing cached if self.config is not None and self.id is not None: try: from .peer_capabilities import get_cached_capabilities_store capabilities_store = get_cached_capabilities_store(self.config) capabilities_store.delete_capabilities(self.id, rel["peerid"]) logger.info( f"Cleaned up peer capabilities for peer {rel['peerid']}" ) except ImportError: pass # Peer capabilities system not available except Exception as e: logger.warning( f"Failed to cleanup peer capabilities for {rel['peerid']}: {e}" ) # Clean up cached peer permissions # No config check needed - delete is a no-op if nothing cached if self.config is not None and self.id is not None: try: from .peer_permissions import get_peer_permission_store peer_permissions_store = get_peer_permission_store(self.config) peer_permissions_store.delete_permissions(self.id, rel["peerid"]) logger.info( f"Cleaned up peer permissions cache for peer {rel['peerid']}" ) except ImportError: pass # Peer permissions caching not available except Exception as e: logger.warning( f"Failed to cleanup peer permissions cache for {rel['peerid']}: {e}" ) # Finally, delete the trust record itself dbtrust = trust.Trust( actor_id=self.id, peerid=rel["peerid"], config=self.config ) dbtrust.delete() if delete_peer and (not success_once or failed_once): return False return True
[docs] def create_subscription( self, peerid=None, target=None, subtarget=None, resource=None, granularity=None, subid=None, callback=False, ): new_sub = subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config, ) new_sub.create( target=target, subtarget=subtarget, resource=resource, granularity=granularity, ) return new_sub.get()
[docs] def create_remote_subscription( self, peerid=None, target=None, subtarget=None, resource=None, granularity=None ): """Creates a new subscription at peerid.""" if not peerid or not target: return False relationships = self.get_trust_relationships(peerid=peerid) if not relationships: return False peer = relationships[0] params = { "id": self.id, "target": target, } if subtarget: params["subtarget"] = subtarget if resource: params["resource"] = resource if granularity and len(granularity) > 0: params["granularity"] = granularity requrl = peer["baseuri"] + "/subscriptions/" + self.id data = json.dumps(params) headers = { "Authorization": "Bearer " + peer["secret"], "Content-Type": "application/json", } try: logger.debug( "Creating remote subscription at url(" + requrl + ") with body (" + str(data) + ")" ) response = requests.post( url=requrl, data=data, headers=headers, timeout=(5, 10) ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) except Exception: return None try: logger.debug( "Created remote subscription at url(" + requrl + ") and got JSON response (" + str(response.content) + ")" ) content_str = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) data = json.loads(content_str) except ValueError: return None if "subscriptionid" in data: subid = data["subscriptionid"] else: return None if self.last_response_code == 201: self.create_subscription( peerid=peerid, target=target, subtarget=subtarget, resource=resource, granularity=granularity, subid=subid, callback=True, ) if "Location" in response.headers: return response.headers["Location"] elif "location" in response.headers: return response.headers["location"] else: return None
[docs] def get_subscriptions( self, peerid: str | None = None, target: str | None = None, subtarget: str | None = None, resource: str | None = None, callback: bool | None = None, ) -> list[dict[str, Any]] | None: """Retrieves subscriptions from db. Args: peerid: Filter by peer ID (None = all peers) target: Filter by target (None = all targets) subtarget: Filter by subtarget (None = all subtargets) resource: Filter by resource (None = all resources) callback: Filter by callback flag (None = all, False = inbound, True = outbound) Returns: List of subscription dictionaries, or None if actor has no ID """ if not self.id: return None if not self.subs_list: self.subs_list = subscription.Subscriptions( actor_id=self.id, config=self.config ).fetch() ret = [] if self.subs_list: for sub in self.subs_list: if not peerid or (peerid and sub["peerid"] == peerid): if not target or (target and sub["target"] == target): if not subtarget or ( subtarget and sub["subtarget"] == subtarget ): if not resource or ( resource and sub["resource"] == resource ): if callback is None or sub["callback"] == callback: ret.append(sub) return ret
[docs] def get_subscription(self, peerid=None, subid=None, callback=False): """Retrieves a single subscription identified by peerid and subid.""" if not subid: return False return subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config, ).get()
[docs] def get_subscription_obj(self, peerid=None, subid=None, callback=False): """Retrieves a single subscription identified by peerid and subid.""" if not subid: return False return subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config, )
[docs] def delete_remote_subscription(self, peerid=None, subid=None): if not subid or not peerid: return False trust_rel = self.get_trust_relationship(peerid=peerid) if not trust_rel: return False sub = self.get_subscription(peerid=peerid, subid=subid) if not sub: sub = self.get_subscription(peerid=peerid, subid=subid, callback=True) if not sub or "callback" not in sub or not sub["callback"]: url = trust_rel["baseuri"] + "/subscriptions/" + self.id + "/" + subid else: url = ( trust_rel["baseuri"] + "/callbacks/subscriptions/" + self.id + "/" + subid ) headers = { "Authorization": "Bearer " + trust_rel["secret"], } try: logger.info(f"Deleting remote subscription at {url}") response = requests.delete(url=url, headers=headers, timeout=(5, 10)) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) if response.status_code == 204: return True else: logger.debug("Failed to delete remote subscription at url(" + url + ")") return False except Exception: return False
[docs] def delete_subscription(self, peerid=None, subid=None, callback=False): """Deletes a specified subscription""" if not subid: return False # For outbound subscriptions (callback=True), check if we need to clean up RemotePeerStore # We need to check this BEFORE deletion to know how many subscriptions exist should_cleanup_remote_peer_store = False if callback and peerid: try: # CRITICAL: Clear the subscription cache to get fresh data from database # Otherwise we might see stale subscriptions or the one we're about to delete self.subs_list = None # Check if we have any other outbound subscriptions to this peer other_subs = self.get_subscriptions(peerid=peerid, callback=True) # Filter out the current subscription being deleted # IMPORTANT: Field name is "subscriptionid" not "subid" remaining_subs = [ s for s in (other_subs or []) if s.get("subscriptionid") != subid ] if remaining_subs: logger.debug( f"Not cleaning up RemotePeerStore for {peerid} - " f"{len(remaining_subs)} other outbound subscription(s) still active" ) else: # Mark for cleanup after successful deletion should_cleanup_remote_peer_store = True except Exception as e: logger.warning( f"Failed to check RemotePeerStore cleanup for {peerid}: {e}" ) sub = subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config, ) result = sub.delete() # Clear subscription cache after deletion to ensure fresh data on next access self.subs_list = None # Clean up RemotePeerStore AFTER successful deletion to avoid data loss # Note: should_cleanup_remote_peer_store is only True when peerid is not None if result and should_cleanup_remote_peer_store and peerid: try: from .interface.actor_interface import ActorInterface from .remote_storage import RemotePeerStore # No other outbound subscriptions to this peer, safe to clean up actor_interface = ActorInterface(self) # type: ignore[arg-type] store = RemotePeerStore( actor_interface, peerid, validate_peer_id=False, ) store.delete_all() logger.info(f"Cleaned up RemotePeerStore for peer {peerid}") except ImportError: pass # RemotePeerStore not available except Exception as e: logger.warning(f"Failed to clean up RemotePeerStore for {peerid}: {e}") return result
[docs] def callback_subscription( self, peerid=None, sub_obj=None, sub=None, diff=None, blob=None ): if not peerid or not diff or not sub or not blob: logger.warning("Missing parameters in callbackSubscription") return if "granularity" in sub and sub["granularity"] == "none": return trust_rel = self.get_trust_relationship(peerid) if not trust_rel: return # Filter blob based on peer permissions for property subscriptions if sub.get("target") == "properties": filtered_blob = self._filter_subscription_data_by_permissions( peerid=peerid, blob=blob, subtarget=sub.get("subtarget"), ) if filtered_blob is None: return # Nothing to send after filtering blob = filtered_blob params = { "id": self.id, "subscriptionid": sub["subscriptionid"], "target": sub["target"], "sequence": diff["sequence"], "timestamp": str(diff["timestamp"]), "granularity": sub["granularity"], } if sub["subtarget"]: params["subtarget"] = sub["subtarget"] if sub["resource"]: params["resource"] = sub["resource"] if sub["granularity"] == "high": try: params["data"] = json.loads(blob) except (TypeError, ValueError, KeyError): params["data"] = blob if sub["granularity"] == "low": params["url"] = ( (self.config.root if self.config else "") + (self.id or "") + "/subscriptions/" + trust_rel["peerid"] + "/" + sub["subscriptionid"] + "/" + str(diff["sequence"]) ) requrl = ( trust_rel["baseuri"] + "/callbacks/subscriptions/" + self.id + "/" + sub["subscriptionid"] ) data = json.dumps(params) headers = { "Authorization": "Bearer " + trust_rel["secret"], "Content-Type": "application/json", } # Helper function for sync callback def _send_callback_sync(): """Send subscription callback using requests (blocking).""" try: logger.debug( "Doing sync callback on subscription at url(" + requrl + ") with body(" + str(data) + ")" ) response = requests.post( url=requrl, data=data.encode("utf-8"), headers=headers, timeout=(5, 10), ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) # Log the response for debugging callback delivery issues if response.status_code == 204: logger.info( f"Callback seq={diff.get('sequence')} delivered successfully (204)" ) else: logger.warning( f"Callback seq={diff.get('sequence')} returned {response.status_code}: " f"{self.last_response_message[:200] if self.last_response_message else 'no message'}" ) # NOTE: Don't clear diffs immediately after 204 response. The subscriber # might have added the callback to a pending queue (due to sequence gaps), # and the diff would be lost. Diffs are cleared when the subscriber # explicitly confirms processing via PUT /subscriptions/{id} with sequence. except (requests.RequestException, requests.Timeout, ConnectionError) as e: logger.warning( f"Callback seq={diff.get('sequence')} failed - peer did not respond: {e}" ) self.last_response_code = 0 self.last_response_message = ( "No response from peer for subscription callback" ) # Check if sync callbacks are forced (recommended for Lambda/serverless) use_sync = getattr(self.config, "sync_subscription_callbacks", False) if use_sync: logger.info( f"Sync callback seq={diff.get('sequence')} to {sub.get('peerid', 'unknown')}" ) _send_callback_sync() return # Fire callback asynchronously to avoid blocking the caller async def _send_callback_async(): """Send subscription callback using httpx (non-blocking).""" import httpx try: logger.debug( "Doing async callback on subscription at url(" + requrl + ") with body(" + str(data) + ")" ) async with httpx.AsyncClient( timeout=httpx.Timeout(10.0, connect=5.0) ) as client: response = await client.post( requrl, content=data.encode("utf-8"), headers=headers ) self.last_response_code = response.status_code self.last_response_message = ( response.content.decode("utf-8", "ignore") if isinstance(response.content, bytes) else str(response.content) ) # NOTE: Don't clear diffs immediately after 204 response. The subscriber # might have added the callback to a pending queue (due to sequence gaps), # and the diff would be lost. Diffs are cleared when the subscriber # explicitly confirms processing via PUT /subscriptions/{id} with sequence. except (httpx.HTTPError, httpx.TimeoutException) as e: logger.debug(f"Peer did not respond to callback on url({requrl}): {e}") self.last_response_code = 0 self.last_response_message = ( "No response from peer for subscription callback" ) # Schedule the async callback without blocking (may be lost on Lambda freeze!) try: import asyncio loop = asyncio.get_running_loop() # We're in an async context - create a background task loop.create_task(_send_callback_async()) logger.debug( f"Async callback seq={diff.get('sequence')} to {sub.get('peerid', 'unknown')} (fire-and-forget)" ) except RuntimeError: # No running event loop - fall back to sync request logger.debug("No async loop, falling back to sync callback") _send_callback_sync()
[docs] async def create_verified_trust_async( self, baseuri="", peerid=None, approved=False, secret=None, verification_token=None, trust_type=None, peer_approved=None, relationship=None, desc="", ): """Async version - wraps sync method in asyncio.to_thread to prevent blocking.""" import asyncio return await asyncio.to_thread( self.create_verified_trust, baseuri=baseuri, peerid=peerid, approved=approved, secret=secret, verification_token=verification_token, trust_type=trust_type, peer_approved=peer_approved, relationship=relationship, desc=desc, )
[docs] async def delete_reciprocal_trust_async(self, peerid=None, delete_peer=False): """Async version - wraps sync method in asyncio.to_thread to prevent blocking.""" import asyncio return await asyncio.to_thread( self.delete_reciprocal_trust, peerid=peerid, delete_peer=delete_peer )
[docs] async def create_remote_subscription_async( self, peerid=None, target=None, subtarget=None, resource=None, granularity=None, ): """Async version - wraps sync method in asyncio.to_thread to prevent blocking.""" import asyncio return await asyncio.to_thread( self.create_remote_subscription, peerid=peerid, target=target, subtarget=subtarget, resource=resource, granularity=granularity, )
[docs] async def delete_remote_subscription_async(self, peerid=None, subid=None): """Async version - wraps sync method in asyncio.to_thread to prevent blocking.""" import asyncio return await asyncio.to_thread( self.delete_remote_subscription, peerid=peerid, subid=subid )
[docs] async def callback_subscription_async( self, peerid=None, sub_obj=None, sub=None, diff=None, blob=None ): """Async version - wraps sync method in asyncio.to_thread to prevent blocking.""" import asyncio return await asyncio.to_thread( self.callback_subscription, peerid=peerid, sub_obj=sub_obj, sub=sub, diff=diff, blob=blob, )
def _filter_subscription_data_by_permissions( self, peerid: str, blob: str | bytes, subtarget: str | None = None ) -> str | None: """Filter subscription data based on peer's property permissions. Returns filtered blob as JSON string, or None if nothing passes the filter. Implements fail-closed: errors result in no data sent. """ try: if not self.config or not self.id: logger.warning("Missing config or actor ID for subscription filtering") return None # Fail-closed evaluator = get_permission_evaluator(self.config) if not evaluator: logger.warning( f"Permission evaluator not available for subscription filtering to {peerid}" ) return None # Fail-closed # Parse blob with explicit encoding handling for bytes if isinstance(blob, bytes): data = json.loads(blob.decode("utf-8")) elif isinstance(blob, str): data = json.loads(blob) else: data = blob if not isinstance(data, dict): logger.debug(f"Cannot filter non-dict subscription data: {type(data)}") return blob if isinstance(blob, str) else json.dumps(blob) filtered_data = {} for property_name, value in data.items(): # Normalize property list keys: strip 'list:' prefix for permission checks # Property lists use 'list:name' internally but permissions use 'name' normalized_name = ( property_name[5:] if property_name.startswith("list:") else property_name ) # Build full property path for permission check property_path = ( f"{subtarget}/{normalized_name}" if subtarget else normalized_name ) result = evaluator.evaluate_property_access( self.id, peerid, property_path, operation="read" ) if result == PermissionResult.ALLOWED: filtered_data[property_name] = value else: logger.debug( f"Filtered property {property_path} from subscription callback to {peerid}" ) if not filtered_data: logger.debug( f"No permitted properties in callback to {peerid}, skipping" ) return None return json.dumps(filtered_data) except Exception as e: logger.error(f"Permission filtering failed for subscription callback: {e}") return None # Fail-closed: don't send data on error # ========================================================================= # Subscription Suspension Management # =========================================================================
[docs] def is_subscription_suspended( self, target: str, subtarget: str | None = None ) -> bool: """Check if diff registration is suspended for a target/subtarget. Args: target: Target resource (e.g., "properties") subtarget: Optional subtarget (e.g., property name) Returns: True if suspended, False otherwise """ if not self.config or not self.id: return False try: db = get_subscription_suspension(self.config, self.id) return db.is_suspended(target, subtarget) except Exception as e: logger.debug(f"Error checking suspension: {e}") return False
[docs] def suspend_subscriptions(self, target: str, subtarget: str | None = None) -> bool: """Suspend diff registration for a target/subtarget. While suspended, property changes will NOT register diffs or trigger callbacks. Call resume_subscriptions() to lift suspension and send resync callbacks. Args: target: Target resource (e.g., "properties") subtarget: Optional subtarget (e.g., property name) Returns: True if newly suspended, False if already suspended """ if not self.config or not self.id: return False try: db = get_subscription_suspension(self.config, self.id) return db.suspend(target, subtarget) except Exception as e: logger.error(f"Error suspending subscriptions: {e}") return False
[docs] def resume_subscriptions(self, target: str, subtarget: str | None = None) -> int: """Resume diff registration and send resync callbacks. Sends a resync callback to ALL subscriptions on this target/subtarget, telling them to do a full GET to re-sync their state. Args: target: Target resource (e.g., "properties") subtarget: Optional subtarget (e.g., property name) Returns: The number of resync callbacks sent """ if not self.config or not self.id: return 0 try: db = get_subscription_suspension(self.config, self.id) if not db.resume(target, subtarget): return 0 # Wasn't suspended # Find all affected subscriptions and send resync callbacks return self._send_resync_callbacks(target, subtarget) except Exception as e: logger.error(f"Error resuming subscriptions: {e}") return 0
def _send_resync_callbacks(self, target: str, subtarget: str | None) -> int: """Send resync callbacks to all subscriptions on target/subtarget. Args: target: Target resource subtarget: Optional subtarget Returns: Number of callbacks sent successfully """ subs = self.get_subscriptions(target=target, subtarget=None, callback=False) if not subs: return 0 count = 0 for sub in subs: sub_target = sub.get("target", "") sub_subtarget = sub.get("subtarget") # Match target if sub_target != target: continue # Match subtarget if specified # Empty subtarget in subscription means "all subtargets", so it matches any filter if subtarget is not None and sub_subtarget and sub_subtarget != subtarget: continue # Send resync callback # If we resumed a specific subtarget, use that for the resync # (even if the subscription itself has no subtarget or a different one) resync_subtarget = subtarget if subtarget else sub_subtarget if self._callback_subscription_resync(sub, resync_subtarget): count += 1 logger.info(f"Sent {count} resync callbacks for {target}/{subtarget}") return count def _send_resync_callback_sync( self, callback_url: str, payload: dict, secret: str, peer_id: str ) -> bool: """Send resync callback synchronously (blocking). Args: callback_url: URL to send the callback to payload: Callback payload secret: Trust secret for authentication peer_id: Peer ID (for logging) Returns: True if callback was sent successfully """ import httpx try: with httpx.Client(timeout=30.0) as client: response = client.post( callback_url, json=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {secret}", }, ) if response.status_code in (200, 204): logger.info( f"Sent resync callback to {peer_id} for subscription " f"{payload.get('subscriptionid')}" ) return True else: logger.warning( f"Resync callback to {peer_id} failed: {response.status_code}" ) return False except Exception as e: logger.error(f"Error sending resync callback to {peer_id}: {e}") return False def _send_resync_callback_async( self, callback_url: str, payload: dict, secret: str, peer_id: str ) -> None: """Send resync callback asynchronously (fire-and-forget). Args: callback_url: URL to send the callback to payload: Callback payload secret: Trust secret for authentication peer_id: Peer ID (for logging) """ async def _send(): import httpx try: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post( callback_url, json=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {secret}", }, ) if response.status_code in (200, 204): logger.info( f"Sent resync callback to {peer_id} for subscription " f"{payload.get('subscriptionid')}" ) else: logger.warning( f"Resync callback to {peer_id} failed: {response.status_code}" ) except Exception as e: logger.error(f"Error sending resync callback to {peer_id}: {e}") try: import asyncio loop = asyncio.get_running_loop() loop.create_task(_send()) logger.debug( f"Async resync callback to {peer_id} for subscription " f"{payload.get('subscriptionid')} (fire-and-forget)" ) except RuntimeError: # No event loop - fallback to sync logger.debug("No async loop, falling back to sync resync callback") self._send_resync_callback_sync(callback_url, payload, secret, peer_id) def _callback_subscription_resync( self, subscription: dict, override_subtarget: str | None = None ) -> bool: """Send a resync callback to a single subscription. Checks peer capability before sending resync. If peer doesn't support the subscriptionresync option, falls back to low-granularity callback. Respects the sync_subscription_callbacks configuration to determine whether to send callbacks synchronously (blocking) or asynchronously (fire-and-forget). Args: subscription: Subscription dict with peerid, subscriptionid, callback, etc. override_subtarget: Optional subtarget to use instead of subscription's subtarget (used when resuming a specific subtarget on a broader subscription) Returns: True if callback was sent/scheduled successfully """ from datetime import UTC, datetime peer_id = subscription.get("peerid", "") sub_id = subscription.get("subscriptionid", "") target = subscription.get("target", "") # Use override_subtarget if provided, otherwise use subscription's subtarget sub_subtarget = ( override_subtarget if override_subtarget is not None else subscription.get("subtarget") ) # Get trust relationship to construct callback URL from .trust import Trust trust = Trust(actor_id=self.id, peerid=peer_id, config=self.config) trust_data = trust.get() if not trust_data: logger.warning(f"No trust found for peer {peer_id}") return False # Construct callback URL from trust relationship (same as callback_subscription) callback_url = ( trust_data.get("baseuri", "") + "/callbacks/subscriptions/" + (self.id or "") + "/" + sub_id ) if not callback_url or not trust_data.get("baseuri"): logger.warning( f"No callback URL for subscription {sub_id} - missing baseuri in trust" ) return False # Check if peer supports resync callbacks (use cached data only) from .interface.actor_interface import ActorInterface from .peer_capabilities import PeerCapabilities actor_interface = ActorInterface(self) caps = PeerCapabilities(actor_interface, peer_id) # Use cached capabilities without blocking on network fetch # If cache is expired or not available, assume support (optimistic) supports_resync_cached = caps.supports_resync_callbacks_cached() if supports_resync_cached is None: # Cache expired or not available - assume support to avoid blocking # This is optimistic but safe: if peer doesn't support resync, # the receiver will process it as a regular low-granularity callback supports_resync = True logger.debug( f"Capabilities not cached for peer {peer_id}, " f"assuming resync support (optimistic)" ) # Schedule background refresh to update cache for next time # This doesn't block the current operation try: import asyncio async def _refresh_capabilities(): try: await caps.refresh_async() logger.debug( f"Background refresh of capabilities for {peer_id}" ) except Exception as e: logger.debug( f"Background capability refresh failed for {peer_id}: {e}" ) loop = asyncio.get_running_loop() loop.create_task(_refresh_capabilities()) except RuntimeError: # No event loop - skip background refresh # Next operation will still use optimistic approach pass else: supports_resync = supports_resync_cached logger.debug( f"Using cached capability for peer {peer_id}: " f"resync_supported={supports_resync}" ) # Increment sequence number new_seq = self._increment_subscription_sequence(peer_id, sub_id) # Build resource URL for resync if not self.config: logger.warning("No config available for building resource URL") return False resource_url = f"{self.config.proto}{self.config.fqdn}/{self.id}/{target}" if sub_subtarget: resource_url += f"/{sub_subtarget}" # Build callback payload - use resync type only if peer supports it if supports_resync: # Resync callback per protocol spec v1.4 payload = { "id": self.id, "subscriptionid": sub_id, "target": target, "subtarget": sub_subtarget, "sequence": new_seq, "timestamp": datetime.now(UTC).isoformat(), "granularity": subscription.get("granularity", "high"), "type": "resync", "url": resource_url, } logger.debug(f"Sending resync callback to peer {peer_id} (supports resync)") else: # Fallback: create a full-state diff and send low-granularity callback # Get the full current state full_state = self._get_full_state_for_subscription(target, sub_subtarget) # Store as a subscription diff diff_url = self._store_subscription_diff( peer_id, sub_id, new_seq, full_state ) # Send low-granularity callback with URL to subscription diff payload = { "id": self.id, "subscriptionid": sub_id, "target": target, "subtarget": sub_subtarget, "sequence": new_seq, "timestamp": datetime.now(UTC).isoformat(), "granularity": "low", "url": diff_url, } logger.info( f"Peer {peer_id} does not support resync callbacks, " f"creating full-state diff for low-granularity callback" ) # Get trust secret for authentication (already fetched above) secret = trust_data.get("secret", "") # Check sync configuration (like diff callbacks do) use_sync = getattr(self.config, "sync_subscription_callbacks", False) if use_sync: # Lambda mode: blocking call logger.info(f"Sync resync callback to {peer_id}") return self._send_resync_callback_sync( callback_url, payload, secret, peer_id ) else: # Local mode: async fire-and-forget self._send_resync_callback_async(callback_url, payload, secret, peer_id) return True # Scheduled (not confirmed) def _get_full_state_for_subscription( self, target: str, subtarget: str | None ) -> dict[str, Any]: """Get the full current state for a subscription target. Args: target: Subscription target (e.g., "properties") subtarget: Optional subtarget (e.g., list name for property lists) Returns: Dict containing the full state """ if target == "properties": if subtarget: # Specific property list or property if hasattr(self, "property_lists") and self.property_lists: if self.property_lists.exists(subtarget): # It's a list - return all items list_attr = getattr(self.property_lists, subtarget) items = list(list_attr) logger.debug( f"Getting full state for list '{subtarget}': {len(items)} items" ) # Return as list operation format for diff return { subtarget: { "list": subtarget, "operation": "extend", "items": items, } } # Try as scalar property prop_data = self.get_property(subtarget) if prop_data is not None: logger.debug(f"Getting full state for property '{subtarget}'") return {subtarget: prop_data} logger.warning(f"Subtarget '{subtarget}' not found as list or property") return {} else: # All properties - get both scalars and lists result = {} # Get scalar properties all_props = self.get_properties() if all_props: result.update(all_props) # Get property lists if hasattr(self, "property_lists") and self.property_lists: for list_name in self.property_lists.list_all(): list_attr = getattr(self.property_lists, list_name) items = list(list_attr) result[list_name] = { "list": list_name, "operation": "extend", "items": items, } logger.debug( f"Getting full state for all properties: {len(result)} keys" ) return result return {} def _store_subscription_diff( self, peer_id: str, subscription_id: str, sequence: int, data: dict[str, Any] ) -> str: """Store a subscription diff and return its URL. Args: peer_id: Peer actor ID subscription_id: Subscription ID sequence: Sequence number (already incremented) data: Diff data to store Returns: URL to fetch this diff """ import json from .db import get_subscription_diff if not self.config: logger.error("No config available for storing subscription diff") return "" # Store diff using low-level diff protocol (sequence already incremented) logger.debug( f"Storing subscription diff for {subscription_id} seq={sequence}, " f"data keys: {list(data.keys())}" ) diff_handle = get_subscription_diff(self.config) blob = json.dumps(data) logger.debug(f"Diff blob size: {len(blob)} chars") success = diff_handle.create( actor_id=self.id, subid=subscription_id, diff=blob, seqnr=sequence, ) if success: logger.info( f"Successfully stored subscription diff for {subscription_id} seq={sequence}" ) else: logger.error( f"Failed to store subscription diff for {subscription_id} seq={sequence}, " f"actor_id={self.id}" ) # Build URL to this diff # Format: /{actor_id}/subscriptions/{peer_id}/{subscription_id}/{sequence} diff_url = ( f"{self.config.proto}{self.config.fqdn}/{self.id}" f"/subscriptions/{peer_id}/{subscription_id}/{sequence}" ) logger.debug(f"Diff URL: {diff_url}") return diff_url def _increment_subscription_sequence( self, peer_id: str, subscription_id: str ) -> int: """Increment and return the new sequence number for a subscription. Args: peer_id: Peer actor ID subscription_id: Subscription ID Returns: New sequence number """ sub_obj = self.get_subscription_obj(peerid=peer_id, subid=subscription_id) if not sub_obj: return 1 # Use the increase_seq() method which increments and returns new sequence new_seq = sub_obj.increase_seq() return new_seq if new_seq else 1 # ========================================================================= # Diff Registration # =========================================================================
[docs] def register_diffs(self, target=None, subtarget=None, resource=None, blob=None): """Registers a blob diff against all subscriptions with the correct target, subtarget, and resource. If resource is set, the blob is expected to be the FULL resource object, not a diff. Note: Skips registration if the target/subtarget is currently suspended. Use suspend_subscriptions() and resume_subscriptions() to manage suspension. """ if blob is None or not target: return # Check suspension BEFORE registering diffs if self.is_subscription_suspended(target, subtarget): logger.debug( f"Skipping diff registration for {target}/{subtarget}: suspended" ) return # Get all subscriptions, both with the specific subtarget/resource and those # without subs = self.get_subscriptions( target=target, subtarget=None, resource=None, callback=False ) if not subs: subs = [] if subtarget and resource: logger.debug( "register_diffs() - blob(%s chars), target(%s), subtarget(%s), resource(%s), # of subs(%s)", len(blob), target, subtarget, resource, len(subs), ) elif subtarget: logger.debug( "register_diffs() - blob(%s chars), target(%s), subtarget(%s), # of subs(%s)", len(blob), target, subtarget, len(subs), ) else: logger.debug( "register_diffs() - blob(%s chars), target(%s), # of subs(%s)", len(blob), target, len(subs), ) for sub in subs: # Skip the ones without correct subtarget if subtarget and sub["subtarget"] and sub["subtarget"] != subtarget: logger.debug(" - no match on subtarget, skipping...") continue # Skip the ones without correct resource if resource and sub["resource"] and sub["resource"] != resource: logger.debug(" - no match on resource, skipping...") continue sub_obj = self.get_subscription_obj( peerid=sub["peerid"], subid=sub["subscriptionid"] ) if not sub_obj: continue sub_obj_data = sub_obj.get() logger.debug( " - processing subscription(%s) for peer(%s) with target(%s) subtarget(%s) and resource(%s)", sub["subscriptionid"], sub["peerid"], sub_obj_data["target"], sub_obj_data["subtarget"] or "", sub_obj_data["resource"] or "", ) # Subscription with a resource, but this diff is on a higher level if ( (not resource or not subtarget) and sub_obj_data["subtarget"] and sub_obj_data["resource"] ): # Create a json diff on the subpart that this subscription # covers try: jsonblob = json.loads(blob) if not subtarget: subblob = json.dumps( jsonblob[sub_obj_data["subtarget"]][ sub_obj_data["resource"] ] ) else: subblob = json.dumps(jsonblob[sub_obj_data["resource"]]) except (TypeError, ValueError, KeyError): # The diff does not contain the resource logger.debug( " - subscription has resource(%s), no matching blob found in diff", sub_obj_data["resource"], ) continue logger.debug( " - subscription has resource(%s), adding diff(%s chars)", sub_obj_data["resource"], len(subblob), ) finblob = subblob # The diff is on the resource, but the subscription is on a # higher level elif resource and not sub_obj_data["resource"]: # Since we have a resource, we know the blob is the entire resource, not a diff # If the subscription is for a sub-target, send [resource] = blob # If the subscription is for a target, send [subtarget][resource] = blob upblob = {} try: jsonblob = json.loads(blob) if not sub_obj_data["subtarget"]: upblob[subtarget] = {} upblob[subtarget][resource] = jsonblob else: upblob[resource] = jsonblob except (TypeError, ValueError, KeyError): if not sub_obj_data["subtarget"]: upblob[subtarget] = {} upblob[subtarget][resource] = blob else: upblob[resource] = blob finblob = json.dumps(upblob) logger.debug( " - diff has resource(%s), subscription has not, adding diff(%s bytes)", resource, len(finblob), ) # Subscriptions with subtarget, but this diff is on a higher level elif not subtarget and sub_obj_data["subtarget"]: # Create a json diff on the subpart that this subscription # covers subblob = None try: jsonblob = json.loads(blob) subblob = json.dumps(jsonblob[sub_obj_data["subtarget"]]) except (TypeError, ValueError, KeyError): # The diff blob does not contain the subtarget pass logger.debug( " - subscription has subtarget(%s), adding diff(%s bytes)", sub_obj_data["subtarget"], len(subblob) if subblob else 0, ) finblob = subblob # The diff is on the subtarget, but the subscription is on the # higher level elif subtarget and not sub_obj_data["subtarget"]: # Create a data["subtarget"] = blob diff to give correct level # of diff to subscriber upblob = {} try: jsonblob = json.loads(blob) upblob[subtarget] = jsonblob except (TypeError, ValueError, KeyError): upblob[subtarget] = blob finblob = json.dumps(upblob) logger.debug( " - diff has subtarget(%s), subscription has not, adding diff(%s chars)", subtarget, len(finblob), ) else: # The diff is correct for the subscription logger.debug( " - exact target/subtarget match, adding diff(%s chars)", len(blob), ) finblob = blob if sub_obj: diff = sub_obj.add_diff(blob=finblob) else: diff = None if not diff: logger.warning( "Failed when registering a diff to subscription (%s). Will not send callback.", sub["subscriptionid"], ) else: # Direct call - callback_subscription handles sync/async internally self.callback_subscription( peerid=sub["peerid"], sub_obj=sub_obj, sub=sub_obj_data, diff=diff, blob=finblob, )
[docs] class Actors: """Handles all actors"""
[docs] def fetch(self): if not self.list: return False if self.actors is not None: return self.actors self.actors = self.list.fetch() return self.actors
def __init__(self, config=None): self.config = config if self.config: self.list = get_actor_list(self.config) else: self.list = None self.actors = None self.fetch()