Skip to content

Commit

Permalink
feat(query optimization): optimize slow query based on $in clause
Browse files Browse the repository at this point in the history
Closes #77
  • Loading branch information
buchslava committed May 15, 2018
1 parent af0f6b0 commit 476c4ea
Show file tree
Hide file tree
Showing 479 changed files with 10,328,175 additions and 23 deletions.
3,802 changes: 3,800 additions & 2 deletions dist/vizabi-ddfcsv-reader-node.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/vizabi-ddfcsv-reader-node.js.map

Large diffs are not rendered by default.

4,041 changes: 4,033 additions & 8 deletions dist/vizabi-ddfcsv-reader.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/vizabi-ddfcsv-reader.js.map

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"description": "Vizabi DDFcsv reader",
"main": "dist/vizabi-ddfcsv-reader-node.js",
"scripts": {
"test": "mocha --compilers ts:ts-node/register --timeout 200000 test/**/*.spec.ts",
"test": "mocha --compilers ts:ts-node/register --timeout 200000 --max-old-space-size=6144 test/**/*.spec.ts",
"test-travis": "tsc && istanbul cover mocha _mocha -- -R spec --compilers ts:ts-node/register --recursive --timeout 200000 test/**/*.spec.ts && codecov",
"changelog": "conventional-changelog -i CHANGELOG.md -s -p angular",
"github-release": "conventional-github-releaser -p angular",
Expand All @@ -20,7 +20,8 @@
"dependencies": {
"bluebird": "3.5.0",
"fetch-polyfill": "0.8.2",
"papaparse": "4.3.6"
"papaparse": "4.3.6",
"lodash": "4.17.4"
},
"devDependencies": {
"@types/bluebird": "3.5.2",
Expand All @@ -30,7 +31,6 @@
"@types/lodash": "4.14.91",
"@types/mocha": "2.2.45",
"@types/node": "8.5.2",
"@types/papaparse": "4.1.33",
"babel": "6.23.0",
"babel-core": "6.24.1",
"babel-loader": "6.4.1",
Expand All @@ -42,7 +42,6 @@
"conventional-changelog-cli": "1.3.1",
"conventional-github-releaser": "1.1.3",
"istanbul": "0.4.5",
"lodash": "4.17.4",
"mocha": "4.1.0",
"rimraf": "2.6.1",
"shelljs": "0.7.8",
Expand Down
38 changes: 32 additions & 6 deletions src/ddf-csv.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import includes = require('lodash/includes');
import isEmpty = require('lodash/isEmpty');
import { IReader } from './file-readers/reader';
import { getAppropriatePlugin } from './query-optimization-plugins';
import {
CSV_PARSING_ERROR,
DDF_ERROR,
Expand Down Expand Up @@ -47,6 +50,7 @@ export function ddfCsvReader(path: string, fileReader: IReader, logger?) {
const conceptsLookup = new Map<string, any>();

let datapackage;
let optimalFilesSet = [];

function getDatapackagePath(pathParam) {
if (!pathParam.endsWith('datapackage.json')) {
Expand Down Expand Up @@ -77,6 +81,7 @@ export function ddfCsvReader(path: string, fileReader: IReader, logger?) {

try {
datapackage = JSON.parse(data);
optimalFilesSet = [];
} catch (parseErr) {
return reject(new DdfCsvError(JSON_PARSING_ERROR, parseErr, pathParam));
}
Expand Down Expand Up @@ -177,7 +182,23 @@ export function ddfCsvReader(path: string, fileReader: IReader, logger?) {
if (isSchemaQuery(queryParam)) {
return datapackagePromise.then(() => querySchema(queryParam));
} else {
return conceptsPromise.then(() => queryData(queryParam));
return conceptsPromise.then(() => {
const appropriatePlugin = getAppropriatePlugin(fileReader, basePath, queryParam, datapackage);

return new Promise((resolve: Function) => {
if (!appropriatePlugin) {
return resolve();
}

appropriatePlugin.getOptimalFilesSet().then(files => {
optimalFilesSet = files;
resolve();
}).catch(() => {
optimalFilesSet = [];
resolve();
});
});
}).then(() => queryData(queryParam));
}
}

Expand Down Expand Up @@ -513,11 +534,16 @@ export function ddfCsvReader(path: string, fileReader: IReader, logger?) {
.reduce((resultSet, resources) => new Set([...resultSet, ...resources]), new Set());
}
// one key, one value
return new Set(
keyValueLookup
.get(createKeyString(key))
.get(value)
);
let oneKeyOneValueResourcesArray = keyValueLookup
.get(createKeyString(key))
.get(value);

if (oneKeyOneValueResourcesArray) {
oneKeyOneValueResourcesArray = oneKeyOneValueResourcesArray
.filter(v => isEmpty(optimalFilesSet) || includes(optimalFilesSet, v.path));
}

return new Set(oneKeyOneValueResourcesArray);
}

function processResourceResponse(response, select, filterFields) {
Expand Down
252 changes: 252 additions & 0 deletions src/query-optimization-plugins/in-clause-under-conjunction-plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import * as path from 'path';
import head = require('lodash/head');
import values = require('lodash/values');
import keys = require('lodash/keys');
import get = require('lodash/get');
import flattenDeep = require('lodash/flattenDeep');
import isEmpty = require('lodash/isEmpty');
import startsWith = require('lodash/startsWith');
import includes = require('lodash/includes');
import compact = require('lodash/compact');
import {IQueryOptimizationPlugin} from './query-optimization-plugin';
import {IReader} from '../file-readers/reader';

const Papa = require('papaparse');

const WHERE_KEYWORD = 'where';
const JOIN_KEYWORD = 'join';
const KEY_IN = '$in';
const KEY_AND = '$and';

const getFirstConditionClause = clause => head(values(clause));
const getFirstKey = obj => head(keys(obj));
const isOneKeyBased = obj => keys(obj).length === 1;

export class InClauseUnderConjunctionPlugin implements IQueryOptimizationPlugin {
private flow: any = {};

constructor(private fileReader: IReader, private basePath: string, private query, private datapackage) {
}

isMatched(): boolean {
this.flow.joinObject = get(this.query, JOIN_KEYWORD);
const mainAndClause = get(this.query, WHERE_KEYWORD);
const isMainAndClauseCorrect = isOneKeyBased(mainAndClause);
const joinKeys = keys(this.flow.joinObject);

let areJoinKeysSameAsKeyInWhereClause = true;

for (const key of joinKeys) {
const joinPart = this.flow.joinObject[key];
const firstKey = getFirstKey(joinPart.where);

if (joinPart.key !== firstKey && firstKey !== KEY_AND) {
areJoinKeysSameAsKeyInWhereClause = false;
break;
}
}

return isMainAndClauseCorrect && !!this.flow.joinObject && areJoinKeysSameAsKeyInWhereClause;
}

getOptimalFilesSet(): Promise<string[]> {
return new Promise((resolve, reject) => {
if (this.isMatched()) {
this.fillResourceToFileHash().collectProcessableClauses().collectEntityFilesNames().collectEntities()
.then((data) => {
const result = this.fillEntityValuesHash(data).getFilesGroupsQueryClause().getOptimalFilesGroup();

resolve(result);
}).catch(err => reject(err));

} else {
reject(`Plugin "InClauseUnderConjunction" is not matched!`);
}
});
}

private fillResourceToFileHash(): InClauseUnderConjunctionPlugin {
this.flow.resourceToFile = this.datapackage.resources.reduce((hash, resource) => {
hash.set(resource.name, resource.path);

return hash;
}, new Map());

return this;
}

private collectProcessableClauses(): InClauseUnderConjunctionPlugin {
const joinKeys = keys(this.flow.joinObject);

this.flow.processableClauses = [];

for (const joinKey of joinKeys) {
const where = get(this.flow.joinObject[joinKey], WHERE_KEYWORD);

if (this.singleAndField(where)) {
this.flow.processableClauses.push(...flattenDeep(where.$and.map(el => this.getProcessableClauses(el))));
} else {
this.flow.processableClauses.push(...this.getProcessableClauses(where));
}
}

return this;
}

private collectEntityFilesNames(): InClauseUnderConjunctionPlugin {
this.flow.entityFilesNames = [];
this.flow.fileNameToPrimaryKeyHash = new Map();

for (const schemaResourceRecord of this.datapackage.ddfSchema.entities) {
for (const clause of this.flow.processableClauses) {
const primaryKey = getFirstKey(clause);

if (head(schemaResourceRecord.primaryKey) === primaryKey) {
for (const resourceName of schemaResourceRecord.resources) {
const file = this.flow.resourceToFile.get(resourceName);

this.flow.entityFilesNames.push(file);
this.flow.fileNameToPrimaryKeyHash.set(file, primaryKey);
}
}
}
}

return this;
}

private collectEntities(): Promise<any> {
const actions = this.flow.entityFilesNames.map(file => new Promise((actResolve, actReject) => {
this.fileReader.readText(path.resolve(this.basePath, file), (err, text) => {
if (err) {
return actReject(err);
}

Papa.parse(text, {
header: true,
skipEmptyLines: true,
complete: result => actResolve({file, result}),
error: error => actReject(error)
});
});
}));

return Promise.all(actions);
}

private fillEntityValuesHash(entitiesData): InClauseUnderConjunctionPlugin {
const getSubdomainsFromRecord = record => compact(keys(record)
.filter(key => startsWith(key, 'is--') && (record[key] === 'TRUE' || record[key] === 'true'))
.map(key => key.replace(/^is--/, '')));

this.flow.entityValueToFileHash = new Map();
this.flow.entityValueToDomainHash = new Map();

for (const entityFileDescriptor of entitiesData) {
for (const entityRecord of entityFileDescriptor.result.data) {
const primaryKeyForThisFile = this.flow.fileNameToPrimaryKeyHash.get(entityFileDescriptor.file);
const primaryKeyCellValue = entityRecord[primaryKeyForThisFile];
const domainsForCurrentRecord = [...getSubdomainsFromRecord(entityRecord)];

if (isEmpty(domainsForCurrentRecord)) {
domainsForCurrentRecord.push(primaryKeyForThisFile);
}

this.flow.entityValueToDomainHash.set(primaryKeyCellValue, domainsForCurrentRecord);
this.flow.entityValueToFileHash.set(primaryKeyCellValue, entityFileDescriptor.file);
}
}

return this;
}

private getFilesGroupsQueryClause(): InClauseUnderConjunctionPlugin {
const filesGroupsByClause = new Map();

for (const clause of this.flow.processableClauses) {
const filesGroupByClause = {
entities: new Set(),
datapoints: new Set(),
concepts: new Set()
};
const entityValuesFromClause = getFirstConditionClause(clause).$in;

for (const entityValueFromClause of entityValuesFromClause) {
filesGroupByClause.entities.add(this.flow.entityValueToFileHash.get(entityValueFromClause));

const entitiesByQuery = this.flow.entityValueToDomainHash.get(entityValueFromClause);

for (const entityByQuery of entitiesByQuery) {
for (const schemaResourceRecord of this.datapackage.ddfSchema.datapoints) {
for (const resourceName of schemaResourceRecord.resources) {
if (includes(schemaResourceRecord.primaryKey, entityByQuery)) {
filesGroupByClause.datapoints.add(this.flow.resourceToFile.get(resourceName));
}
}
}
}
}

for (const schemaResourceRecord of this.datapackage.ddfSchema.concepts) {
for (const resourceName of schemaResourceRecord.resources) {
filesGroupByClause.concepts.add(this.flow.resourceToFile.get(resourceName));
}
}

filesGroupsByClause.set(clause, filesGroupByClause);
}

this.flow.filesGroupsByClause = filesGroupsByClause;

return this;
}

private getOptimalFilesGroup(): string[] {
const clauseKeys = this.flow.filesGroupsByClause.keys();

let appropriateClauseKey;
let appropriateClauseSize;

for (const key of clauseKeys) {
const size = this.flow.filesGroupsByClause.get(key).datapoints.size +
this.flow.filesGroupsByClause.get(key).entities.size +
this.flow.filesGroupsByClause.get(key).concepts.size;

if (!appropriateClauseKey || size < appropriateClauseSize) {
appropriateClauseKey = key;
appropriateClauseSize = size;
}
}

return [
...Array.from(this.flow.filesGroupsByClause.get(appropriateClauseKey).concepts),
...Array.from(this.flow.filesGroupsByClause.get(appropriateClauseKey).entities),
...Array.from(this.flow.filesGroupsByClause.get(appropriateClauseKey).datapoints)
] as string[];
}

private getProcessableClauses(clause) {
const result = [];
const clauseKeys = keys(clause);

for (const key of clauseKeys) {
if (!startsWith(key, '$') && isOneKeyBased(clause[key])) {
// attention! this functionality process only first clause
// for example, { geo: { '$in': ['world'] } }
// in this example { geo: { '$in': ['world'] }, foo: { '$in': ['bar', 'baz'] } }]
// foo: { '$in': ['bar', 'baz'] } will NOT be processed
const conditionKey = head(keys(clause[key]));

if (conditionKey === KEY_IN) {
result.push(clause);
}
}
}

return result;
}

private singleAndField(clause): boolean {
return isOneKeyBased(clause) && !!get(clause, KEY_AND);
}
}
16 changes: 16 additions & 0 deletions src/query-optimization-plugins/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {InClauseUnderConjunctionPlugin} from './in-clause-under-conjunction-plugin';
import head = require('lodash/head');
import {IQueryOptimizationPlugin} from './query-optimization-plugin';
import {IReader} from '../file-readers/reader';

export function getAppropriatePlugin(
fileReader: IReader,
basePath: string,
queryParam,
datapackage): IQueryOptimizationPlugin {
const plugins = [
new InClauseUnderConjunctionPlugin(fileReader, basePath, queryParam, datapackage)
];

return head(plugins.filter(plugin => plugin.isMatched()));
}
5 changes: 5 additions & 0 deletions src/query-optimization-plugins/query-optimization-plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export interface IQueryOptimizationPlugin {
isMatched(): boolean;

getOptimalFilesSet(): Promise<string[]>;
}
Loading

0 comments on commit 476c4ea

Please sign in to comment.