import os
import time
from collections.abc import Sequence
from pynamodb.attributes import (
JSONAttribute,
NumberAttribute,
UnicodeAttribute,
UTCDateTimeAttribute,
)
from pynamodb.models import Model
"""
DbAttribute handles all db operations for an attribute (internal)
AWS DynamoDB is used as a backend.
"""
[docs]
class Attribute(Model):
"""
DynamoDB data model for a property
"""
id = UnicodeAttribute(hash_key=True)
bucket_name = UnicodeAttribute(range_key=True)
bucket = UnicodeAttribute()
name = UnicodeAttribute()
data = JSONAttribute(null=True)
timestamp = UTCDateTimeAttribute(null=True)
# TTL timestamp for automatic DynamoDB expiration (Unix epoch timestamp)
# Enable DynamoDB TTL on this field for automatic cleanup
ttl_timestamp = NumberAttribute(null=True)
[docs]
class DbAttribute:
"""
DbProperty does all the db operations for property objects
The actor_id must always be set. get(), set() will set a new internal handle
that will be reused by set() (overwrite attribute) and
delete().
"""
[docs]
@staticmethod
def get_bucket(actor_id=None, bucket=None):
"""Returns a dict of attributes from a bucket, each with data and timestamp"""
if not actor_id or not bucket:
return None
try:
query = Attribute.query(
actor_id, Attribute.bucket_name.startswith(bucket), consistent_read=True
)
except Exception: # PynamoDB DoesNotExist exception
return None
ret = {}
for t in query:
ret[t.name] = {
"data": t.data,
"timestamp": t.timestamp,
}
return ret
[docs]
@staticmethod
def get_attr(actor_id=None, bucket=None, name=None):
"""Returns a dict of attributes from a bucket, each with data and timestamp"""
if not actor_id or not bucket or not name:
return None
try:
r = Attribute.get(actor_id, bucket + ":" + name, consistent_read=True)
except Exception: # PynamoDB DoesNotExist exception
return None
return {
"data": r.data,
"timestamp": r.timestamp,
}
[docs]
@staticmethod
def set_attr(
actor_id=None,
bucket=None,
name=None,
data=None,
timestamp=None,
ttl_seconds=None,
):
"""Sets a data value for a given attribute in a bucket.
Args:
actor_id: The actor ID
bucket: The bucket name
name: The attribute name
data: The data to store (JSON-serializable)
timestamp: Optional timestamp
ttl_seconds: Optional TTL in seconds from now. If provided,
DynamoDB will automatically delete this item after expiry.
Note: A 1-hour buffer is added for clock skew safety.
"""
if not actor_id or not name or not bucket:
return False
if not data:
try:
item = Attribute.get(
actor_id, bucket + ":" + name, consistent_read=True
)
item.delete()
except Exception: # PynamoDB DoesNotExist exception
pass
return True
# Calculate TTL timestamp if provided
ttl_timestamp = None
if ttl_seconds is not None:
from ...constants import TTL_CLOCK_SKEW_BUFFER
# Add buffer for clock skew safety
ttl_timestamp = int(time.time()) + ttl_seconds + TTL_CLOCK_SKEW_BUFFER
# Defensive sanitization of data before storing
from actingweb.db.utils import sanitize_json_data
data = sanitize_json_data(data, log_source="attribute")
new = Attribute(
id=actor_id,
bucket_name=bucket + ":" + name,
bucket=bucket,
name=name,
data=data,
timestamp=timestamp,
ttl_timestamp=ttl_timestamp,
)
new.save()
return True
[docs]
def delete_attr(self, actor_id=None, bucket=None, name=None):
"""Deletes an attribute in a bucket"""
return self.set_attr(actor_id=actor_id, bucket=bucket, name=name, data=None)
[docs]
@staticmethod
def conditional_update_attr(
actor_id=None,
bucket=None,
name=None,
old_data=None,
new_data=None,
timestamp=None,
): # type: ignore[misc]
"""Conditionally update an attribute only if current data matches old_data.
This provides atomic compare-and-swap functionality for race-free updates.
JSON comparison is order-independent - dict key ordering does not affect equality.
If the caller's old_data has different key ordering than stored data, we normalize
both sides for comparison and use the stored ordering for the atomic update.
Args:
actor_id: The actor ID
bucket: The bucket name
name: The attribute name
old_data: Expected current data value (for comparison)
new_data: New data to set if current matches old_data
timestamp: Optional timestamp
Returns:
True if update succeeded (current matched old_data), False otherwise
"""
import json
if not actor_id or not bucket or not name:
return False
bucket_name = bucket + ":" + name
# Defensive sanitization of data before storing
from actingweb.db.utils import sanitize_json_data
old_data = sanitize_json_data(old_data, log_source="attribute")
new_data = sanitize_json_data(new_data, log_source="attribute")
try:
# Get current item with consistent read
item = Attribute.get(actor_id, bucket_name, consistent_read=True)
# Normalize JSON for order-independent comparison
# This ensures {"a": 1, "b": 2} == {"b": 2, "a": 1}
def normalize_json(data):
"""Normalize JSON data by serializing with sorted keys."""
if data is None:
return None
return json.loads(json.dumps(data, sort_keys=True))
old_data_normalized = normalize_json(old_data)
current_data_normalized = normalize_json(item.data)
# Check if current data matches old_data (order-independent)
if old_data_normalized != current_data_normalized:
return False
# Data matches semantically - perform atomic update
# Use the ACTUAL stored data for DynamoDB's condition to ensure atomicity
# This handles the case where old_data has different key ordering
actions: Sequence[object] = [Attribute.data.set(new_data)]
if timestamp:
actions = list(actions) + [Attribute.timestamp.set(timestamp)]
item.update(
actions=actions, # type: ignore[arg-type]
condition=(
Attribute.data == item.data
), # Atomic check against current value
)
return True
except Exception:
# Item doesn't exist or condition check failed (race condition)
return False
[docs]
@staticmethod
def delete_bucket(actor_id=None, bucket=None):
"""Deletes an entire bucket"""
if not actor_id or not bucket:
return False
try:
query = Attribute.query(
actor_id, Attribute.bucket_name.startswith(bucket), consistent_read=True
)
except Exception: # PynamoDB DoesNotExist exception
return True
for t in query:
t.delete()
return True
def __init__(self):
if not Attribute.exists():
try:
Attribute.create_table(wait=True)
except Exception as e:
# Handle race condition where another process created the table
# between our exists() check and create_table() call
if "ResourceInUseException" in str(e):
pass # Table was created by another process, continue
else:
raise
[docs]
class DbAttributeBucketList:
"""
DbAttributeBucketList handles multiple buckets
The actor_id must always be set.
"""
[docs]
@staticmethod
def fetch(actor_id=None):
"""Retrieves all the attributes of an actor_id from the database"""
if not actor_id:
return None
try:
query = Attribute.query(actor_id)
except Exception: # PynamoDB DoesNotExist exception
return None
ret = {}
for t in query:
if t.bucket not in ret:
ret[t.bucket] = {}
ret[t.bucket][t.name] = {
"data": t.data,
"timestamp": t.timestamp,
}
return ret
[docs]
@staticmethod
def fetch_timestamps(actor_id=None):
"""Retrieves timestamps for all buckets of an actor_id"""
if not actor_id:
return None
try:
query = Attribute.query(actor_id)
except Exception: # PynamoDB DoesNotExist exception
return None
ret = {}
for t in query:
if t.bucket not in ret:
ret[t.bucket] = t.timestamp
return ret
[docs]
@staticmethod
def delete(actor_id=None):
"""Deletes all the attributes in the database"""
if not actor_id:
return False
try:
query = Attribute.query(actor_id)
except Exception: # PynamoDB DoesNotExist exception
return False
for t in query:
t.delete()
return True
def __init__(self):
if not Attribute.exists():
try:
Attribute.create_table(wait=True)
except Exception as e:
# Handle race condition where another process created the table
# between our exists() check and create_table() call
if "ResourceInUseException" in str(e):
pass # Table was created by another process, continue
else:
raise