-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature: add couchbase query runner (#3658)
* feature: add couchbase query runner * fix style * fix style * fix style * fix naming due to convention * extracting protocol as parameter
- Loading branch information
1 parent
fb48bc3
commit a1e75d2
Showing
3 changed files
with
178 additions
and
0 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import datetime | ||
import logging | ||
|
||
from dateutil.parser import parse | ||
|
||
from redash.query_runner import * | ||
from redash.utils import JSONEncoder, json_dumps, json_loads, parse_human_time | ||
import json | ||
|
||
logger = logging.getLogger(__name__) | ||
try: | ||
import requests | ||
import httplib2 | ||
except ImportError as e: | ||
logger.error('Failed to import: ' + str(e)) | ||
|
||
|
||
TYPES_MAP = { | ||
str: TYPE_STRING, | ||
unicode: TYPE_STRING, | ||
int: TYPE_INTEGER, | ||
long: TYPE_INTEGER, | ||
float: TYPE_FLOAT, | ||
bool: TYPE_BOOLEAN, | ||
datetime.datetime: TYPE_DATETIME, | ||
datetime.datetime: TYPE_STRING | ||
} | ||
|
||
|
||
def _get_column_by_name(columns, column_name): | ||
for c in columns: | ||
if "name" in c and c["name"] == column_name: | ||
return c | ||
return None | ||
|
||
|
||
def parse_results(results): | ||
rows = [] | ||
columns = [] | ||
|
||
for row in results: | ||
parsed_row = {} | ||
for key in row: | ||
if isinstance(row[key], dict): | ||
for inner_key in row[key]: | ||
column_name = u'{}.{}'.format(key, inner_key) | ||
if _get_column_by_name(columns, column_name) is None: | ||
columns.append({ | ||
"name": column_name, | ||
"friendly_name": column_name, | ||
"type": TYPES_MAP.get(type(row[key][inner_key]), TYPE_STRING) | ||
}) | ||
|
||
parsed_row[column_name] = row[key][inner_key] | ||
|
||
else: | ||
if _get_column_by_name(columns, key) is None: | ||
columns.append({ | ||
"name": key, | ||
"friendly_name": key, | ||
"type": TYPES_MAP.get(type(row[key]), TYPE_STRING) | ||
}) | ||
|
||
parsed_row[key] = row[key] | ||
|
||
rows.append(parsed_row) | ||
return rows, columns | ||
|
||
|
||
class Couchbase(BaseQueryRunner): | ||
|
||
noop_query = 'Select 1' | ||
|
||
@classmethod | ||
def configuration_schema(cls): | ||
return { | ||
'type': 'object', | ||
'properties': { | ||
'protocol': { | ||
'type': 'string', | ||
'default': 'http' | ||
}, | ||
'host': { | ||
'type': 'string', | ||
}, | ||
'port': { | ||
'type': 'string', | ||
'title': 'Port (Defaults: 8095 - Analytics, 8093 - N1QL)', | ||
'default': '8095' | ||
}, | ||
'user': { | ||
'type': 'string', | ||
}, | ||
'password': { | ||
'type': 'string', | ||
}, | ||
}, | ||
'required': ['host', 'user', 'password'], | ||
'order': ['protocol', 'host', 'port', 'user', 'password'], | ||
'secret': ['password'] | ||
} | ||
|
||
def __init__(self, configuration): | ||
super(Couchbase, self).__init__(configuration) | ||
|
||
@classmethod | ||
def enabled(cls): | ||
return True | ||
|
||
@classmethod | ||
def annotate_query(cls): | ||
return False | ||
|
||
def test_connection(self): | ||
result = self.call_service(self.noop_query, '') | ||
|
||
def get_buckets(self, query, name_param): | ||
defaultColumns = [ | ||
'meta().id' | ||
] | ||
result = self.call_service(query, "").json()['results'] | ||
schema = {} | ||
for row in result: | ||
table_name = row.get(name_param) | ||
schema[table_name] = {'name': table_name, 'columns': defaultColumns} | ||
|
||
return schema.values() | ||
|
||
def get_schema(self, get_stats=False): | ||
|
||
try: | ||
# Try fetch from Analytics | ||
return self.get_buckets( | ||
"SELECT ds.GroupName as name FROM Metadata.`Dataset` ds where ds.DataverseName <> 'Metadata'", "name") | ||
except Exception: | ||
# Try fetch from N1QL | ||
return self.get_buckets("select name from system:keyspaces", "name") | ||
|
||
def call_service(self, query, user): | ||
try: | ||
user = self.configuration.get("user") | ||
password = self.configuration.get("password") | ||
protocol = self.configuration.get("protocol", "http") | ||
host = self.configuration.get("host") | ||
port = self.configuration.get("port", 8095) | ||
params = {'statement': query} | ||
|
||
url = "%s://%s:%s/query/service" % (protocol, host, port) | ||
|
||
r = requests.post(url, params=params, auth=(user, password)) | ||
r.raise_for_status() | ||
return r | ||
except requests.exceptions.HTTPError as err: | ||
if (err.response.status_code == 401): | ||
raise Exception("Wrong username/password") | ||
raise Exception("Couchbase connection error") | ||
|
||
def run_query(self, query, user): | ||
try: | ||
result = self.call_service(query, user) | ||
|
||
rows, columns = parse_results(result.json()['results']) | ||
data = { | ||
"columns": columns, | ||
"rows": rows | ||
} | ||
|
||
return json_dumps(data), None | ||
except KeyboardInterrupt: | ||
return None, "Query cancelled by user." | ||
|
||
@classmethod | ||
def name(cls): | ||
return "Couchbase" | ||
|
||
|
||
register(Couchbase) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters