Source code for actingweb.handlers.subscription

import json
import logging

from actingweb.handlers import base_handler

logger = logging.getLogger(__name__)


[docs] class SubscriptionRootHandler(base_handler.BaseHandler): """Handles requests to /subscription"""
[docs] def get(self, actor_id): if self.request.get("_method") == "POST": self.post(actor_id) return myself = self.require_authenticated_actor(actor_id, "subscriptions", "GET") if not myself: return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return peerid = self.request.get("peerid") target = self.request.get("target") subtarget = self.request.get("subtarget") resource = self.request.get("resource") # Use SubscriptionManager to get subscriptions if peerid: # Return both directions: outbound (we subscribed to them) and inbound (they subscribed to us) outbound = actor_interface.subscriptions.get_subscriptions_to_peer(peerid) inbound = actor_interface.subscriptions.get_subscriptions_from_peer(peerid) subscription_infos = outbound + inbound elif target: subscription_infos = ( actor_interface.subscriptions.get_subscriptions_for_target( target, subtarget or "", resource or "" ) ) else: subscription_infos = actor_interface.subscriptions.all_subscriptions # Convert SubscriptionInfo objects to dicts for JSON response subscriptions = [sub.to_dict() for sub in subscription_infos] # Return empty data array with 200 OK when no subscriptions exist (SPA-friendly, spec v1.2) if not subscriptions: subscriptions = [] data = { "id": myself.id, "data": subscriptions, } out = json.dumps(data) if self.response: self.response.write(out) self.response.headers["Content-Type"] = "application/json" self.response.set_status(200, "Ok")
[docs] def post(self, actor_id): myself = self.require_authenticated_actor(actor_id, "subscriptions", "POST") if not myself: return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return try: body = self.request.body if isinstance(body, bytes): body = body.decode("utf-8", "ignore") elif body is None: body = "{}" params = json.loads(body) peerid = params.get("peerid") target = params.get("target") subtarget = params.get("subtarget", "") resource = params.get("resource", "") granularity = params.get("granularity", "none") except ValueError: peerid = self.request.get("peerid") target = self.request.get("target") subtarget = self.request.get("subtarget") or "" resource = self.request.get("resource") or "" granularity = self.request.get("granularity") or "none" if not peerid or len(peerid) == 0: if self.response: self.response.set_status(400, "Missing peer URL") return if not target or len(target) == 0: if self.response: self.response.set_status(400, "Missing target") return # Use SubscriptionManager to create remote subscription remote_loc = actor_interface.subscriptions.subscribe_to_peer( peer_id=peerid, target=target, subtarget=subtarget, resource=resource, granularity=granularity, ) if not remote_loc: if self.response: self.response.set_status( 408, "Unable to create remote subscription with peer" ) return if self.response: self.response.headers["Location"] = remote_loc self.response.set_status(204, "Created")
# Handling requests to /subscription/*, e.g. /subscription/<peerid>
[docs] class SubscriptionRelationshipHandler(base_handler.BaseHandler):
[docs] def get(self, actor_id, peerid): if self.request.get("_method") == "POST": self.post(actor_id, peerid) return auth_result = self.authenticate_actor(actor_id, "subscriptions") if not auth_result.success: return myself = auth_result.actor # Check authorization - peers can access their own subscriptions if not auth_result.auth_obj.check_authorisation( path="subscriptions", subpath="<id>", method="GET", peerid=peerid, approved=False, # Allow access even if not fully approved ): if self.response: self.response.set_status(403) return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return target = self.request.get("target") subtarget = self.request.get("subtarget") resource = self.request.get("resource") # Use SubscriptionManager to get subscriptions for peer if target: subscription_infos = ( actor_interface.subscriptions.get_subscriptions_for_target( target, subtarget or "", resource or "" ) ) # Filter to only this peer's subscriptions subscription_infos = [ sub for sub in subscription_infos if sub.peer_id == peerid ] else: # Return both directions: outbound (we subscribed to them) and inbound (they subscribed to us) outbound = actor_interface.subscriptions.get_subscriptions_to_peer(peerid) inbound = actor_interface.subscriptions.get_subscriptions_from_peer(peerid) subscription_infos = outbound + inbound # Convert SubscriptionInfo objects to dicts for JSON response subscriptions = [sub.to_dict() for sub in subscription_infos] # Return empty data array with 200 OK when no subscriptions exist (SPA-friendly, spec v1.2) if not subscriptions: subscriptions = [] data = { "id": myself.id, "peerid": peerid, "data": subscriptions, } out = json.dumps(data) if self.response: self.response.write(out) self.response.headers["Content-Type"] = "application/json" self.response.set_status(200, "Ok")
[docs] def post(self, actor_id, peerid): auth_result = self.authenticate_actor(actor_id, "subscriptions") if not auth_result.success: return myself = auth_result.actor if not auth_result.authorize("POST", "subscriptions", "<id>"): return try: body = self.request.body if isinstance(body, bytes): body = body.decode("utf-8", "ignore") elif body is None: body = "{}" params = json.loads(body) if "target" in params: target = params["target"] else: if self.response: self.response.set_status(400, "No target in request") return if "subtarget" in params: subtarget = params["subtarget"] else: subtarget = None if "resource" in params: resource = params["resource"] else: resource = None if "granularity" in params: granularity = params["granularity"] else: granularity = "none" except ValueError: if self.response: self.response.set_status(400, "No json body") return if peerid != auth_result.auth_obj.acl["peerid"]: logger.warning( "Peer " + peerid + " tried to create a subscription for peer " + auth_result.auth_obj.acl["peerid"] ) if self.response: self.response.set_status(403, "Forbidden. Wrong peer id in request") return # Subscription authorization is handled by ACL rules (e.g., ("subscriptions/<id>", "POST", "a")) # The ACL check happens in authenticate_actor/authorize above # Property-level filtering happens at callback time based on permission patterns # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return # Use SubscriptionManager to create local subscription (peer subscribes to us) new_sub = actor_interface.subscriptions.create_local_subscription( peer_id=auth_result.auth_obj.acl["peerid"], target=target, subtarget=subtarget or "", resource=resource or "", granularity=granularity, ) if not new_sub: if self.response: self.response.set_status(500, "Unable to create new subscription") return # Invalidate subscription cache so register_diffs() sees the new subscription myself.subs_list = None if self.response: self.response.headers["Location"] = str( self.config.root + (myself.id or "") + "/subscriptions/" + new_sub["peerid"] + "/" + new_sub["subscriptionid"] ) pair = { "subscriptionid": new_sub["subscriptionid"], "target": new_sub["target"], "subtarget": new_sub["subtarget"], "resource": new_sub["resource"], "granularity": new_sub["granularity"], "sequence": new_sub["sequence"], } out = json.dumps(pair) self.response.write(out) self.response.headers["Content-Type"] = "application/json" self.response.set_status(201, "Created")
[docs] class SubscriptionHandler(base_handler.BaseHandler): """Handling requests to specific subscriptions, e.g. /subscriptions/<peerid>/12f2ae53bd"""
[docs] def get(self, actor_id, peerid, subid): if self.request.get("_method") == "PUT": self.put(actor_id, peerid, subid) return if self.request.get("_method") == "DELETE": self.delete(actor_id, peerid, subid) return auth_result = self.authenticate_actor( actor_id, "subscriptions", subpath=peerid + "/" + subid ) if not auth_result.success: return myself = auth_result.actor if not auth_result.authorize("GET", "subscriptions", "<id>/<subid>"): return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return # Use SubscriptionManager to get subscription with diff operations sub_with_diffs = actor_interface.subscriptions.get_subscription_with_diffs( peer_id=peerid, subscription_id=subid ) if not sub_with_diffs: if self.response: self.response.set_status(404, "Subscription does not exist") return sub_info = sub_with_diffs.subscription_info if not sub_info: if self.response: self.response.set_status(404, "Subscription does not exist") return diffs = sub_with_diffs.get_diffs() pairs = [] for diff in diffs: try: d = json.loads(diff["diff"]) except (TypeError, ValueError, KeyError): d = diff["diff"] pairs.append( { "sequence": diff["sequence"], "timestamp": diff["timestamp"].strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "data": d, } ) # Always return subscription metadata with current sequence, # even when there are no new diffs. This allows subscribers # to synchronize their sequence number with the publisher. sub_dict = sub_info.to_dict() data = { "id": myself.id, "peerid": peerid, "subscriptionid": subid, "target": sub_dict["target"], "subtarget": sub_dict["subtarget"], "resource": sub_dict["resource"], "sequence": sub_dict["sequence"], "data": pairs, } out = json.dumps(data) if self.response: self.response.write(out) self.response.headers["Content-Type"] = "application/json" self.response.set_status(200, "Ok")
[docs] def put(self, actor_id, peerid, subid): auth_result = self.authenticate_actor( actor_id, "subscriptions", subpath=peerid + "/" + subid ) if not auth_result.success: return myself = auth_result.actor if not auth_result.authorize("GET", "subscriptions", "<id>/<subid>"): return try: body = self.request.body if isinstance(body, bytes): body = body.decode("utf-8", "ignore") elif body is None: body = "{}" params = json.loads(body) if "sequence" in params: seq = params["sequence"] else: self.response.set_status( 400, "Error in json body and no GET parameters" ) return except (TypeError, ValueError, KeyError): seq = self.request.get("sequence") if len(seq) == 0: self.response.set_status( 400, "Error in json body and no GET parameters" ) return try: if not isinstance(seq, int): seqnr = int(seq) else: seqnr = seq except ValueError: self.response.set_status(400, "Sequence does not contain a number") return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return # Use SubscriptionManager to get subscription with diff operations sub_with_diffs = actor_interface.subscriptions.get_subscription_with_diffs( peer_id=peerid, subscription_id=subid ) if not sub_with_diffs: self.response.set_status(404, "Subscription does not exist") return sub_with_diffs.clear_diffs(seqnr=seqnr) if self.response: self.response.set_status(204) return
[docs] def delete(self, actor_id, peerid, subid): auth_result = self.authenticate_actor( actor_id, "subscriptions", subpath=peerid + "/" + subid ) if not auth_result.success: return myself = auth_result.actor if not auth_result.authorize("GET", "subscriptions", "<id>/<subid>"): return # Do not delete remote subscription if this is from our peer acl_peerid = auth_result.auth_obj.acl.get("peerid", "") logger.debug( f"DELETE subscription: actor_id={actor_id}, peerid={peerid}, subid={subid}, acl_peerid='{acl_peerid}'" ) if len(acl_peerid) == 0: logger.debug( f"Calling delete_remote_subscription because acl_peerid is empty (actor={actor_id}, peer={peerid}, subid={subid})" ) myself.delete_remote_subscription(peerid=peerid, subid=subid) else: logger.debug( f"NOT calling delete_remote_subscription because request is from peer '{acl_peerid}'" ) # Get subscription info before deletion for lifecycle hook # NOTE: We don't know if this is inbound or outbound yet, so we try to fetch # with callback=False first (most common case - peer deleting their subscription to us) from ..subscription import Subscription sub_obj = Subscription( actor_id=actor_id, peerid=peerid, subid=subid, callback=False, config=myself.config, ) sub_data = sub_obj.subscription if sub_obj.subscription else {} # If subscription not found with callback=False, try callback=True if not sub_data: sub_obj = Subscription( actor_id=actor_id, peerid=peerid, subid=subid, callback=True, config=myself.config, ) sub_data = sub_obj.subscription if sub_obj.subscription else {} is_callback = sub_data.get("callback", False) # Delete the subscription (cleanup happens automatically in Subscription.delete()) # IMPORTANT: Pass the actual callback value from the subscription record if not myself.delete_subscription( peerid=peerid, subid=subid, callback=is_callback ): self.response.set_status(404) return # Execute lifecycle hook after successful deletion # For inbound subscriptions (callback=False), fire the hook with appropriate initiated_by_peer flag # - initiated_by_peer=True when peer initiates deletion (acl_peerid is set) # - initiated_by_peer=False when local actor deletes via REST API (acl_peerid is empty) # For outbound subscriptions (callback=True), hook is fired at the interface level logger.debug( f"Hook check: hooks={self.hooks is not None}, acl_peerid='{acl_peerid}', " f"len(acl_peerid)={len(acl_peerid)}, is_callback={is_callback}" ) if self.hooks and not is_callback: initiated_by_peer = len(acl_peerid) > 0 logger.info( f"Executing subscription_deleted hook for {peerid}, initiated_by_peer={initiated_by_peer}" ) actor_interface = self._get_actor_interface(myself) if actor_interface: try: self.hooks.execute_lifecycle_hooks( "subscription_deleted", actor=actor_interface, peer_id=peerid, subscription_id=subid, subscription_data=sub_data, initiated_by_peer=initiated_by_peer, ) logger.info( f"Successfully executed subscription_deleted hook for {peerid}" ) except Exception as e: logger.warning( f"Error executing subscription_deleted hook: {e}", exc_info=True ) else: logger.warning("Failed to get actor_interface for hook execution") else: logger.debug( f"Skipping hook execution: hooks={self.hooks is not None}, " f"is_callback={is_callback}" ) if self.response: self.response.set_status(204) return
[docs] class SubscriptionDiffHandler(base_handler.BaseHandler): """Handling requests to specific diffs for one subscription and clears it, e.g. /subscriptions/<peerid>/<subid>/112"""
[docs] def get(self, actor_id, peerid, subid, seqnr): auth_result = self.authenticate_actor( actor_id, "subscriptions", subpath=peerid + "/" + subid + "/" + str(seqnr) ) if not auth_result.success: return myself = auth_result.actor if not auth_result.authorize("GET", "subscriptions", "<id>/<subid>"): return # Use developer API - ActorInterface with SubscriptionManager actor_interface = self._get_actor_interface(myself) if not actor_interface: if self.response: self.response.set_status(500, "Internal error") return # Use SubscriptionManager to get subscription with diff operations sub_with_diffs = actor_interface.subscriptions.get_subscription_with_diffs( peer_id=peerid, subscription_id=subid ) if not sub_with_diffs: if self.response: self.response.set_status(404, "Subscription does not exist") return sub_info = sub_with_diffs.subscription_info if not sub_info: if self.response: self.response.set_status(404, "Subscription does not exist") return if not isinstance(seqnr, int): seqnr = int(seqnr) diff = sub_with_diffs.get_diff(seqnr=seqnr) if not diff: self.response.set_status(404, "No diffs available") return try: d = json.loads(diff["data"]) except (TypeError, ValueError, KeyError): d = diff["data"] sub_dict = sub_info.to_dict() pairs = { "id": myself.id, "peerid": peerid, "subscriptionid": subid, "timestamp": diff["timestamp"].strftime("%Y-%m-%dT%H:%M:%S.%fZ"), "target": sub_dict["target"], "subtarget": sub_dict["subtarget"], "resource": sub_dict["resource"], "sequence": seqnr, "data": d, } sub_with_diffs.clear_diff(seqnr) out = json.dumps(pairs) if self.response: self.response.write(out) self.response.headers["Content-Type"] = "application/json" self.response.set_status(200, "Ok")