import copy
import json
import logging
from typing import Any
from actingweb.handlers import base_handler
from ..permission_evaluator import PermissionResult, get_permission_evaluator
[docs]
def merge_dict(d1, d2):
"""Modifies d1 in-place to contain values from d2.
If any value in d1 is a dictionary (or dict-like), *and* the corresponding
value in d2 is also a dictionary, then merge them in-place.
Thanks to Edward Loper on stackoverflow.com
"""
for k, v2 in list(d2.items()):
v1 = d1.get(k) # returns None if v1 has no value for this key
if isinstance(v1, dict) and isinstance(v2, dict):
merge_dict(v1, v2)
else:
d1[k] = v2
[docs]
def delete_dict(d1, path):
"""Deletes path (an array of strings) in d1 dict.
d1 is modified to no longer contain the attr/value pair
or dict that is specified by path.
"""
if not d1:
return False
if len(path) > 1 and path[1] and len(path[1]) > 0:
return delete_dict(d1.get(path[0]), path[1:])
if len(path) == 1 and path[0] and path[0] in d1:
try:
del d1[path[0]]
return True
except KeyError:
return False
return False
logger = logging.getLogger(__name__)
[docs]
class PropertiesHandler(base_handler.BaseHandler):
def _check_property_permission(
self, actor_id: str, auth_obj, property_path: str, operation: str
) -> bool:
"""
Check property permission using the unified access control system.
This replaces the legacy auth.check_authorisation() with the new permission evaluator
that supports granular trust type-based permissions.
Args:
actor_id: The actor ID
auth_obj: Auth object from authentication
property_path: Property path (e.g., "email", "notes/work")
operation: Operation type ("read", "write", "delete")
Returns:
True if access is allowed, False otherwise
"""
# Get peer ID from auth object (if authenticated via trust relationship)
# Note: auth_obj.acl is a dict, not an object, so we use .get()
peer_id = auth_obj.acl.get("peerid", "") if hasattr(auth_obj, "acl") else ""
if not peer_id:
# No peer relationship - fall back to legacy authorization for basic/oauth auth
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
# Use permission evaluator for peer-based access
try:
evaluator = get_permission_evaluator(self.config)
result = evaluator.evaluate_property_access(
actor_id, peer_id, property_path, operation
)
if result == PermissionResult.ALLOWED:
return True
elif result == PermissionResult.DENIED:
logger.info(
f"Property access denied: {actor_id} -> {peer_id} -> {property_path} ({operation})"
)
return False
else: # NOT_FOUND
# No specific permission rule - fall back to legacy for backward compatibility
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
except Exception as e:
logger.error(
f"Error in permission evaluation for {actor_id}:{peer_id}:{property_path}: {e}"
)
# Fall back to legacy authorization on errors
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
def _create_auth_context(self, auth_obj, operation: str = "read") -> dict[str, Any]:
"""Create auth context for hook execution with peer information."""
# Note: auth_obj.acl is a dict, not an object, so we use .get()
peer_id = auth_obj.acl.get("peerid", "") if hasattr(auth_obj, "acl") else ""
return {"peer_id": peer_id, "config": self.config, "operation": operation}
[docs]
def get(self, actor_id, name):
if self.request.get("_method") == "PUT":
self.put(actor_id, name)
return
if self.request.get("_method") == "DELETE":
self.delete(actor_id, name)
return
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
if not name:
path = []
else:
path = name.split("/")
name = path[0]
# Use unified access control system for permission checking
property_path = "/".join(path) if path else ""
if not self._check_property_permission(actor_id, check, property_path, "read"):
if self.response:
self.response.set_status(403)
return
# if name is not set, this request URI was the properties root
if not name:
self.listall(myself, check)
return
# Block direct access to list: prefixed properties
# The "list:" prefix is an internal implementation detail
if name.startswith("list:"):
if self.response:
self.response.set_status(404, "Not found")
return
# Check if this is a list property first
if (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(name)
):
# This is a list property - handle format and index parameters
logger.info(f"Processing list property '{name}'")
index_param = (
self.request.get("index") or None
) # Convert empty string to None
format_param = (
self.request.get("format") or None
) # Convert empty string to None
try:
logger.info(f"Getting list property object for '{name}'")
list_prop = getattr(myself.property_lists, name)
logger.info(
f"Got list_prop: {type(list_prop).__name__}, length={len(list_prop) if list_prop else 'N/A'}"
)
logger.info(f"index_param={index_param}, format_param={format_param}")
if index_param is not None:
logger.info(f"Handling index access for index={index_param}")
# Get specific item by index
try:
index = int(index_param)
item = list_prop[index]
# Execute property hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
hook_path = [str(index)]
auth_context = self._create_auth_context(check, "read")
transformed = self.hooks.execute_property_hooks(
name,
"get",
actor_interface,
item,
hook_path,
auth_context,
)
if transformed is not None:
item = transformed
else:
if self.response:
self.response.set_status(404)
return
out = json.dumps(item)
except (IndexError, ValueError):
if self.response:
self.response.set_status(404, "List item not found")
return
else:
logger.info(
f"Handling list access (not index), format_param={format_param}"
)
# Determine response format
if format_param == "short":
logger.info("Using short format")
# Short format: return metadata only
# This matches the format used in GET /properties?metadata=true
metadata = {
"_list": True,
"count": len(list_prop),
"description": list_prop.get_description(),
"explanation": list_prop.get_explanation(),
}
out = json.dumps(metadata)
else:
# Default (no format or format=full): return all items
# This is the expected behavior for subscriptions
all_items = list_prop.to_list()
# Execute property hook if available
logger.info(
f"Checking hooks: has_hooks={self.hooks is not None}"
)
if self.hooks:
actor_interface = self._get_actor_interface(myself)
logger.info(
f"Got actor_interface: {actor_interface is not None}"
)
if actor_interface:
hook_path = []
auth_context = self._create_auth_context(check, "read")
logger.info(
f"Executing property hooks for '{name}', items count={len(all_items)}"
)
transformed = self.hooks.execute_property_hooks(
name,
"get",
actor_interface,
all_items,
hook_path,
auth_context,
)
logger.info(
f"Hook result: transformed is None? {transformed is None}"
)
if transformed is not None:
all_items = transformed
else:
logger.warning(
f"Property hook returned None for '{name}', returning 404"
)
if self.response:
self.response.set_status(404)
return
out = json.dumps(all_items)
if self.response:
self.response.set_status(200, "Ok")
self.response.headers["Content-Type"] = "application/json"
self.response.write(out)
return
except Exception as e:
logger.error(f"Error accessing list property '{name}': {e}")
if self.response:
self.response.set_status(500, "Error accessing list property")
return
# Regular property handling
lookup = myself.property[name] if myself and myself.property else None
if not lookup:
if self.response:
self.response.set_status(404, "Property not found")
return
try:
jsonblob = json.loads(lookup)
try:
out = jsonblob
if len(path) > 1:
del path[0]
for p in path:
out = out[p]
# Execute property hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
# Use the original name for the hook, not the modified path
hook_path = path[1:] if len(path) > 1 else []
auth_context = self._create_auth_context(check, "read")
transformed = self.hooks.execute_property_hooks(
name or "*",
"get",
actor_interface,
out,
hook_path,
auth_context,
)
if transformed is not None:
out = transformed
elif (
name
): # If hook returns None for specific property, it means 404
if self.response:
self.response.set_status(404)
return
out = json.dumps(out)
except (TypeError, ValueError, KeyError):
if self.response:
self.response.set_status(404)
return
# Keep as string for response.write()
except (TypeError, ValueError, KeyError):
out = lookup
if self.response:
self.response.set_status(200, "Ok")
self.response.headers["Content-Type"] = "application/json"
self.response.write(out)
[docs]
def listall(self, myself, check):
# Get actor interface for property access
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
properties = actor_interface.properties.to_dict()
# Check query parameters
include_metadata = self.request.get("metadata") == "true"
format_param = self.request.get("format") or None
# Mutual exclusion: format and metadata cannot be used together
if include_metadata and format_param:
if self.response:
self.response.set_status(
400, "Cannot use format and metadata parameters together"
)
return
pair = {}
if properties and len(properties) > 0:
for name, value in list(properties.items()):
try:
js = json.loads(value)
pair[name] = js
except ValueError:
pair[name] = value
# Filter properties based on peer permissions (bulk evaluation)
peer_id = check.acl.get("peerid", "") if hasattr(check, "acl") else ""
if peer_id and actor_interface and actor_interface.id and pair:
try:
evaluator = get_permission_evaluator(self.config)
# Use bulk evaluation to reduce logging verbosity
property_names = list(pair.keys())
results = evaluator.evaluate_bulk_property_access(
actor_interface.id, peer_id, property_names, "read"
)
# Filter based on results
filtered_pair = {}
for prop_name, prop_value in pair.items():
result = results.get(prop_name, PermissionResult.DENIED)
if result == PermissionResult.ALLOWED:
filtered_pair[prop_name] = prop_value
elif result == PermissionResult.NOT_FOUND:
# No specific rule - include for backward compatibility
filtered_pair[prop_name] = prop_value
# DENIED properties are excluded
pair = filtered_pair
except Exception as e:
logger.error(f"Error filtering properties by permission: {e}")
# On error, return empty for security (fail closed)
pair = {}
# Execute property hooks for all properties if available
if self.hooks and pair:
if actor_interface:
auth_context = self._create_auth_context(check, "read")
result = {}
for key, value in pair.items():
transformed = self.hooks.execute_property_hooks(
key, "get", actor_interface, value, [], auth_context
)
if transformed is not None:
result[key] = transformed
pair = result
# Note: Don't return early if pair is empty - we still need to add list properties below
# The final output will be handled at the end of the function
# Always discover list properties (needed for both metadata and non-metadata responses)
list_names: set[str] = set()
if (
actor_interface
and hasattr(actor_interface, "property_lists")
and actor_interface.property_lists is not None
):
all_list_names = set(actor_interface.property_lists.list_all() or [])
# Filter list properties based on peer permissions (bulk evaluation)
if peer_id and actor_interface and actor_interface.id:
try:
evaluator = get_permission_evaluator(self.config)
# Use bulk evaluation to reduce logging verbosity
results = evaluator.evaluate_bulk_property_access(
actor_interface.id, peer_id, list(all_list_names), "read"
)
for list_name, result in results.items():
if (
result == PermissionResult.ALLOWED
or result == PermissionResult.NOT_FOUND
):
list_names.add(list_name)
# DENIED list properties are excluded
except Exception as e:
logger.error(f"Error filtering list properties by permission: {e}")
# On error, exclude all list properties for security
list_names = set()
else:
# No peer - include all (owner access)
list_names = all_list_names
# Build response based on query parameters
if include_metadata:
# Metadata-only response: no property values, just structure info
simple_names = list(pair.keys())
simple_total_bytes = sum(len(json.dumps(v)) for v in pair.values())
lists_info: dict[str, Any] = {}
for list_name in list_names:
list_prop = getattr(actor_interface.property_lists, list_name)
items = list(list_prop)
total_bytes = sum(len(json.dumps(item)) for item in items)
lists_info[list_name] = {
"count": len(items),
"total_bytes": total_bytes,
"description": list_prop.get_description(),
"explanation": list_prop.get_explanation(),
}
pair = {
"simple": {
"properties": simple_names,
"total_bytes": simple_total_bytes,
},
"lists": lists_info,
}
elif format_param == "full":
# Full format: simple props as-is + list props with items, description, explanation
for list_name in list_names:
list_prop = getattr(actor_interface.property_lists, list_name)
items = list(list_prop)
# Execute property hooks on list items if available
if self.hooks and actor_interface:
auth_context = self._create_auth_context(check, "read")
transformed_items = []
for item in items:
transformed = self.hooks.execute_property_hooks(
list_name, "get", actor_interface, item, [], auth_context
)
if transformed is not None:
transformed_items.append(transformed)
else:
transformed_items.append(item)
items = transformed_items
pair[list_name] = {
"_list": True,
"count": len(items),
"description": list_prop.get_description(),
"explanation": list_prop.get_explanation(),
"items": items,
}
else:
# Default / format=short: simple props as-is + minimal list markers
for list_name in list_names:
list_prop = getattr(actor_interface.property_lists, list_name)
pair[list_name] = {
"_list": True,
"count": len(list_prop),
}
out = json.dumps(pair)
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
return
[docs]
def put(self, actor_id, name):
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
resource = None
if not name:
path = []
else:
path = name.split("/")
name = path[0]
if len(path) >= 2 and len(path[1]) > 0:
resource = path[1]
# Check if this is a list operation (indicated by index parameter)
# Note: request.get() may return None or "" when parameter is not present
index_param = self.request.get("index")
if index_param:
# This is a list item operation - handle it appropriately
if not (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(name)
):
if self.response:
self.response.set_status(404, f"List property '{name}' not found")
return
# Check write permission
property_path = "/".join(path) if path else ""
if not check or not self._check_property_permission(
actor_id, check, property_path, "write"
):
if self.response:
self.response.set_status(403)
return
# Parse the body
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = ""
try:
item_value = json.loads(body)
except (TypeError, ValueError, KeyError):
item_value = body
# Get the list property and set the item at the specified index
try:
index = int(index_param)
if index < 0:
if self.response:
self.response.set_status(
400, f"Invalid index: {index} (must be >= 0)"
)
return
list_prop = getattr(myself.property_lists, name)
# Extend list if needed
while len(list_prop) <= index:
list_prop.append(None)
# Execute property put hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
name,
"put",
actor_interface,
item_value,
[name, str(index)],
auth_context,
)
if transformed is not None:
item_value = transformed
else:
if self.response:
self.response.set_status(400, "Item rejected by hooks")
return
# Set the item at the index
list_prop[index] = item_value
# Register diff
myself.register_diffs(
target="properties",
subtarget=name,
blob=json.dumps({"index": index, "value": item_value}),
)
if self.response:
self.response.set_status(204)
return
except (ValueError, IndexError) as e:
logger.error(f"Error setting list item at index {index_param}: {e}")
if self.response:
self.response.set_status(400, f"Error setting list item: {str(e)}")
return
# Use unified access control system for permission checking
property_path = "/".join(path) if path else ""
if not check or not self._check_property_permission(
actor_id, check, property_path, "write"
):
if self.response:
self.response.set_status(403)
return
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = ""
if len(path) == 1:
old = myself.property[name] if myself and myself.property else None
try:
old = json.loads(old or "{}")
except (TypeError, ValueError, KeyError):
old = {}
try:
new_body = json.loads(body)
is_json = True
except (TypeError, ValueError, KeyError):
new_body = body
is_json = False
# Execute property put hook if available
new = new_body
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface and path:
property_name = path[0] if path else "*"
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
property_name,
"put",
actor_interface,
new_body,
path[
1:
], # Exclude property name from path (already in property_name)
auth_context,
)
if transformed is not None:
new = transformed
else:
self.response.set_status(400, "Payload is not accepted")
return
if is_json:
if myself and myself.property:
myself.property[name] = json.dumps(new)
else:
if myself and myself.property:
myself.property[name] = new
myself.register_diffs(target="properties", subtarget=name, blob=body)
self.response.set_status(204)
return
# Keep text blob for later diff registration
blob = body
# Make store var to be merged with original struct
try:
body = json.loads(body)
except (TypeError, ValueError, KeyError):
pass
store = {path[len(path) - 1]: body}
# Make store to be at same level as orig value
i = len(path) - 2
while i > 0:
c = copy.copy(store)
store = {path[i]: c}
i -= 1
orig = myself.property[name] if myself and myself.property else None
try:
orig = json.loads(orig or "{}")
merge_dict(orig, store)
res = orig
except (TypeError, ValueError, KeyError):
res = store
# Execute property put hook if available
final_res = res
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface and path:
property_name = path[0] if path else "*"
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
property_name, "put", actor_interface, res, path[1:], auth_context
)
if transformed is not None:
final_res = transformed
else:
self.response.set_status(400, "Payload is not accepted")
return
res = final_res
res = json.dumps(res)
if myself and myself.property:
myself.property[name] = res
myself.register_diffs(
target="properties", subtarget=name, resource=resource, blob=blob
)
self.response.set_status(204)
[docs]
def post(self, actor_id, name):
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
if not auth_result.authorize("POST", "properties", name):
return
if len(name) > 0:
if self.response:
self.response.set_status(400)
pair = {}
# Handle the form with property type support
if self.request.get("property"):
prop_name = self.request.get("property")
prop_type = (
self.request.get("property_type") or "simple"
) # Default to simple
# Handle list property creation
if prop_type == "list":
# Create empty list property
if myself and hasattr(myself, "property_lists"):
# Create empty list by accessing it (this initializes the ListProperty)
list_prop = getattr(myself.property_lists, prop_name)
# The ListProperty is now created with metadata, but no items
# Set description and explanation if provided
description = self.request.get("description") or ""
explanation = self.request.get("explanation") or ""
if description:
list_prop.set_description(description)
if explanation:
list_prop.set_explanation(explanation)
# Execute property post hook if available for list creation
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
prop_name,
"post",
actor_interface,
[],
[prop_name],
auth_context,
)
if transformed is None:
if self.response:
self.response.set_status(403)
return
pair[prop_name] = "[Empty list property created]"
else:
if self.response:
self.response.set_status(500, "List properties not supported")
return
# Handle simple property creation
elif prop_type == "simple" and self.request.get("value"):
# Execute property post hook if available
val = self.request.get("value")
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
prop_name,
"post",
actor_interface,
val,
[prop_name],
auth_context,
)
if transformed is not None:
val = transformed
else:
if self.response:
self.response.set_status(403)
return
pair[prop_name] = val
if myself and myself.property:
myself.property[prop_name] = val
else:
# Missing value for simple property
if self.response:
self.response.set_status(400, "Value required for simple property")
return
elif len(self.request.arguments()) > 0:
for name in self.request.arguments():
# Execute property post hook if available
val = self.request.get(name)
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
name, "post", actor_interface, val, [], auth_context
)
if transformed is not None:
val = transformed
else:
continue
pair[name] = val
if myself and myself.property:
myself.property[name] = val
else:
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
elif body is None:
body = "{}"
params = json.loads(body)
except (TypeError, ValueError, KeyError):
if self.response:
self.response.set_status(400, "Error in json body")
return
for key in params:
val = params[key]
# Handle special list property creation with metadata
if isinstance(val, dict) and val.get("_type") == "list":
# This is a list property creation with metadata
if myself and hasattr(myself, "property_lists"):
list_prop = getattr(myself.property_lists, key)
# Set description and explanation if provided, or ensure metadata is persisted
description_set = False
if "description" in val:
list_prop.set_description(val["description"])
description_set = True
if "explanation" in val:
list_prop.set_explanation(val["explanation"])
elif not description_set:
# Ensure metadata is persisted even if no description/explanation provided
list_prop.set_description("")
# Execute property post hook if available for list creation
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
key,
"post",
actor_interface,
[],
[key],
auth_context,
)
if transformed is not None:
pair[key] = "[Empty list property created]"
else:
continue
else:
pair[key] = "[Empty list property created]"
else:
# List properties not supported
continue
# Handle items array for bulk list updates
elif isinstance(val, dict) and "items" in val:
# Validate items array structure
if not isinstance(val["items"], list):
logger.error(
f"Invalid 'items' field for property '{key}': expected list, got {type(val['items']).__name__}"
)
if self.response:
self.response.set_status(
400,
f"Invalid 'items' field for property '{key}': expected list, got {type(val['items']).__name__}",
)
return
if len(val["items"]) == 0:
logger.warning(
f"Empty 'items' array for property '{key}': no updates to perform"
)
pair[key] = "[No items to update]"
continue
# This is a bulk update for a list property
if (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(key)
):
try:
list_prop = getattr(myself.property_lists, key)
items_updated = 0
items_deleted = 0
for i, item_spec in enumerate(val["items"]):
# Validate item structure
if not isinstance(item_spec, dict):
logger.error(
f"Invalid item at position {i}: must be a dictionary, got {type(item_spec).__name__}"
)
if self.response:
self.response.set_status(
400,
f"Invalid item at position {i}: must be a dictionary, got {type(item_spec).__name__}",
)
return
# Check for required "index" field
if "index" not in item_spec:
logger.error(
f"Missing 'index' field in item at position {i}: {item_spec}"
)
if self.response:
self.response.set_status(
400,
f"Missing 'index' field in item at position {i}",
)
return
index = item_spec["index"]
# Validate index type and value
if not isinstance(index, int):
logger.error(
f"Invalid index type in item at position {i}: expected integer, got {type(index).__name__}"
)
if self.response:
self.response.set_status(
400,
f"Invalid index type in item at position {i}: expected integer, got {type(index).__name__}",
)
return
if index < 0:
logger.error(
f"Invalid index value in item at position {i}: {index} (must be >= 0)"
)
if self.response:
self.response.set_status(
400,
f"Invalid index value in item at position {i}: {index} (must be >= 0)",
)
return
# Check if this is a deletion (empty item data)
if (
len(item_spec) == 1
): # Only has "index" key, means delete
try:
if index < len(list_prop):
del list_prop[index]
items_deleted += 1
else:
logger.warning(
f"Cannot delete item at index {index}: index out of range (list length: {len(list_prop)})"
)
# Don't fail the entire operation, just log warning
except IndexError as e:
logger.error(
f"Error deleting item at index {index}: {e}"
)
# Don't fail the entire operation for delete errors
else:
# Update/set item - the entire item_spec except "index" is the item data
item_data = {
k: v
for k, v in item_spec.items()
if k != "index"
}
try:
# Extend list if needed
while len(list_prop) <= index:
list_prop.append(None)
# Store the complete object
list_prop[index] = item_data
items_updated += 1
except (IndexError, ValueError) as e:
logger.error(
f"Error updating item at index {index}: {e}"
)
if self.response:
self.response.set_status(
500,
f"Error updating item at index {index}: {str(e)}",
)
return
# Execute property post hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
# Pass the entire list for hook validation
current_items = list_prop.to_list()
auth_context = self._create_auth_context(
check, "write"
)
transformed = self.hooks.execute_property_hooks(
key,
"post",
actor_interface,
current_items,
[key],
auth_context,
)
if transformed is None:
# Hook rejected the update - need to revert changes
if self.response:
self.response.set_status(
403, "Bulk update rejected by hooks"
)
return
pair[key] = (
f"[Bulk update: {items_updated} items updated, {items_deleted} items deleted]"
)
except Exception as e:
logger.error(
f"Error in bulk update for list property '{key}': {e}"
)
if self.response:
self.response.set_status(
500, f"Error in bulk update: {str(e)}"
)
return
else:
# Not a list property or doesn't exist
if self.response:
self.response.set_status(
400, f"Property '{key}' is not a list property"
)
return
else:
# Regular property handling
# Execute property post hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
auth_context = self._create_auth_context(check, "write")
transformed = self.hooks.execute_property_hooks(
key, "post", actor_interface, val, [], auth_context
)
if transformed is not None:
val = transformed
else:
continue
pair[key] = val
if isinstance(val, dict):
text = json.dumps(val)
else:
text = val
if myself and myself.property:
myself.property[key] = text
if not pair:
if self.response:
self.response.set_status(403, "No attributes accepted")
return
out = json.dumps(pair)
myself.register_diffs(target="properties", blob=out)
if self.response:
self.response.write(out)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(201, "Created")
[docs]
def delete(self, actor_id, name):
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
resource = None
if not name:
path = []
else:
path = name.split("/")
name = path[0]
if len(path) >= 2 and len(path[1]) > 0:
resource = path[1]
# Use unified access control system for permission checking
property_path = "/".join(path) if path else ""
if not self._check_property_permission(
actor_id, check, property_path, "delete"
):
self.response.set_status(403)
return
if not name:
# Get actor interface for property operations
actor_interface = self._get_actor_interface(myself)
if not actor_interface:
if self.response:
self.response.set_status(500, "Internal error")
return
# Execute property delete hook if available
if self.hooks:
result = self.hooks.execute_property_hooks(
"*",
"delete",
actor_interface,
actor_interface.properties.to_dict(),
path,
)
if result is None:
self.response.set_status(403)
return
actor_interface.properties.clear()
myself.register_diffs(target="properties", subtarget=None, blob="")
self.response.set_status(204)
return
if len(path) == 1:
# Check if this is a list property first
if (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(name)
):
# This is a list property - delete the entire list
try:
list_prop = getattr(myself.property_lists, name)
# Execute property delete hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface:
# Pass current list data for hook validation
current_items = list_prop.to_list()
auth_context = self._create_auth_context(check, "delete")
result = self.hooks.execute_property_hooks(
name,
"delete",
actor_interface,
current_items,
path,
auth_context,
)
if result is None:
self.response.set_status(403)
return
# Delete the entire list including metadata
list_prop.delete()
myself.register_diffs(target="properties", subtarget=name, blob="")
self.response.set_status(204)
return
except Exception as e:
logger.error(f"Error deleting list property '{name}': {e}")
self.response.set_status(
500, f"Error deleting list property: {str(e)}"
)
return
# Regular property handling
old_prop = myself.property[name] if myself and myself.property else None
# Execute property delete hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface and path:
property_name = path[0] if path else "*"
auth_context = self._create_auth_context(check, "delete")
result = self.hooks.execute_property_hooks(
property_name,
"delete",
actor_interface,
old_prop or {},
path,
auth_context,
)
if result is None:
self.response.set_status(403)
return
if myself and myself.property:
myself.property[name] = None
myself.register_diffs(target="properties", subtarget=name, blob="")
self.response.set_status(204)
return
orig = myself.property[name] if myself and myself.property else None
old = orig
try:
orig = json.loads(orig or "{}")
except (TypeError, ValueError, KeyError):
# Since /properties/something was handled above
# orig must be json loadable
self.response.set_status(404)
return
if not delete_dict(orig, path[1:]):
self.response.set_status(404)
return
# Execute property delete hook if available
if self.hooks:
actor_interface = self._get_actor_interface(myself)
if actor_interface and path:
property_name = path[0] if path else "*"
auth_context = self._create_auth_context(check, "delete")
result = self.hooks.execute_property_hooks(
property_name,
"delete",
actor_interface,
old or {},
path,
auth_context,
)
if result is None:
self.response.set_status(403)
return
res = json.dumps(orig)
if myself and myself.property:
myself.property[name] = res
myself.register_diffs(
target="properties", subtarget=name, resource=resource, blob=""
)
self.response.set_status(204)
[docs]
class PropertyListItemsHandler(base_handler.BaseHandler):
"""Handler for list property items operations.
Handles GET/POST /{actor_id}/properties/{name}/items
for reading all items and adding/updating/deleting items in list properties.
"""
def _check_property_permission(
self, actor_id: str, auth_obj, property_path: str, operation: str
) -> bool:
"""
Check property permission using the unified access control system.
Reuses the same permission logic as PropertiesHandler.
"""
# Get peer ID from auth object (if authenticated via trust relationship)
# Note: auth_obj.acl is a dict, not an object, so we use .get()
peer_id = auth_obj.acl.get("peerid", "") if hasattr(auth_obj, "acl") else ""
if not peer_id:
# No peer relationship - fall back to legacy authorization
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
# Use permission evaluator for peer-based access
try:
evaluator = get_permission_evaluator(self.config)
result = evaluator.evaluate_property_access(
actor_id, peer_id, property_path, operation
)
if result == PermissionResult.ALLOWED:
return True
elif result == PermissionResult.DENIED:
logger.info(
f"Property items access denied: {actor_id} -> {peer_id} -> {property_path} ({operation})"
)
return False
else: # NOT_FOUND
# Fall back to legacy for backward compatibility
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
except Exception as e:
logger.error(
f"Error in permission evaluation for items {actor_id}:{peer_id}:{property_path}: {e}"
)
# Fall back to legacy authorization on errors
legacy_subpath = property_path.split("/")[0] if property_path else ""
method_map = {"read": "GET", "write": "PUT", "delete": "DELETE"}
return auth_obj.check_authorisation(
path="properties",
subpath=legacy_subpath,
method=method_map.get(operation, "GET"),
)
[docs]
def get(self, actor_id: str, name: str):
"""Get all items from a list property."""
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
# Check read permission
if not self._check_property_permission(actor_id, check, name, "read"):
if self.response:
self.response.set_status(403)
return
# Verify this is a list property
if not (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(name)
):
if self.response:
self.response.set_status(
404, "Property not found or not a list property"
)
return
# Get all items
list_prop = getattr(myself.property_lists, name)
items = list_prop.to_list()
if self.response:
self.response.write(json.dumps(items))
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(200)
[docs]
def post(self, actor_id: str, name: str):
"""Add, update, or delete items in a list property.
Expects JSON body with:
- action: "add", "update", or "delete"
- item_value: The value to add or update to (for add/update)
- item_index: The index to update or delete (for update/delete)
"""
auth_result = self.authenticate_actor(actor_id, "properties", subpath=name)
if not auth_result.success:
return
myself = auth_result.actor
check = auth_result.auth_obj
# Check write permission
if not self._check_property_permission(actor_id, check, name, "write"):
if self.response:
self.response.set_status(403)
return
# Verify this is a list property
if not (
myself
and hasattr(myself, "property_lists")
and myself.property_lists is not None
and myself.property_lists.exists(name)
):
if self.response:
self.response.set_status(
404, "Property not found or not a list property"
)
return
# Parse request body
try:
body = self.request.body
if isinstance(body, bytes):
body = body.decode("utf-8", "ignore")
params = json.loads(body or "{}")
except (TypeError, ValueError, KeyError):
if self.response:
self.response.set_status(400, "Invalid JSON body")
return
action = params.get("action")
if not action:
if self.response:
self.response.set_status(400, "Missing 'action' parameter")
return
list_prop = getattr(myself.property_lists, name)
try:
if action == "add":
# Add new item
item_value = params.get("item_value")
if item_value is None:
if self.response:
self.response.set_status(400, "Missing 'item_value' parameter")
return
list_prop.append(item_value)
# Register diff for subscription notifications
myself.register_diffs(
target="properties",
subtarget=name,
blob=json.dumps(
{
"action": "add",
"index": len(list_prop) - 1,
"value": item_value,
}
),
)
if self.response:
self.response.write(
json.dumps({"success": True, "index": len(list_prop) - 1})
)
self.response.headers["Content-Type"] = "application/json"
self.response.set_status(201)
elif action == "update":
# Update existing item
item_index = params.get("item_index")
item_value = params.get("item_value")
if item_index is None:
if self.response:
self.response.set_status(400, "Missing 'item_index' parameter")
return
if item_value is None:
if self.response:
self.response.set_status(400, "Missing 'item_value' parameter")
return
try:
index = int(item_index)
except ValueError:
if self.response:
self.response.set_status(400, "Invalid 'item_index' value")
return
if index < 0 or index >= len(list_prop):
if self.response:
self.response.set_status(400, f"Index {index} out of range")
return
list_prop[index] = item_value
# Register diff for subscription notifications
myself.register_diffs(
target="properties",
subtarget=name,
blob=json.dumps(
{"action": "update", "index": index, "value": item_value}
),
)
if self.response:
self.response.set_status(204)
elif action == "delete":
# Delete item
item_index = params.get("item_index")
if item_index is None:
if self.response:
self.response.set_status(400, "Missing 'item_index' parameter")
return
try:
index = int(item_index)
except ValueError:
if self.response:
self.response.set_status(400, "Invalid 'item_index' value")
return
if index < 0 or index >= len(list_prop):
if self.response:
self.response.set_status(400, f"Index {index} out of range")
return
del list_prop[index]
# Register diff for subscription notifications
myself.register_diffs(
target="properties",
subtarget=name,
blob=json.dumps({"action": "delete", "index": index}),
)
if self.response:
self.response.set_status(204)
else:
if self.response:
self.response.set_status(400, f"Unknown action: {action}")
return
except Exception as e:
logger.error(f"Error in list item operation '{action}' for '{name}': {e}")
if self.response:
self.response.set_status(500, f"Error processing list item: {str(e)}")