import logging
import os
from typing import Any
from pynamodb.attributes import UnicodeAttribute
from pynamodb.indexes import AllProjection, GlobalSecondaryIndex
from pynamodb.models import Model
"""
DbActor handles all db operations for an actor
Google datastore for google is used as a backend.
"""
logger = logging.getLogger(__name__)
[docs]
class CreatorIndex(GlobalSecondaryIndex[Any]):
"""
Secondary index on actor
"""
creator = UnicodeAttribute(hash_key=True)
[docs]
class Actor(Model):
"""
DynamoDB data model for an actor
"""
id = UnicodeAttribute(hash_key=True)
creator = UnicodeAttribute()
passphrase = UnicodeAttribute()
creator_index = CreatorIndex()
[docs]
class DbActor:
"""
DbActor does all the db operations for actor objects
"""
[docs]
def get(self, actor_id: str | None = None) -> dict[str, Any] | None:
"""Retrieves the actor from the database"""
if not actor_id:
return None
try:
self.handle = Actor.get(actor_id, consistent_read=True)
except Exception: # PynamoDB DoesNotExist exception
return None
if self.handle:
t = self.handle
return {
"id": t.id,
"creator": t.creator,
"passphrase": t.passphrase,
}
else:
return None
[docs]
def get_by_creator(
self, creator: str | None = None
) -> dict[str, Any] | list[dict[str, Any]] | None:
"""Retrieves the actor from db based on creator field
Returns None if none was found. If one is found, that one is
loaded in the object. If more, all are returned.
"""
if not creator:
return None
if "@" in creator:
creator = creator.lower()
self.handle = Actor.creator_index.query(creator)
ret = []
for c in self.handle:
logger.warning(" id (" + c.id + ")")
ret.append(self.get(actor_id=c.id))
return ret
[docs]
def modify(
self, creator: str | None = None, passphrase: bytes | None = None
) -> bool:
"""Modify an actor"""
if not self.handle:
logger.debug("Attempted modification of DbActor without db handle")
return False
if creator and len(creator) > 0:
# Email in creator needs to be lower case
if "@" in creator:
creator = creator.lower()
self.handle.creator = creator # type: ignore[attr-defined]
if passphrase and len(passphrase) > 0:
self.handle.passphrase = passphrase.decode("utf-8") # type: ignore[attr-defined]
self.handle.save() # type: ignore[attr-defined]
return True
[docs]
def create(
self,
actor_id: str | None = None,
creator: str | None = None,
passphrase: str | None = None,
) -> bool:
"""Create a new actor"""
if not actor_id:
return False
if not creator:
creator = ""
# Email in creator needs to be lower case
if "@" in creator:
creator = creator.lower()
if not passphrase:
passphrase = ""
if self.get(actor_id=actor_id):
logger.warning("Trying to create actor that exists(" + actor_id + ")")
return False
self.handle = Actor(id=actor_id, creator=creator, passphrase=passphrase)
self.handle.save()
return True
[docs]
def delete(self):
"""Deletes the actor in the database"""
if not self.handle:
logger.debug("Attempted delete of DbActor without db handle")
return False
self.handle.delete() # type: ignore[attr-defined]
self.handle = None
return True
def __init__(self):
self.handle = None
if not Actor.exists():
try:
Actor.create_table(wait=True)
except Exception as e:
# Handle race condition where another process created the table
# between our exists() check and create_table() call
if "ResourceInUseException" in str(e):
pass # Table was created by another process, continue
else:
raise
[docs]
class DbActorList:
"""
DbActorList does all the db operations for list of actor objects
"""
[docs]
def fetch(self):
"""Retrieves the actors in the database"""
self.handle = Actor.scan()
if self.handle:
ret = []
for t in self.handle:
ret.append(
{
"id": t.id,
"creator": t.creator,
}
)
return ret
else:
return False
def __init__(self):
self.handle = None