import datetime
import logging
from typing import Any
from actingweb.db import (
get_subscription,
get_subscription_diff,
get_subscription_diff_list,
get_subscription_list,
)
logger = logging.getLogger(__name__)
[docs]
class Subscription:
"""Base class with core subscription methods (storage-related)"""
[docs]
def get(self) -> dict[str, Any]:
"""Retrieve subscription from db given pre-initialized variables"""
if not self.actor_id or not self.peerid or not self.subid:
return {}
if self.subscription and len(self.subscription) > 0:
return self.subscription
if self.handle:
self.subscription = self.handle.get(
actor_id=self.actor_id, peerid=self.peerid, subid=self.subid
)
else:
self.subscription = {}
if not self.subscription:
self.subscription = {}
return self.subscription
[docs]
def create(
self,
target: str | None = None,
subtarget: str | None = None,
resource: str | None = None,
granularity: str | None = None,
seqnr: int = 0,
) -> bool:
"""Create new subscription and push it to db"""
if self.subscription and len(self.subscription) > 0:
logger.debug(
"Attempted creation of subscription when already loaded from storage"
)
return False
if not self.actor_id or not self.peerid:
logger.debug(
"Attempted creation of subscription without actor_id or peerid set"
)
return False
if not self.subid:
now = datetime.datetime.utcnow()
if self.config:
seed = self.config.root + now.strftime("%Y%m%dT%H%M%S%f")
self.subid = self.config.new_uuid(seed)
else:
self.subid = None
if not self.handle or not self.handle.create(
actor_id=self.actor_id,
peerid=self.peerid,
subid=self.subid,
granularity=granularity,
target=target,
subtarget=subtarget,
resource=resource,
seqnr=seqnr,
callback=self.callback,
):
return False
assert self.subscription is not None # Always initialized in __init__
self.subscription["id"] = self.actor_id
self.subscription["subscriptionid"] = self.subid
self.subscription["peerid"] = self.peerid
self.subscription["target"] = target
self.subscription["subtarget"] = subtarget
self.subscription["resource"] = resource
self.subscription["granularity"] = granularity
self.subscription["sequence"] = seqnr
self.subscription["callback"] = self.callback
return True
[docs]
def delete(self):
"""Delete a subscription in storage"""
if not self.handle:
logger.debug("Attempted delete of subscription without storage handle")
return False
# Clear diffs
self.clear_diffs()
# Create minimal actor stub (used by both cleanup operations below)
# We need to avoid circular imports and full Actor initialization
class _ActorStub:
def __init__(self, actor_id, config):
self.id = actor_id
self.config = config
# Clear callback processor state if this is a callback subscription
if self.callback and self.actor_id and self.peerid and self.subid:
try:
from .callback_processor import CallbackProcessor
actor_stub = _ActorStub(self.actor_id, self.config)
processor = CallbackProcessor(actor_stub) # type: ignore[arg-type]
processor.clear_state(self.peerid, self.subid)
logger.debug(
f"Cleared callback state for subscription {self.subid} from peer {self.peerid}"
)
except ImportError:
pass # CallbackProcessor not available
except Exception as e:
logger.warning(f"Failed to clear callback state for {self.subid}: {e}")
# NOTE: RemotePeerStore cleanup is handled at a higher level in Actor.delete_subscription()
# where we have access to check for other remaining subscriptions to the same peer
# Delete subscription record
self.handle.delete()
return True
[docs]
def increase_seq(self):
if not self.handle:
logger.debug(
"Attempted increase_seq without subscription retrieved from storage"
)
return False
assert self.subscription is not None # Always initialized in __init__
self.subscription["sequence"] += 1
if not self.handle.modify(seqnr=self.subscription["sequence"]):
# Failed to update database
return False
return self.subscription["sequence"]
[docs]
def decrease_seq(self):
"""Rollback sequence number by 1 (used when diff creation fails after seq increment)"""
if not self.handle:
logger.debug(
"Attempted decrease_seq without subscription retrieved from storage"
)
return False
assert self.subscription is not None # Always initialized in __init__
if self.subscription["sequence"] <= 0:
logger.warning(
f"Attempted decrease_seq when sequence is already {self.subscription['sequence']}"
)
return False
self.subscription["sequence"] -= 1
if not self.handle.modify(seqnr=self.subscription["sequence"]):
# Failed to update database
return False
return self.subscription["sequence"]
[docs]
def add_diff(self, blob=None):
"""Add a new diff for this subscription"""
if not self.actor_id or not self.subid or not blob:
logger.debug("Attempted add_diff without actorid, subid, or blob")
return False
if not self.config:
return False
assert self.subscription is not None # Always initialized in __init__
# Increment sequence BEFORE creating diff so first diff gets sequence=1 per spec
new_sequence = self.increase_seq()
if not new_sequence:
logger.error(
f"Failed increasing sequence number for subscription {self.subid} for peer {self.peerid}"
)
return False
# Now create diff with the incremented sequence number
diff = get_subscription_diff(self.config)
success = diff.create(
actor_id=self.actor_id,
subid=self.subid,
diff=blob,
seqnr=self.subscription["sequence"],
)
# If diff creation failed, rollback the sequence increment
if not success:
logger.error(
f"Failed creating diff for subscription {self.subid}, rolling back sequence from {new_sequence}"
)
self.decrease_seq()
return False
return diff.get()
[docs]
def get_diff(self, seqnr=0):
"""Get one specific diff"""
if seqnr == 0:
return None
if not isinstance(seqnr, int):
return None
if not self.config:
return None
diff = get_subscription_diff(self.config)
return diff.get(actor_id=self.actor_id, subid=self.subid, seqnr=seqnr)
[docs]
def get_diffs(self):
"""Get all the diffs available for this subscription ordered by the timestamp, oldest first"""
if not self.config:
return []
diff_list = get_subscription_diff_list(self.config)
return diff_list.fetch(actor_id=self.actor_id, subid=self.subid)
[docs]
def clear_diff(self, seqnr):
"""Clears one specific diff"""
if not self.config:
return False
diff = get_subscription_diff(self.config)
diff.get(actor_id=self.actor_id, subid=self.subid, seqnr=seqnr)
return diff.delete()
[docs]
def clear_diffs(self, seqnr=0):
"""Clear all diffs up to and including a seqnr"""
if not self.config:
return False
diff_list = get_subscription_diff_list(self.config)
diff_list.fetch(actor_id=self.actor_id, subid=self.subid)
diff_list.delete(seqnr=seqnr)
def __init__(
self, actor_id=None, peerid=None, subid=None, callback=False, config=None
):
self.config = config
if self.config:
self.handle = get_subscription(self.config)
else:
self.handle = None
self.subscription = {}
if not actor_id:
return
self.actor_id = actor_id
self.peerid = peerid
self.subid = subid
self.callback = callback
if self.actor_id and self.peerid and self.subid:
self.get()
[docs]
class Subscriptions:
"""Handles all subscriptions of a specific actor_id
Access the indvidual subscriptions in .dbsubscriptions and the subscription data
in .subscriptions as a dictionary
"""
[docs]
def fetch(self):
if self.subscriptions is not None:
return self.subscriptions
if not self.list and self.config:
self.list = get_subscription_list(self.config)
if not self.subscriptions and self.list:
self.subscriptions = self.list.fetch(actor_id=self.actor_id)
return self.subscriptions
[docs]
def delete(self):
if not self.list:
logger.debug("Already deleted list in subscriptions")
return False
if self.subscriptions:
for sub in self.subscriptions:
if not self.config:
continue
diff_list = get_subscription_diff_list(self.config)
diff_list.fetch(actor_id=self.actor_id, subid=sub["subscriptionid"])
diff_list.delete()
self.list.delete()
self.list = None
self.subscriptions = None
return True
def __init__(self, actor_id=None, config=None):
"""Properties must always be initialised with an actor_id"""
self.config = config
if not actor_id:
self.list = None
logger.debug("No actor_id in initialisation of subscriptions")
return
if self.config:
self.list = get_subscription_list(self.config)
else:
self.list = None
self.actor_id = actor_id
self.subscriptions = None
self.fetch()