Source code for actingweb.actor

from __future__ import absolute_import
from builtins import str
from builtins import object
import datetime
import base64
import logging
import json
from actingweb import (property, trust, subscription, peertrustee, attribute)


[docs]class DummyPropertyClass: """ Only used to deprecate get_property() in 2.4.4 """ def __init__(self, v=None): self.value = v
[docs]class Actor(object): ################### # Basic operations ################### def __init__(self, actor_id=None, config=None): self.config = config self.property_list = None self.subs_list = None self.actor = None self.passphrase = None self.creator = None self.last_response_code = 0 self.last_response_message = '' self.id = actor_id self.handle = self.config.DbActor.DbActor() if actor_id and config: self.store = attribute.InternalStore(actor_id=actor_id, config=config) self.property = property.PropertyStore(actor_id=actor_id, config=config) else: self.store = None self.property = None self.get(actor_id=actor_id)
[docs] def get_peer_info(self, url: str) -> dict: """ Contacts an another actor over http/s to retrieve meta information :param url: Root URI of a remote actor :rtype: dict :return: The json response from the /meta path in the data element and last_response_code/last_response_message set to the results of the https request :Example: >>>{ >>> "last_response_code": 200, >>> "last_response_message": "OK", >>> "data":{} >>>} """ try: logging.debug('Getting peer info at url(' + url + ')') if self.config.env == "appengine": self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=url + '/meta') res = { "last_response_code": response.status_code, "last_response_message": response.content, "data": json.loads(response.content.decode('utf-8', 'ignore')), } logging.debug('Got peer info from url(' + url + ') with body(' + str(response.content) + ')') except (TypeError, ValueError, KeyError): res = { "last_response_code": 500, } return res
[docs] def get(self, actor_id: str = None) -> dict or None: """Retrieves an actor from storage or initialises if it does not exist""" if not actor_id and not self.id: return None elif not actor_id: actor_id = self.id if self.handle and self.actor and len(self.actor) > 0: return self.actor self.actor = self.handle.get(actor_id=actor_id) if self.actor and len(self.actor) > 0: self.id = self.actor["id"] self.creator = self.actor["creator"] self.passphrase = self.actor["passphrase"] self.store = attribute.InternalStore(actor_id=self.id, config=self.config) self.property = property.PropertyStore(actor_id=self.id, config=self.config) if self.config.force_email_prop_as_creator: em = self.store.email if self.config.migrate_2_5_0 and not em: em = self.property.email if em: self.store.email = em.lower() self.property.email = None if em and em.lower() != self.creator: self.modify(creator=em.lower()) else: self.id = None self.creator = None self.passphrase = None return self.actor
[docs] def get_from_property(self, name='oauthId', value=None): """ Initialise an actor by matching on a stored property. Use with caution as the property's value de-facto becomes a security token. If multiple properties are found with the same value, no actor will be initialised. Also note that this is a costly operation as all properties of this type will be retrieved and proceessed. """ actor_id = property.Property(name=name, value=value, config=self.config).get_actor_id() if not actor_id: self.id = None self.creator = None self.passphrase = None return self.get(actor_id=actor_id)
[docs] def get_from_creator(self, creator=None): """ Initialise an actor by matching on creator. If unique_creator config is False, then no actor will be initialised. Likewise, if multiple properties are found with the same value (due to earlier uniqueness off). """ self.id = None self.creator = None self.passphrase = None if not self.config.unique_creator: return False exists = self.config.DbActor.DbActor().get_by_creator(creator=creator) if len(exists) != 1: return False self.get(actor_id=exists[0]["id"]) return True
[docs] def create(self, url, creator, passphrase, actor_id=None, delete=False): """"Creates a new actor and persists it. If delete is True, any existing actors with same creator value will be deleted. If it is False, the one with the correct passphrase will be chosen (if any) """ seed = url now = datetime.datetime.utcnow() seed += now.strftime("%Y%m%dT%H%M%S%f") if len(creator) > 0: self.creator = creator else: self.creator = "creator" if self.config.unique_creator: in_db = self.config.DbActor.DbActor() exists = in_db.get_by_creator(creator=self.creator) if exists: # If uniqueness is turned on at a later point, we may have multiple accounts # with creator as "creator". Check if we have an internal value "email" and then # set creator to the email address. if delete: for c in exists: anactor = Actor(actor_id=c["id"], config=self.config) anactor.delete() else: if self.config.force_email_prop_as_creator and self.creator == "creator": for c in exists: anactor = Actor(actor_id=c["id"]) em = anactor.store.email if self.config.migrate_2_5_0 and not em: em = anactor.property.email if em: anactor.store.email = em.lower() anactor.property.email = None if em: anactor.modify(creator=em.lower()) for c in exists: if c['passphrase'] == passphrase: self.handle = in_db self.id = c['id'] self.passphrase = c['passphrase'] self.creator = c['creator'] return True return False if passphrase and len(passphrase) > 0: self.passphrase = passphrase else: self.passphrase = self.config.new_token() if actor_id: self.id = actor_id else: self.id = self.config.new_uuid(seed) if not self.handle: self.handle = self.config.DbActor.DbActor() self.handle.create(creator=self.creator, passphrase=self.passphrase, actor_id=self.id) self.store = attribute.InternalStore(actor_id=self.id, config=self.config) self.property = property.PropertyStore(actor_id=self.id, config=self.config) return True
[docs] def modify(self, creator=None): if not self.handle or not creator: logging.debug("Attempted modify of actor with no handle or no param changed") return False if '@' in creator: creator = creator.lower() self.creator = creator if self.actor: self.actor["creator"] = creator self.handle.modify(creator=creator) return True
[docs] def delete(self): """Deletes an actor and cleans up all relevant stored data""" if not self.handle: logging.debug("Attempted delete of actor with no handle") return False self.delete_peer_trustee(shorttype='*') if not self.property_list: self.property_list = property.Properties(actor_id=self.id, config=self.config) self.property_list.delete() subs = subscription.Subscriptions(actor_id=self.id, config=self.config) subs.fetch() subs.delete() trusts = trust.Trusts(actor_id=self.id, config=self.config) relationships = trusts.fetch() for rel in relationships: self.delete_reciprocal_trust(peerid=rel["peerid"], delete_peer=True) trusts.delete() buckets = attribute.Buckets(actor_id=self.id, config=self.config) buckets.delete() self.handle.delete()
###################### # Advanced operations ######################
[docs] def set_property(self, name, value): """Sets an actor's property name to value. (DEPRECATED, use actor's property store!) """ self.property[name] = value
[docs] def get_property(self, name): """Retrieves a property object named name. (DEPRECATED, use actor's property store!) """ return DummyPropertyClass(self.property[name])
[docs] def delete_property(self, name): """Deletes a property name. (DEPRECATED, use actor's property store!)""" self.property[name] = None
[docs] def delete_properties(self): """Deletes all properties.""" if not self.property_list: self.property_list = property.Properties(actor_id=self.id, config=self.config) return self.property_list.delete()
[docs] def get_properties(self): """Retrieves properties from db and returns a dict.""" self.property_list = property.Properties(actor_id=self.id, config=self.config) return self.property_list.fetch()
[docs] def delete_peer_trustee(self, shorttype=None, peerid=None): if not peerid and not shorttype: return False if shorttype == '*': for t in self.config.actors: self.delete_peer_trustee(shorttype=t) return True if shorttype and not self.config.actors[shorttype]: logging.error('Got a request to delete an unknown actor type(' + shorttype + ')') return False peer_data = None new_peer = None if peerid: new_peer = peertrustee.PeerTrustee(actor_id=self.id, peerid=peerid, config=self.config) peer_data = new_peer.get() if not peer_data or len(peer_data) == 0: return False elif shorttype: new_peer = peertrustee.PeerTrustee(actor_id=self.id, short_type=shorttype, config=self.config) peer_data = new_peer.get() if not peer_data or len(peer_data) == 0: return False logging.debug( 'Deleting peer actor at baseuri(' + peer_data["baseuri"] + ')') u_p = b'trustee:' + peer_data["passphrase"].encode('utf-8') headers = {'Authorization': 'Basic ' + base64.b64encode(u_p).decode('utf-8'), } try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=peer_data["baseuri"], method=self.config.module["urlfetch"].DELETE, headers=headers ) else: response = self.config.module["urlfetch"].delete(url=peer_data["baseuri"], headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug('Not able to delete peer actor remotely due to network issues') self.last_response_code = 408 return False if response.status_code < 200 or response.status_code > 299: logging.debug('Not able to delete peer actor remotely, peer is unwilling') return False # Delete trust, peer is already deleted remotely if not self.delete_reciprocal_trust(peerid=peer_data["peerid"], delete_peer=False): logging.debug('Not able to delete peer actor trust in db') if not new_peer.delete(): logging.debug('Not able to delete peer actor in db') return False return True
[docs] def get_peer_trustee(self, shorttype=None, peerid=None): """ Get a peer, either existing or create it as trustee Will retrieve an existing peer or create a new and establish trust. If no trust exists, a new trust will be established. Use either peerid to target a specific known peer, or shorttype to allow creation of a new peer if none exists """ if not peerid and not shorttype: return None if shorttype and not self.config.actors[shorttype]: logging.error('Got a request to create an unknown actor type(' + shorttype + ')') return None if peerid: new_peer = peertrustee.PeerTrustee(actor_id=self.id, peerid=peerid, config=self.config) else: new_peer = peertrustee.PeerTrustee(actor_id=self.id, short_type=shorttype, config=self.config) peer_data = new_peer.get() if peer_data and len(peer_data) > 0: logging.debug('Found peer in getPeer, now checking existing trust...') dbtrust = trust.Trust(actor_id=self.id, peerid=peer_data["peerid"], config=self.config) new_trust = dbtrust.get() if new_trust and len(new_trust) > 0: return peer_data logging.debug('Did not find existing trust, will create a new one') factory = self.config.actors[shorttype]['factory'] # If peer did not exist, create it as trustee if not peer_data or len(peer_data) == 0: if len(factory) == 0: logging.error('Peer actor of shorttype(' + shorttype + ') does not have factory set.') params = { 'creator': 'trustee', 'trustee_root': self.config.root + self.id } data = json.dumps(params) logging.debug( 'Creating peer actor at factory(' + factory + ') with data(' + str(data) + ')') response = None try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=factory, method=self.config.module["urlfetch"].POST, payload=data ) else: response = self.config.module["urlfetch"].post( url=factory, data=data, headers={ 'Content-Type': 'application/json' } ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug('Not able to create new peer actor') self.last_response_code = 408 logging.debug('Create peer actor POST response:' + str(self.last_response_code)) if self.last_response_code < 200 or self.last_response_code > 299: return None try: data = json.loads(response.content.decode('utf-8', 'ignore')) except (TypeError, ValueError, KeyError): logging.warning("Not able to parse response when creating peer at factory(" + factory + ")") return None if 'Location' in response.headers: baseuri = response.headers['Location'] elif 'location' in response.headers: baseuri = response.headers['location'] else: logging.warning("No location uri found in response when creating a peer as trustee") baseuri = "" res = self.get_peer_info(baseuri) if not res or res["last_response_code"] < 200 or res["last_response_code"] >= 300: return None info_peer = res['data'] if not info_peer or ('id' in info_peer and not info_peer["id"])\ or ('type' in info_peer and not info_peer["type"]): logging.info( 'Received invalid peer info when trying to create peer actor at: ' + str(factory)) return None new_peer = peertrustee.PeerTrustee(actor_id=self.id, peerid=info_peer["id"], peer_type=info_peer["type"], config=self.config) if not new_peer.create(baseuri=baseuri, passphrase=data["passphrase"]): logging.error('Failed to create in db new peer Actor(' + self.id + ') at ' + baseuri) return None # Now peer exists, create trust new_peer_data = new_peer.get() new_trust = self.create_reciprocal_trust( url=new_peer_data["baseuri"], secret=self.config.new_token(), desc='Trust from trustee to ' + shorttype, relationship=self.config.actors[shorttype]['relationship'] ) if not new_trust or len(new_trust) == 0: logging.warning("Not able to establish trust relationship with peer at factory(" + factory + ")") else: # Approve the relationship params = { 'approved': True, } u_p = b'trustee:' + new_peer_data["passphrase"].encode('utf-8') headers = {'Authorization': 'Basic ' + base64.b64encode(u_p).decode('utf-8'), 'Content-Type': 'application/json', } data = json.dumps(params) try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=new_peer_data["baseuri"] + '/trust/' + self.config.actors[shorttype]['relationship'] + '/' + self.id, method=self.config.module["urlfetch"].PUT, payload=data, headers=headers ) else: response = self.config.module["urlfetch"].put( url=new_peer_data["baseuri"] + '/trust/' + self.config.actors[shorttype]['relationship'] + '/' + self.id, data=data, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): self.last_response_code = 408 self.last_response_message = 'Not able to approve peer actor trust remotely' if self.last_response_code < 200 or self.last_response_code > 299: logging.debug('Not able to delete peer actor remotely') return new_peer_data
[docs] def get_trust_relationship(self, peerid=None): if not peerid: return None return trust.Trust(actor_id=self.id, peerid=peerid, config=self.config).get()
[docs] def get_trust_relationships(self, relationship='', peerid='', trust_type=''): """Retrieves all trust relationships or filtered.""" trust_list = trust.Trusts(actor_id=self.id, config=self.config) relationships = trust_list.fetch() rels = [] for rel in relationships: if len(relationship) > 0 and relationship != rel["relationship"]: continue if len(peerid) > 0 and peerid != rel["peerid"]: continue if len(trust_type) > 0 and trust_type != rel["type"]: continue rels.append(rel) return rels
[docs] def modify_trust_and_notify(self, relationship=None, peerid=None, baseuri='', secret='', desc='', approved=None, verified=None, verification_token=None, peer_approved=None): """Changes a trust relationship and noties the peer if approval is changed.""" if not relationship or not peerid: return False relationships = self.get_trust_relationships( relationship=relationship, peerid=peerid) if not relationships: return False this_trust = relationships[0] headers = dict() # If we change approval status, send the changed status to our peer if approved is True and this_trust["approved"] is False: params = { 'approved': True, } requrl = this_trust["baseuri"] + '/trust/' + relationship + '/' + self.id if this_trust["secret"]: headers = {'Authorization': 'Bearer ' + this_trust["secret"], 'Content-Type': 'application/json', } data = json.dumps(params) # Note the POST here instead of PUT. POST is used to used to notify about # state change in the relationship (i.e. not change the object as PUT # would do) logging.debug( 'Trust relationship has been approved, notifying peer at url(' + requrl + ')') try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=requrl, method=self.config.module["urlfetch"].POST, payload=data, headers=headers ) else: response = self.config.module["urlfetch"].post( url=requrl, data=data, headers=headers ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug('Not able to notify peer at url(' + requrl + ')') self.last_response_code = 500 dbtrust = trust.Trust(actor_id=self.id, peerid=peerid, config=self.config) return dbtrust.modify(baseuri=baseuri, secret=secret, desc=desc, approved=approved, verified=verified, verification_token=verification_token, peer_approved=peer_approved )
[docs] def create_reciprocal_trust(self, url, secret=None, desc='', relationship='', trust_type=''): """Creates a new reciprocal trust relationship locally and by requesting a relationship from a peer actor.""" if len(url) == 0: return False if not secret or len(secret) == 0: return False res = self.get_peer_info(url) if not res or res["last_response_code"] < 200 or res["last_response_code"] >= 300: return False peer = res["data"] if not peer["id"] or not peer["type"] or len(peer["type"]) == 0: logging.info( "Received invalid peer info when trying to establish trust: " + url) return False if len(trust_type) > 0: if trust_type.lower() != peer["type"].lower(): logging.info( "Peer is of the wrong actingweb type: " + peer["type"]) return False if not relationship or len(relationship) == 0: relationship = self.config.default_relationship # Create trust, so that peer can do a verify on the relationship (using # verification_token) when we request the relationship dbtrust = trust.Trust(actor_id=self.id, peerid=peer["id"], config=self.config) if not dbtrust.create(baseuri=url, secret=secret, peer_type=peer["type"], relationship=relationship, approved=True, verified=False, desc=desc): logging.warning("Trying to establish a new Reciprocal trust when peer relationship already exists (" + peer["id"] + ")") return False # Since we are initiating the relationship, we implicitly approve it # It is not verified until the peer has verified us new_trust = dbtrust.get() params = { 'baseuri': self.config.root + self.id, 'id': self.id, 'type': self.config.aw_type, 'secret': secret, 'desc': desc, 'verify': new_trust["verification_token"], } requrl = url + '/trust/' + relationship data = json.dumps(params) logging.debug('Creating reciprocal trust at url(' + requrl + ') and body (' + str(data) + ')') try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=requrl, method=self.config.module["urlfetch"].POST, payload=data, headers={ 'Content-Type': 'application/json', } ) else: response = self.config.module["urlfetch"].post( url=requrl, data=data, headers={ 'Content-Type': 'application/json', } ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug( "Not able to create trust with peer, deleting my trust.") dbtrust.delete() return False if self.last_response_code == 201 or self.last_response_code == 202: # Reload the trust to check if approval was done mod_trust = trust.Trust(actor_id=self.id, peerid=peer["id"], config=self.config) mod_trust_data = mod_trust.get() if not mod_trust_data or len(mod_trust_data) == 0: logging.error( "Couldn't find trust relationship after peer POST and verification") return False if self.last_response_code == 201: # Already approved by peer (probably auto-approved) # Do it direct on the trust (and not self.modifyTrustAndNotify) to avoid a callback # to the peer mod_trust.modify(peer_approved=True) return mod_trust.get() else: logging.debug( "Not able to create trust with peer, deleting my trust.") dbtrust.delete() return False
[docs] def create_verified_trust(self, baseuri='', peerid=None, approved=False, secret=None, verification_token=None, trust_type=None, peer_approved=None, relationship=None, desc=''): """Creates a new trust when requested and call backs to initiating actor to verify relationship.""" if not peerid or len(baseuri) == 0 or not relationship: return False requrl = baseuri + '/trust/' + relationship + '/' + self.id if not secret or len(secret) == 0: logging.debug('No secret received from requesting peer(' + peerid + ') at url (' + requrl + '). Verification is not possible.') verified = False else: headers = {'Authorization': 'Bearer ' + secret, } logging.debug('Verifying trust at requesting peer(' + peerid + ') at url (' + requrl + ') and secret(' + secret + ')') try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=requrl, method=self.config.module["urlfetch"].GET, headers=headers) else: response = self.config.module["urlfetch"].get(url=requrl, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content try: logging.debug( 'Verifying trust response JSON:' + str(response.content)) data = json.loads(response.content.decode('utf-8', 'ignore')) if data["verification_token"] == verification_token: verified = True else: verified = False except ValueError: logging.debug( 'No json body in response when verifying trust at url(' + requrl + ')') verified = False except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug( 'No response when verifying trust at url' + requrl + ')') verified = False new_trust = trust.Trust(actor_id=self.id, peerid=peerid, config=self.config) if not new_trust.create(baseuri=baseuri, secret=secret, peer_type=trust_type, approved=approved, peer_approved=peer_approved, relationship=relationship, verified=verified, desc=desc): return False else: return new_trust.get()
[docs] def delete_reciprocal_trust(self, peerid=None, delete_peer=False): """Deletes a trust relationship and requests deletion of peer's relationship as well.""" failed_once = False # For multiple relationships, this will be True if at least one deletion at peer failed success_once = False # True if at least one relationship was deleted at peer if not peerid: rels = self.get_trust_relationships() else: rels = self.get_trust_relationships(peerid=peerid) for rel in rels: if delete_peer: url = rel["baseuri"] + '/trust/' + rel["relationship"] + '/' + self.id headers = {} if rel["secret"]: headers = {'Authorization': 'Bearer ' + rel["secret"], } logging.debug( 'Deleting reciprocal relationship at url(' + url + ')') try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=url, method=self.config.module["urlfetch"].DELETE, headers=headers) else: response = self.config.module["urlfetch"].delete(url=url, headers=headers) except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug( 'Failed to delete reciprocal relationship at url(' + url + ')') failed_once = True continue if (response.status_code < 200 or response.status_code > 299) and response.status_code != 404: logging.debug( 'Failed to delete reciprocal relationship at url(' + url + ')') failed_once = True continue else: success_once = True if not self.subs_list: self.subs_list = subscription.Subscriptions(actor_id=self.id, config=self.config).fetch() # Delete this peer's subscriptions for sub in self.subs_list: if sub["peerid"] == rel["peerid"]: logging.debug("Deleting subscription(" + sub["subscriptionid"] + ") as part of trust deletion.") sub_obj = self.get_subscription_obj(peerid=sub["peerid"], subid=sub["subscriptionid"], callback=sub["callback"]) sub_obj.delete() dbtrust = trust.Trust(actor_id=self.id, peerid=rel["peerid"], config=self.config) dbtrust.delete() if delete_peer and (not success_once or failed_once): return False return True
[docs] def create_subscription(self, peerid=None, target=None, subtarget=None, resource=None, granularity=None, subid=None, callback=False): new_sub = subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config) new_sub.create(target=target, subtarget=subtarget, resource=resource, granularity=granularity) return new_sub.get()
[docs] def create_remote_subscription(self, peerid=None, target=None, subtarget=None, resource=None, granularity=None): """Creates a new subscription at peerid.""" if not peerid or not target: return False relationships = self.get_trust_relationships(peerid=peerid) if not relationships: return False peer = relationships[0] params = { 'id': self.id, 'target': target, } if subtarget: params['subtarget'] = subtarget if resource: params['resource'] = resource if granularity and len(granularity) > 0: params['granularity'] = granularity requrl = peer["baseuri"] + '/subscriptions/' + self.id data = json.dumps(params) headers = {'Authorization': 'Bearer ' + peer["secret"], 'Content-Type': 'application/json', } try: logging.debug('Creating remote subscription at url(' + requrl + ') with body (' + str(data) + ')') if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=requrl, method=self.config.module["urlfetch"].POST, payload=data, headers=headers ) else: response = self.config.module["urlfetch"].post(url=requrl, data=data, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): return None try: logging.debug('Created remote subscription at url(' + requrl + ') and got JSON response (' + str(response.content) + ')') data = json.loads(response.content.decode('utf-8', 'ignore')) except ValueError: return None if 'subscriptionid' in data: subid = data["subscriptionid"] else: return None if self.last_response_code == 201: self.create_subscription(peerid=peerid, target=target, subtarget=subtarget, resource=resource, granularity=granularity, subid=subid, callback=True) if 'Location' in response.headers: return response.headers['Location'] elif 'location' in response.headers: return response.headers['location'] else: return None
[docs] def get_subscriptions(self, peerid=None, target=None, subtarget=None, resource=None, callback=False): """Retrieves subscriptions from db.""" if not self.id: return None if not self.subs_list: self.subs_list = subscription.Subscriptions(actor_id=self.id, config=self.config).fetch() ret = [] for sub in self.subs_list: if not peerid or (peerid and sub["peerid"] == peerid): if not target or (target and sub["target"] == target): if not subtarget or (subtarget and sub["subtarget"] == subtarget): if not resource or (resource and sub["resource"] == resource): if not callback or (callback and sub["callback"] == callback): ret.append(sub) return ret
[docs] def get_subscription(self, peerid=None, subid=None, callback=False): """Retrieves a single subscription identified by peerid and subid.""" if not subid: return False return subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config).get()
[docs] def get_subscription_obj(self, peerid=None, subid=None, callback=False): """Retrieves a single subscription identified by peerid and subid.""" if not subid: return False return subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config)
[docs] def delete_remote_subscription(self, peerid=None, subid=None): if not subid or not peerid: return False trust_rel = self.get_trust_relationship(peerid=peerid) if not trust_rel: return False sub = self.get_subscription(peerid=peerid, subid=subid) if not sub: sub = self.get_subscription(peerid=peerid, subid=subid, callback=True) if 'callback' not in sub or not sub["callback"]: url = trust_rel["baseuri"] + '/subscriptions/' + self.id + '/' + subid else: url = trust_rel["baseuri"] + '/callbacks/subscriptions/' + self.id + '/' + subid headers = {'Authorization': 'Bearer ' + trust_rel["secret"], } try: logging.debug('Deleting remote subscription at url(' + url + ')') if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=url, method=self.config.module["urlfetch"].DELETE, headers=headers) else: response = self.config.module["urlfetch"].delete(url=url, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content if response.status_code == 204: return True else: logging.debug( 'Failed to delete remote subscription at url(' + url + ')') return False except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): return False
[docs] def delete_subscription(self, peerid=None, subid=None, callback=False): """Deletes a specified subscription""" if not subid: return False sub = subscription.Subscription( actor_id=self.id, peerid=peerid, subid=subid, callback=callback, config=self.config) return sub.delete()
[docs] def callback_subscription(self, peerid=None, sub_obj=None, sub=None, diff=None, blob=None): if not peerid or not diff or not sub or not blob: logging.warning("Missing parameters in callbackSubscription") return if "granularity" in sub and sub["granularity"] == "none": return trust_rel = self.get_trust_relationship(peerid) if not trust_rel: return params = { 'id': self.id, 'subscriptionid': sub["subscriptionid"], 'target': sub["target"], 'sequence': diff["sequence"], 'timestamp': str(diff["timestamp"]), 'granularity': sub["granularity"], } if sub["subtarget"]: params['subtarget'] = sub["subtarget"] if sub["resource"]: params['resource'] = sub["resource"] if sub["granularity"] == "high": try: params['data'] = json.loads(blob) except (TypeError, ValueError, KeyError): params['data'] = blob if sub["granularity"] == "low": params['url'] = self.config.root + self.id + '/subscriptions/' + \ trust_rel["peerid"] + '/' + sub["subscriptionid"] + '/' + str(diff["sequence"]) requrl = trust_rel["baseuri"] + '/callbacks/subscriptions/' + self.id + '/' + sub["subscriptionid"] data = json.dumps(params) headers = {'Authorization': 'Bearer ' + trust_rel["secret"], 'Content-Type': 'application/json', } try: logging.debug('Doing a callback on subscription at url(' + requrl + ') with body(' + str(data) + ')') if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch(url=requrl, method=self.config.module["urlfetch"].POST, payload=data.encode('utf-8'), headers=headers ) else: response = self.config.module["urlfetch"].post( url=requrl, data=data.encode('utf-8'), headers=headers ) except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.debug( 'Peer did not respond to callback on url(' + requrl + ')') self.last_response_code = 0 self.last_response_message = 'No response from peer for subscription callback' return self.last_response_code = response.status_code self.last_response_message = response.content if response.status_code == 204 and sub["granularity"] == "high": if not sub_obj: logging.warning("About to clear diff without having subobj set") else: sub_obj.clear_diff(diff["sequence"])
[docs] def register_diffs(self, target=None, subtarget=None, resource=None, blob=None): """Registers a blob diff against all subscriptions with the correct target, subtarget, and resource. If resource is set, the blob is expected to be the FULL resource object, not a diff. """ if blob is None or not target: return # Get all subscriptions, both with the specific subtarget/resource and those # without subs = self.get_subscriptions( target=target, subtarget=None, resource=None, callback=False) if not subs: subs = [] if subtarget and resource: logging.debug("register_diffs() - blob(" + blob + "), target(" + target + "), subtarget(" + subtarget + "), resource(" + resource + "), # of subs(" + str(len(subs)) + ")") elif subtarget: logging.debug("register_diffs() - blob(" + blob + "), target(" + target + "), subtarget(" + subtarget + "), # of subs(" + str(len(subs)) + ")") else: logging.debug("register_diffs() - blob(" + blob + "), target(" + target + "), # of subs(" + str(len(subs)) + ")") for sub in subs: # Skip the ones without correct subtarget if subtarget and sub["subtarget"] and sub["subtarget"] != subtarget: logging.debug(" - no match on subtarget, skipping...") continue # Skip the ones without correct resource if resource and sub["resource"] and sub["resource"] != resource: logging.debug(" - no match on resource, skipping...") continue sub_obj = self.get_subscription_obj(peerid=sub["peerid"], subid=sub["subscriptionid"]) sub_obj_data = sub_obj.get() logging.debug(" - processing subscription(" + sub["subscriptionid"] + ") for peer(" + sub["peerid"] + ") with target(" + sub_obj_data["target"] + ") subtarget(" + str(sub_obj_data["subtarget"] or '') + ") and resource(" + str(sub_obj_data["resource"] or '') + ")") # Subscription with a resource, but this diff is on a higher level if (not resource or not subtarget) and sub_obj_data["subtarget"] and sub_obj_data["resource"]: # Create a json diff on the subpart that this subscription # covers try: jsonblob = json.loads(blob) if not subtarget: subblob = json.dumps(jsonblob[sub_obj_data["subtarget"]][sub_obj_data["resource"]]) else: subblob = json.dumps(jsonblob[sub_obj_data["resource"]]) except (TypeError, ValueError, KeyError): # The diff does not contain the resource logging.debug(" - subscription has resource(" + sub_obj_data["resource"] + "), no matching blob found in diff") continue logging.debug(" - subscription has resource(" + sub_obj_data["resource"] + "), adding diff(" + subblob + ")") finblob = subblob # The diff is on the resource, but the subscription is on a # higher level elif resource and not sub_obj_data["resource"]: # Since we have a resource, we know the blob is the entire resource, not a diff # If the subscription is for a sub-target, send [resource] = blob # If the subscription is for a target, send [subtarget][resource] = blob upblob = {} try: jsonblob = json.loads(blob) if not sub_obj_data["subtarget"]: upblob[subtarget] = {} upblob[subtarget][resource] = jsonblob else: upblob[resource] = jsonblob except (TypeError, ValueError, KeyError): if not sub_obj_data["subtarget"]: upblob[subtarget] = {} upblob[subtarget][resource] = blob else: upblob[resource] = blob finblob = json.dumps(upblob) logging.debug(" - diff has resource(" + resource + "), subscription has not, adding diff(" + finblob + ")") # Subscriptions with subtarget, but this diff is on a higher level elif not subtarget and sub_obj_data["subtarget"]: # Create a json diff on the subpart that this subscription # covers subblob = None try: jsonblob = json.loads(blob) subblob = json.dumps(jsonblob[sub_obj_data["subtarget"]]) except (TypeError, ValueError, KeyError): # The diff blob does not contain the subtarget pass logging.debug(" - subscription has subtarget(" + sub_obj_data["subtarget"] + "), adding diff(" + subblob + ")") finblob = subblob # The diff is on the subtarget, but the subscription is on the # higher level elif subtarget and not sub_obj_data["subtarget"]: # Create a data["subtarget"] = blob diff to give correct level # of diff to subscriber upblob = {} try: jsonblob = json.loads(blob) upblob[subtarget] = jsonblob except (TypeError, ValueError, KeyError): upblob[subtarget] = blob finblob = json.dumps(upblob) logging.debug(" - diff has subtarget(" + subtarget + "), subscription has not, adding diff(" + finblob + ")") else: # The diff is correct for the subscription logging.debug( " - exact target/subtarget match, adding diff(" + blob + ")") finblob = blob diff = sub_obj.add_diff(blob=finblob) if not diff: logging.warning("Failed when registering a diff to subscription (" + sub["subscriptionid"] + "). Will not send callback.") else: if self.config.module["deferred"]: self.config.module["deferred"].defer(self.callback_subscription, peerid=sub["peerid"], sub_obj=sub_obj, sub=sub_obj_data, diff=diff, blob=finblob) else: self.callback_subscription(peerid=sub["peerid"], sub_obj=sub_obj, sub=sub_obj_data, diff=diff, blob=finblob)
[docs]class Actors(object): """ Handles all actors """
[docs] def fetch(self): if not self.list: return False if self.actors is not None: return self.actors self.actors = self.list.fetch() return self.actors
def __init__(self, config=None): self.config = config self.list = self.config.DbActor.DbActorList() self.actors = None self.fetch()