diff --git a/redash/authentication.py b/redash/authentication.py
index 8f330aef3d..d42aab83c0 100644
--- a/redash/authentication.py
+++ b/redash/authentication.py
@@ -5,7 +5,7 @@
import logging
from flask import request, make_response, redirect, url_for
-from flask.ext.login import LoginManager, login_user, current_user
+from flask.ext.login import LoginManager, login_user, current_user, logout_user
from redash import models, settings, google_oauth
@@ -23,9 +23,38 @@ def sign(key, path, expires):
return h.hexdigest()
-class HMACAuthentication(object):
- @staticmethod
- def api_key_authentication():
+class Authentication(object):
+ def verify_authentication(self):
+ return False
+
+ def required(self, fn):
+ @functools.wraps(fn)
+ def decorated(*args, **kwargs):
+ if current_user.is_authenticated() or self.verify_authentication():
+ return fn(*args, **kwargs)
+
+ return make_response(redirect(url_for("login", next=request.url)))
+
+ return decorated
+
+
+class ApiKeyAuthentication(Authentication):
+ def verify_authentication(self):
+ api_key = request.args.get('api_key')
+ query_id = request.view_args.get('query_id', None)
+
+ if query_id and api_key:
+ query = models.Query.get(models.Query.id == query_id)
+
+ if query.api_key and api_key == query.api_key:
+ login_user(models.ApiUser(query.api_key), remember=False)
+ return True
+
+ return False
+
+
+class HMACAuthentication(Authentication):
+ def verify_authentication(self):
signature = request.args.get('signature')
expires = float(request.args.get('expires') or 0)
query_id = request.view_args.get('query_id', None)
@@ -41,22 +70,14 @@ def api_key_authentication():
return False
- def required(self, fn):
- @functools.wraps(fn)
- def decorated(*args, **kwargs):
- if current_user.is_authenticated():
- return fn(*args, **kwargs)
-
- if self.api_key_authentication():
- return fn(*args, **kwargs)
-
- return make_response(redirect(url_for("login", next=request.url)))
-
- return decorated
-
@login_manager.user_loader
def load_user(user_id):
+ # If the user was previously logged in as api user, the user_id will be the api key and will raise an exception as
+ # it can't be casted to int.
+ if isinstance(user_id, basestring) and not user_id.isdigit():
+ return None
+
return models.User.select().where(models.User.id == user_id).first()
@@ -66,4 +87,13 @@ def setup_authentication(app):
app.secret_key = settings.COOKIE_SECRET
app.register_blueprint(google_oauth.blueprint)
- return HMACAuthentication()
+ if settings.AUTH_TYPE == 'hmac':
+ auth = HMACAuthentication()
+ elif settings.AUTH_TYPE == 'api_key':
+ auth = ApiKeyAuthentication()
+ else:
+ logger.warning("Unknown authentication type ({}). Using default (HMAC).".format(settings.AUTH_TYPE))
+ auth = HMACAuthentication()
+
+ return auth
+
diff --git a/redash/models.py b/redash/models.py
index e3085d2d07..d864cd2614 100644
--- a/redash/models.py
+++ b/redash/models.py
@@ -84,6 +84,9 @@ class ApiUser(UserMixin, PermissionsCheckMixin):
def __init__(self, api_key):
self.id = api_key
+ def __repr__(self):
+ return u"
".format(self.id)
+
@property
def permissions(self):
return ['view_query']
diff --git a/redash/settings.py b/redash/settings.py
index 42f17b0dcd..035315b932 100644
--- a/redash/settings.py
+++ b/redash/settings.py
@@ -60,6 +60,8 @@ def parse_boolean(str):
# proved to be "safe".
QUERY_RESULTS_CLEANUP_ENABLED = parse_boolean(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_ENABLED", "false"))
+AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "hmac")
+
# Google Apps domain to allow access from; any user with email in this Google Apps will be allowed
# access
GOOGLE_APPS_DOMAIN = os.environ.get("REDASH_GOOGLE_APPS_DOMAIN", "")
diff --git a/tests/test_authentication.py b/tests/test_authentication.py
index 4e5ca33bd9..8d2e7a947e 100644
--- a/tests/test_authentication.py
+++ b/tests/test_authentication.py
@@ -1,8 +1,45 @@
+from flask.ext.login import current_user
from mock import patch
from tests import BaseTestCase
from redash import models
from redash.google_oauth import create_and_login_user
-from tests.factories import user_factory
+from redash.authentication import ApiKeyAuthentication
+from tests.factories import user_factory, query_factory
+from redash.wsgi import app
+
+
+class TestApiKeyAuthentication(BaseTestCase):
+ #
+ # This is a bad way to write these tests, but the way Flask works doesn't make it easy to write them properly...
+ #
+ def setUp(self):
+ super(TestApiKeyAuthentication, self).setUp()
+ self.api_key = 10
+ self.query = query_factory.create(api_key=self.api_key)
+
+ def test_no_api_key(self):
+ auth = ApiKeyAuthentication()
+ with app.test_client() as c:
+ rv = c.get('/api/queries/{0}'.format(self.query.id))
+ self.assertFalse(auth.verify_authentication())
+
+ def test_wrong_api_key(self):
+ auth = ApiKeyAuthentication()
+ with app.test_client() as c:
+ rv = c.get('/api/queries/{0}'.format(self.query.id), query_string={'api_key': 'whatever'})
+ self.assertFalse(auth.verify_authentication())
+
+ def test_correct_api_key(self):
+ auth = ApiKeyAuthentication()
+ with app.test_client() as c:
+ rv = c.get('/api/queries/{0}'.format(self.query.id), query_string={'api_key': self.api_key})
+ self.assertTrue(auth.verify_authentication())
+
+ def test_no_query_id(self):
+ auth = ApiKeyAuthentication()
+ with app.test_client() as c:
+ rv = c.get('/api/queries', query_string={'api_key': self.api_key})
+ self.assertFalse(auth.verify_authentication())
class TestCreateAndLoginUser(BaseTestCase):