import datetime
import logging
import os
from pynamodb.attributes import NumberAttribute, UnicodeAttribute, UTCDateTimeAttribute
from pynamodb.models import Model
"""
DbSubscriptionDiff handles all db operations for a subscription diff
DbSubscriptionDiffList handles list of subscriptions diffs
Google datastore for google is used as a backend.
"""
logger = logging.getLogger(__name__)
[docs]
class SubscriptionDiff(Model):
id = UnicodeAttribute(hash_key=True)
subid_seqnr = UnicodeAttribute(range_key=True)
subid = UnicodeAttribute()
timestamp = UTCDateTimeAttribute(default=datetime.datetime.utcnow())
diff = UnicodeAttribute()
seqnr = NumberAttribute(default=1)
[docs]
class DbSubscriptionDiff:
"""
DbSubscriptionDiff does all the db operations for subscription diff objects
The actor_id must always be set.
"""
[docs]
def get(self, actor_id=None, subid=None, seqnr=None):
"""Retrieves the subscriptiondiff from the database"""
if not actor_id and not self.handle:
return None
if not subid and not self.handle:
logger.debug("Attempt to get subscriptiondiff without subid")
return None
if not self.handle:
if not seqnr:
query = SubscriptionDiff.query(
actor_id,
SubscriptionDiff.subid_seqnr.startswith(subid or ""),
consistent_read=True,
)
# Find the record with lowest seqnr
for t in query:
if not self.handle:
self.handle = t
continue
if t.seqnr < self.handle.seqnr:
self.handle = t
else:
self.handle = SubscriptionDiff.get(
actor_id, (subid or "") + ":" + str(seqnr), consistent_read=True
)
if self.handle:
t = self.handle
return {
"id": t.id,
"subscriptionid": t.subid,
"timestamp": t.timestamp,
"data": t.diff,
"sequence": t.seqnr,
}
else:
return None
[docs]
def create(self, actor_id=None, subid=None, diff="", seqnr=1):
"""Create a new subscription diff"""
if not actor_id or not subid:
logger.debug("Attempt to create subscriptiondiff without actorid or subid")
return False
self.handle = SubscriptionDiff(
id=actor_id,
subid_seqnr=subid + ":" + str(seqnr),
subid=subid,
diff=diff,
seqnr=seqnr,
)
self.handle.save()
return True
[docs]
def delete(self):
"""Deletes the subscription diff in the database"""
if not self.handle:
return False
self.handle.delete()
self.handle = None
return True
def __init__(self):
self.handle = None
if not SubscriptionDiff.exists():
try:
SubscriptionDiff.create_table(wait=True)
except Exception as e:
# Handle race condition where another process created the table
# between our exists() check and create_table() call
if "ResourceInUseException" in str(e):
pass # Table was created by another process, continue
else:
raise
[docs]
class DbSubscriptionDiffList:
"""
DbSubscriptionDiffList does all the db operations for list of diff objects
The actor_id must always be set.
"""
[docs]
def fetch(self, actor_id=None, subid=None):
"""Retrieves the subscription diffs of an actor_id from the database as an array"""
if not actor_id:
return None
self.actor_id = actor_id
self.subid = subid
self.handle = SubscriptionDiff.query(actor_id, consistent_read=True)
self.diffs = []
if self.handle:
for t in self.handle:
if subid and subid != t.subid:
continue
self.diffs.append(
{
"id": t.id,
"subscriptionid": t.subid,
"timestamp": t.timestamp,
"diff": t.diff,
"sequence": t.seqnr,
}
)
sorted(self.diffs, key=lambda diff: diff["sequence"])
return self.diffs
else:
return []
[docs]
def delete(self, seqnr=None):
"""Deletes all the fetched subscription diffs in the database
Optional seqnr deletes up to (excluding) a specific seqnr
"""
if not self.handle:
return False
if not seqnr or not isinstance(seqnr, int):
seqnr = 0
self.handle = SubscriptionDiff.query(self.actor_id, consistent_read=True)
for p in self.handle:
if self.subid and self.subid != p.subid:
continue
if seqnr == 0 or p.seqnr <= seqnr:
p.delete()
self.handle = None
return True
def __init__(self):
self.handle = None
self.diffs = []
self.actor_id = None
self.subid = None
if not SubscriptionDiff.exists():
try:
SubscriptionDiff.create_table(wait=True)
except Exception as e:
# Handle race condition where another process created the table
# between our exists() check and create_table() call
if "ResourceInUseException" in str(e):
pass # Table was created by another process, continue
else:
raise