Source code for actingweb.oauth

from builtins import str
from builtins import object
try:
    from urllib.parse import urlencode as urllib_urlencode
except ImportError:
    from urllib import urlencode as urllib_urlencode
import logging
import json


# This function code is from latest urlfetch. For some reason the
# Appengine version of urlfetch does not include links()





[docs]class OAuth(object): def __init__(self, token=None, config=None): self.config = config self.token = token self.first = None self.next = None self.prev = None self.last_response_code = 0 self.last_response_message = ""
[docs] def enabled(self): if len(self.config.oauth['client_id']) == 0: return False else: return True
[docs] def set_token(self, token): if token: self.token = token
[docs] def post_request(self, url, params=None, urlencode=False): if params: if urlencode: data = urllib_urlencode(params) logging.debug('Oauth POST request with urlencoded payload: ' + url + ' ' + data) else: data = json.dumps(params) logging.debug('Oauth POST request with JSON payload: ' + url + ' ' + data) else: data = None logging.debug('Oauth POST request: ' + url) if urlencode: if self.token: headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Bearer ' + self.token, } else: headers = {'Content-Type': 'application/x-www-form-urlencoded', } else: if self.token: headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + self.token, } else: headers = {'Content-Type': 'application/json'} try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"].fetch( url=url, payload=data, method=self.config.module["urlfetch"].POST, headers=headers) else: response = self.config.module["urlfetch"].post(url=url, data=data, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): self.last_response_code = 0 self.last_response_message = 'No response' logging.warning("Oauth POST failed with exception") return None if response.status_code == 204: return {} if response.status_code != 200 and response.status_code != 201: logging.info('Error when sending POST request: ' + str(response.status_code) + str(response.content)) return None logging.debug('Oauth POST response JSON:' + str(response.content)) return json.loads(response.content.decode('utf-8', 'ignore'))
[docs] def put_request(self, url, params=None, urlencode=False): if params: if urlencode: data = urllib_urlencode(params) logging.debug('Oauth PUT request with urlencoded payload: ' + url + ' ' + data) else: data = json.dumps(params) logging.debug('Oauth PUT request with JSON payload: ' + url + ' ' + data) else: data = None logging.debug('Oauth PUT request: ' + url) if urlencode: if self.token: headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Authorization': 'Bearer ' + self.token, } else: headers = {'Content-Type': 'application/x-www-form-urlencoded', } else: if self.token: headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + self.token, } else: headers = {'Content-Type': 'application/json'} try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(20) response = self.config.module["urlfetch"]( url=url, payload=data, method=self.config.module["urlfetch"].PUT, headers=headers) else: response = self.config.module["urlfetch"].put(url=url, data=data, headers=headers) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): self.last_response_code = 0 self.last_response_message = 'No response' logging.warning("Oauth PUT failed with exception") return None if response.status_code == 204: return {} if response.status_code != 200 and response.status_code != 201: logging.info('Error when sending PUT request: ' + str(response.status_code) + str(response.content)) return None logging.debug('Oauth PUT response JSON:' + str(response.content)) return json.loads(response.content.decode('utf-8', 'ignore'))
[docs] def get_request(self, url, params=None): if not self.token: logging.debug("No token set in get_request()") return None if params: url = url + '?' + urllib_urlencode(params) logging.debug('Oauth GET request: ' + url) try: if self.config.env == 'appengine': self.config.module["urlfetch"].set_default_fetch_deadline(60) response = self.config.module["urlfetch"].fetch( url=url, method=self.config.module["urlfetch"].GET, headers={ 'Authorization': 'Bearer ' + self.token, } ) else: response = self.config.module["urlfetch"].get( url, headers={ 'Authorization': 'Bearer ' + self.token, } ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): self.last_response_code = 0 self.last_response_message = 'No response' logging.warning("Oauth GET failed with exception") return None if response.status_code < 200 or response.status_code > 299: logging.info('Error when sending GET request to Oauth: ' + str(response.status_code) + str(response.content)) return None links = pagination_links(response) self.next = None self.first = None self.prev = None for link in links: logging.debug('Links:' + link['rel'] + ':' + link['url']) if link['rel'] == 'next': self.next = link['url'] elif link['rel'] == 'first': self.first = link['url'] elif link['rel'] == 'prev': self.prev = link['url'] return json.loads(response.content.decode('utf-8', 'ignore'))
[docs] def head_request(self, url, params=None): if not self.token: logging.debug("No token set in head_request(()") return None if params: url = url + '?' + urllib_urlencode(params) logging.debug('Oauth HEAD request: ' + url) try: response = self.config.module["urlfetch"].head( url=url, headers={'Authorization': 'Bearer ' + self.token} ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): self.last_response_code = 0 self.last_response_message = 'No response' logging.warning("Oauth HEAD failed with exception") return None if response.status_code < 200 or response.status_code > 299: logging.warning('Error when sending HEAD request to Oauth: ' + str(response.status_code) + str(response.content)) return None return response.headers
[docs] def delete_request(self, url): if not self.token: return None logging.debug('Oauth DELETE request: ' + url) try: if self.config.env == 'appengine': response = self.config.module["urlfetch"].fetch(url=url, method=self.config.module["urlfetch"].DELETE, headers={'Authorization': 'Bearer ' + self.token} ) else: response = self.config.module["urlfetch"].delete( url=url, headers={'Authorization': 'Bearer ' + self.token} ) self.last_response_code = response.status_code self.last_response_message = response.content except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): logging.warning("Spark DELETE failed.") self.last_response_code = 0 self.last_response_message = 'No response' return None if response.status_code < 200 or response.status_code > 299: logging.info('Error when sending DELETE request to Oauth: ' + str(response.status_code) + str(response.content)) return None if response.status_code == 204: return {} try: ret = json.loads(response.content.decode('utf-8', 'ignore')) except (self.config.module["urlfetch"].UrlfetchException, self.config.module["urlfetch"].URLError, self.config.module["urlfetch"].Timeout, self.config.module["urlfetch"].TooManyRedirects): return {} return ret
[docs] def oauth_redirect_uri(self, state='', creator=None): params = { 'response_type': self.config.oauth['response_type'], 'client_id': self.config.oauth['client_id'], 'redirect_uri': self.config.oauth['redirect_uri'], 'scope': self.config.oauth['scope'], 'state': state, } if 'oauth_extras' in self.config.oauth: for k, v in self.config.oauth['oauth_extras'].items(): if isinstance(v, str) and 'dynamic:' in v: if v[8:] == 'creator' and creator: v = creator params[k] = v uri = self.config.oauth['auth_uri'] + "?" + urllib_urlencode(params) logging.debug('OAuth redirect with url: ' + uri + ' and state:' + state) return uri
[docs] def oauth_request_token(self, code=None): if not code: return None params = { 'grant_type': self.config.oauth['grant_type'], 'client_id': self.config.oauth['client_id'], 'client_secret': self.config.oauth['client_secret'], 'code': code, 'redirect_uri': self.config.oauth['redirect_uri'], } self.token = None result = self.post_request(url=self.config.oauth[ 'token_uri'], params=params, urlencode=True) if result and 'access_token' in result: self.token = result['access_token'] return result
[docs] def oauth_refresh_token(self, refresh_token): if not refresh_token: return None params = { 'grant_type': self.config.oauth['refresh_type'], 'client_id': self.config.oauth['client_id'], 'client_secret': self.config.oauth['client_secret'], 'refresh_token': refresh_token, } self.token = None result = self.post_request(url=self.config.oauth[ 'token_uri'], params=params, urlencode=True) if not result: self.token = None return None self.token = result['access_token'] return result