Source code for oauthsub.auth_service

"""
This lightweight web service performs authentication. All requests that reach
this service should be proxied through nginx.

See: https://developers.google.com/api-client-library/python/auth/web-app
"""

from __future__ import print_function
from __future__ import unicode_literals

import inspect
import os
import json
import logging
import logging.handlers
import sys

import flask
import jinja2
from requests_oauthlib import OAuth2Session

import oauthsub
from oauthsub import util

logger = logging.getLogger("oauthsub")

if sys.version_info < (3, 0, 0):
  # pylint: disable=E1101
  import urllib as parse
else:
  # pylint: disable=E1101
  from urllib import parse


[docs]def strip_settings(settings_dict): """ Return a copy of the settings dictionary including only the kwargs expected by OAuth2Session """ # NOTE(josh): args[0] is `self` if sys.version_info < (3, 5, 0): # pylint: disable=W1505 fields = inspect.getargspec(OAuth2Session.__init__).args[1:] else: sig = getattr(inspect, 'signature')(OAuth2Session.__init__) fields = [field for field, _ in list(sig.parameters.items())[1:-1]] return {k: v for k, v in settings_dict.items() if k in fields}
[docs]def login(): """ The login page. Start of the oauth dance. Construct a flow, get redirect, bounce the user. """ app = flask.current_app if app.session_get('user') is not None: return app.render_message("You are already logged in as {}", app.session_get('user')) provider = flask.request.args.get("provider") if provider is None: return flask.make_response( app.render_message("No provider!"), 403, {}) if provider not in app.app_config.client_secrets: message = "Invalid provider: {}".format(provider) html = app.render_message(message) response = flask.make_response(html, 403, {}) return response app.session_set( "original_uri", flask.request.args.get("original_uri")) logger.debug("Requesting auth from provider: %s", provider) settings = app.app_config.client_secrets[provider] kwargs = strip_settings(settings) client = OAuth2Session(**kwargs) auth_uri, csrf_token = client.authorization_url( settings["authorize_uri"], prompt="login") app.session_set("csrf_token", csrf_token) return flask.redirect(auth_uri)
[docs]def logout(): """ Delete the user's session, effectively logging them out. """ app = flask.current_app flask.session.clear() return app.render_message('Logged out')
[docs]def callback(): """ Handle oauth bounce-back. """ app = flask.current_app # If we didn't received a 'code' in the query parameters then this # definately not a redirect back from google. Assume this is a user meaning # to use the /login endpoint and punt them to the start of the dance. if 'code' not in flask.request.args: return app.login() if 'provider' not in flask.request.args: return app.login() provider = flask.request.args.get("provider") if provider not in app.app_config.client_secrets: message = "Invalid provider: {}".format(provider) html = app.render_message(message) response = flask.make_response(html, 403, {}) return response logger.debug("Fetching token from provider: %s", provider) settings = app.app_config.client_secrets[provider] kwargs = strip_settings(settings) kwargs["state"] = app.session_get("csrf_token") client = OAuth2Session(**kwargs) # Exchange the code that the provider gave us for an actual credentials # object, and store those credentials in the session for this user. kwargs = {key: settings[key] for key in ("token_uri", "client_secret")} kwargs["token_url"] = kwargs.pop("token_uri", None) kwargs["authorization_response"] = flask.request.url token = client.fetch_token(**kwargs) # Use the credentials that we have in order to get the users information # from the provider. We only need one request to get the user's email # address and name. if provider == "google": request_url = "https://www.googleapis.com/userinfo/v2/me?alt=json" elif provider == "github": request_url = "https://api.github.com/user" else: message = 'Invalid provider: {}'.format(provider) return flask.make_response(app.render_message(message), 401, {}) response = client.get(request_url) if response.status_code != 200: message = 'Failed to query {}: [{}]'.format(provider, response.status) return flask.make_response(app.render_message(message), 500, {}) # We'll store the users email, name, and 'given_name' from the provider's # reponse. This is just to help the user understand which identity # they currently have authenticated against. content_str = response.content.decode("utf-8") parsed_content = json.loads(content_str) # If the user logged in with an email domain other than <??> then we want # to warn them that they are probably not doing what they wanted to do. # TODO(josh): move into google-specific auth function if (app.app_config.allowed_domains and (parsed_content.get('hd') not in app.app_config.allowed_domains)): content = app.render_message('You did not login with the right account!') return flask.make_response(content, 401, {}) username = app.app_config.user_lookup(provider, parsed_content) if username is None: logger.warning("user lookup failed: %s", json.dumps(parsed_content, indent=2, sort_keys=True)) content = "Failed user lookup" return flask.make_response(content, 401, {}) app.session_set('user', username) app.session_set("token", json.dumps(token, indent=2, sort_keys=True)) # At this point the user is authed for key in ['email', 'name', 'given_name']: app.session_set(key, parsed_content.get(key, "unknown")) # If we are logging-in due to attempt to access an auth-requiring page, # then go to back to that page original_uri = app.session_get('original_uri', None) if original_uri is None: logger.info('Finished auth, no original_uri in request') return flask.redirect(app.app_config.rooturl) logger.debug('Finished auth, redirecting to: %s', original_uri) return flask.redirect(app.app_config.rooturl + original_uri)
[docs]def query_auth(): """ This is the main endpoint used by nginx to check authorization. If this is an nginx request the X-Original-URI will be passed as an http header. """ app = flask.current_app original_uri = flask.request.headers.get('X-Original-URI') if original_uri: logger.debug('Doing auth for original URI: %s, session %s', original_uri, flask.session.get('id', None)) # If bypass key is present and matches configured, then bypass the # auth-check and assume the user identity if app.app_config.bypass_key is not None: if ('X-OAuthSub-Bypass-Key' in flask.request.headers and 'X-OAuthSub-Bypass-User' in flask.request.headers): logger.debug("bypass headers are present") if(flask.request.headers['X-OAuthSub-Bypass-Key'] == app.app_config.bypass_key): username = flask.request.headers["X-OAuthSub-Bypass-User"] logger.debug("admin bypass, setting user to %s", username) app.session_set("user", username) else: logger.warning("admin bypass key doesn't match") # NOTE(josh): we don't do any whitelisting here, we'll let the nginx # config decide which urls to request auth for if app.session_get('user', None) is not None: response = flask.make_response("", 200, {}) if app.app_config.response_header: response.headers[app.app_config.response_header] \ = app.session_get('user') return response # NOTE(josh): since nginx will return a 401, it will not pass the # Set-Cookie header to the client. This session will not be associated # with the client unless they already have a cookie for this site. # There's not much point in dealing with the X-Original-URI here since # we can't realiably maintain any context. return flask.make_response("", 401, {}) flask.abort(401) return None
[docs]def forbidden(): """ The page served when a user isn't authorized. We'll just set the return path if it's available and then kick them through oauth2. """ app = flask.current_app original_uri = flask.request.headers.get('X-Original-URI') logger.info('Serving forbidden, session %s, original uri: %s', flask.session.get('id', None), original_uri) # NOTE(josh): it seems we can't do a redirect from the 401 page, or else it # might be on the browser side, but we get stuck at some google text saying # that the page should automatically redirect but it doesn't. Let's just # print the message and let them login. If they login it will return them # to where they wanted to go in the first place. if original_uri is not None and original_uri.endswith("favicon.ico"): return flask.make_response("", 401, {}) html = app.render_message('Permission denied. Are you logged in?', original_uri=original_uri) return flask.make_response(html)
[docs]def get_session(): """ Return the user's session as a json object. Can be used to retrieve user identity within other frontend services, or for debugging. """ app = flask.current_app session_dict = { key: app.session_get(key) for key in ('email', 'name', 'given_name', 'user') } return flask.jsonify(session_dict)
[docs]class Application(flask.Flask): """ Main application context. Exists as a class to keep things local... even though flask is all about the global state. """ def __init__(self, app_config): """ Configure jinja, beaker, etc. """ super(Application, self).__init__("oauthsub") # TODO(josh): validate config.client_secrets self.app_config = app_config # TODO(josh): move this to main() and pass in the template loader module_path = os.path.dirname(oauthsub.__file__) zipfile_path, package_path = util.get_zipfile_path(module_path) if self.app_config.custom_template: logger.info('Using FilesystemLoader for templates') template_loader = jinja2.FileSystemLoader( os.path.dirname(self.app_config.custom_template)) elif zipfile_path: logger.info('Using ZipfileLoader for templates') template_loader = util.ZipfileLoader(zipfile_path, package_path + '/templates') else: logger.info('Using PackageLoader for templates') template_loader = jinja2.PackageLoader('oauthsub', 'templates') self.jinja = jinja2.Environment(loader=template_loader) self.jinja.globals.update(url_encode=parse.quote_plus) self.secret_key = app_config.flask_privkey self.debug = app_config.flask_debug for key, value in app_config.flaskopt.items(): self.config[key] = value self.add_url_rule('{}/login'.format(app_config.route_prefix), 'login', login) self.add_url_rule('{}/logout'.format(app_config.route_prefix), 'logout', logout) self.add_url_rule('{}/callback'.format(app_config.route_prefix), 'callback', callback) self.add_url_rule('{}/get_session'.format(app_config.route_prefix), 'get_session', get_session) self.add_url_rule('{}/query_auth'.format(app_config.route_prefix), 'query_auth', query_auth) if app_config.enable_forbidden: self.add_url_rule('{}/forbidden'.format(app_config.route_prefix), 'forbidden', forbidden)
[docs] def route(self, rule, **options): return super(Application, self).route( "{}/{}".format(self.app_config.route_prefix, rule), **options)
[docs] def render_message(self, message, *args, **kwargs): # pylint: disable=no-member original_uri = kwargs.pop("original_uri", None) tplargs = { "session": flask.session, "message": message.format(*args, **kwargs), "providers": sorted(self.app_config.client_secrets.keys()), "original_uri": original_uri, "route_prefix": self.app_config.route_prefix } if self.app_config.custom_template: template = os.path.basename(self.app_config.custom_template) else: template = "message.html.tpl" return self.jinja.get_template(template).render(**tplargs)
[docs] def session_get(self, key, default=None): """ Return the value of the session variable `key`, using the prefix-qualifed name for `key` """ qualified_key = '{}{}'.format(self.app_config.session_key_prefix, key) return flask.session.get(qualified_key, default)
[docs] def session_set(self, key, value): """ Set the value of the session variable `key`, using the prefix-qualifed name for `key` """ qualified_key = '{}{}'.format(self.app_config.session_key_prefix, key) flask.session[qualified_key] = value