diff --git a/migrations/0003_update_data_source_config.py b/migrations/0003_update_data_source_config.py new file mode 100644 index 0000000000..7196328428 --- /dev/null +++ b/migrations/0003_update_data_source_config.py @@ -0,0 +1,63 @@ +import json + +from redash import query_runner +from redash.models import DataSource + + +def update(data_source): + print "[%s] Old options: %s" % (data_source.name, data_source.options) + + if query_runner.validate_configuration(data_source.type, data_source.options): + print "[%s] configuration already valid. skipping." % data_source.name + return + + if data_source.type == 'pg': + values = data_source.options.split(" ") + configuration = {} + for value in values: + k, v = value.split("=", 1) + configuration[k] = v + data_source.options = json.dumps(configuration) + + elif data_source.type == 'mysql': + values = data_source.options.split(";") + configuration = {} + for value in values: + k, v = value.split("=", 1) + configuration[k] = v + data_source.options = json.dumps(configuration) + + elif data_source.type == 'graphite': + old_config = json.loads(data_source.options) + + configuration = { + "url": old_config["url"] + } + + if "verify" in old_config: + configuration['verify'] = old_config['verify'] + + if "auth" in old_config: + configuration['username'], configuration['password'] = old_config["auth"] + + data_source.options = json.dumps(configuration) + + elif data_source.type == 'url': + data_source.options = json.dumps({"url": data_source.options}) + + elif data_source.type == 'script': + data_source.options = json.dumps({"path": data_source.options}) + + elif data_source.type == 'mongo': + data_source.type = 'mongodb' + + else: + print "[%s] No need to convert type of: %s" % (data_source.name, data_source.type) + + print "[%s] New options: %s" % (data_source.name, data_source.options) + data_source.save() + + +if __name__ == '__main__': + for data_source in DataSource.all(): + update(data_source) \ No newline at end of file diff --git a/redash/__init__.py b/redash/__init__.py index f766fe87d8..2286b624e7 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -4,8 +4,9 @@ from statsd import StatsClient from redash import settings +from redash.query_runner import import_query_runners -__version__ = '0.5.0' +__version__ = '0.6.0' def setup_logging(): @@ -31,4 +32,6 @@ def create_redis_connection(): setup_logging() redis_connection = create_redis_connection() -statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) \ No newline at end of file +statsd_client = StatsClient(host=settings.STATSD_HOST, port=settings.STATSD_PORT, prefix=settings.STATSD_PREFIX) + +import_query_runners(settings.QUERY_RUNNERS) diff --git a/redash/cli/data_sources.py b/redash/cli/data_sources.py index 909ec037bb..9948fb17d7 100644 --- a/redash/cli/data_sources.py +++ b/redash/cli/data_sources.py @@ -1,5 +1,8 @@ +import json +import click from flask.ext.script import Manager from redash import models +from redash.query_runner import query_runners, validate_configuration manager = Manager(help="Data sources management commands.") @@ -13,11 +16,70 @@ def list(): print "Id: {}\nName: {}\nType: {}\nOptions: {}".format(ds.id, ds.name, ds.type, ds.options) +def validate_data_source_type(type): + if type not in query_runners.keys(): + print "Error: the type \"{}\" is not supported (supported types: {}).".format(type, ", ".join(query_runners.keys())) + exit() + + +def validate_data_source_options(type, options): + if not validate_configuration(type, options): + print "Error: invalid configuration." + exit() + @manager.command -def new(name, type, options): +def new(name=None, type=None, options=None): """Create new data source""" - # TODO: validate it's a valid type and in the future, validate the options. + if name is None: + name = click.prompt("Name") + + if type is None: + print "Select type:" + for i, query_runner_name in enumerate(query_runners.keys()): + print "{}. {}".format(i+1, query_runner_name) + + idx = 0 + while idx < 1 or idx > len(query_runners.keys()): + idx = click.prompt("[{}-{}]".format(1, len(query_runners.keys())), type=int) + + type = query_runners.keys()[idx-1] + else: + validate_data_source_type(type) + + if options is None: + query_runner = query_runners[type] + schema = query_runner.configuration_schema() + + types = { + 'string': unicode, + 'number': int, + 'boolean': bool + } + + options_obj = {} + + for k, prop in schema['properties'].iteritems(): + required = k in schema.get('required', []) + default_value = "<>" + if required: + default_value = None + + prompt = prop.get('title', k.capitalize()) + if required: + prompt = "{} (required)".format(prompt) + else: + prompt = "{} (optional)".format(prompt) + + value = click.prompt(prompt, default=default_value, type=types[prop['type']], show_default=False) + if value != default_value: + options_obj[k] = value + + options = json.dumps(options_obj) + + validate_data_source_options(type, options) + print "Creating {} data source ({}) with options:\n{}".format(type, name, options) + data_source = models.DataSource.create(name=name, type=type, options=options) @@ -49,7 +111,14 @@ def update_attr(obj, attr, new_value): def edit(name, new_name=None, options=None, type=None): """Edit data source settings (name, options, type)""" try: + if type is not None: + validate_data_source_type(type) + data_source = models.DataSource.get(models.DataSource.name==name) + + if options is not None: + validate_data_source_options(data_source.type, options) + update_attr(data_source, "name", new_name) update_attr(data_source, "type", type) update_attr(data_source, "options", options) diff --git a/redash/controllers.py b/redash/controllers.py index 57a7ff5c1e..e5d15d4b55 100644 --- a/redash/controllers.py +++ b/redash/controllers.py @@ -23,6 +23,7 @@ from redash.tasks import QueryTask, record_event from redash.cache import headers as cache_headers from redash.permissions import require_permission +from redash.query_runner import query_runners, validate_configuration @app.route('/ping', methods=['GET']) @@ -174,11 +175,35 @@ def post(self): api.add_resource(MetricsAPI, '/api/metrics/v1/send', endpoint='metrics') +class DataSourceTypeListAPI(BaseResource): + @require_permission("admin") + def get(self): + return [q.to_dict() for q in query_runners.values()] + +api.add_resource(DataSourceTypeListAPI, '/api/data_sources/types', endpoint='data_source_types') + + class DataSourceListAPI(BaseResource): def get(self): data_sources = [ds.to_dict() for ds in models.DataSource.all()] return data_sources + @require_permission("admin") + def post(self): + req = request.get_json(True) + required_fields = ('options', 'name', 'type') + for f in required_fields: + if f not in req: + abort(400) + + if not validate_configuration(req['type'], req['options']): + abort(400) + + datasource = models.DataSource.create(name=req['name'], type=req['type'], options=req['options']) + + return datasource.to_dict() + + api.add_resource(DataSourceListAPI, '/api/data_sources', endpoint='data_sources') diff --git a/redash/data/__init__.py b/redash/data/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/redash/data/query_runner.py b/redash/data/query_runner.py deleted file mode 100644 index d58d0bed39..0000000000 --- a/redash/data/query_runner.py +++ /dev/null @@ -1,34 +0,0 @@ -import json - - -def get_query_runner(connection_type, connection_string): - if connection_type == 'mysql': - from redash.data import query_runner_mysql - runner = query_runner_mysql.mysql(connection_string) - elif connection_type == 'graphite': - from redash.data import query_runner_graphite - connection_params = json.loads(connection_string) - if connection_params['auth']: - connection_params['auth'] = tuple(connection_params['auth']) - else: - connection_params['auth'] = None - runner = query_runner_graphite.graphite(connection_params) - elif connection_type == 'bigquery': - from redash.data import query_runner_bigquery - connection_params = json.loads(connection_string) - runner = query_runner_bigquery.bigquery(connection_params) - elif connection_type == 'script': - from redash.data import query_runner_script - runner = query_runner_script.script(connection_string) - elif connection_type == 'url': - from redash.data import query_runner_url - runner = query_runner_url.url(connection_string) - elif connection_type == "mongo": - from redash.data import query_runner_mongodb - connection_params = json.loads(connection_string) - runner = query_runner_mongodb.mongodb(connection_params) - else: - from redash.data import query_runner_pg - runner = query_runner_pg.pg(connection_string) - - return runner diff --git a/redash/data/query_runner_bigquery.py b/redash/data/query_runner_bigquery.py deleted file mode 100644 index b894febb53..0000000000 --- a/redash/data/query_runner_bigquery.py +++ /dev/null @@ -1,138 +0,0 @@ -import datetime -import httplib2 -import json -import logging -import sys -import time - -try: - import apiclient.errors - from apiclient.discovery import build - from apiclient.errors import HttpError - from oauth2client.client import SignedJwtAssertionCredentials -except ImportError: - print "Missing dependencies. Please install google-api-python-client and oauth2client." - print "You can use pip: pip install google-api-python-client oauth2client" - -from redash.utils import JSONEncoder - -types_map = { - 'INTEGER': 'integer', - 'FLOAT': 'float', - 'BOOLEAN': 'boolean', - 'STRING': 'string', - 'TIMESTAMP': 'datetime', -} - -def transform_row(row, fields): - column_index = 0 - row_data = {} - - for cell in row["f"]: - field = fields[column_index] - cell_value = cell['v'] - - if cell_value is None: - pass - # Otherwise just cast the value - elif field['type'] == 'INTEGER': - cell_value = int(cell_value) - elif field['type'] == 'FLOAT': - cell_value = float(cell_value) - elif field['type'] == 'BOOLEAN': - cell_value = cell_value.lower() == "true" - elif field['type'] == 'TIMESTAMP': - cell_value = datetime.datetime.fromtimestamp(float(cell_value)) - - row_data[field["name"]] = cell_value - column_index += 1 - - return row_data - -def bigquery(connection_string): - def load_key(filename): - f = file(filename, "rb") - try: - return f.read() - finally: - f.close() - - def get_bigquery_service(): - scope = [ - "https://www.googleapis.com/auth/bigquery", - ] - - credentials = SignedJwtAssertionCredentials(connection_string["serviceAccount"], - load_key(connection_string["privateKey"]), scope=scope) - http = httplib2.Http() - http = credentials.authorize(http) - - return build("bigquery", "v2", http=http) - - def get_query_results(jobs, project_id, job_id, start_index): - query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute() - logging.debug('query_reply %s', query_reply) - if not query_reply['jobComplete']: - time.sleep(10) - return get_query_results(jobs, project_id, job_id, start_index) - - return query_reply - - def query_runner(query): - bigquery_service = get_bigquery_service() - - jobs = bigquery_service.jobs() - job_data = { - "configuration": { - "query": { - "query": query, - } - } - } - - logging.debug("bigquery got query: %s", query) - - project_id = connection_string["projectId"] - - try: - insert_response = jobs.insert(projectId=project_id, body=job_data).execute() - current_row = 0 - query_reply = get_query_results(jobs, project_id=project_id, - job_id=insert_response['jobReference']['jobId'], start_index=current_row) - - logging.debug("bigquery replied: %s", query_reply) - - rows = [] - - while ("rows" in query_reply) and current_row < query_reply['totalRows']: - for row in query_reply["rows"]: - rows.append(transform_row(row, query_reply["schema"]["fields"])) - - current_row += len(query_reply['rows']) - query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], - startIndex=current_row).execute() - - columns = [{'name': f["name"], - 'friendly_name': f["name"], - 'type': types_map.get(f['type'], "string")} for f in query_reply["schema"]["fields"]] - - data = { - "columns": columns, - "rows": rows - } - error = None - - json_data = json.dumps(data, cls=JSONEncoder) - except apiclient.errors.HttpError, e: - json_data = None - error = e.content - except KeyboardInterrupt: - error = "Query cancelled by user." - json_data = None - except Exception: - raise sys.exc_info()[1], None, sys.exc_info()[2] - - return json_data, error - - - return query_runner diff --git a/redash/data/query_runner_graphite.py b/redash/data/query_runner_graphite.py deleted file mode 100644 index dc664e0109..0000000000 --- a/redash/data/query_runner_graphite.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -QueryRunner for Graphite. -""" -import json -import datetime -import requests -from redash.utils import JSONEncoder - - -def graphite(connection_params): - def transform_result(response): - columns = [{'name': 'Time::x'}, {'name': 'value::y'}, {'name': 'name::series'}] - rows = [] - - for series in response.json(): - for values in series['datapoints']: - timestamp = datetime.datetime.fromtimestamp(int(values[1])) - rows.append({'Time::x': timestamp, 'name::series': series['target'], 'value::y': values[0]}) - - data = {'columns': columns, 'rows': rows} - return json.dumps(data, cls=JSONEncoder) - - def query_runner(query): - base_url = "%s/render?format=json&" % connection_params['url'] - url = "%s%s" % (base_url, "&".join(query.split("\n"))) - error = None - data = None - - try: - response = requests.get(url, auth=connection_params['auth'], - verify=connection_params['verify']) - - if response.status_code == 200: - data = transform_result(response) - else: - error = "Failed getting results (%d)" % response.status_code - - except Exception, ex: - data = None - error = ex.message - - return data, error - - query_runner.annotate_query = False - - return query_runner \ No newline at end of file diff --git a/redash/data/query_runner_mongodb.py b/redash/data/query_runner_mongodb.py deleted file mode 100644 index 235a277eea..0000000000 --- a/redash/data/query_runner_mongodb.py +++ /dev/null @@ -1,242 +0,0 @@ -import datetime -import logging -import json -import sys -import re -import time -from redash.utils import JSONEncoder - -try: - import pymongo - from bson.objectid import ObjectId - from bson.son import SON -except ImportError: - print "Missing dependencies. Please install pymongo." - print "You can use pip: pip install pymongo" - raise - -TYPES_MAP = { - ObjectId : "string", - str : "string", - unicode : "string", - int : "integer", - long : "integer", - float : "float", - bool : "boolean", - datetime.datetime: "datetime", -} - -date_regex = re.compile("ISODate\(\"(.*)\"\)", re.IGNORECASE) - -# Simple query example: -# -# { -# "collection" : "my_collection", -# "query" : { -# "date" : { -# "$gt" : "ISODate(\"2015-01-15 11:41\")", -# }, -# "type" : 1 -# }, -# "fields" : { -# "_id" : 1, -# "name" : 2 -# }, -# "sort" : [ -# { -# "name" : "date", -# "direction" : -1 -# } -# ] -# -# } -# -# -# Aggregation -# =========== -# Uses a syntax similar to the one used in PyMongo, however to support the -# correct order of sorting, it uses a regular list for the "$sort" operation -# that converts into a SON (sorted dictionary) object before execution. -# -# Aggregation query example: -# -# { -# "collection" : "things", -# "aggregate" : [ -# { -# "$unwind" : "$tags" -# }, -# { -# "$group" : { -# { -# "_id" : "$tags", -# "count" : { "$sum" : 1 } -# } -# } -# }, -# { -# "$sort" : [ -# { -# "name" : "count", -# "direction" : -1 -# }, -# { -# "name" : "_id", -# "direction" : -1 -# } -# ] -# } -# ] -# } -# -# -def mongodb(connection_string): - def _get_column_by_name(columns, column_name): - for c in columns: - if "name" in c and c["name"] == column_name: - return c - - return None - - def _convert_date(q, field_name): - m = date_regex.findall(q[field_name]) - if len(m) > 0: - if q[field_name].find(":") == -1: - q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d"))) - else: - q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d %H:%M"))) - - def query_runner(query): - if not "dbName" in connection_string or not connection_string["dbName"]: - return None, "dbName is missing from connection string JSON or is empty" - - db_name = connection_string["dbName"] - - if not "connectionString" in connection_string or not connection_string["connectionString"]: - return None, "connectionString is missing from connection string JSON or is empty" - - is_replica_set = True if "replicaSetName" in connection_string and connection_string["replicaSetName"] else False - - if is_replica_set: - if not connection_string["replicaSetName"]: - return None, "replicaSetName is set in the connection string JSON but is empty" - - db_connection = pymongo.MongoReplicaSetClient(connection_string["connectionString"], replicaSet=connection_string["replicaSetName"]) - else: - db_connection = pymongo.MongoClient(connection_string["connectionString"]) - - if db_name not in db_connection.database_names(): - return None, "Unknown database name '%s'" % db_name - - db = db_connection[db_name] - - logging.debug("mongodb connection string: %s", connection_string) - logging.debug("mongodb got query: %s", query) - - try: - query_data = json.loads(query) - except: - return None, "Invalid query format. The query is not a valid JSON." - - if "query" in query_data and "aggregate" in query_data: - return None, "'query' and 'aggregate' sections cannot be used at the same time" - - collection = None - if not "collection" in query_data: - return None, "'collection' must be set" - else: - collection = query_data["collection"] - - q = None - if "query" in query_data: - q = query_data["query"] - for k in q: - if q[k] and type(q[k]) in [str, unicode]: - logging.debug(q[k]) - _convert_date(q, k) - elif q[k] and type(q[k]) is dict: - for k2 in q[k]: - if type(q[k][k2]) in [str, unicode]: - _convert_date(q[k], k2) - - f = None - - aggregate = None - if "aggregate" in query_data: - aggregate = query_data["aggregate"] - for step in aggregate: - if "$sort" in step: - sort_list = [] - for sort_item in step["$sort"]: - sort_list.append((sort_item["name"], sort_item["direction"])) - - step["$sort"] = SON(sort_list) - - if aggregate: - pass - else: - s = None - if "sort" in query_data and query_data["sort"]: - s = [] - for field in query_data["sort"]: - s.append((field["name"], field["direction"])) - - if "fields" in query_data: - f = query_data["fields"] - - columns = [] - rows = [] - - error = None - json_data = None - - cursor = None - if q or (not q and not aggregate): - if s: - cursor = db[collection].find(q, f).sort(s) - else: - cursor = db[collection].find(q, f) - - if "skip" in query_data: - cursor = cursor.skip(query_data["skip"]) - - if "limit" in query_data: - cursor = cursor.limit(query_data["limit"]) - - elif aggregate: - r = db[collection].aggregate(aggregate) - cursor = r["result"] - - for r in cursor: - for k in r: - if _get_column_by_name(columns, k) is None: - columns.append({ - "name": k, - "friendly_name": k, - "type": TYPES_MAP[type(r[k])] if type(r[k]) in TYPES_MAP else None - }) - - # Convert ObjectId to string - if type(r[k]) == ObjectId: - r[k] = str(r[k]) - - rows.append(r) - - if f: - ordered_columns = [] - for k in sorted(f, key=f.get): - ordered_columns.append(_get_column_by_name(columns, k)) - - columns = ordered_columns - - data = { - "columns": columns, - "rows": rows - } - error = None - json_data = json.dumps(data, cls=JSONEncoder) - - return json_data, error - - query_runner.annotate_query = False - return query_runner diff --git a/redash/data/query_runner_mysql.py b/redash/data/query_runner_mysql.py deleted file mode 100644 index a59a023b3b..0000000000 --- a/redash/data/query_runner_mysql.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -QueryRunner is the function that the workers use, to execute queries. This is the Redshift -(PostgreSQL in fact) version, but easily we can write another to support additional databases -(MySQL and others). - -Because the worker just pass the query, this can be used with any data store that has some sort of -query language (for example: HiveQL). -""" -import logging -import json -import MySQLdb -import sys -from redash.utils import JSONEncoder - -def mysql(connection_string): - if connection_string.endswith(';'): - connection_string = connection_string[0:-1] - - def query_runner(query): - connections_params = [entry.split('=')[1] for entry in connection_string.split(';')] - connection = MySQLdb.connect(*connections_params, charset="utf8", use_unicode=True) - cursor = connection.cursor() - - logging.debug("mysql got query: %s", query) - - try: - cursor.execute(query) - - data = cursor.fetchall() - - cursor_desc = cursor.description - if (cursor_desc != None): - num_fields = len(cursor_desc) - column_names = [i[0] for i in cursor.description] - - rows = [dict(zip(column_names, row)) for row in data] - - columns = [{'name': col_name, - 'friendly_name': col_name, - 'type': None} for col_name in column_names] - - data = {'columns': columns, 'rows': rows} - json_data = json.dumps(data, cls=JSONEncoder) - error = None - else: - json_data = None - error = "No data was returned." - - cursor.close() - except MySQLdb.Error, e: - json_data = None - error = e.args[1] - except KeyboardInterrupt: - error = "Query cancelled by user." - json_data = None - except Exception as e: - raise sys.exc_info()[1], None, sys.exc_info()[2] - finally: - connection.close() - - return json_data, error - - - return query_runner diff --git a/redash/data/query_runner_pg.py b/redash/data/query_runner_pg.py deleted file mode 100644 index 4f6edc7737..0000000000 --- a/redash/data/query_runner_pg.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -QueryRunner is the function that the workers use, to execute queries. This is the PostgreSQL -version, but easily we can write another to support additional databases (MySQL and others). - -Because the worker just pass the query, this can be used with any data store that has some sort of -query language (for example: HiveQL). -""" -import json -import sys -import select -import logging -import psycopg2 - -from redash.utils import JSONEncoder - -types_map = { - 20: 'integer', - 21: 'integer', - 23: 'integer', - 700: 'float', - 1700: 'float', - 701: 'float', - 16: 'boolean', - 1082: 'date', - 1114: 'datetime', - 1184: 'datetime', - 1014: 'string', - 1015: 'string', - 1008: 'string', - 1009: 'string', - 2951: 'string' -} - - -def pg(connection_string): - def column_friendly_name(column_name): - return column_name - - def wait(conn): - while 1: - try: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], []) - elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], []) - else: - raise psycopg2.OperationalError("poll() returned %s" % state) - except select.error: - raise psycopg2.OperationalError("select.error received") - - def query_runner(query): - connection = psycopg2.connect(connection_string, async=True) - wait(connection) - - cursor = connection.cursor() - - try: - cursor.execute(query) - wait(connection) - - # While set would be more efficient here, it sorts the data which is not what we want, but due to the small - # size of the data we can assume it's ok. - column_names = [] - columns = [] - duplicates_counter = 1 - - for column in cursor.description: - # TODO: this deduplication needs to be generalized and reused in all query runners. - column_name = column.name - if column_name in column_names: - column_name = column_name + str(duplicates_counter) - duplicates_counter += 1 - - column_names.append(column_name) - - columns.append({ - 'name': column_name, - 'friendly_name': column_friendly_name(column_name), - 'type': types_map.get(column.type_code, None) - }) - - rows = [dict(zip(column_names, row)) for row in cursor] - - data = {'columns': columns, 'rows': rows} - json_data = json.dumps(data, cls=JSONEncoder) - error = None - cursor.close() - except (select.error, OSError) as e: - logging.exception(e) - error = "Query interrupted. Please retry." - json_data = None - except psycopg2.DatabaseError as e: - logging.exception(e) - json_data = None - error = e.message - except KeyboardInterrupt: - connection.cancel() - error = "Query cancelled by user." - json_data = None - except Exception as e: - raise sys.exc_info()[1], None, sys.exc_info()[2] - finally: - connection.close() - - return json_data, error - - return query_runner diff --git a/redash/data/query_runner_script.py b/redash/data/query_runner_script.py deleted file mode 100644 index 8959003166..0000000000 --- a/redash/data/query_runner_script.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -import logging -import sys -import os -import subprocess - -# We use subprocess.check_output because we are lazy. -# If someone will really want to run this on Python < 2.7 they can easily update the code to run -# Popen, check the retcodes and other things and read the standard output to a variable. -if not "check_output" in subprocess.__dict__: - print "ERROR: This runner uses subprocess.check_output function which exists in Python 2.7" - -def script(connection_string): - - def query_runner(query): - try: - json_data = None - error = None - - if connection_string is None: - return None, "script execution path is not set. Please reconfigure the data source" - - # Poor man's protection against running scripts from output the scripts directory - if connection_string.find("../") > -1: - return None, "Scripts can only be run from the configured scripts directory" - - query = query.strip() - - script = os.path.join(connection_string, query) - if not os.path.exists(script): - return None, "Script '%s' not found in script directory" % query - - output = subprocess.check_output(script, shell=False) - if output != None: - output = output.strip() - if output != "": - return output, None - - error = "Error reading output" - except subprocess.CalledProcessError as e: - return None, str(e) - except KeyboardInterrupt: - error = "Query cancelled by user." - json_data = None - except Exception as e: - raise sys.exc_info()[1], None, sys.exc_info()[2] - - return json_data, error - - query_runner.annotate_query = False - return query_runner diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py new file mode 100644 index 0000000000..eaf5693a57 --- /dev/null +++ b/redash/query_runner/__init__.py @@ -0,0 +1,104 @@ +import logging +import json + +import jsonschema +from jsonschema import ValidationError + +logger = logging.getLogger(__name__) + +__all__ = [ + 'ValidationError', + 'BaseQueryRunner', + 'TYPE_DATETIME', + 'TYPE_BOOLEAN', + 'TYPE_INTEGER', + 'TYPE_STRING', + 'TYPE_DATE', + 'TYPE_FLOAT', + 'register', + 'get_query_runner', + 'import_query_runners' +] + +# Valid types of columns returned in results: +TYPE_INTEGER = 'integer' +TYPE_FLOAT = 'float' +TYPE_BOOLEAN = 'boolean' +TYPE_STRING = 'string' +TYPE_DATETIME = 'datetime' +TYPE_DATE = 'date' + + +class BaseQueryRunner(object): + def __init__(self, configuration): + jsonschema.validate(configuration, self.configuration_schema()) + self.configuration = configuration + + @classmethod + def name(cls): + return cls.__name__ + + @classmethod + def type(cls): + return cls.__name__.lower() + + @classmethod + def enabled(cls): + return True + + @classmethod + def annotate_query(cls): + return True + + @classmethod + def configuration_schema(cls): + return {} + + def run_query(self, query): + raise NotImplementedError() + + @classmethod + def to_dict(cls): + return { + 'name': cls.name(), + 'type': cls.type(), + 'configuration_schema': cls.configuration_schema() + } + + +query_runners = {} + + +def register(query_runner_class): + global query_runners + if query_runner_class.enabled(): + logger.debug("Registering %s (%s) query runner.", query_runner_class.name(), query_runner_class.type()) + query_runners[query_runner_class.type()] = query_runner_class + else: + logger.warning("%s query runner enabled but not supported, not registering. Either disable or install missing dependencies.", query_runner_class.name()) + + +def get_query_runner(query_runner_type, configuration_json): + query_runner_class = query_runners.get(query_runner_type, None) + if query_runner_class is None: + return None + + return query_runner_class(json.loads(configuration_json)) + + +def validate_configuration(query_runner_type, configuration_json): + query_runner_class = query_runners.get(query_runner_type, None) + if query_runner_class is None: + return False + + try: + jsonschema.validate(json.loads(configuration_json), query_runner_class.configuration_schema()) + except (ValidationError, ValueError): + return False + + return True + + +def import_query_runners(query_runner_imports): + for runner_import in query_runner_imports: + __import__(runner_import) \ No newline at end of file diff --git a/redash/query_runner/big_query.py b/redash/query_runner/big_query.py new file mode 100644 index 0000000000..2505ada953 --- /dev/null +++ b/redash/query_runner/big_query.py @@ -0,0 +1,182 @@ +import datetime +import json +import httplib2 +import logging +import sys +import time + +from redash.query_runner import * +from redash.utils import JSONEncoder + +logger = logging.getLogger(__name__) + +types_map = { + 'INTEGER': TYPE_INTEGER, + 'FLOAT': TYPE_FLOAT, + 'BOOLEAN': TYPE_BOOLEAN, + 'STRING': TYPE_STRING, + 'TIMESTAMP': TYPE_DATETIME, +} + + +def transform_row(row, fields): + column_index = 0 + row_data = {} + + for cell in row["f"]: + field = fields[column_index] + cell_value = cell['v'] + + if cell_value is None: + pass + # Otherwise just cast the value + elif field['type'] == 'INTEGER': + cell_value = int(cell_value) + elif field['type'] == 'FLOAT': + cell_value = float(cell_value) + elif field['type'] == 'BOOLEAN': + cell_value = cell_value.lower() == "true" + elif field['type'] == 'TIMESTAMP': + cell_value = datetime.datetime.fromtimestamp(float(cell_value)) + + row_data[field["name"]] = cell_value + column_index += 1 + + return row_data + + +def _import(): + try: + import apiclient.errors + from apiclient.discovery import build + from apiclient.errors import HttpError + from oauth2client.client import SignedJwtAssertionCredentials + + return True + except ImportError: + logger.warning("Missing dependencies. Please install google-api-python-client and oauth2client.") + logger.warning("You can use pip: pip install google-api-python-client oauth2client") + + return False + + +def _load_key(filename): + f = file(filename, "rb") + try: + return f.read() + finally: + f.close() + + +def _get_bigquery_service(service_account, private_key): + scope = [ + "https://www.googleapis.com/auth/bigquery", + ] + + credentials = SignedJwtAssertionCredentials(service_account, private_key, scope=scope) + http = httplib2.Http() + http = credentials.authorize(http) + + return build("bigquery", "v2", http=http) + + +def _get_query_results(jobs, project_id, job_id, start_index): + query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute() + logging.debug('query_reply %s', query_reply) + if not query_reply['jobComplete']: + time.sleep(10) + return _get_query_results(jobs, project_id, job_id, start_index) + + return query_reply + + +class BigQuery(BaseQueryRunner): + @classmethod + def enabled(cls): + return _import() + + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'serviceAccount': { + 'type': 'string', + 'title': 'Service Account' + }, + 'projectId': { + 'type': 'string', + 'title': 'Project ID' + }, + 'privateKey': { + 'type': 'string', + 'title': 'Private Key Path' + } + }, + 'required': ['serviceAccount', 'projectId', 'privateKey'] + } + + def __init__(self, configuration_json): + super(BigQuery, self).__init__(configuration_json) + _import() + + self.private_key = _load_key(self.configuration["privateKey"]) + + def run_query(self, query): + bigquery_service = _get_bigquery_service(self.configuration["serviceAccount"], + self.private_key) + + jobs = bigquery_service.jobs() + job_data = { + "configuration": { + "query": { + "query": query, + } + } + } + + logger.debug("BigQuery got query: %s", query) + + project_id = self.configuration["projectId"] + + try: + insert_response = jobs.insert(projectId=project_id, body=job_data).execute() + current_row = 0 + query_reply = _get_query_results(jobs, project_id=project_id, + job_id=insert_response['jobReference']['jobId'], start_index=current_row) + + logger.debug("bigquery replied: %s", query_reply) + + rows = [] + + while ("rows" in query_reply) and current_row < query_reply['totalRows']: + for row in query_reply["rows"]: + rows.append(transform_row(row, query_reply["schema"]["fields"])) + + current_row += len(query_reply['rows']) + query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], + startIndex=current_row).execute() + + columns = [{'name': f["name"], + 'friendly_name': f["name"], + 'type': types_map.get(f['type'], "string")} for f in query_reply["schema"]["fields"]] + + data = { + "columns": columns, + "rows": rows + } + error = None + + json_data = json.dumps(data, cls=JSONEncoder) + except apiclient.errors.HttpError, e: + json_data = None + error = e.content + except KeyboardInterrupt: + error = "Query cancelled by user." + json_data = None + except Exception: + raise sys.exc_info()[1], None, sys.exc_info()[2] + + return json_data, error + +register(BigQuery) \ No newline at end of file diff --git a/redash/query_runner/graphite.py b/redash/query_runner/graphite.py new file mode 100644 index 0000000000..d6fc97bfb6 --- /dev/null +++ b/redash/query_runner/graphite.py @@ -0,0 +1,83 @@ +import json +import datetime +import requests +import logging +from redash.query_runner import * +from redash.utils import JSONEncoder + +logger = logging.getLogger(__name__) + + +def _transform_result(response): + columns = ({'name': 'Time::x', 'type': TYPE_DATETIME}, + {'name': 'value::y', 'type': TYPE_FLOAT}, + {'name': 'name::series', 'type': TYPE_STRING}) + + rows = [] + + for series in response.json(): + for values in series['datapoints']: + timestamp = datetime.datetime.fromtimestamp(int(values[1])) + rows.append({'Time::x': timestamp, 'name::series': series['target'], 'value::y': values[0]}) + + data = {'columns': columns, 'rows': rows} + return json.dumps(data, cls=JSONEncoder) + + +class Graphite(BaseQueryRunner): + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'url': { + 'type': 'string' + }, + 'username': { + 'type': 'string' + }, + 'password': { + 'type': 'string' + }, + 'verify': { + 'type': 'boolean', + 'title': 'Verify SSL certificate' + } + }, + 'required': ['url'] + } + + @classmethod + def annotate_query(cls): + return False + + def __init__(self, configuration_json): + super(Graphite, self).__init__(configuration_json) + + if "username" in self.configuration and self.configuration["username"]: + self.auth = (self.configuration["username"], self.configuration["password"]) + else: + self.auth = None + + self.verify = self.configuration["verify"] + self.base_url = "%s/render?format=json&" % self.configuration['url'] + + def run_query(self, query): + url = "%s%s" % (self.base_url, "&".join(query.split("\n"))) + error = None + data = None + + try: + response = requests.get(url, auth=self.auth, verify=self.verify) + + if response.status_code == 200: + data = _transform_result(response) + else: + error = "Failed getting results (%d)" % response.status_code + except Exception, ex: + data = None + error = ex.message + + return data, error + +register(Graphite) \ No newline at end of file diff --git a/redash/query_runner/mongodb.py b/redash/query_runner/mongodb.py new file mode 100644 index 0000000000..67e7d8b7e1 --- /dev/null +++ b/redash/query_runner/mongodb.py @@ -0,0 +1,182 @@ +import json +import datetime +import logging +import re +import time + +from redash.utils import JSONEncoder +from redash.query_runner import * + +logger = logging.getLogger(__name__) + + +def _import(): + try: + import pymongo + from bson.objectid import ObjectId + return True + + except ImportError: + logger.warning("Missing dependencies. Please install pymongo.") + logger.warning("You can use pip: pip install pymongo") + + return False + + +TYPES_MAP = { + str: TYPE_STRING, + unicode: TYPE_STRING, + int: TYPE_INTEGER, + long: TYPE_INTEGER, + float: TYPE_FLOAT, + bool: TYPE_BOOLEAN, + datetime.datetime: TYPE_DATETIME, +} + +date_regex = re.compile("ISODate\(\"(.*)\"\)", re.IGNORECASE) + + +def _get_column_by_name(columns, column_name): + for c in columns: + if "name" in c and c["name"] == column_name: + return c + + return None + + +def _convert_date(q, field_name): + m = date_regex.findall(q[field_name]) + if len(m) > 0: + if q[field_name].find(":") == -1: + q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d"))) + else: + q[field_name] = datetime.datetime.fromtimestamp(time.mktime(time.strptime(m[0], "%Y-%m-%d %H:%M"))) + + +class MongoDB(BaseQueryRunner): + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'connectionString': { + 'type': 'string', + 'title': 'Connection String' + }, + 'dbName': { + 'type': 'string', + 'title': "Database Name" + }, + 'replicaSetName': { + 'type': 'string', + 'title': 'Replica Set Name' + }, + 'required': ['connectionString'] + } + } + + @classmethod + def enabled(cls): + return _import() + + @classmethod + def annotate_query(cls): + return False + + def __init__(self, configuration_json): + _import() + super(MongoDB, self).__init__(configuration_json) + + self.db_name = self.configuration["dbName"] + + self.is_replica_set = True if "replicaSetName" in self.configuration and self.configuration["replicaSetName"] else False + + def run_query(self, query): + if self.is_replica_set: + db_connection = pymongo.MongoReplicaSetClient(self.configuration["connectionString"], replicaSet=self.configuration["replicaSetName"]) + else: + db_connection = pymongo.MongoClient(self.configuration["connectionString"]) + + if self.db_name not in db_connection.database_names(): + return None, "Unknown database name '%s'" % self.db_name + + db = db_connection[self.db_name ] + + logger.debug("mongodb connection string: %s", self.configuration['connectionString']) + logger.debug("mongodb got query: %s", query) + + try: + query_data = json.loads(query) + except ValueError: + return None, "Invalid query format. The query is not a valid JSON." + + if "collection" not in query_data: + return None, "'collection' must have a value to run a query" + else: + collection = query_data["collection"] + + q = None + if "query" in query_data: + q = query_data["query"] + for k in q: + if q[k] and type(q[k]) in [str, unicode]: + logging.debug(q[k]) + _convert_date(q, k) + elif q[k] and type(q[k]) is dict: + for k2 in q[k]: + if type(q[k][k2]) in [str, unicode]: + _convert_date(q[k], k2) + + f = None + if "fields" in query_data: + f = query_data["fields"] + + s = None + if "sort" in query_data and query_data["sort"]: + s = [] + for field_name in query_data["sort"]: + s.append((field_name, query_data["sort"][field_name])) + + columns = [] + rows = [] + + error = None + json_data = None + + if s: + cursor = db[collection].find(q, f).sort(s) + else: + cursor = db[collection].find(q, f) + + for r in cursor: + for k in r: + if _get_column_by_name(columns, k) is None: + columns.append({ + "name": k, + "friendly_name": k, + "type": TYPES_MAP.get(type(r[k]), TYPE_STRING) + }) + + # Convert ObjectId to string + if type(r[k]) == ObjectId: + r[k] = str(r[k]) + + rows.append(r) + + if f: + ordered_columns = [] + for k in sorted(f, key=f.get): + ordered_columns.append(_get_column_by_name(columns, k)) + + columns = ordered_columns + + data = { + "columns": columns, + "rows": rows + } + error = None + json_data = json.dumps(data, cls=JSONEncoder) + + return json_data, error + +register(MongoDB) \ No newline at end of file diff --git a/redash/query_runner/mysql.py b/redash/query_runner/mysql.py new file mode 100644 index 0000000000..023c9de9e4 --- /dev/null +++ b/redash/query_runner/mysql.py @@ -0,0 +1,98 @@ +import sys +import json +import logging + +from redash.utils import JSONEncoder +from redash.query_runner import * + +logger = logging.getLogger(__name__) + + +class Mysql(BaseQueryRunner): + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'host': { + 'type': 'string' + }, + 'user': { + 'type': 'string' + }, + 'passwd': { + 'type': 'string', + 'title': 'Password' + }, + 'db': { + 'type': 'string', + 'title': 'Database name' + } + }, + 'required': ['db'] + } + + @classmethod + def enabled(cls): + try: + import MySQLdb + except ImportError: + return False + + return True + + def __init__(self, configuration_json): + super(Mysql, self).__init__(configuration_json) + + self.configuration.update({ + "charset": "utf8", + "use_unicode": True + }) + + def run_query(self, query): + import MySQLdb + + connection = MySQLdb.connect(**self.configuration) + cursor = connection.cursor() + + logger.debug("MySQL running query: %s", query) + + try: + cursor.execute(query) + + data = cursor.fetchall() + + cursor_desc = cursor.description + if cursor_desc is not None: + num_fields = len(cursor_desc) + column_names = [i[0] for i in cursor.description] + + rows = [dict(zip(column_names, row)) for row in data] + + # TODO: add types support + columns = [{'name': col_name, + 'friendly_name': col_name, + 'type': None} for col_name in column_names] + + data = {'columns': columns, 'rows': rows} + json_data = json.dumps(data, cls=JSONEncoder) + error = None + else: + json_data = None + error = "No data was returned." + + cursor.close() + except MySQLdb.Error, e: + json_data = None + error = e.args[1] + except KeyboardInterrupt: + error = "Query cancelled by user." + json_data = None + except Exception as e: + raise sys.exc_info()[1], None, sys.exc_info()[2] + finally: + connection.close() + + return json_data, error + +register(Mysql) \ No newline at end of file diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py new file mode 100644 index 0000000000..811d52a2dc --- /dev/null +++ b/redash/query_runner/pg.py @@ -0,0 +1,142 @@ +import json +import logging +import psycopg2 +import select +import sys + +from redash.query_runner import * +from redash.utils import JSONEncoder + +logger = logging.getLogger(__name__) + +types_map = { + 20: TYPE_INTEGER, + 21: TYPE_INTEGER, + 23: TYPE_INTEGER, + 700: TYPE_FLOAT, + 1700: TYPE_FLOAT, + 701: TYPE_FLOAT, + 16: TYPE_BOOLEAN, + 1082: TYPE_DATE, + 1114: TYPE_DATETIME, + 1184: TYPE_DATETIME, + 1014: TYPE_STRING, + 1015: TYPE_STRING, + 1008: TYPE_STRING, + 1009: TYPE_STRING, + 2951: TYPE_STRING +} + + +def _wait(conn): + while 1: + try: + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) + except select.error: + raise psycopg2.OperationalError("select.error received") + + +class PostgreSQL(BaseQueryRunner): + @classmethod + def configuration_schema(cls): + return { + "type": "object", + "properties": { + "user": { + "type": "string" + }, + "password": { + "type": "string" + }, + "host": { + "type": "string" + }, + "port": { + "type": "number" + }, + "dbname": { + "type": "string", + "title": "Database Name" + } + }, + "required": ["dbname"] + } + + @classmethod + def type(cls): + return "pg" + + def __init__(self, configuration_json): + super(PostgreSQL, self).__init__(configuration_json) + + values = [] + for k, v in self.configuration.iteritems(): + values.append("{}={}".format(k, v)) + + self.connection_string = " ".join(values) + + def run_query(self, query): + connection = psycopg2.connect(self.connection_string, async=True) + _wait(connection) + + cursor = connection.cursor() + + try: + cursor.execute(query) + _wait(connection) + + # While set would be more efficient here, it sorts the data which is not what we want, but due to the small + # size of the data we can assume it's ok. + column_names = [] + columns = [] + duplicates_counter = 1 + + for column in cursor.description: + # TODO: this deduplication needs to be generalized and reused in all query runners. + column_name = column.name + if column_name in column_names: + column_name += str(duplicates_counter) + duplicates_counter += 1 + + column_names.append(column_name) + + columns.append({ + 'name': column_name, + 'friendly_name': column_name, + 'type': types_map.get(column.type_code, None) + }) + + rows = [dict(zip(column_names, row)) for row in cursor] + + data = {'columns': columns, 'rows': rows} + json_data = json.dumps(data, cls=JSONEncoder) + error = None + cursor.close() + except (select.error, OSError) as e: + logging.exception(e) + error = "Query interrupted. Please retry." + json_data = None + except psycopg2.DatabaseError as e: + logging.exception(e) + json_data = None + error = e.message + except KeyboardInterrupt: + connection.cancel() + error = "Query cancelled by user." + json_data = None + except Exception as e: + raise sys.exc_info()[1], None, sys.exc_info()[2] + finally: + connection.close() + + return json_data, error + +register(PostgreSQL) \ No newline at end of file diff --git a/redash/query_runner/script.py b/redash/query_runner/script.py new file mode 100644 index 0000000000..e25e27fdba --- /dev/null +++ b/redash/query_runner/script.py @@ -0,0 +1,65 @@ +import os +import sys +import subprocess + +from redash.query_runner import * + + +class Script(BaseQueryRunner): + @classmethod + def enabled(cls): + return "check_output" in subprocess.__dict__ + + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'path': { + 'type': 'string', + 'title': 'Scripts path' + } + }, + 'required': ['path'] + } + + @classmethod + def annotate_query(cls): + return False + + def __init__(self, configuration_json): + super(Script, self).__init__(configuration_json) + + # Poor man's protection against running scripts from output the scripts directory + if self.configuration["path"].find("../") > -1: + raise ValidationError("Scripts can only be run from the configured scripts directory") + + def run_query(self, query): + try: + json_data = None + error = None + + query = query.strip() + + script = os.path.join(self.configuration["path"], query) + if not os.path.exists(script): + return None, "Script '%s' not found in script directory" % query + + output = subprocess.check_output(script, shell=False) + if output is not None: + output = output.strip() + if output != "": + return output, None + + error = "Error reading output" + except subprocess.CalledProcessError as e: + return None, str(e) + except KeyboardInterrupt: + error = "Query cancelled by user." + json_data = None + except Exception as e: + raise sys.exc_info()[1], None, sys.exc_info()[2] + + return json_data, error + +register(Script) \ No newline at end of file diff --git a/redash/data/query_runner_url.py b/redash/query_runner/url.py similarity index 63% rename from redash/data/query_runner_url.py rename to redash/query_runner/url.py index 64f146bd54..102df91959 100644 --- a/redash/data/query_runner_url.py +++ b/redash/query_runner/url.py @@ -1,16 +1,30 @@ -import json -import logging import sys -import os import urllib2 -def url(connection_string): +from redash.query_runner import * - def query_runner(query): - base_url = connection_string + +class Url(BaseQueryRunner): + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'url': { + 'type': 'string', + 'title': 'URL base path' + } + } + } + + @classmethod + def annotate_query(cls): + return False + + def run_query(self, query): + base_url = self.configuration["url"] try: - json_data = None error = None query = query.strip() @@ -41,5 +55,4 @@ def query_runner(query): return json_data, error - query_runner.annotate_query = False - return query_runner +register(Url) \ No newline at end of file diff --git a/redash/settings.py b/redash/settings.py index 68f328c150..42f17b0dcd 100644 --- a/redash/settings.py +++ b/redash/settings.py @@ -77,5 +77,16 @@ def parse_boolean(str): CLIENT_SIDE_METRICS = parse_boolean(os.environ.get("REDASH_CLIENT_SIDE_METRICS", "false")) ANALYTICS = os.environ.get("REDASH_ANALYTICS", "") +# Query Runners +QUERY_RUNNERS = [ + 'redash.query_runner.big_query', + 'redash.query_runner.graphite', + 'redash.query_runner.mongodb', + 'redash.query_runner.mysql', + 'redash.query_runner.pg', + 'redash.query_runner.script', + 'redash.query_runner.url', +] + # Features: FEATURE_TABLES_PERMISSIONS = parse_boolean(os.environ.get("REDASH_FEATURE_TABLES_PERMISSIONS", "false")) diff --git a/redash/tasks.py b/redash/tasks.py index 2a75bb1e17..f224b635a9 100644 --- a/redash/tasks.py +++ b/redash/tasks.py @@ -8,7 +8,7 @@ from redash import redis_connection, models, statsd_client, settings from redash.utils import gen_query_hash from redash.worker import celery -from redash.data.query_runner import get_query_runner +from redash.query_runner import get_query_runner logger = get_task_logger(__name__) @@ -151,8 +151,6 @@ def refresh_queries(): outdated_queries_count += 1 statsd_client.gauge('manager.outdated_queries', outdated_queries_count) - # TODO: decide if we still need this - # statsd_client.gauge('manager.queue_size', self.redis_connection.zcard('jobs')) logger.info("Done refreshing queries. Found %d outdated queries." % outdated_queries_count) @@ -237,15 +235,15 @@ def execute_query(self, query, data_source_id): query_hash = gen_query_hash(query) query_runner = get_query_runner(data_source.type, data_source.options) - if getattr(query_runner, 'annotate_query', True): - # TODO: anotate with queu ename + if query_runner.annotate_query(): + # TODO: annotate with queue name annotated_query = "/* Task Id: %s, Query hash: %s */ %s" % \ (self.request.id, query_hash, query) else: annotated_query = query with statsd_client.timer('query_runner.{}.{}.run_time'.format(data_source.type, data_source.name)): - data, error = query_runner(annotated_query) + data, error = query_runner.run_query(annotated_query) run_time = time.time() - start_time logger.info("Query finished... data length=%s, error=%s", data and len(data), error) @@ -255,8 +253,6 @@ def execute_query(self, query, data_source_id): # Delete query_hash redis_connection.delete(QueryTask._job_lock_id(query_hash, data_source.id)) - # TODO: it is possible that storing the data will fail, and we will need to retry - # while we already marked the job as done if not error: query_result = models.QueryResult.store_result(data_source.id, query_hash, query, data, run_time, datetime.datetime.utcnow()) else: diff --git a/requirements.txt b/requirements.txt index ee94503011..46e48f4445 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,5 @@ honcho==0.5.0 statsd==2.1.2 gunicorn==18.0 celery==3.1.11 +jsonschema==2.4.0 +click==3.3 diff --git a/setup/bootstrap.sh b/setup/bootstrap.sh index 9c6c89215f..37ef41d6f8 100644 --- a/setup/bootstrap.sh +++ b/setup/bootstrap.sh @@ -146,7 +146,7 @@ if [ $pg_user_exists -ne 0 ]; then sudo -u redash psql -c "grant select on activity_log, events, queries, dashboards, widgets, visualizations, query_results to redash_reader;" redash cd /opt/redash/current - sudo -u redash bin/run ./manage.py ds new "re:dash metadata" "pg" "user=redash_reader password=$REDASH_READER_PASSWORD host=localhost dbname=redash" + sudo -u redash bin/run ./manage.py ds new -n "re:dash metadata" -t "pg" -o "{\"user\": \"redash_reader\", \"password\": \"$REDASH_READER_PASSWORD\", \"host\": \"localhost\", \"dbname\": \"redash\"}" fi # BigQuery dependencies: diff --git a/tests/test_controllers.py b/tests/test_controllers.py index 5bc49d4885..b78644b96b 100644 --- a/tests/test_controllers.py +++ b/tests/test_controllers.py @@ -472,4 +472,45 @@ def test_logout_when_loggedin(self): self.assertTrue(current_user.is_authenticated()) rv = c.get('/logout') self.assertEquals(rv.status_code, 302) - self.assertFalse(current_user.is_authenticated()) \ No newline at end of file + self.assertFalse(current_user.is_authenticated()) + + +class DataSourceTypesTest(BaseTestCase): + def test_returns_data_for_admin(self): + admin = user_factory.create(groups=['admin', 'default']) + with app.test_client() as c, authenticated_user(c, user=admin): + rv = c.get("/api/data_sources/types") + self.assertEqual(rv.status_code, 200) + + def test_returns_403_for_non_admin(self): + with app.test_client() as c, authenticated_user(c): + rv = c.get("/api/data_sources/types") + self.assertEqual(rv.status_code, 403) + + +class DataSourceTest(BaseTestCase): + def test_returns_400_when_missing_fields(self): + admin = user_factory.create(groups=['admin', 'default']) + with app.test_client() as c, authenticated_user(c, user=admin): + rv = c.post("/api/data_sources") + self.assertEqual(rv.status_code, 400) + + rv = json_request(c.post, '/api/data_sources', data={'name': 'DS 1'}) + + self.assertEqual(rv.status_code, 400) + + def test_returns_400_when_configuration_invalid(self): + admin = user_factory.create(groups=['admin', 'default']) + with app.test_client() as c, authenticated_user(c, user=admin): + rv = json_request(c.post, '/api/data_sources', + data={'name': 'DS 1', 'type': 'pg', 'options': '{}'}) + + self.assertEqual(rv.status_code, 400) + + def test_creates_data_source(self): + admin = user_factory.create(groups=['admin', 'default']) + with app.test_client() as c, authenticated_user(c, user=admin): + rv = json_request(c.post, '/api/data_sources', + data={'name': 'DS 1', 'type': 'pg', 'options': '{"dbname": "redash"}'}) + + self.assertEqual(rv.status_code, 200) \ No newline at end of file