import json
import logging
from actingweb import auth
from actingweb.handlers import base_handler
from actingweb.permission_evaluator import PermissionResult, get_permission_evaluator
# Import permission system with fallback
try:
from ..trust_permissions import (
create_permission_override,
get_trust_permission_store,
)
PERMISSION_SYSTEM_AVAILABLE = True
except ImportError:
# Set fallback values for when permission system is not available
get_trust_permission_store = None
create_permission_override = None
PERMISSION_SYSTEM_AVAILABLE = False # pyright: ignore[reportConstantRedefinition]
# /trust aw_handlers
#
# GET /trust with query parameters (relationship, type, and peerid) to retrieve trust relationships (auth: only creator
# and admins allowed)
# POST /trust with json body to initiate a trust relationship between this
# actor and another (reciprocal relationship) (auth: only creator and admins allowed)
# POST /trust/{trust_type} with json body to create new trust
# relationship (see config.py for default relationship and auto-accept, no
# auth required)
# Note: {trust_type} is the permission level (friend, admin, etc.), not the mini-app type
# GET /trust/{trust_type}}/{actorid} to get details on a specific relationship (auth: creator, admin, or peer secret)
# POST /trust/{trust_type}}/{actorid} to send information to a peer about changes in the relationship
# PUT /trust/{trust_type}}/{actorid} with a json body to change details on a relationship (baseuri, secret, desc)
# (auth: creator,
# admin, or peer secret)
# DELETE /trust/{trust_type}}/{actorid} to delete a relationship (with
# ?peer=true if the delete is from the peer) (auth: creator, admin, or
# peer secret)
logger = logging.getLogger(__name__)
# Handling requests to trust/
[docs]
class TrustHandler(base_handler.BaseHandler):
[docs]
def get(self, actor_id):
if self.request.get("_method") == "POST":
# Web UI method override for creating trust
self.post(actor_id)
try:
status = getattr(self.response, "status_code", 0)
except Exception:
status = 0
if status in (200, 201, 202):
# After successful creation, return to trust overview
self.response.set_status(302, "Found")
self.response.set_redirect(f"/{actor_id}/www/trust")
return
myself = self.require_authenticated_actor(actor_id, "trust", "GET")
if not myself:
return
relationship = self.request.get("relationship")
trust_type = self.request.get("type")
peerid = self.request.get(
"peerid",
)
pairs = myself.get_trust_relationships(
relationship=relationship, peerid=peerid, trust_type=trust_type
)
# Return empty array with 200 OK when no relationships exist (SPA-friendly, spec v1.2)
if not pairs:
pairs = []
out = json.dumps(pairs)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(200, "Ok")
[docs]
def post(self, actor_id):
myself = self.require_authenticated_actor(actor_id, "trust", "POST")
if not myself:
return
desc = ""
relationship = self.config.default_relationship
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
if "url" in params:
url = params["url"]
else:
url = ""
if "relationship" in params:
relationship = params["relationship"]
if "desc" in params:
desc = params["desc"]
except ValueError:
url = self.request.get("url")
relationship = self.request.get("relationship")
if len(url) == 0:
self.response.set_status(400, "Missing peer URL")
return
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
secret = self.config.new_token()
new_trust_rel = actor_interface.trust.create_relationship(
peer_url=url,
secret=secret,
description=desc,
relationship=relationship,
)
if not new_trust_rel:
self.response.set_status(408, "Unable to create trust relationship")
return
new_trust = new_trust_rel.to_dict()
# Trigger trust_initiated lifecycle hook to notify application of outgoing trust request
if self.hooks:
try:
peerid = new_trust.get("peerid", "")
logger.debug(
f"trust_initiated hook called for {actor_id} -> {peerid}, "
f"relationship={relationship}"
)
self.hooks.execute_lifecycle_hooks(
"trust_initiated",
actor=actor_interface,
peer_id=peerid,
relationship=relationship,
trust_data=new_trust,
)
logger.info(
f"trust_initiated hook triggered for {actor_id} -> {peerid}"
)
except Exception as e:
logger.error(f"Error triggering trust_initiated hook: {e}")
self.response.headers["Location"] = str(
self.config.root
+ (myself.id or "")
+ "/trust/"
+ new_trust["relationship"]
+ "/"
+ new_trust["peerid"]
)
out = json.dumps(new_trust)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(201, "Created")
# Handling requests to /trust/*, e.g. /trust/friend
[docs]
class TrustRelationshipHandler(base_handler.BaseHandler):
[docs]
def get(self, actor_id, relationship):
if self.request.get("_method") == "POST":
# Web UI method override for creating specific relationship
self.post(actor_id, relationship)
try:
status = getattr(self.response, "status_code", 0)
except Exception:
status = 0
if status in (200, 201, 202):
self.response.set_status(302, "Found")
self.response.set_redirect(f"/{actor_id}/www/trust")
return
self.response.set_status(404, "Not found")
[docs]
def put(self, actor_id, relationship):
# Use AuthResult for granular control since we need add_response=False
auth_result = self.authenticate_actor(
actor_id, "trust", subpath=relationship, add_response=False
)
if not auth_result.success:
return
myself = auth_result.actor
if relationship != "trustee":
if self.response:
self.response.set_status(404, "Not found")
return
# Access is the same as /trust
if not auth_result.authorize("POST", "trust"):
return
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
if "trustee_root" in params:
trustee_root = params["trustee_root"]
else:
trustee_root = ""
if "creator" in params:
creator = params["creator"]
else:
creator = None
except ValueError:
if self.response:
self.response.set_status(400, "No json content")
return
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
if len(trustee_root) > 0:
actor_interface.trust.trustee_root = trustee_root
if creator:
myself.modify(creator=creator)
if self.response:
self.response.set_status(204, "No content")
[docs]
def delete(self, actor_id, relationship):
# Use AuthResult for granular control since we need add_response=False
auth_result = self.authenticate_actor(
actor_id, "trust", subpath=relationship, add_response=False
)
if not auth_result.success:
return
myself = auth_result.actor
if relationship != "trustee":
if self.response:
self.response.set_status(404, "Not found")
return
# Access is the same as /trust
if not auth_result.authorize("DELETE", "trust"):
return
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
actor_interface.trust.trustee_root = None
if self.response:
self.response.set_status(204, "No content")
[docs]
def post(self, actor_id, relationship):
# This endpoint does not require authentication - trust creation can be done by peers
# Load actor without any authentication or authorization checks
auth_result = self.authenticate_actor(
actor_id, "trust", subpath=relationship, add_response=False
)
if not auth_result.actor:
self.response.set_status(404)
logger.debug(
"Got trust creation request for unknown Actor(" + str(actor_id) + ")"
)
return
myself = auth_result.actor
# Skip authentication and authorization checks for this public endpoint
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
if "baseuri" in params:
baseuri = params["baseuri"]
else:
baseuri = ""
if "id" in params:
peerid = params["id"]
else:
peerid = ""
if "secret" in params:
secret = params["secret"]
else:
secret = ""
if "desc" in params:
desc = params["desc"]
else:
desc = ""
if "verify" in params:
verification_token = params["verify"]
else:
verification_token = None
except ValueError:
if self.response:
self.response.set_status(400, "No json content")
return
if len(baseuri) == 0 or len(peerid) == 0:
self.response.set_status(400, "Missing mandatory attributes")
return
if (
self.config.auto_accept_default_relationship
and self.config.default_relationship == relationship
):
approved = True
else:
approved = False
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
# Since we received a request for a relationship, assume that peer has approved
# Note: trust_type is the peer's mini-app type (from JSON body "type" field)
# relationship is the trust type/permission level (from URL path)
peer_app_type = params.get("type", "")
new_trust = actor_interface.trust.create_verified_trust(
baseuri=baseuri,
peer_id=peerid,
approved=approved,
secret=secret,
verification_token=verification_token,
trust_type=peer_app_type, # peer's mini-application type (e.g., "urn:actingweb:example.com:banking")
peer_approved=True,
relationship=relationship, # trust type/permission level (e.g., "friend", "admin")
description=desc,
)
if not new_trust:
self.response.set_status(403, "Forbidden")
return
# Trigger trust_request_received lifecycle hook to notify application of incoming trust request
if self.hooks:
try:
logger.debug(
f"trust_request_received hook called for {actor_id} <- {peerid}, "
f"relationship={relationship}"
)
self.hooks.execute_lifecycle_hooks(
"trust_request_received",
actor=actor_interface,
peer_id=peerid,
relationship=relationship,
trust_data=new_trust,
)
logger.info(
f"trust_request_received hook triggered for {actor_id} <- {peerid}"
)
except Exception as e:
logger.error(f"Error triggering trust_request_received hook: {e}")
self.response.headers["Location"] = str(
self.config.root
+ (myself.id or "")
+ "/trust/"
+ new_trust["relationship"]
+ "/"
+ new_trust["peerid"]
)
out = json.dumps(new_trust)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
if approved:
self.response.set_status(201, "Created")
else:
self.response.set_status(202, "Accepted")
# Handling requests to specific relationships, e.g. /trust/friend/12f2ae53bd
[docs]
class TrustPeerHandler(base_handler.BaseHandler):
[docs]
def get(self, actor_id, relationship, peerid):
if self.request.get("_method") == "PUT":
# Web UI method override for updating/approving trust
self.put(actor_id, relationship, peerid)
try:
status = getattr(self.response, "status_code", 0)
except Exception:
status = 0
if status in (200, 201, 202, 204):
self.response.set_status(302, "Found")
self.response.set_redirect(f"/{actor_id}/www/trust")
return
if self.request.get("_method") == "DELETE":
# Perform deletion, then redirect back to the web UI trust page on success
self.delete(actor_id, relationship, peerid)
try:
status = getattr(self.response, "status_code", 0)
except Exception:
status = 0
if status in (200, 201, 202, 204):
# Redirect to the trust overview so the browser view updates
self.response.set_status(302, "Found")
self.response.set_redirect(f"/{actor_id}/www/trust")
return
logger.debug("GET trust request received")
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
return
myself = auth_result.actor
# Custom authorization check for peer access - peers can read their own trust relationship
if not auth_result.auth_obj.check_authorisation(
path="trust",
subpath="<type>/<id>",
method="GET",
peerid=peerid,
approved=False, # Allow access even if not fully approved
):
if self.response:
self.response.set_status(403)
return
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
if not relationships or len(relationships) == 0:
if self.response:
self.response.set_status(404, "Not found")
return
my_trust = relationships[0]
# Check access based on authentication type (needed for verification logic)
_ = (
auth_result.auth_obj.acl.get("authenticated")
and auth_result.auth_obj.acl.get("role") == "creator"
) # pyright: ignore[reportUnusedExpression]
is_peer_token = auth_result.auth_obj.trust is not None
# The GET handler should return trust data for verification purposes but NOT modify verified status
# Verification is handled by create_verified_trust() on the receiving side, not here
# Check if permissions query is requested
include_permissions = self.request.get("permissions") == "true"
if (
include_permissions
and PERMISSION_SYSTEM_AVAILABLE
and get_trust_permission_store
):
# Add permission information to response
permission_store = get_trust_permission_store(self.config)
permissions = permission_store.get_permissions(actor_id, peerid)
if permissions:
my_trust["permissions"] = {
"properties": permissions.properties,
"methods": permissions.methods,
"actions": permissions.actions,
"tools": permissions.tools,
"resources": permissions.resources,
"prompts": permissions.prompts,
"created_by": permissions.created_by,
"updated_at": permissions.updated_at,
"notes": permissions.notes,
}
else:
my_trust["permissions"] = None
if not my_trust["approved"] and is_peer_token:
# Peer with token but unapproved trust - deny access
if self.response:
self.response.set_status(403, "Trust relationship not approved")
return
out = json.dumps(my_trust)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
if my_trust["approved"]:
self.response.set_status(200, "Ok")
else:
# Creator can see unapproved trust details with 202 status
self.response.set_status(202, "Accepted")
[docs]
def post(self, actor_id, relationship, peerid):
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
return
myself = auth_result.actor
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
peer_approved = None
if "approved" in params:
if params["approved"] and params["approved"] is True:
peer_approved = True
except ValueError:
if self.response:
self.response.set_status(400, "No json content")
return
if peer_approved:
# If this is a notification from a peer about approval, we cannot check if the relationship is approved!
# Custom authorization check for peer approval case
if not auth_result.auth_obj.check_authorisation(
path="trust",
subpath="<type>/<id>",
method="POST",
peerid=peerid,
approved=False,
):
if self.response:
self.response.set_status(403)
return
else:
if not auth_result.authorize("POST", "trust", "<type>/<id>"):
return
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
# Update the trust relationship
trust_updated = actor_interface.trust.modify_and_notify(
peer_id=peerid, relationship=relationship, peer_approved=peer_approved
)
# Trigger trust_fully_approved_remote lifecycle hook when receiving peer approval notification
# This is the critical step where Actor A (who initiated the trust) learns that
# Actor B has approved, and can now create subscriptions
logger.info(
f"POST notification received: peer_approved={peer_approved}, "
f"has_hooks={self.hooks is not None}"
)
if peer_approved is True and self.hooks:
try:
# Get the trust relationship data to check if both sides approved
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
if relationships:
trust_data = relationships[0]
# Only trigger hook if BOTH sides have approved
if trust_data.get("approved") and trust_data.get("peer_approved"):
logger.info(
f"Trust fully approved remotely via POST notification: {peerid} approved, completing relationship with {actor_id}"
)
from actingweb.interface.actor_interface import ActorInterface
actor_interface = ActorInterface(myself, self.config)
self.hooks.execute_lifecycle_hooks(
"trust_fully_approved_remote",
actor=actor_interface,
peer_id=peerid,
relationship=relationship,
trust_data=trust_data,
)
logger.info(
f"trust_fully_approved_remote hook triggered via POST for {actor_id} <-> {peerid}"
)
else:
logger.debug(
f"Trust not yet fully approved after POST: approved={trust_data.get('approved')}, "
f"peer_approved={trust_data.get('peer_approved')}"
)
except Exception as e:
logger.error(
f"Error triggering trust_fully_approved_remote hook in POST handler: {e}"
)
if trust_updated:
self.response.set_status(204, "Ok")
else:
self.response.set_status(500, "Not modified")
[docs]
def put(self, actor_id, relationship, peerid):
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
return
myself = auth_result.actor
if not auth_result.authorize("PUT", "trust", "<type>/<id>"):
return
approved = None
permission_updates = None
actor_interface = None # Initialize to satisfy type checker
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
if "baseuri" in params:
baseuri = params["baseuri"]
else:
baseuri = ""
if "desc" in params:
desc = params["desc"]
else:
desc = ""
if "approved" in params:
if params["approved"] is True or params["approved"].lower() == "true":
approved = True
# Handle permission updates
if "permissions" in params and PERMISSION_SYSTEM_AVAILABLE:
permission_updates = params["permissions"]
except ValueError:
if not self.request.get("_method") or self.request.get("_method") != "PUT":
if self.response:
self.response.set_status(400, "No json content")
return
if self.request.get("approved") and len(self.request.get("approved")) > 0:
if self.request.get("approved").lower() == "true":
approved = True
if self.request.get("baseuri") and len(self.request.get("baseuri")) > 0:
baseuri = self.request.get("baseuri")
else:
baseuri = ""
if self.request.get("desc") and len(self.request.get("desc")) > 0:
desc = self.request.get("desc")
else:
desc = ""
# Use developer API - ActorInterface with TrustManager
if actor_interface is None:
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
# Update trust relationship
trust_updated = actor_interface.trust.modify_and_notify(
peer_id=peerid,
relationship=relationship,
baseuri=baseuri,
approved=approved,
description=desc,
)
# Trigger trust_fully_approved_local lifecycle hook if this actor just approved and both are now approved
if approved is True and self.hooks:
try:
# Get the trust relationship data to check if both sides approved
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
if relationships:
trust_data = relationships[0]
# Only trigger hook if BOTH sides have approved
if trust_data.get("approved") and trust_data.get("peer_approved"):
logger.info(
f"Trust fully approved locally via PUT: {actor_id} approved, completing relationship with {peerid}"
)
self.hooks.execute_lifecycle_hooks(
"trust_fully_approved_local",
actor=actor_interface,
peer_id=peerid,
relationship=relationship,
trust_data=trust_data,
)
logger.info(
f"trust_fully_approved_local hook triggered for {actor_id} <-> {peerid}"
)
except Exception as e:
logger.error(
f"Error triggering trust_fully_approved_local hook in PUT handler: {e}"
)
# Update permissions if provided
permissions_updated = True
if (
permission_updates is not None
and PERMISSION_SYSTEM_AVAILABLE
and get_trust_permission_store
):
try:
permission_store = get_trust_permission_store(self.config)
# Check if permissions already exist
existing_permissions = permission_store.get_permissions(
actor_id, peerid
)
if existing_permissions:
# Update existing permissions
permissions_updated = permission_store.update_permissions(
actor_id, peerid, permission_updates
)
elif create_permission_override:
# Create new permission override
permissions_obj = create_permission_override(
actor_id=actor_id,
peer_id=peerid,
trust_type=relationship, # Use relationship as trust type
permission_updates=permission_updates,
)
permissions_updated = permission_store.store_permissions(
permissions_obj
)
if not permissions_updated:
logger.error(
f"Failed to update permissions for trust relationship {actor_id}:{peerid}"
)
except Exception as e:
logger.error(
f"Error updating permissions for trust relationship {actor_id}:{peerid}: {e}"
)
permissions_updated = False
if trust_updated and permissions_updated:
self.response.set_status(204, "Ok")
elif not trust_updated:
# Trust modification failed - check if it's because trust doesn't exist
# or because of a validation error
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
if not relationships:
logger.warning(
f"Trust modification failed: no trust relationship found for "
f"actor={actor_id}, peer={peerid}, relationship={relationship}"
)
self.response.set_status(404, "Trust relationship not found")
else:
logger.warning(
f"Trust modification failed for actor={actor_id}, peer={peerid}, "
f"relationship={relationship} (trust exists but modify failed)"
)
self.response.set_status(400, "Failed to modify trust relationship")
else:
# permissions_updated is False
logger.warning(
f"Permissions update failed for actor={actor_id}, peer={peerid}"
)
self.response.set_status(500, "Failed to update permissions")
[docs]
def delete(self, actor_id, relationship, peerid):
# Use AuthResult for granular control since we need add_response=False and custom logic
auth_result = self.authenticate_actor(
actor_id, "trust", subpath=relationship, add_response=False
)
# Special case: if actor doesn't exist, return 404 for delete_reciprocal_trust cleanup
# This allows the peer to clean up orphaned relationships when the remote actor no longer exists
if not auth_result.actor:
logger.info(
f"DELETE trust: actor {actor_id} not found, returning 404 for cleanup"
)
if self.response:
self.response.set_status(404, "Not found")
return
# Special handling: if authentication failed but actor was loaded,
# check if relationship exists. If not, return 404 instead of auth error.
# This allows cleanup of orphaned relationships during delete_reciprocal_trust.
if auth_result.actor and (
not auth_result.auth_obj
or (
auth_result.auth_obj.response["code"] != 200
and auth_result.auth_obj.response["code"] != 401
)
):
# Actor loaded but auth failed - check if relationship exists
relationships = auth_result.actor.get_trust_relationships(
relationship=relationship, peerid=peerid
)
logger.debug(
f"DELETE trust auth failed, checking relationship: actor={actor_id}, "
f"peerid={peerid}, found={len(relationships) if relationships else 0}"
)
if not relationships or len(relationships) == 0:
# Relationship doesn't exist - return 404 (expected by delete_reciprocal_trust)
logger.info(
f"Trust relationship not found after auth failure: actor={actor_id}, "
f"peerid={peerid}, returning 404"
)
if self.response:
self.response.set_status(404, "Not found")
return
# Relationship exists but auth failed - return auth error
auth.add_auth_response(appreq=self, auth_obj=auth_result.auth_obj)
return
# Actor loaded but auth_obj missing (should not happen, but handle gracefully)
if not auth_result.auth_obj:
logger.warning(
f"DELETE trust: actor {actor_id} loaded but auth_obj missing"
)
if self.response:
self.response.set_status(500, "Internal server error")
return
myself = auth_result.actor
# Check if relationship exists BEFORE authorization
# This ensures we return 404 (not 403) when peer tries to delete non-existent relationship
# The delete_reciprocal_trust flow expects 404 for missing relationships
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
logger.debug(
f"DELETE trust check: actor={actor_id}, peerid={peerid}, "
f"relationship={relationship}, found={len(relationships) if relationships else 0} relationships"
)
if not relationships or len(relationships) == 0:
logger.info(
f"Trust relationship not found for deletion: actor={actor_id}, peerid={peerid}, "
f"returning 404"
)
if self.response:
self.response.set_status(404, "Not found")
return
# We allow non-approved peers to delete even if we haven't approved the relationship yet
if not auth_result.auth_obj.check_authorisation(
path="trust",
subpath="<type>/<id>",
method="DELETE",
peerid=peerid,
approved=False,
):
if self.response:
self.response.set_status(403)
return
# Prevent actors from deleting trust relationships with themselves
if peerid == actor_id:
if self.response:
self.response.set_status(
400, "Cannot delete trust relationship with self"
)
return
is_peer = False
if (
auth_result.auth_obj
and auth_result.auth_obj.trust
and auth_result.auth_obj.trust["peerid"] == peerid
):
is_peer = True
else:
# Use of GET param peer=true is a way of forcing no deletion of a peer
# relationship even when requestor is not a peer (primarily for testing purposes)
peer_get = self.request.get("peer").lower()
if peer_get.lower() == "true":
is_peer = True
# Use developer API - ActorInterface with TrustManager
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
# Trigger trust_deleted lifecycle hook BEFORE deleting
if self.hooks:
try:
# Pass relationship and trust_data for consistency with trust_approved hook
trust_data = relationships[0] if relationships else {}
if not trust_data:
logger.warning(
f"trust_deleted hook called for {actor_id} <-> {peerid}, "
f"but trust relationship data not found (may have been deleted already)"
)
self.hooks.execute_lifecycle_hooks(
"trust_deleted",
actor=actor_interface,
peer_id=peerid,
relationship=relationship,
trust_data=trust_data,
initiated_by_peer=is_peer,
)
logger.debug(
f"trust_deleted hook triggered for {actor_id} <-> {peerid}, initiated_by_peer={is_peer}"
)
except Exception as e:
logger.error(f"Error triggering trust_deleted hook: {e}")
# Delete trust relationship with appropriate peer notification setting
if is_peer:
deleted = actor_interface.trust.delete_peer_trust(
peer_id=peerid, notify_peer=False
)
else:
deleted = actor_interface.trust.delete_peer_trust(
peer_id=peerid, notify_peer=True
)
if not deleted:
self.response.set_status(502, "Not able to delete relationship with peer.")
return
self.response.set_status(204, "Ok")
# Handling requests to trust permissions, e.g. /trust/friend/12f2ae53bd/permissions
[docs]
class TrustPermissionHandler(base_handler.BaseHandler):
[docs]
def get(self, actor_id: str, relationship: str, peerid: str):
"""Get effective permissions for a specific trust relationship (custom or default from trust type)."""
if not PERMISSION_SYSTEM_AVAILABLE or not get_trust_permission_store:
if self.response:
self.response.set_status(501, "Permission system not available")
return
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
logger.error(
f"TrustPermissionHandler auth failed: actor={auth_result.actor is not None}, auth_obj={auth_result.auth_obj is not None}, code={auth_result.auth_obj.response['code'] if auth_result.auth_obj else 'None'}"
)
return
# Same authorization as trust endpoint
if not auth_result.authorize("GET", "trust", "<type>/<id>"):
return
try:
permission_store = get_trust_permission_store(self.config)
custom_permissions = permission_store.get_permissions(actor_id, peerid)
if custom_permissions:
# Return custom permission overrides
permission_data = {
"actor_id": custom_permissions.actor_id,
"peer_id": custom_permissions.peer_id,
"trust_type": custom_permissions.trust_type,
"properties": custom_permissions.properties,
"methods": custom_permissions.methods,
"actions": custom_permissions.actions,
"tools": custom_permissions.tools,
"resources": custom_permissions.resources,
"prompts": custom_permissions.prompts,
"created_by": custom_permissions.created_by,
"updated_at": custom_permissions.updated_at,
"notes": custom_permissions.notes,
"is_custom": True,
"source": "custom_override",
}
else:
# No custom permissions, get defaults from trust type registry
# The 'relationship' parameter is the trust type (friend, admin, etc.)
try:
from ..trust_type_registry import get_registry
registry = get_registry(self.config)
logger.debug(f"Looking up trust type '{relationship}' in registry")
trust_type = registry.get_type(relationship)
if not trust_type:
logger.error(
f"Trust type '{relationship}' not found in registry"
)
available_types = (
[t.name for t in registry.list_types()] if registry else []
)
logger.error(f"Available trust types: {available_types}")
if self.response:
self.response.set_status(
404, f"Trust type '{relationship}' not found"
)
return
logger.debug(
f"Found trust type '{relationship}': {trust_type.display_name}"
)
# Return default permissions from trust type
permission_data = {
"actor_id": actor_id,
"peer_id": peerid,
"trust_type": relationship,
"properties": trust_type.base_permissions.get("properties", {}),
"methods": trust_type.base_permissions.get("methods", {}),
"actions": trust_type.base_permissions.get("actions", {}),
"tools": trust_type.base_permissions.get("tools", {}),
"resources": trust_type.base_permissions.get("resources", {}),
"prompts": trust_type.base_permissions.get("prompts", {}),
"created_by": trust_type.created_by,
"updated_at": None,
"notes": f"Default permissions for {trust_type.display_name}",
"is_custom": False,
"source": "trust_type_default",
}
except Exception as registry_error:
logger.error(
f"Error accessing trust type registry for {relationship}: {registry_error}"
)
if self.response:
self.response.set_status(
500,
f"Error accessing trust type defaults: {registry_error}",
)
return
out = json.dumps(permission_data)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(200, "Ok")
except Exception as e:
logger.error(f"Error retrieving permissions for {actor_id}:{peerid}: {e}")
if self.response:
self.response.set_status(500, "Internal server error")
[docs]
def put(self, actor_id: str, relationship: str, peerid: str):
"""Create or update permission overrides for a trust relationship."""
if not PERMISSION_SYSTEM_AVAILABLE or not get_trust_permission_store:
if self.response:
self.response.set_status(501, "Permission system not available")
return
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
return
myself = auth_result.actor
# Same authorization as trust endpoint
if not auth_result.authorize("PUT", "trust", "<type>/<id>"):
return
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
# Validate trust relationship exists
relationships = myself.get_trust_relationships(
relationship=relationship, peerid=peerid
)
if not relationships or len(relationships) == 0:
if self.response:
self.response.set_status(404, "Trust relationship not found")
return
permission_store = get_trust_permission_store(self.config)
existing_permissions = permission_store.get_permissions(actor_id, peerid)
if existing_permissions:
# Update existing permissions
success = permission_store.update_permissions(actor_id, peerid, params)
if success:
self.response.set_status(200, "Updated")
else:
self.response.set_status(500, "Failed to update permissions")
elif create_permission_override:
# Create new permission override
permissions_obj = create_permission_override(
actor_id=actor_id,
peer_id=peerid,
trust_type=relationship,
permission_updates=params,
)
success = permission_store.store_permissions(permissions_obj)
if success:
self.response.set_status(201, "Created")
else:
self.response.set_status(500, "Failed to create permissions")
except ValueError as e:
logger.error(f"Invalid JSON in permission request: {e}")
if self.response:
self.response.set_status(400, "Invalid JSON")
except Exception as e:
logger.error(f"Error updating permissions for {actor_id}:{peerid}: {e}")
if self.response:
self.response.set_status(500, "Internal server error")
[docs]
def delete(self, actor_id: str, relationship: str, peerid: str):
"""Delete permission overrides for a trust relationship."""
if not PERMISSION_SYSTEM_AVAILABLE or not get_trust_permission_store:
if self.response:
self.response.set_status(501, "Permission system not available")
return
auth_result = self.authenticate_actor(actor_id, "trust", subpath=relationship)
if not auth_result.success:
return
# Same authorization as trust endpoint
if not auth_result.authorize("DELETE", "trust", "<type>/<id>"):
return
try:
permission_store = get_trust_permission_store(self.config)
success = permission_store.delete_permissions(actor_id, peerid)
if success:
self.response.set_status(204, "Deleted")
else:
self.response.set_status(404, "No permissions found")
except Exception as e:
logger.error(f"Error deleting permissions for {actor_id}:{peerid}: {e}")
if self.response:
self.response.set_status(500, "Internal server error")
[docs]
class TrustSharedPropertiesHandler(base_handler.BaseHandler):
"""Handler for /{actor_id}/trust/{relationship}/{peerid}/shared_properties
Returns properties the authenticated peer is permitted to subscribe to.
Requires an active trust relationship with the requesting peer.
"""
[docs]
def get(self, actor_id: str, relationship: str, peerid: str):
"""Get properties available for subscription by this peer."""
# Authenticate the request
auth_result = self.authenticate_actor(
actor_id, "trust", subpath=f"{relationship}/{peerid}/shared_properties"
)
if not auth_result.success or not auth_result.actor:
return
myself = auth_result.actor
# Verify the requesting peer matches the authenticated peer
authenticated_peerid = (
auth_result.auth_obj.acl.get("peerid") if auth_result.auth_obj else None
)
if peerid != authenticated_peerid:
if self.response:
self.response.set_status(403, "Can only query own shared properties")
return
# Verify trust relationship exists and is active
trust_rel = myself.get_trust_relationship(peerid)
if not trust_rel:
if self.response:
self.response.set_status(404, "Trust relationship not found")
return
# Get permission evaluator
evaluator = (
get_permission_evaluator(self.config)
if PERMISSION_SYSTEM_AVAILABLE
else None
)
if not evaluator:
if self.response:
self.response.set_status(503, "Permission system not available")
return
# Get all properties to check
all_properties = myself.get_properties()
property_names = list(all_properties.keys()) if all_properties else []
# Also check property lists (distributed storage)
actor_interface = None
try:
from actingweb.interface.actor_interface import ActorInterface
actor_interface = ActorInterface(myself, self.config)
if hasattr(actor_interface, "property_lists"):
list_names = actor_interface.property_lists.list_all()
property_names.extend(list_names)
except Exception as e:
logger.debug(f"Could not get property lists: {e}")
# Remove duplicates
property_names = list(set(property_names))
# Filter by peer's permissions
shared = []
excluded = []
for prop_name in property_names:
result = evaluator.evaluate_property_access(
actor_id, peerid, prop_name, operation="subscribe"
)
if result == PermissionResult.ALLOWED:
# Get item count if it's a property list
item_count = 0
try:
if actor_interface and hasattr(actor_interface, "property_lists"):
prop_list = getattr(
actor_interface.property_lists, prop_name, None
)
if prop_list and hasattr(prop_list, "__len__"):
item_count = len(prop_list)
except Exception as e:
logger.debug(
f"Could not get item count for property list {prop_name}: {e}"
)
shared.append(
{
"name": prop_name,
"display_name": prop_name.replace("_", " ").title(),
"item_count": item_count,
"operations": ["read", "subscribe"],
}
)
else:
excluded.append(prop_name)
response_data = {
"actor_id": actor_id,
"peer_id": peerid,
"relationship": relationship,
"shared_properties": shared,
"excluded_properties": excluded,
}
if self.response:
self.response.write(json.dumps(response_data))
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(200)