Skip to content

Commit

Permalink
Merge pull request #346 from erans/master
Browse files Browse the repository at this point in the history
Initial support for Mongo's aggregation framework.
  • Loading branch information
arikfr committed Jan 12, 2015
2 parents 7fc82a2 + 50bed1d commit ef868db
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
2 changes: 2 additions & 0 deletions rd_ui/app/scripts/services/resources.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
} else if (_.isString(v) && v.match(/^\d{4}-\d{2}-\d{2}/)) {
row[k] = moment(v);
columnTypes[k] = 'date';
} else if (typeof(v) == 'object') {
row[k] = JSON.stringify(v);
}
}, this);
}, this);
Expand Down
112 changes: 99 additions & 13 deletions redash/data/query_runner_mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
try:
import pymongo
from bson.objectid import ObjectId
from bson.son import SON
except ImportError:
print "Missing dependencies. Please install pymongo."
print "You can use pip: pip install pymongo"
raise

TYPES_MAP = {
ObjectId : "string",
Expand All @@ -26,6 +28,68 @@

date_regex = re.compile("ISODate\(\"(.*)\"\)", re.IGNORECASE)

# Simple query example:
#
# {
# "collection" : "my_collection",
# "query" : {
# "date" : {
# "$gt" : "ISODate(\"2015-01-15 11:41\")",
# },
# "type" : 1
# },
# "fields" : {
# "_id" : 1,
# "name" : 2
# },
# "sort" : [
# {
# "name" : "date",
# "direction" : -1
# }
# ]
#
# }
#
#
# Aggregation
# ===========
# Uses a syntax similar to the one used in PyMongo, however to support the
# correct order of sorting, it uses a regular list for the "$sort" operation
# that converts into a SON (sorted dictionary) object before execution.
#
# Aggregation query example:
#
# {
# "collection" : "things",
# "aggregate" : [
# {
# "$unwind" : "$tags"
# },
# {
# "$group" : {
# {
# "_id" : "$tags",
# "count" : { "$sum" : 1 }
# }
# }
# },
# {
# "$sort" : [
# {
# "name" : "count",
# "direction" : -1
# },
# {
# "name" : "_id",
# "direction" : -1
# }
# ]
# }
# ]
# }
#
#
def mongodb(connection_string):
def _get_column_by_name(columns, column_name):
for c in columns:
Expand Down Expand Up @@ -56,7 +120,7 @@ def query_runner(query):
if is_replica_set:
if not connection_string["replicaSetName"]:
return None, "replicaSetName is set in the connection string JSON but is empty"

db_connection = pymongo.MongoReplicaSetClient(connection_string["connectionString"], replicaSet=connection_string["replicaSetName"])
else:
db_connection = pymongo.MongoClient(connection_string["connectionString"])
Expand All @@ -74,9 +138,12 @@ def query_runner(query):
except:
return None, "Invalid query format. The query is not a valid JSON."

if "query" in query_data and "aggregate" in query_data:
return None, "'query' and 'aggregate' sections cannot be used at the same time"

collection = None
if not "collection" in query_data:
return None, "'collection' must have a value to run a query"
return None, "'collection' must be set"
else:
collection = query_data["collection"]

Expand All @@ -93,26 +160,46 @@ def query_runner(query):
_convert_date(q[k], k2)

f = None

aggregate = None
if "aggregate" in query_data:
aggregate = query_data["aggregate"]
for step in aggregate:
if "$sort" in step:
sort_list = []
for sort_item in step["$sort"]:
sort_list.append((sort_item["name"], sort_item["direction"]))

step["$sort"] = SON(sort_list)

if aggregate:
pass
else:
s = None
if "sort" in query_data and query_data["sort"]:
s = []
for field in query_data["sort"]:
for k in field:
s.append((k, field[k]))

if "fields" in query_data:
f = query_data["fields"]

s = None
if "sort" in query_data and query_data["sort"]:
s = []
for field_name in query_data["sort"]:
s.append((field_name, query_data["sort"][field_name]))

columns = []
rows = []

error = None
json_data = None

cursor = None
if s:
cursor = db[collection].find(q, f).sort(s)
else:
cursor = db[collection].find(q, f)
if q:
if s:
cursor = db[collection].find(q, f).sort(s)
else:
cursor = db[collection].find(q, f)
elif aggregate:
r = db[collection].aggregate(aggregate)
cursor = r["result"]

for r in cursor:
for k in r:
Expand All @@ -127,7 +214,6 @@ def query_runner(query):
if type(r[k]) == ObjectId:
r[k] = str(r[k])


rows.append(r)

if f:
Expand Down

0 comments on commit ef868db

Please sign in to comment.