diff --git a/client/app/assets/less/ant.less b/client/app/assets/less/ant.less
index d1509a48b5..6238066681 100644
--- a/client/app/assets/less/ant.less
+++ b/client/app/assets/less/ant.less
@@ -14,6 +14,7 @@
@import '~antd/lib/radio/style/index';
@import '~antd/lib/time-picker/style/index';
@import '~antd/lib/pagination/style/index';
+@import '~antd/lib/drawer/style/index';
@import '~antd/lib/table/style/index';
@import '~antd/lib/popover/style/index';
@import '~antd/lib/icon/style/index';
diff --git a/client/app/assets/less/inc/base.less b/client/app/assets/less/inc/base.less
index d5f05424ef..55faf52d6e 100755
--- a/client/app/assets/less/inc/base.less
+++ b/client/app/assets/less/inc/base.less
@@ -124,6 +124,10 @@ strong {
transition: height 0s, width 0s !important;
}
+.admin-schema-editor {
+ padding: 50px 0;
+}
+
// Ace Editor
.ace_editor {
border: 1px solid fade(@redash-gray, 15%) !important;
diff --git a/client/app/assets/less/inc/schema-browser.less b/client/app/assets/less/inc/schema-browser.less
index 0034391086..d547a78790 100644
--- a/client/app/assets/less/inc/schema-browser.less
+++ b/client/app/assets/less/inc/schema-browser.less
@@ -7,14 +7,14 @@ div.table-name {
border-radius: @redash-radius;
position: relative;
- .copy-to-editor {
+ .copy-to-editor, .info {
display: none;
}
&:hover {
background: fade(@redash-gray, 10%);
- .copy-to-editor {
+ .copy-to-editor, .info {
display: flex;
}
}
@@ -36,7 +36,7 @@ div.table-name {
background: transparent;
}
- .copy-to-editor {
+ .copy-to-editor, .info {
color: fade(@redash-gray, 90%);
cursor: pointer;
position: absolute;
@@ -49,6 +49,10 @@ div.table-name {
justify-content: center;
}
+ .info {
+ right: 20px
+ }
+
.table-open {
padding: 0 22px 0 26px;
overflow: hidden;
@@ -56,14 +60,14 @@ div.table-name {
white-space: nowrap;
position: relative;
- .copy-to-editor {
+ .copy-to-editor, .info {
display: none;
}
&:hover {
background: fade(@redash-gray, 10%);
- .copy-to-editor {
+ .copy-to-editor, .info {
display: flex;
}
}
diff --git a/client/app/components/proptypes.js b/client/app/components/proptypes.js
index a1240cf029..d45896f106 100644
--- a/client/app/components/proptypes.js
+++ b/client/app/components/proptypes.js
@@ -11,8 +11,16 @@ export const DataSource = PropTypes.shape({
type_name: PropTypes.string,
});
+export const DataSourceMetadata = PropTypes.shape({
+ key: PropTypes.number,
+ name: PropTypes.string,
+ type: PropTypes.string,
+ example: PropTypes.string,
+ description: PropTypes.string,
+});
+
export const Table = PropTypes.shape({
- columns: PropTypes.arrayOf(PropTypes.string).isRequired,
+ columns: PropTypes.arrayOf(PropTypes.object).isRequired,
});
export const Schema = PropTypes.arrayOf(Table);
@@ -31,6 +39,13 @@ export const RefreshScheduleDefault = {
until: null,
};
+export const TableMetadata = PropTypes.shape({
+ key: PropTypes.number.isRequired,
+ name: PropTypes.string.isRequired,
+ description: PropTypes.string,
+ visible: PropTypes.bool.isRequired,
+});
+
export const Field = PropTypes.shape({
name: PropTypes.string.isRequired,
title: PropTypes.string,
diff --git a/client/app/components/queries/SchemaData.jsx b/client/app/components/queries/SchemaData.jsx
new file mode 100644
index 0000000000..e1ad6bfab3
--- /dev/null
+++ b/client/app/components/queries/SchemaData.jsx
@@ -0,0 +1,101 @@
+import React from 'react';
+import PropTypes from 'prop-types';
+import { react2angular } from 'react2angular';
+import Drawer from 'antd/lib/drawer';
+import Table from 'antd/lib/table';
+
+import { DataSourceMetadata } from '@/components/proptypes';
+
+function textWrapRenderer(text) {
+ return (
+
+ {text}
+
+ );
+}
+
+class SchemaData extends React.PureComponent {
+ static propTypes = {
+ show: PropTypes.bool.isRequired,
+ onClose: PropTypes.func.isRequired,
+ tableName: PropTypes.string,
+ tableDescription: PropTypes.string,
+ tableMetadata: PropTypes.arrayOf(DataSourceMetadata),
+ };
+
+ static defaultProps = {
+ tableName: '',
+ tableDescription: '',
+ tableMetadata: [],
+ };
+
+ render() {
+ const columns = [{
+ title: 'Column Name',
+ dataIndex: 'name',
+ width: 400,
+ key: 'name',
+ render: textWrapRenderer,
+ }, {
+ title: 'Column Type',
+ dataIndex: 'type',
+ width: 400,
+ key: 'type',
+ render: textWrapRenderer,
+ }];
+
+ const hasDescription =
+ this.props.tableMetadata.some(columnMetadata => columnMetadata.description);
+
+ const hasExample =
+ this.props.tableMetadata.some(columnMetadata => columnMetadata.example);
+
+ if (hasDescription) {
+ columns.push({
+ title: 'Description',
+ dataIndex: 'description',
+ width: 400,
+ key: 'description',
+ render: textWrapRenderer,
+ });
+ }
+
+ if (hasExample) {
+ columns.push({
+ title: 'Example',
+ dataIndex: 'example',
+ width: 400,
+ key: 'example',
+ render: textWrapRenderer,
+ });
+ }
+
+ return (
+
+
+ {this.props.tableDescription}
+
+
+
+ );
+ }
+}
+
+export default function init(ngModule) {
+ ngModule.component('schemaData', react2angular(SchemaData, null, []));
+}
+
+init.init = true;
diff --git a/client/app/components/queries/schema-browser.html b/client/app/components/queries/schema-browser.html
index fe7e26669e..5f452acf74 100644
--- a/client/app/components/queries/schema-browser.html
+++ b/client/app/components/queries/schema-browser.html
@@ -19,22 +19,32 @@
-
+
{{table.name}}
({{table.size}})
+
-
{{column}}
+
+ {{column.name}}
+ ng-click="$ctrl.itemSelected($event, [column.name])">
+
diff --git a/client/app/components/queries/schema-browser.js b/client/app/components/queries/schema-browser.js
index ded89d09bc..c5ab533f8e 100644
--- a/client/app/components/queries/schema-browser.js
+++ b/client/app/components/queries/schema-browser.js
@@ -11,6 +11,18 @@ function SchemaBrowserCtrl($rootScope, $scope) {
$scope.$broadcast('vsRepeatTrigger');
};
+ $scope.showSchemaInfo = false;
+ $scope.openSchemaInfo = ($event, table) => {
+ $scope.tableName = table.name;
+ $scope.tableDescription = table.description;
+ $scope.tableMetadata = table.columns;
+ $scope.showSchemaInfo = true;
+ $event.stopPropagation();
+ };
+ $scope.closeSchemaInfo = () => {
+ $scope.$apply(() => { $scope.showSchemaInfo = false; });
+ };
+
this.getSize = (table) => {
let size = 22;
@@ -34,6 +46,13 @@ function SchemaBrowserCtrl($rootScope, $scope) {
}
};
+ this.itemExists = (item) => {
+ if ('visible' in item) {
+ return item.visible;
+ }
+ return false;
+ };
+
this.itemSelected = ($event, hierarchy) => {
$rootScope.$broadcast('query-editor.command', 'paste', hierarchy.join('.'));
$event.preventDefault();
diff --git a/client/app/pages/data-sources/EditDataSource.jsx b/client/app/pages/data-sources/EditDataSource.jsx
index 4db6045387..4ea39e51e3 100644
--- a/client/app/pages/data-sources/EditDataSource.jsx
+++ b/client/app/pages/data-sources/EditDataSource.jsx
@@ -10,6 +10,7 @@ import notification from '@/services/notification';
import PromiseRejectionError from '@/lib/promise-rejection-error';
import LoadingState from '@/components/items-list/components/LoadingState';
import DynamicForm from '@/components/dynamic-form/DynamicForm';
+import SchemaTable from '@/pages/data-sources/schema-table-components/SchemaTable';
import helper from '@/components/dynamic-form/dynamicFormHelper';
import { HelpTrigger, TYPES as HELP_TRIGGER_TYPES } from '@/components/HelpTrigger';
@@ -26,13 +27,23 @@ class EditDataSource extends React.Component {
dataSource: null,
type: null,
loading: true,
+ schema: null,
};
componentDidMount() {
DataSource.get({ id: $route.current.params.dataSourceId }).$promise.then((dataSource) => {
const { type } = dataSource;
this.setState({ dataSource });
- DataSource.types(types => this.setState({ type: find(types, { type }), loading: false }));
+
+ const typesPromise = DataSource.types().$promise;
+ const schemaPromise = DataSource.schema({ id: $route.current.params.dataSourceId }).$promise;
+
+ typesPromise.then(types => this.setState({ type: find(types, { type }) }));
+ schemaPromise.then(data => this.setState({ schema: data.schema }));
+
+ Promise.all([typesPromise, schemaPromise]).then(() => {
+ this.setState({ loading: false });
+ });
}).catch((error) => {
// ANGULAR_REMOVE_ME This code is related to Angular's HTTP services
if (error.status && error.data) {
@@ -78,6 +89,12 @@ class EditDataSource extends React.Component {
});
};
+ updateSchema = (schema, tableId, columnId) => {
+ const { dataSource } = this.state;
+ const data = { tableId, columnId, schema };
+ DataSource.updateSchema({ id: dataSource.id }, data);
+ };
+
testConnection = (callback) => {
const { dataSource } = this.state;
DataSource.test({ id: dataSource.id }, (httpResponse) => {
@@ -124,6 +141,12 @@ class EditDataSource extends React.Component {
+
+
+
);
}
diff --git a/client/app/pages/data-sources/schema-table-components/EditableTable.jsx b/client/app/pages/data-sources/schema-table-components/EditableTable.jsx
new file mode 100644
index 0000000000..6154541e6f
--- /dev/null
+++ b/client/app/pages/data-sources/schema-table-components/EditableTable.jsx
@@ -0,0 +1,87 @@
+import React from 'react';
+import Form from 'antd/lib/form';
+import Input from 'antd/lib/input';
+import PropTypes from 'prop-types';
+import { TableMetadata } from '@/components/proptypes';
+import TableVisibilityCheckbox from './TableVisibilityCheckbox';
+import './schema-table.css';
+
+const FormItem = Form.Item;
+const { TextArea } = Input;
+export const EditableContext = React.createContext();
+
+// eslint-disable-next-line react/prop-types
+const EditableRow = ({ form, index, ...props }) => (
+
+
+
+);
+
+export const EditableFormRow = Form.create()(EditableRow);
+
+export class EditableCell extends React.Component {
+ static propTypes = {
+ dataIndex: PropTypes.string,
+ input_type: PropTypes.string,
+ editing: PropTypes.bool,
+ record: TableMetadata,
+ };
+
+ static defaultProps = {
+ dataIndex: undefined,
+ input_type: undefined,
+ editing: false,
+ record: {},
+ };
+
+ constructor(props) {
+ super(props);
+ this.state = {
+ visible: this.props.record ? this.props.record.visible : false,
+ };
+ }
+
+ onChange = () => {
+ this.setState(prevState => ({ visible: !prevState.visible }));
+ }
+
+ getInput = () => {
+ if (this.props.input_type === 'visible') {
+ return (
+
+ );
+ }
+ return ;
+ };
+
+ render() {
+ const {
+ editing,
+ dataIndex,
+ record,
+ ...restProps
+ } = this.props;
+
+ return (
+
+ {(form) => {
+ const { getFieldDecorator } = form;
+ return (
+
+ {editing ? (
+
+ {getFieldDecorator(dataIndex, {
+ initialValue: record[dataIndex],
+ })(this.getInput()) }
+
+ ) : restProps.children}
+ |
+ );
+ }}
+
+ );
+ }
+}
diff --git a/client/app/pages/data-sources/schema-table-components/SchemaTable.jsx b/client/app/pages/data-sources/schema-table-components/SchemaTable.jsx
new file mode 100644
index 0000000000..b5f5ecc2c0
--- /dev/null
+++ b/client/app/pages/data-sources/schema-table-components/SchemaTable.jsx
@@ -0,0 +1,258 @@
+import React from 'react';
+import PropTypes from 'prop-types';
+import Table from 'antd/lib/table';
+import Popconfirm from 'antd/lib/popconfirm';
+import { Schema } from '@/components/proptypes';
+import { EditableCell, EditableFormRow, EditableContext } from './EditableTable';
+import TableVisibilityCheckbox from './TableVisibilityCheckbox';
+
+import './schema-table.css';
+
+function fetchTableData(schema) {
+ return schema.map(tableData => ({
+ key: tableData.id,
+ name: tableData.name,
+ description: tableData.description || '',
+ visible: tableData.visible,
+ columns: tableData.columns,
+ }));
+}
+
+const components = {
+ body: {
+ row: EditableFormRow,
+ cell: EditableCell,
+ },
+};
+
+export default class SchemaTable extends React.Component {
+ static propTypes = {
+ schema: Schema, // eslint-disable-line react/no-unused-prop-types
+ updateSchema: PropTypes.func.isRequired,
+ };
+
+ static defaultProps = {
+ schema: null,
+ };
+
+ constructor(props) {
+ super(props);
+ this.state = { data: [], editingKey: '' };
+ this.columns = [{
+ title: 'Table Name',
+ dataIndex: 'name',
+ width: '20%',
+ key: 'name',
+ }, {
+ title: 'Table Description',
+ dataIndex: 'description',
+ width: '52%',
+ key: 'description',
+ editable: true,
+ render: this.truncateDescriptionText,
+ }, {
+ title: 'Visibility',
+ dataIndex: 'visible',
+ width: '13%',
+ key: 'visible',
+ editable: true,
+ render: (text, record) => (
+
+ ),
+ }, {
+ title: '',
+ width: '15%',
+ dataIndex: 'edit',
+ key: 'edit',
+ // Purposely calling fieldEditor() instead of setting render() to it
+ // because render() will pass a different third argument than what
+ // fieldEditory() takes
+ render: (text, record) => this.fieldEditor(text, record),
+ }];
+ }
+
+ static getDerivedStateFromProps(nextProps, prevState) {
+ if (nextProps.schema && prevState.data.length === 0) {
+ return {
+ data: fetchTableData(nextProps.schema),
+ editingKey: prevState.editingKey,
+ };
+ }
+ return prevState;
+ }
+
+ expandedRowRender = (tableData) => {
+ const columns = [
+ {
+ title: 'Column Name',
+ dataIndex: 'name',
+ key: 'name',
+ width: '15%',
+ }, {
+ title: 'Column Type',
+ dataIndex: 'type',
+ key: 'type',
+ width: '15%',
+ }, {
+ title: 'Column Example',
+ dataIndex: 'example',
+ key: 'example',
+ width: '20%',
+ }, {
+ title: 'Column Description',
+ dataIndex: 'description',
+ key: 'description',
+ width: '35%',
+ editable: true,
+ render: this.truncateDescriptionText,
+ onCell: record => ({
+ record,
+ input_type: 'text',
+ dataIndex: 'description',
+ title: 'Column Description',
+ editing: this.isEditing(record),
+ }),
+ },
+ {
+ title: '',
+ width: '15%',
+ dataIndex: 'edit',
+ key: 'edit',
+ render: (text, record) => this.fieldEditor(text, record, tableData),
+ },
+ ];
+
+ return (
+
+ );
+ }
+
+ truncateDescriptionText = (text) => {
+ if (!text) {
+ return;
+ }
+ const MAX_CHARACTER_COUNT = 305;
+ const addEllipses = text.length > MAX_CHARACTER_COUNT;
+ return (
+
+ {`${text.replace(/\n/g, ' ').substring(0, MAX_CHARACTER_COUNT)}${addEllipses ? '...' : ''}`}
+
+ );
+ }
+
+ fieldEditor(text, record, tableData) {
+ const editable = this.isEditing(record);
+ const tableKey = tableData ? tableData.key : record.key;
+ const columnKey = tableData ? record.key : undefined;
+ return (
+
+ );
+ }
+
+ cancel() {
+ this.setState({ editingKey: '' });
+ }
+
+ edit(key) {
+ this.setState({ editingKey: key });
+ }
+
+ isEditing(record) {
+ return record.key === this.state.editingKey;
+ }
+
+ save(form, tableKey, columnKey) {
+ form.validateFields((error, editedFields) => {
+ if (error) {
+ return;
+ }
+ this.setState((prevState) => {
+ const newData = [...prevState.data];
+ let spliceIndex = newData.findIndex(item => tableKey === item.key);
+
+ if (spliceIndex < 0) {
+ return;
+ }
+
+ const tableRow = newData[spliceIndex];
+ let dataToUpdate = newData;
+ let rowToUpdate = tableRow;
+
+ const columnIndex = tableRow.columns.findIndex(item => columnKey === item.key);
+ const columnRow = tableRow.columns[columnIndex];
+ if (columnKey) {
+ dataToUpdate = tableRow.columns;
+ spliceIndex = columnIndex;
+ rowToUpdate = columnRow;
+ }
+
+ dataToUpdate.splice(spliceIndex, 1, {
+ ...rowToUpdate,
+ ...editedFields,
+ });
+ this.props.updateSchema(editedFields, tableRow.key, columnRow ? columnRow.key : undefined);
+ return { data: newData, editingKey: '' };
+ });
+ });
+ }
+
+ render() {
+ const columns = this.columns.map(col => ({
+ ...col,
+ onCell: record => ({
+ record,
+ input_type: col.dataIndex,
+ dataIndex: col.dataIndex,
+ title: col.title,
+ editing: col.editable ? this.isEditing(record) : false,
+ }),
+ }));
+
+ return (
+
+ );
+ }
+}
diff --git a/client/app/pages/data-sources/schema-table-components/TableVisibilityCheckbox.jsx b/client/app/pages/data-sources/schema-table-components/TableVisibilityCheckbox.jsx
new file mode 100644
index 0000000000..17a7ac7f6b
--- /dev/null
+++ b/client/app/pages/data-sources/schema-table-components/TableVisibilityCheckbox.jsx
@@ -0,0 +1,29 @@
+import React from 'react';
+import PropTypes from 'prop-types';
+import Checkbox from 'antd/lib/checkbox';
+
+
+export default class TableVisibilityCheckbox extends React.PureComponent {
+ static propTypes = {
+ visible: PropTypes.bool.isRequired,
+ onChange: PropTypes.func,
+ disabled: PropTypes.bool,
+ };
+
+ static defaultProps = {
+ disabled: false,
+ onChange: undefined,
+ };
+
+ render() {
+ return (
+
+ {this.props.visible ? 'Visible' : 'Hidden'}
+
+ );
+ }
+}
diff --git a/client/app/pages/data-sources/schema-table-components/schema-table.css b/client/app/pages/data-sources/schema-table-components/schema-table.css
new file mode 100644
index 0000000000..790177326b
--- /dev/null
+++ b/client/app/pages/data-sources/schema-table-components/schema-table.css
@@ -0,0 +1,4 @@
+.editable-row, .table-description {
+ word-break: break-all;
+ white-space: pre-line;
+}
diff --git a/client/app/services/data-source.js b/client/app/services/data-source.js
index efd4422a0d..3db1162cb8 100644
--- a/client/app/services/data-source.js
+++ b/client/app/services/data-source.js
@@ -18,6 +18,7 @@ function DataSourceService($q, $resource, $http) {
const actions = {
get: { method: 'GET', cache: false, isArray: false },
+ post: { method: 'POST', cache: false, isArray: false },
query: { method: 'GET', cache: false, isArray: true },
save: { method: 'POST' },
types: {
@@ -32,6 +33,18 @@ function DataSourceService($q, $resource, $http) {
isArray: false,
url: 'api/data_sources/:id/test',
},
+ schema: {
+ method: 'GET',
+ cache: false,
+ isArray: false,
+ url: 'api/data_sources/:id/schema',
+ },
+ updateSchema: {
+ method: 'POST',
+ cache: false,
+ isArray: false,
+ url: 'api/data_sources/:id/schema',
+ },
};
const DataSourceResource = $resource('api/data_sources/:id', { id: '@id' }, actions);
diff --git a/migrations/versions/280daa582976_.py b/migrations/versions/280daa582976_.py
new file mode 100644
index 0000000000..578ba29af5
--- /dev/null
+++ b/migrations/versions/280daa582976_.py
@@ -0,0 +1,55 @@
+"""Add column metadata and table metadata
+
+Revision ID: 280daa582976
+Revises: 151a4c333e96
+Create Date: 2019-01-24 18:23:53.040608
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '280daa582976'
+down_revision = '151a4c333e96'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.create_table(
+ 'table_metadata',
+ sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
+ sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('org_id', sa.Integer(), nullable=False),
+ sa.Column('data_source_id', sa.Integer(), nullable=False),
+ sa.Column('exists', sa.Boolean(), nullable=False),
+ sa.Column('name', sa.String(length=255), nullable=False),
+ sa.Column('description', sa.String(length=4096), nullable=True),
+ sa.Column('column_metadata', sa.Boolean(), nullable=False),
+ sa.Column('sample_query', sa.Text(), nullable=True),
+ sa.ForeignKeyConstraint(['data_source_id'], ['data_sources.id'], ondelete="CASCADE"),
+ sa.ForeignKeyConstraint(['org_id'], ['organizations.id']),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.create_table(
+ 'column_metadata',
+ sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False),
+ sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('org_id', sa.Integer(), nullable=False),
+ sa.Column('table_id', sa.Integer(), nullable=False),
+ sa.Column('name', sa.String(length=255), nullable=False),
+ sa.Column('type', sa.String(length=255), nullable=True),
+ sa.Column('example', sa.String(length=4096), nullable=True),
+ sa.Column('exists', sa.Boolean(), nullable=False),
+ sa.ForeignKeyConstraint(['table_id'], ['table_metadata.id'], ondelete="CASCADE"),
+ sa.ForeignKeyConstraint(['org_id'], ['organizations.id']),
+ sa.PrimaryKeyConstraint('id')
+ )
+
+
+def downgrade():
+ op.drop_table('column_metadata')
+ op.drop_table('table_metadata')
diff --git a/migrations/versions/6adb92e75691_.py b/migrations/versions/6adb92e75691_.py
new file mode 100644
index 0000000000..c3997d0c9b
--- /dev/null
+++ b/migrations/versions/6adb92e75691_.py
@@ -0,0 +1,25 @@
+"""Add sample_updated_at column to table_metadata
+
+Revision ID: 6adb92e75691
+Revises: 280daa582976
+Create Date: 2019-04-10 20:13:13.714589
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '6adb92e75691'
+down_revision = '280daa582976'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.add_column('table_metadata', sa.Column(
+ 'sample_updated_at', sa.DateTime(timezone=True), nullable=True))
+
+
+def downgrade():
+ op.drop_column('table_metadata', 'sample_updated_at')
diff --git a/migrations/versions/cf135a57332e_.py b/migrations/versions/cf135a57332e_.py
new file mode 100644
index 0000000000..3f58c97b2f
--- /dev/null
+++ b/migrations/versions/cf135a57332e_.py
@@ -0,0 +1,26 @@
+"""Add column description and table visibility fields
+
+Revision ID: cf135a57332e
+Revises: 6adb92e75691
+Create Date: 2019-02-05 19:52:48.233070
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = 'cf135a57332e'
+down_revision = '6adb92e75691'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ op.add_column('column_metadata', sa.Column('description', sa.String(length=4096), nullable=True))
+ op.add_column('table_metadata', sa.Column('visible', sa.Boolean(), nullable=False, server_default='True'))
+
+
+def downgrade():
+ op.drop_column('table_metadata', 'visible')
+ op.drop_column('column_metadata', 'description')
diff --git a/redash/cli/data_sources.py b/redash/cli/data_sources.py
index fa7c7a8032..b758221f9c 100644
--- a/redash/cli/data_sources.py
+++ b/redash/cli/data_sources.py
@@ -9,6 +9,7 @@
from redash import models
from redash.query_runner import (get_configuration_schema_for_query_runner_type,
query_runners)
+from redash.tasks import refresh_samples
from redash.utils import json_loads
from redash.utils.configuration import ConfigurationContainer
@@ -110,7 +111,7 @@ def new(name=None, type=None, options=None, organization='default'):
options_obj = {}
- for k, prop in schema['properties'].iteritems():
+ for k, prop in sorted(schema['properties'].iteritems()):
required = k in schema.get('required', [])
default_value = "<>"
if required:
@@ -172,6 +173,27 @@ def update_attr(obj, attr, new_value):
setattr(obj, attr, new_value)
+@manager.command()
+@click.argument('name')
+@click.option('--org', 'organization', default='default',
+ help="The organization the user belongs to (leave blank for "
+ "'default').")
+@click.option('--count', 'num_tables', default=50,
+ help="number of tables to process data samples for")
+def refresh_data_samples(name, num_tables=50, organization='default'):
+ """Refresh table samples by data source name."""
+ try:
+ org = models.Organization.get_by_slug(organization)
+ data_source = models.DataSource.query.filter(
+ models.DataSource.name == name,
+ models.DataSource.org == org).one()
+ print("Refreshing samples for data source: {} (id={})".format(name, data_source.id))
+ refresh_samples(data_source.id, num_tables)
+ except NoResultFound:
+ print("Couldn't find data source named: {}".format(name))
+ exit(1)
+
+
@manager.command()
@click.argument('name')
@click.option('--name', 'new_name', default=None,
diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py
index 54e29c5e4f..4b92ea1014 100644
--- a/redash/handlers/data_sources.py
+++ b/redash/handlers/data_sources.py
@@ -6,10 +6,11 @@
from six import text_type
from sqlalchemy.exc import IntegrityError
-from redash import models
+from redash import models, settings
from redash.handlers.base import BaseResource, get_object_or_404, require_fields
from redash.permissions import (require_access, require_admin,
require_permission, view_only)
+from redash.tasks.queries import refresh_schema
from redash.query_runner import (get_configuration_schema_for_query_runner_type,
query_runners, NotSupported)
from redash.utils import filter_none
@@ -53,6 +54,9 @@ def post(self, data_source_id):
data_source.name = req['name']
models.db.session.add(data_source)
+ # Refresh the stored schemas when a data source is updated
+ refresh_schema.apply_async(args=(data_source.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
+
try:
models.db.session.commit()
except IntegrityError as e:
@@ -98,7 +102,7 @@ def get(self):
continue
try:
- d = ds.to_dict()
+ d = ds.to_dict(all=True)
d['view_only'] = all(project(ds.groups, self.current_user.group_ids).values())
response[ds.id] = d
except AttributeError:
@@ -134,6 +138,9 @@ def post(self):
options=config)
models.db.session.commit()
+
+ # Refresh the stored schemas when a new data source is added to the list
+ refresh_schema.apply_async(args=(datasource.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
except IntegrityError as e:
models.db.session.rollback()
if req['name'] in e.message:
@@ -151,15 +158,22 @@ def post(self):
class DataSourceSchemaResource(BaseResource):
+ @require_admin
+ def post(self, data_source_id):
+ data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
+ new_schema_data = request.get_json(force=True)
+ models.DataSource.save_schema(new_schema_data)
+
def get(self, data_source_id):
data_source = get_object_or_404(models.DataSource.get_by_id_and_org, data_source_id, self.current_org)
require_access(data_source, self.current_user, view_only)
refresh = request.args.get('refresh') is not None
response = {}
-
try:
- response['schema'] = data_source.get_schema(refresh)
+ if refresh:
+ refresh_schema.apply_async(args=(data_source.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
+ response['schema'] = data_source.get_schema()
except NotSupported:
response['error'] = {
'code': 1,
diff --git a/redash/models/__init__.py b/redash/models/__init__.py
index 41f67337d3..212e3d3512 100644
--- a/redash/models/__init__.py
+++ b/redash/models/__init__.py
@@ -4,6 +4,8 @@
import time
import pytz
+import xlsxwriter
+from operator import itemgetter
from six import python_2_unicode_compatible, text_type
from sqlalchemy import distinct, or_, and_, UniqueConstraint
from sqlalchemy.dialects import postgresql
@@ -62,6 +64,70 @@ def get(self, query_id):
scheduled_queries_executions = ScheduledQueriesExecutions()
+@python_2_unicode_compatible
+@generic_repr('id', 'name', 'data_source_id', 'org_id', 'exists', 'column_metadata')
+class TableMetadata(TimestampMixin, db.Model):
+ id = Column(db.Integer, primary_key=True)
+ org_id = Column(db.Integer, db.ForeignKey("organizations.id"))
+ data_source_id = Column(db.Integer, db.ForeignKey("data_sources.id", ondelete="CASCADE"))
+ exists = Column(db.Boolean, default=True)
+ visible = Column(db.Boolean, default=True)
+ name = Column(db.String(255))
+ description = Column(db.String(4096), nullable=True)
+ column_metadata = Column(db.Boolean, default=False)
+ sample_query = Column("sample_query", db.Text, nullable=True)
+ sample_updated_at = Column(db.DateTime(True), nullable=True)
+
+ __tablename__ = 'table_metadata'
+
+ def __str__(self):
+ return text_type(self.name)
+
+ def to_dict(self):
+ return {
+ 'id': self.id,
+ 'org_id': self.org_id,
+ 'data_source_id': self.data_source_id,
+ 'exists': self.exists,
+ 'visible': self.visible,
+ 'name': self.name,
+ 'description': self.description,
+ 'column_metadata': self.column_metadata,
+ 'sample_query': self.sample_query,
+ 'sample_updated_at': self.sample_updated_at,
+ }
+
+
+@python_2_unicode_compatible
+@generic_repr('id', 'name', 'type', 'table_id', 'org_id', 'exists')
+class ColumnMetadata(TimestampMixin, db.Model):
+ id = Column(db.Integer, primary_key=True)
+ org_id = Column(db.Integer, db.ForeignKey("organizations.id"))
+ table_id = Column(db.Integer, db.ForeignKey("table_metadata.id", ondelete="CASCADE"))
+ name = Column(db.String(255))
+ type = Column(db.String(255), nullable=True)
+ example = Column(db.String(4096), nullable=True)
+ exists = Column(db.Boolean, default=True)
+ description = Column(db.String(4096), nullable=True)
+
+ __tablename__ = 'column_metadata'
+
+ def __str__(self):
+ return text_type(self.name)
+
+ def to_dict(self):
+ return {
+ 'id': self.id,
+ 'org_id': self.org_id,
+ 'table_id': self.table_id,
+ 'name': self.name,
+ 'type': self.type,
+ 'example': self.example,
+ 'exists': self.exists,
+ 'description': self.description,
+ }
+
+
@python_2_unicode_compatible
@generic_repr('id', 'name', 'type', 'org_id', 'created_at')
class DataSource(BelongsToOrgMixin, db.Model):
@@ -135,30 +201,67 @@ def all(cls, org, group_ids=None):
def get_by_id(cls, _id):
return cls.query.filter(cls.id == _id).one()
+ @classmethod
+ def save_schema(self, schema_info):
+ if 'columnId' in schema_info:
+ ColumnMetadata.query.filter(
+ ColumnMetadata.id == schema_info['columnId'],
+ ColumnMetadata.table_id == schema_info['tableId'],
+ ).update(schema_info['schema'])
+ else:
+ TableMetadata.query.filter(
+ TableMetadata.id == schema_info['tableId']
+ ).update(schema_info['schema'])
+
+ db.session.commit()
+
def delete(self):
Query.query.filter(Query.data_source == self).update(dict(data_source_id=None, latest_query_data_id=None))
QueryResult.query.filter(QueryResult.data_source == self).delete()
res = db.session.delete(self)
db.session.commit()
-
- redis_connection.delete(self._schema_key)
-
return res
- def get_schema(self, refresh=False):
- cache = None
- if not refresh:
- cache = redis_connection.get(self._schema_key)
-
- if cache is None:
- query_runner = self.query_runner
- schema = sorted(query_runner.get_schema(get_stats=refresh), key=lambda t: t['name'])
-
- redis_connection.set(self._schema_key, json_dumps(schema))
- else:
- schema = json_loads(cache)
-
- return schema
+ def get_schema(self):
+ schema = []
+ columns_by_table_id = {}
+
+ tables = TableMetadata.query.filter(
+ TableMetadata.data_source_id == self.id,
+ TableMetadata.exists.is_(True),
+ ).all()
+ table_ids = [table.id for table in tables]
+
+ columns = ColumnMetadata.query.filter(
+ ColumnMetadata.exists.is_(True),
+ ColumnMetadata.table_id.in_(table_ids),
+ ).all()
+
+ for column in columns:
+ columns_by_table_id.setdefault(column.table_id, []).append({
+ 'key': column.id,
+ 'name': column.name,
+ 'type': column.type,
+ 'exists': column.exists,
+ 'example': column.example,
+ 'description': column.description,
+ })
+
+ for table in tables:
+ table_info = {
+ 'id': table.id,
+ 'name': table.name,
+ 'exists': table.exists,
+ 'visible': table.visible,
+ 'hasColumnMetadata': table.column_metadata,
+ 'description': table.description,
+ 'columns': []}
+
+ table_info['columns'] = sorted(
+ columns_by_table_id.get(table.id, []), key=itemgetter('name'))
+ schema.append(table_info)
+
+ return sorted(schema, key=itemgetter('name'))
@property
def _schema_key(self):
diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py
index 95b70ad0df..29695cb74b 100644
--- a/redash/query_runner/__init__.py
+++ b/redash/query_runner/__init__.py
@@ -58,6 +58,7 @@ class BaseQueryRunner(object):
deprecated = False
should_annotate_query = True
noop_query = None
+ sample_query = None
def __init__(self, configuration):
self.syntax = 'sql'
@@ -126,6 +127,25 @@ def _run_query_internal(self, query):
raise Exception("Failed running query [%s]." % query)
return json_loads(results)['rows']
+ def get_table_sample(self, table_name):
+ if self.sample_query is None:
+ raise NotImplementedError()
+
+ query = self.sample_query.format(table=table_name)
+
+ results, error = self.run_query(query, None)
+ if error is not None:
+ logger.exception(error)
+ raise NotSupported()
+
+ rows = json_loads(results).get('rows', [])
+ if len(rows) > 0:
+ sample = rows[0]
+ else:
+ sample = {}
+
+ return sample
+
@classmethod
def to_dict(cls):
return {
diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py
index c21ac822fd..b75e38ea12 100644
--- a/redash/query_runner/athena.py
+++ b/redash/query_runner/athena.py
@@ -45,6 +45,10 @@ def format(self, operation, parameters=None):
class Athena(BaseQueryRunner):
noop_query = 'SELECT 1'
+ # This takes a 1% random sample from {table}, reducing
+ # the runtime and data scanned for the query
+ sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"
+
@classmethod
def name(cls):
return "Amazon Athena"
@@ -90,6 +94,10 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
},
+ 'samples': {
+ 'type': 'boolean',
+ 'title': 'Show Data Samples'
+ },
},
'required': ['region', 's3_staging_dir'],
'order': ['region', 's3_staging_dir', 'schema', 'work_group'],
@@ -183,9 +191,18 @@ def __get_schema_from_glue(self):
table_name = '%s.%s' % (database['Name'], table['Name'])
if table_name not in schema:
column = [columns['Name'] for columns in table['StorageDescriptor']['Columns']]
- schema[table_name] = {'name': table_name, 'columns': column}
+ metadata = [{
+ "name": column_data['Name'],
+ "type": column_data['Type']
+ } for column_data in table['StorageDescriptor']['Columns']]
+ schema[table_name] = {'name': table_name, 'columns': column, 'metadata': metadata}
for partition in table.get('PartitionKeys', []):
schema[table_name]['columns'].append(partition['Name'])
+ schema[table_name]['metadata'].append({
+ "name": partition['Name'],
+ "type": partition['Type']
+ })
+
return schema.values()
def get_schema(self, get_stats=False):
@@ -194,7 +211,7 @@ def get_schema(self, get_stats=False):
schema = {}
query = """
- SELECT table_schema, table_name, column_name
+ SELECT table_schema, table_name, column_name, data_type AS column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema')
"""
@@ -204,11 +221,17 @@ def get_schema(self, get_stats=False):
raise Exception("Failed getting schema.")
results = json_loads(results)
- for row in results['rows']:
+
+ for i, row in enumerate(results['rows']):
table_name = '{0}.{1}'.format(row['table_schema'], row['table_name'])
if table_name not in schema:
- schema[table_name] = {'name': table_name, 'columns': []}
+ schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []}
+
schema[table_name]['columns'].append(row['column_name'])
+ schema[table_name]['metadata'].append({
+ "name": row['column_name'],
+ "type": row['column_type'],
+ })
return schema.values()
diff --git a/redash/query_runner/big_query.py b/redash/query_runner/big_query.py
index 51aff7e665..9578dac50f 100644
--- a/redash/query_runner/big_query.py
+++ b/redash/query_runner/big_query.py
@@ -2,6 +2,7 @@
import logging
import sys
import time
+import operator
from base64 import b64decode
import httplib2
@@ -133,7 +134,11 @@ def configuration_schema(cls):
"title": "Toggle Table String",
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
- }
+ },
+ 'samples': {
+ 'type': 'boolean',
+ 'title': 'Show Data Samples'
+ },
},
'required': ['jsonKeyFile', 'projectId'],
"order": ['projectId', 'jsonKeyFile', 'loadSchema', 'useStandardSql', 'location', 'totalMBytesProcessedLimit', 'maximumBillingTier', 'userDefinedFunctionResourceUri'],
@@ -248,22 +253,118 @@ def _get_query_result(self, jobs, query):
def _get_columns_schema(self, table_data):
columns = []
+ metadata = []
for column in table_data.get('schema', {}).get('fields', []):
- columns.extend(self._get_columns_schema_column(column))
+ metadatum = self._get_column_metadata(column)
+ metadata.extend(metadatum)
+ columns.extend(map(operator.itemgetter('name'), metadatum))
project_id = self._get_project_id()
table_name = table_data['id'].replace("%s:" % project_id, "")
- return {'name': table_name, 'columns': columns}
+ return {'name': table_name, 'columns': columns, 'metadata': metadata}
+
+ def _get_column_metadata(self, column):
+ metadata = []
- def _get_columns_schema_column(self, column):
- columns = []
if column['type'] == 'RECORD':
for field in column['fields']:
- columns.append(u"{}.{}".format(column['name'], field['name']))
+ field_name = u"{}.{}".format(column['name'], field['name'])
+ metadata.append({'name': field_name, 'type': field['type']})
else:
- columns.append(column['name'])
+ metadata.append({'name': column['name'], 'type': column['type']})
+
+ return metadata
+
+ def _columns_and_samples_to_dict(self, schema, samples):
+ samples_dict = {}
+ if not samples:
+ return samples_dict
+
+ # If a sample exists, its shape/length should be analogous to
+ # the schema provided (i.e their lengths should match up)
+ for i, column in enumerate(schema):
+ if column['type'] == 'RECORD':
+ if column.get('mode', None) == 'REPEATED':
+ # Repeated fields have multiple samples of the same format.
+ # We only need to show the first one as an example.
+ associated_sample = [] if len(samples[i]) == 0 else samples[i][0]
+ else:
+ associated_sample = samples[i] or []
+
+ for j, field in enumerate(column['fields']):
+ field_name = u"{}.{}".format(column['name'], field['name'])
+ samples_dict[field_name] = None
+ if len(associated_sample) > 0:
+ samples_dict[field_name] = associated_sample[j]
+ else:
+ samples_dict[column['name']] = samples[i]
+
+ return samples_dict
+
+
+ def _flatten_samples(self, samples):
+ samples_list = []
+ for field in samples:
+ value = field['v']
+ if isinstance(value, dict):
+ samples_list.append(
+ self._flatten_samples(value.get('f', []))
+ )
+ elif isinstance(value, list):
+ samples_list.append(
+ self._flatten_samples(value)
+ )
+ else:
+ samples_list.append(value)
+
+ return samples_list
- return columns
+ def get_table_sample(self, table_name):
+ if not self.configuration.get('loadSchema', False):
+ return {}
+
+ service = self._get_bigquery_service()
+ project_id = self._get_project_id()
+
+ dataset_id, table_id = table_name.split('.', 1)
+
+ try:
+ # NOTE: the `sample_response` is limited by `maxResults` here.
+ # Without this limit, the response would be very large and require
+ # pagination using `nextPageToken`.
+ sample_response = service.tabledata().list(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ fields="rows",
+ maxResults=1
+ ).execute()
+ schema_response = service.tables().get(
+ projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ fields="schema,id",
+ ).execute()
+ table_rows = sample_response.get('rows', [])
+
+ if len(table_rows) == 0:
+ samples = []
+ else:
+ samples = table_rows[0].get('f', [])
+
+ schema = schema_response.get('schema', {}).get('fields', [])
+ columns = self._get_columns_schema(schema_response).get('columns', [])
+
+ flattened_samples = self._flatten_samples(samples)
+ samples_dict = self._columns_and_samples_to_dict(schema, flattened_samples)
+ return samples_dict
+ except HttpError as http_error:
+ logger.exception(
+ "Error communicating with server for sample for table %s: %s",
+ table_name,
+ http_error
+ )
+ return {}
def get_schema(self, get_stats=False):
if not self.configuration.get('loadSchema', False):
diff --git a/redash/query_runner/mysql.py b/redash/query_runner/mysql.py
index 9028a88755..ab1b038a5f 100644
--- a/redash/query_runner/mysql.py
+++ b/redash/query_runner/mysql.py
@@ -40,6 +40,7 @@ def __init__(self):
class Mysql(BaseSQLQueryRunner):
noop_query = "SELECT 1"
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -73,7 +74,11 @@ def configuration_schema(cls):
"title": "Toggle Table String",
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
- }
+ },
+ 'samples': {
+ 'type': 'boolean',
+ 'title': 'Show Data Samples'
+ },
},
"order": ['host', 'port', 'user', 'passwd', 'db'],
'required': ['db'],
@@ -135,7 +140,8 @@ def _get_tables(self, schema):
query = """
SELECT col.table_schema as table_schema,
col.table_name as table_name,
- col.column_name as column_name
+ col.column_name as column_name,
+ col.column_type as column_type
FROM `information_schema`.`columns` col
WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"""
@@ -147,7 +153,7 @@ def _get_tables(self, schema):
results = json_loads(results)
- for row in results['rows']:
+ for i, row in enumerate(results['rows']):
if row['table_schema'] != self.configuration['db']:
table_name = u'{}.{}'.format(row['table_schema'],
row['table_name'])
@@ -155,9 +161,13 @@ def _get_tables(self, schema):
table_name = row['table_name']
if table_name not in schema:
- schema[table_name] = {'name': table_name, 'columns': []}
+ schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []}
schema[table_name]['columns'].append(row['column_name'])
+ schema[table_name]['metadata'].append({
+ "name": row['column_name'],
+ "type": row['column_type'],
+ })
return schema.values()
diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py
index 21a9620725..3e7d875cab 100644
--- a/redash/query_runner/pg.py
+++ b/redash/query_runner/pg.py
@@ -65,6 +65,7 @@ def _wait(conn, timeout=None):
class PostgreSQL(BaseSQLQueryRunner):
noop_query = "SELECT 1"
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -99,7 +100,11 @@ def configuration_schema(cls):
"title": "Toggle Table String",
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
- }
+ },
+ "samples": {
+ "type": "boolean",
+ "title": "Show Data Samples"
+ },
},
"order": ['host', 'port', 'user', 'password'],
"required": ["dbname"],
@@ -126,9 +131,13 @@ def _get_definitions(self, schema, query):
table_name = row['table_name']
if table_name not in schema:
- schema[table_name] = {'name': table_name, 'columns': []}
+ schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []}
schema[table_name]['columns'].append(row['column_name'])
+ schema[table_name]['metadata'].append({
+ "name": row['column_name'],
+ "type": row['column_type'],
+ })
def _get_tables(self, schema):
'''
@@ -148,7 +157,8 @@ def _get_tables(self, schema):
query = """
SELECT s.nspname as table_schema,
c.relname as table_name,
- a.attname as column_name
+ a.attname as column_name,
+ a.atttypid::regtype::varchar as column_type
FROM pg_class c
JOIN pg_namespace s
ON c.relnamespace = s.oid
@@ -157,13 +167,16 @@ def _get_tables(self, schema):
ON a.attrelid = c.oid
AND a.attnum > 0
AND NOT a.attisdropped
+ JOIN pg_type t
+ ON a.atttypid = t.oid
WHERE c.relkind IN ('m', 'f', 'p')
UNION
SELECT table_schema,
table_name,
- column_name
+ column_name,
+ data_type as column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
@@ -227,6 +240,8 @@ def run_query(self, query, user):
class Redshift(PostgreSQL):
+
+ sample_query = "SELECT * FROM {table} LIMIT 1"
@classmethod
def type(cls):
return "redshift"
@@ -283,6 +298,16 @@ def configuration_schema(cls):
"title": "Query Group for Scheduled Queries",
"default": "default"
},
+ "toggle_table_string": {
+ "type": "string",
+ "title": "Toggle Table String",
+ "default": "_v",
+ "info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
+ },
+ "samples": {
+ "type": "boolean",
+ "title": "Show Data Samples"
+ },
},
"order": ['host', 'port', 'user', 'password', 'dbname', 'sslmode', 'adhoc_query_group', 'scheduled_query_group'],
"required": ["dbname", "user", "password", "host", "port"],
@@ -316,11 +341,12 @@ def _get_tables(self, schema):
SELECT DISTINCT table_name,
table_schema,
column_name,
+ data_type AS column_type,
ordinal_position AS pos
FROM svv_columns
WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema')
)
- SELECT table_name, table_schema, column_name
+ SELECT table_name, table_schema, column_name, column_type
FROM tables
WHERE
HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND
diff --git a/redash/query_runner/presto.py b/redash/query_runner/presto.py
index c144bd9927..2cf22a860a 100644
--- a/redash/query_runner/presto.py
+++ b/redash/query_runner/presto.py
@@ -31,6 +31,9 @@
class Presto(BaseQueryRunner):
noop_query = 'SHOW TABLES'
+ # This takes a 1% random sample from {table}, reducing
+ # the runtime and data scanned for the query
+ sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"
@classmethod
def configuration_schema(cls):
@@ -65,6 +68,10 @@ def configuration_schema(cls):
"default": "_v",
"info": "This string will be used to toggle visibility of tables in the schema browser when editing a query in order to remove non-useful tables from sight."
},
+ 'samples': {
+ 'type': 'boolean',
+ 'title': 'Show Data Samples'
+ },
},
'order': ['host', 'protocol', 'port', 'username', 'password', 'schema', 'catalog'],
'required': ['host']
@@ -81,13 +88,12 @@ def type(cls):
def get_schema(self, get_stats=False):
schema = {}
query = """
- SELECT table_schema, table_name, column_name
+ SELECT table_schema, table_name, column_name, data_type AS column_type
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
results, error = self.run_query(query, None)
-
if error is not None:
raise Exception("Failed getting schema.")
@@ -95,12 +101,14 @@ def get_schema(self, get_stats=False):
for row in results['rows']:
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])
-
if table_name not in schema:
- schema[table_name] = {'name': table_name, 'columns': []}
+ schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []}
schema[table_name]['columns'].append(row['column_name'])
-
+ schema[table_name]['metadata'].append({
+ "name": row['column_name'],
+ "type": row['column_type'],
+ })
return schema.values()
def run_query(self, query, user):
diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py
index 1a7154dfe6..516d5c73f9 100644
--- a/redash/settings/__init__.py
+++ b/redash/settings/__init__.py
@@ -51,8 +51,9 @@
QUERY_RESULTS_CLEANUP_COUNT = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_COUNT", "100"))
QUERY_RESULTS_CLEANUP_MAX_AGE = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_MAX_AGE", "7"))
-SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 30))
+SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 360))
SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "celery")
+SAMPLES_REFRESH_QUEUE = os.environ.get("REDASH_SAMPLES_REFRESH_QUEUE", "celery")
AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key")
INVITATION_TOKEN_MAX_AGE = int(os.environ.get("REDASH_INVITATION_TOKEN_MAX_AGE", 60 * 60 * 24 * 7))
@@ -353,6 +354,15 @@ def email_server_is_configured():
# Enhance schema fetching
SCHEMA_RUN_TABLE_SIZE_CALCULATIONS = parse_boolean(os.environ.get("REDASH_SCHEMA_RUN_TABLE_SIZE_CALCULATIONS", "false"))
+# Frequency of clearing out old schema metadata.
+SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60))
+
+# Frequency of schema samples updates
+SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS", 14))
+
+# Frequency of schema samples refresh when no samples are stored
+SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 2))
+
# kylin
KYLIN_OFFSET = int(os.environ.get('REDASH_KYLIN_OFFSET', 0))
KYLIN_LIMIT = int(os.environ.get('REDASH_KYLIN_LIMIT', 50000))
diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py
index e1bccd8ee2..d710f899f5 100644
--- a/redash/tasks/__init__.py
+++ b/redash/tasks/__init__.py
@@ -1,4 +1,4 @@
from .general import record_event, version_check, send_mail, sync_user_details
-from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query, empty_schedules
+from .queries import QueryTask, refresh_queries, refresh_schemas, refresh_schema, cleanup_query_results, execute_query, update_sample, cleanup_schema_metadata, refresh_samples, empty_schedules
from .alerts import check_alerts_for_query
from .failure_report import notify_of_failure
diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py
index d99700f0d4..1cc9d315f9 100644
--- a/redash/tasks/queries.py
+++ b/redash/tasks/queries.py
@@ -1,15 +1,22 @@
import logging
import signal
import time
+import datetime
+
import redis
+from celery import group
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
+from dateutil import parser
from six import text_type
+from sqlalchemy.orm import load_only
+from sqlalchemy import or_
-from redash import models, redis_connection, settings, statsd_client
+from redash import models, redis_connection, settings, statsd_client, utils
+from redash.models import TableMetadata, ColumnMetadata, db
from redash.models.parameterized_query import InvalidParameterError, QueryDetachedFromDataSourceError
-from redash.query_runner import InterruptException
+from redash.query_runner import InterruptException, NotSupported
from redash.tasks.alerts import check_alerts_for_query
from redash.tasks.failure_report import notify_of_failure
from redash.utils import gen_query_hash, json_dumps, utcnow, mustache_render
@@ -253,13 +260,284 @@ def cleanup_query_results():
logger.info("Deleted %d unused query results.", deleted_count)
-@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60)
+def truncate_long_string(original_str, max_length):
+ # Remove null characters so we can save as string to postgres
+ new_str = original_str.replace('\x00', '')
+
+ if new_str and len(new_str) > max_length:
+ new_str = u'{}...'.format(new_str[:max_length])
+ return new_str
+
+
+@celery.task(name="redash.tasks.update_sample")
+def update_sample(data_source_id, table_name, table_id, sample_updated_at):
+ """
+ For a given table, look up a sample row for it and update
+ the "example" fields for it in the column_metadata table.
+ """
+ logger.info(u"task=update_sample state=start table_name=%s", table_name)
+ start_time = time.time()
+ ds = models.DataSource.get_by_id(data_source_id)
+
+ persisted_columns = ColumnMetadata.query.filter(
+ ColumnMetadata.exists.is_(True),
+ ColumnMetadata.table_id == table_id,
+ ).options(load_only('id', 'name', 'example'))
+
+ DAYS_AGO = (
+ utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_UPDATE_FREQUENCY_DAYS))
+
+ first_column = persisted_columns.first()
+
+ if (first_column and
+ sample_updated_at and
+ first_column.example and
+ parser.parse(sample_updated_at) > DAYS_AGO):
+ # Look at the first example in the persisted columns.
+ # If this is *not* empty AND sample_updated_at is recent, don't update sample
+ logger.info(u"task=update_sample state=abort - recent sample exists table_name=%s",
+ table_name)
+ return
+
+ sample = None
+ try:
+ sample = ds.query_runner.get_table_sample(table_name)
+ except NotSupported:
+ logger.info(u"Unable to fetch samples for {}".format(table_name))
+
+ if not sample:
+ return
+
+ # If a column exists, add a sample to it.
+ column_examples = []
+ for persisted_column in persisted_columns.all():
+ column_example = sample.get(persisted_column.name, None)
+ column_example = column_example if isinstance(
+ column_example, unicode) else str(column_example) # noqa: F821
+ column_example = truncate_long_string(column_example, 4000)
+
+ column_examples.append({
+ "id": persisted_column.id,
+ "example": column_example
+ })
+
+ models.db.session.bulk_update_mappings(
+ ColumnMetadata,
+ column_examples
+ )
+ models.db.session.commit()
+ logger.info(u"task=update_sample state=finished table_name=%s runtime=%.2f",
+ table_name, time.time() - start_time)
+
+
+@celery.task(name="redash.tasks.refresh_samples")
+def refresh_samples(data_source_id, table_sample_limit):
+ """
+ For a given data source, refresh the data samples stored for each
+ table. This is done for tables with no samples or samples older
+ than DAYS_AGO
+ """
+ logger.info(u"task=refresh_samples state=start ds_id=%s", data_source_id)
+ ds = models.DataSource.get_by_id(data_source_id)
+
+ if not ds.query_runner.configuration.get('samples', False):
+ return
+
+ DAYS_AGO = (
+ utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS))
+
+ # Find all existing tables that have an empty or old sample_updated_at
+ tables_to_sample = TableMetadata.query.filter(
+ TableMetadata.exists.is_(True),
+ TableMetadata.data_source_id == data_source_id,
+ or_(
+ TableMetadata.sample_updated_at.is_(None),
+ TableMetadata.sample_updated_at < DAYS_AGO
+ )
+ ).limit(table_sample_limit).all()
+
+ tasks = []
+ for table in tables_to_sample:
+ tasks.append(
+ update_sample.signature(
+ args=(ds.id, table.name, table.id, table.sample_updated_at),
+ queue=settings.SCHEMAS_REFRESH_QUEUE
+ )
+ )
+ table.sample_updated_at = db.func.now()
+ models.db.session.add(table)
+
+ group(tasks).apply_async()
+ models.db.session.commit()
+
+
+def cleanup_data_in_table(table_model):
+ TTL_DAYS_AGO = (
+ utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_METADATA_TTL_DAYS))
+
+ table_model.query.filter(
+ table_model.exists.is_(False),
+ table_model.updated_at < TTL_DAYS_AGO
+ ).delete()
+
+ db.session.commit()
+
+
+@celery.task(name="redash.tasks.cleanup_schema_metadata")
+def cleanup_schema_metadata():
+ cleanup_data_in_table(TableMetadata)
+ cleanup_data_in_table(ColumnMetadata)
+
+
+def insert_or_update_table_metadata(data_source, existing_tables_set, table_data):
+ # Update all persisted tables that exist to reflect this.
+ persisted_tables = TableMetadata.query.filter(
+ TableMetadata.name.in_(existing_tables_set),
+ TableMetadata.data_source_id == data_source.id,
+ )
+ persisted_table_data = []
+ for persisted_table in persisted_tables:
+ # Add IDs to persisted table data so it can be used for updates.
+ table_data[persisted_table.name]['id'] = persisted_table.id
+ persisted_table_data.append(table_data[persisted_table.name])
+
+ models.db.session.bulk_update_mappings(
+ TableMetadata,
+ persisted_table_data
+ )
+
+ # Find the tables that need to be created by subtracting the sets:
+ persisted_table_set = set([col_data['name'] for col_data in persisted_table_data])
+ tables_to_create = existing_tables_set.difference(persisted_table_set)
+
+ table_metadata = [table_data[table_name] for table_name in tables_to_create]
+
+ models.db.session.bulk_insert_mappings(
+ TableMetadata,
+ table_metadata
+ )
+
+
+def insert_or_update_column_metadata(table, existing_columns_set, column_data):
+ persisted_columns = ColumnMetadata.query.filter(
+ ColumnMetadata.name.in_(existing_columns_set),
+ ColumnMetadata.table_id == table.id,
+ ).all()
+
+ persisted_column_data = []
+ for persisted_column in persisted_columns:
+ # Add IDs to persisted column data so it can be used for updates.
+ column_data[persisted_column.name]['id'] = persisted_column.id
+ persisted_column_data.append(column_data[persisted_column.name])
+
+ models.db.session.bulk_update_mappings(
+ ColumnMetadata,
+ persisted_column_data
+ )
+
+ # Find the columns that need to be created by subtracting the sets:
+ persisted_column_set = set([col_data['name'] for col_data in persisted_column_data])
+ columns_to_create = existing_columns_set.difference(persisted_column_set)
+
+ column_metadata = [column_data[col_name] for col_name in columns_to_create]
+
+ models.db.session.bulk_insert_mappings(
+ ColumnMetadata,
+ column_metadata
+ )
+
+
+@celery.task(name="redash.tasks.refresh_schema", time_limit=600, soft_time_limit=300)
def refresh_schema(data_source_id):
ds = models.DataSource.get_by_id(data_source_id)
logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id)
start_time = time.time()
+
+ MAX_TYPE_STRING_LENGTH = 250
try:
- ds.get_schema(refresh=True)
+ schema = ds.query_runner.get_schema(get_stats=True)
+
+ # Stores data from the updated schema that tells us which
+ # columns and which tables currently exist
+ existing_tables_set = set()
+ existing_columns_set = set()
+
+ # Stores data that will be inserted into postgres
+ table_data = {}
+ column_data = {}
+
+ new_column_names = {}
+ new_column_metadata = {}
+ for table in schema:
+ table_name = table['name']
+ existing_tables_set.add(table_name)
+
+ table_data[table_name] = {
+ 'org_id': ds.org_id,
+ 'name': table_name,
+ 'data_source_id': ds.id,
+ 'column_metadata': "metadata" in table,
+ 'exists': True
+ }
+ new_column_names[table_name] = table['columns']
+ new_column_metadata[table_name] = table.get('metadata', None)
+
+ insert_or_update_table_metadata(ds, existing_tables_set, table_data)
+ models.db.session.commit()
+
+ all_existing_persisted_tables = TableMetadata.query.filter(
+ TableMetadata.exists.is_(True),
+ TableMetadata.data_source_id == ds.id,
+ ).all()
+
+ for table in all_existing_persisted_tables:
+ for i, column in enumerate(new_column_names.get(table.name, [])):
+ existing_columns_set.add(column)
+ column_data[column] = {
+ 'org_id': ds.org_id,
+ 'table_id': table.id,
+ 'name': column,
+ 'type': None,
+ 'exists': True
+ }
+
+ if table.column_metadata:
+ column_type = new_column_metadata[table.name][i]['type']
+ column_type = truncate_long_string(column_type, MAX_TYPE_STRING_LENGTH)
+ column_data[column]['type'] = column_type
+
+ insert_or_update_column_metadata(table, existing_columns_set, column_data)
+ models.db.session.commit()
+
+ existing_columns_list = list(existing_columns_set)
+
+ # If a column did not exist, set the 'column_exists' flag to false.
+ ColumnMetadata.query.filter(
+ ColumnMetadata.exists.is_(True),
+ ColumnMetadata.table_id == table.id,
+ ~ColumnMetadata.name.in_(existing_columns_list),
+ ).update({
+ "exists": False,
+ "updated_at": db.func.now()
+ }, synchronize_session='fetch')
+
+ # Clear the set for the next round
+ existing_columns_set.clear()
+
+ # If a table did not exist in the get_schema() response above,
+ # set the 'exists' flag to false.
+ existing_tables_list = list(existing_tables_set)
+ TableMetadata.query.filter(
+ TableMetadata.exists.is_(True),
+ TableMetadata.data_source_id == ds.id,
+ ~TableMetadata.name.in_(existing_tables_list)
+ ).update({
+ "exists": False,
+ "updated_at": db.func.now()
+ }, synchronize_session='fetch')
+
+ models.db.session.commit()
+
logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
statsd_client.incr('refresh_schema.success')
except SoftTimeLimitExceeded:
@@ -276,6 +554,7 @@ def refresh_schemas():
"""
Refreshes the data sources schemas.
"""
+ TABLE_SAMPLE_LIMIT = 50
blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id]
global_start_time = time.time()
@@ -290,6 +569,7 @@ def refresh_schemas():
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id)
else:
refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
+ refresh_samples.apply_async(args=(ds.id, TABLE_SAMPLE_LIMIT), queue=settings.SAMPLES_REFRESH_QUEUE)
logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time)
diff --git a/redash/worker.py b/redash/worker.py
index e960c34fd7..a179ea12bf 100644
--- a/redash/worker.py
+++ b/redash/worker.py
@@ -42,7 +42,11 @@
'send_aggregated_errors': {
'task': 'redash.tasks.send_aggregated_errors',
'schedule': timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
- }
+ },
+ 'cleanup_schema_metadata': {
+ 'task': 'redash.tasks.cleanup_schema_metadata',
+ 'schedule': timedelta(days=3),
+ },
}
if settings.VERSION_CHECK:
diff --git a/tests/factories.py b/tests/factories.py
index 190418d1f0..54fa18a3a4 100644
--- a/tests/factories.py
+++ b/tests/factories.py
@@ -79,6 +79,15 @@ def __call__(self):
data_source=data_source_factory.create,
org_id=1)
+table_metadata_factory = ModelFactory(redash.models.TableMetadata,
+ data_source_id=1,
+ exists=True,
+ name='table')
+
+column_metadata_factory = ModelFactory(redash.models.ColumnMetadata,
+ table_id=1,
+ name='column')
+
query_with_params_factory = ModelFactory(redash.models.Query,
name='New Query with Params',
description='',
@@ -178,6 +187,12 @@ def create_org(self, **kwargs):
return org
+ def create_table_metadata(self, **kwargs):
+ return table_metadata_factory.create(**kwargs)
+
+ def create_column_metadata(self, **kwargs):
+ return column_metadata_factory.create(**kwargs)
+
def create_user(self, **kwargs):
args = {
'org': self.org,
diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py
index 37d4af663b..c7406bf3e9 100644
--- a/tests/models/test_data_sources.py
+++ b/tests/models/test_data_sources.py
@@ -1,5 +1,3 @@
-import mock
-from mock import patch
from tests import BaseTestCase
from redash.models import DataSource, Query, QueryResult
@@ -8,38 +6,39 @@
class DataSourceTest(BaseTestCase):
def test_get_schema(self):
- return_value = [{'name': 'table', 'columns': []}]
-
- with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- schema = self.factory.data_source.get_schema()
-
- self.assertEqual(return_value, schema)
-
- def test_get_schema_uses_cache(self):
- return_value = [{'name': 'table', 'columns': []}]
- with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- self.factory.data_source.get_schema()
- schema = self.factory.data_source.get_schema()
-
- self.assertEqual(return_value, schema)
- self.assertEqual(patched_get_schema.call_count, 1)
-
- def test_get_schema_skips_cache_with_refresh_true(self):
- return_value = [{'name': 'table', 'columns': []}]
- with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema:
- patched_get_schema.return_value = return_value
-
- self.factory.data_source.get_schema()
- new_return_value = [{'name': 'new_table', 'columns': []}]
- patched_get_schema.return_value = new_return_value
- schema = self.factory.data_source.get_schema(refresh=True)
+ data_source = self.factory.create_data_source()
- self.assertEqual(new_return_value, schema)
- self.assertEqual(patched_get_schema.call_count, 2)
+ # Create an existing table with a non-existing column
+ table_metadata = self.factory.create_table_metadata(
+ data_source_id=data_source.id,
+ org_id=data_source.org_id
+ )
+
+ # Create a non-existing table with an existing column
+ table_metadata2 = self.factory.create_table_metadata(
+ data_source_id=data_source.id,
+ org_id=data_source.org_id,
+ name='table_doesnt_exist',
+ exists=False
+ )
+ column_metadata = self.factory.create_column_metadata(
+ table_id=table_metadata2.id,
+ org_id=data_source.org_id,
+ type='boolean',
+ example=True,
+ )
+
+ return_value = [{
+ 'id': table_metadata.id,
+ 'name': 'table',
+ 'hasColumnMetadata': False,
+ 'exists': True,
+ 'columns': [],
+ 'visible': True,
+ 'description': None,
+ }]
+ schema = data_source.get_schema()
+ self.assertEqual(return_value, schema)
class TestDataSourceCreate(BaseTestCase):
@@ -97,10 +96,3 @@ def test_deletes_child_models(self):
data_source.delete()
self.assertIsNone(DataSource.query.get(data_source.id))
self.assertEqual(0, QueryResult.query.filter(QueryResult.data_source == data_source).count())
-
- @patch('redash.redis_connection.delete')
- def test_deletes_schema(self, mock_redis):
- data_source = self.factory.create_data_source()
- data_source.delete()
-
- mock_redis.assert_called_with(data_source._schema_key)
diff --git a/tests/query_runner/test_athena.py b/tests/query_runner/test_athena.py
index fe444de64f..7c7a139a13 100644
--- a/tests/query_runner/test_athena.py
+++ b/tests/query_runner/test_athena.py
@@ -72,7 +72,11 @@ def test_external_table(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
- assert query_runner.get_schema() == [{'columns': ['row_id'], 'name': 'test1.jdbc_table'}]
+ assert query_runner.get_schema() == [{
+ 'columns': ['row_id'],
+ 'name': 'test1.jdbc_table',
+ 'metadata': [{'type': 'int', 'name': 'row_id'}]
+ }]
def test_partitioned_table(self):
"""
@@ -118,7 +122,11 @@ def test_partitioned_table(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
- assert query_runner.get_schema() == [{'columns': ['sk', 'category'], 'name': 'test1.partitioned_table'}]
+ assert query_runner.get_schema() == [{
+ 'columns': ['sk', 'category'],
+ 'name': 'test1.partitioned_table',
+ 'metadata': [{'type': 'int', 'name': 'sk'}, {'type': 'int', 'name': 'category'}]
+ }]
def test_view(self):
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
@@ -150,7 +158,11 @@ def test_view(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
- assert query_runner.get_schema() == [{'columns': ['sk'], 'name': 'test1.view'}]
+ assert query_runner.get_schema() == [{
+ 'columns': ['sk'],
+ 'name': 'test1.view',
+ 'metadata': [{'type': 'int', 'name': 'sk'}]
+ }]
def test_dodgy_table_does_not_break_schema_listing(self):
"""
@@ -187,4 +199,8 @@ def test_dodgy_table_does_not_break_schema_listing(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
- assert query_runner.get_schema() == [{'columns': ['region'], 'name': 'test1.csv'}]
+ assert query_runner.get_schema() == [{
+ 'columns': ['region'],
+ 'name': 'test1.csv',
+ 'metadata': [{'type': 'string', 'name': 'region'}]
+ }]
diff --git a/tests/query_runner/test_bigquery.py b/tests/query_runner/test_bigquery.py
new file mode 100644
index 0000000000..cbfe9322f7
--- /dev/null
+++ b/tests/query_runner/test_bigquery.py
@@ -0,0 +1,76 @@
+from mock import patch
+from tests import BaseTestCase
+
+from redash.query_runner.big_query import BigQuery
+
+
+class TestBigQuery(BaseTestCase):
+
+ def test_get_table_sample_returns_expected_result(self):
+ SAMPLES_RESPONSE = {
+ 'rows': [
+ {'f': [
+ {
+ 'v': '2017-10-28'
+ }, {
+ 'v': '2019-03-28T18:57:04.485091'
+ }, {
+ 'v': '3341'
+ }, {
+ 'v': '2451'
+ }, {
+ 'v': 'Iran'
+ }
+ ]}
+ ]
+ }
+
+ SCHEMA_RESPONSE = {
+ 'id': 'project:dataset.table',
+ 'schema': {
+ 'fields': [{
+ 'type': 'DATE',
+ 'name': 'submission_date',
+ 'mode': 'NULLABLE'
+ }, {
+ 'type': 'DATETIME',
+ 'name': 'generated_time',
+ 'mode': 'NULLABLE'
+ }, {
+ 'type': 'INTEGER',
+ 'name': 'mau',
+ 'mode': 'NULLABLE'
+ }, {
+ 'type': 'INTEGER',
+ 'name': 'wau',
+ 'mode': 'NULLABLE'
+ }, {
+ 'type': 'STRING',
+ 'name': 'country',
+ 'mode': 'NULLABLE'
+ }]
+ }
+ }
+
+ EXPECTED_SAMPLES_DICT = {
+ 'submission_date': '2017-10-28',
+ 'country': 'Iran',
+ 'wau': '2451',
+ 'mau': '3341',
+ 'generated_time': '2019-03-28T18:57:04.485091'
+ }
+
+ with patch.object(BigQuery, '_get_bigquery_service') as get_bq_service:
+ tabledata_list = get_bq_service.return_value.tabledata.return_value.list
+ tabledata_list.return_value.execute.return_value = SAMPLES_RESPONSE
+
+ tables_get = get_bq_service.return_value.tables.return_value.get
+ tables_get.return_value.execute.return_value = SCHEMA_RESPONSE
+
+ query_runner = BigQuery({
+ 'loadSchema': True,
+ 'projectId': 'test_project'
+ })
+ table_sample = query_runner.get_table_sample("dataset.table")
+
+ self.assertEqual(table_sample, EXPECTED_SAMPLES_DICT)
diff --git a/tests/query_runner/test_get_schema_format.py b/tests/query_runner/test_get_schema_format.py
new file mode 100644
index 0000000000..7ebbf6eb56
--- /dev/null
+++ b/tests/query_runner/test_get_schema_format.py
@@ -0,0 +1,77 @@
+import json
+import mock
+
+from unittest import TestCase
+
+from redash.query_runner.presto import Presto
+from redash.query_runner.athena import Athena
+from redash.query_runner.mysql import Mysql
+from redash.query_runner.pg import PostgreSQL, Redshift
+
+class TestBaseQueryRunner(TestCase):
+ def setUp(self):
+ self.query_runners = [{
+ 'instance': Presto({}),
+ 'mock_location': 'presto.Presto'
+ }, {
+ 'instance': Athena({}),
+ 'mock_location': 'athena.Athena'
+ }, {
+ 'instance': Mysql({'db': None}),
+ 'mock_location': 'mysql.Mysql'
+ }, {
+ 'instance': PostgreSQL({}),
+ 'mock_location': 'pg.PostgreSQL'
+ }, {
+ 'instance': Redshift({}),
+ 'mock_location': 'pg.Redshift'
+ }]
+
+ def _setup_mock(self, function_to_patch):
+ patcher = mock.patch(function_to_patch)
+ patched_function = patcher.start()
+ self.addCleanup(patcher.stop)
+ return patched_function
+
+ def assert_correct_schema_format(self, query_runner, mock_location):
+ EXPECTED_SCHEMA_RESULT = [{
+ 'columns': ['created_date'],
+ 'metadata': [{
+ 'name': 'created_date',
+ 'type': 'varchar',
+ }],
+ 'name': 'default.table_name'
+ }]
+
+ get_schema_query_response = {
+ "rows": [{
+ "table_schema": "default",
+ "table_name": "table_name",
+ "column_type": "varchar",
+ "column_name": "created_date"
+ }]
+ }
+ get_samples_query_response = {
+ "rows": [{
+ "created_date": "2017-10-26"
+ }]
+ }
+
+ self.run_count = 0
+ def query_runner_resonses(query, user):
+ response = (json.dumps(get_schema_query_response), None)
+ if self.run_count > 0:
+ response = (json.dumps(get_samples_query_response), None)
+ self.run_count += 1
+ return response
+
+ self.patched_run_query = self._setup_mock(
+ 'redash.query_runner.{location}.run_query'.format(location=mock_location))
+ self.patched_run_query.side_effect = query_runner_resonses
+
+ schema = query_runner.get_schema()
+ self.assertEqual(schema, EXPECTED_SCHEMA_RESULT)
+
+ def test_get_schema_format(self):
+ for runner in self.query_runners:
+ self.assert_correct_schema_format(runner['instance'], runner['mock_location'])
diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py
index d542c3991e..bf6f229ee4 100644
--- a/tests/tasks/test_queries.py
+++ b/tests/tasks/test_queries.py
@@ -3,11 +3,14 @@
import uuid
import mock
+import datetime
from tests import BaseTestCase
-from redash import redis_connection, models
+from redash import redis_connection, models, utils
+from redash.models import TableMetadata
from redash.query_runner.pg import PostgreSQL
-from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query
+from redash.tasks.queries import (QueryExecutionError, enqueue_query,
+ execute_query, cleanup_data_in_table)
FakeResult = namedtuple('FakeResult', 'id')
@@ -124,3 +127,24 @@ def test_success_after_failure(self):
scheduled_query_id=q.id)
q = models.Query.get_by_id(q.id)
self.assertEqual(q.schedule_failures, 0)
+
+
+class TestPruneSchemaMetadata(BaseTestCase):
+
+ def test_cleanup_data_in_table(self):
+ data_source = self.factory.create_data_source()
+
+ # Create an existing table with a non-existing column
+ table_metadata = self.factory.create_table_metadata(
+ data_source_id=data_source.id,
+ org_id=data_source.org_id,
+ exists=False,
+ updated_at=(utils.utcnow() - datetime.timedelta(days=70))
+ )
+ all_tables = TableMetadata.query.all()
+ self.assertEqual(len(all_tables), 1)
+
+ cleanup_data_in_table(TableMetadata)
+
+ all_tables = TableMetadata.query.all()
+ self.assertEqual(len(all_tables), 0)
diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py
index df29f5f207..419f55676b 100644
--- a/tests/tasks/test_refresh_schemas.py
+++ b/tests/tasks/test_refresh_schemas.py
@@ -1,10 +1,54 @@
+import copy
+import datetime
+
from mock import patch
from tests import BaseTestCase
-from redash.tasks import refresh_schemas
+from redash import models, utils
+from redash.tasks import (refresh_schemas, refresh_schema,
+ update_sample, refresh_samples)
+from redash.models import TableMetadata, ColumnMetadata
class TestRefreshSchemas(BaseTestCase):
+ def setUp(self):
+ super(TestRefreshSchemas, self).setUp()
+
+ self.COLUMN_NAME = 'first_column'
+ self.COLUMN_TYPE = 'text'
+ self.COLUMN_EXAMPLE = 'some text for column value'
+ self.EXPECTED_COLUMN_METADATA = {
+ 'id': 1,
+ 'org_id': 1,
+ 'table_id': 1,
+ 'name': self.COLUMN_NAME,
+ 'type': self.COLUMN_TYPE,
+ 'example': self.COLUMN_EXAMPLE,
+ 'description': None,
+ 'exists': True,
+ }
+
+ get_schema_patcher = patch('redash.query_runner.pg.PostgreSQL.get_schema')
+ self.patched_get_schema = get_schema_patcher.start()
+ self.addCleanup(get_schema_patcher.stop)
+ self.default_schema_return_value = [{
+ 'name': 'table',
+ 'columns': [self.COLUMN_NAME],
+ 'metadata': [{
+ 'name': self.COLUMN_NAME,
+ 'type': self.COLUMN_TYPE,
+ }]
+ }]
+ self.patched_get_schema.return_value = self.default_schema_return_value
+
+ get_table_sample_patcher = patch('redash.query_runner.BaseQueryRunner.get_table_sample')
+ self.patched_get_table_sample = get_table_sample_patcher.start()
+ self.addCleanup(get_table_sample_patcher.stop)
+ self.patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE}
+
+ def tearDown(self):
+ self.factory.data_source.query_runner.configuration['samples'] = False
+
def test_calls_refresh_of_all_data_sources(self):
self.factory.data_source # trigger creation
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
@@ -23,3 +67,303 @@ def test_skips_paused_data_sources(self):
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
refresh_schemas()
refresh_job.assert_called()
+
+
+ def test_refresh_schema_creates_tables(self):
+ EXPECTED_TABLE_METADATA = {
+ 'id': 1,
+ 'org_id': 1,
+ 'exists': True,
+ 'name': 'table',
+ 'visible': True,
+ 'sample_query': None,
+ 'description': None,
+ 'column_metadata': True,
+ 'data_source_id': 1,
+ 'sample_updated_at': None,
+ }
+
+ refresh_schema(self.factory.data_source.id)
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ "2019-05-09T17:07:52.386910Z"
+ )
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertEqual(table_metadata[0].to_dict(), EXPECTED_TABLE_METADATA)
+ self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA)
+
+ def test_refresh_schema_deleted_table_marked(self):
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertTrue(table_metadata[0].to_dict()['exists'])
+
+ # Table is gone, `exists` should be False.
+ self.patched_get_schema.return_value = []
+
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertFalse(table_metadata[0].to_dict()['exists'])
+
+ # Table is back, `exists` should be True again.
+ self.patched_get_schema.return_value = self.default_schema_return_value
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ self.assertTrue(table_metadata[0].to_dict()['exists'])
+
+ def test_refresh_schema_table_with_new_metadata_updated(self):
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertTrue(table_metadata[0].to_dict()['column_metadata'])
+
+ # Table has no metdata field, `column_metadata` should be False.
+ self.patched_get_schema.return_value = [{
+ 'name': 'table',
+ 'columns': [self.COLUMN_NAME],
+ }]
+
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertEqual(len(table_metadata), 1)
+ self.assertEqual(len(column_metadata), 1)
+ self.assertFalse(table_metadata[0].to_dict()['column_metadata'])
+
+ # Table metadata field is back, `column_metadata` should be True again.
+ self.patched_get_schema.return_value = self.default_schema_return_value
+ refresh_schema(self.factory.data_source.id)
+ table_metadata = TableMetadata.query.all()
+ self.assertTrue(table_metadata[0].to_dict()['column_metadata'])
+
+ def test_refresh_schema_delete_column(self):
+ NEW_COLUMN_NAME = 'new_column'
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+
+ self.assertTrue(column_metadata[0].to_dict()['exists'])
+
+ self.patched_get_schema.return_value = [{
+ 'name': 'table',
+ 'columns': [NEW_COLUMN_NAME],
+ 'metadata': [{
+ 'name': NEW_COLUMN_NAME,
+ 'type': self.COLUMN_TYPE,
+ }]
+ }]
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+ self.assertEqual(len(column_metadata), 2)
+
+ self.assertFalse(column_metadata[1].to_dict()['exists'])
+ self.assertTrue(column_metadata[0].to_dict()['exists'])
+
+ def test_refresh_schema_update_column(self):
+ UPDATED_COLUMN_TYPE = 'varchar'
+
+ refresh_schema(self.factory.data_source.id)
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ "2019-05-09T17:07:52.386910Z"
+ )
+ column_metadata = ColumnMetadata.query.all()
+ self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA)
+
+ updated_schema = copy.deepcopy(self.default_schema_return_value)
+ updated_schema[0]['metadata'][0]['type'] = UPDATED_COLUMN_TYPE
+ self.patched_get_schema.return_value = updated_schema
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.all()
+ self.assertNotEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA)
+ self.assertEqual(column_metadata[0].to_dict()['type'], UPDATED_COLUMN_TYPE)
+
+ def test_refresh_samples_rate_limits(self):
+ NEW_COLUMN_NAME = 'new_column'
+ NUM_TABLES = 105
+ tables = []
+
+ for i in range(NUM_TABLES):
+ tables.append({
+ 'name': 'table{}'.format(i),
+ 'columns': [NEW_COLUMN_NAME],
+ 'metadata': [{
+ 'name': NEW_COLUMN_NAME,
+ 'type': self.COLUMN_TYPE,
+ }]
+ })
+
+ self.patched_get_schema.return_value = tables
+ self.factory.data_source.query_runner.configuration['samples'] = True
+
+ refresh_schema(self.factory.data_source.id)
+ refresh_samples(self.factory.data_source.id, 50)
+
+ # There's a total of 105 tables
+ table_metadata = TableMetadata.query.count()
+ self.assertEqual(table_metadata, NUM_TABLES)
+
+ # 50 tables are processed on the first call
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 55)
+
+ # 50 more tables are processed on the second call
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 5)
+
+ # All tables are processed by the third call
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.is_(None)
+ ).all()
+ self.assertEqual(len(table_metadata), 0)
+
+ def test_refresh_samples_refreshes(self):
+ NEW_COLUMN_NAME = 'new_column'
+ NUM_TABLES = 5
+ TIME_BEFORE_UPDATE = utils.utcnow()
+ tables = []
+
+ for i in range(NUM_TABLES):
+ tables.append({
+ 'name': 'table{}'.format(i),
+ 'columns': [NEW_COLUMN_NAME],
+ 'metadata': [{
+ 'name': NEW_COLUMN_NAME,
+ 'type': self.COLUMN_TYPE,
+ }]
+ })
+
+ self.patched_get_schema.return_value = tables
+ self.factory.data_source.query_runner.configuration['samples'] = True
+
+ refresh_schema(self.factory.data_source.id)
+ refresh_samples(self.factory.data_source.id, 50)
+
+ # There's a total of 5 processed tables
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertEqual(table_metadata.count(), NUM_TABLES)
+ self.assertTrue(table_metadata.first().sample_updated_at > TIME_BEFORE_UPDATE)
+
+ table_metadata.update({
+ 'sample_updated_at': utils.utcnow() - datetime.timedelta(days=30)
+ })
+ models.db.session.commit()
+
+ TIME_BEFORE_UPDATE = utils.utcnow()
+ refresh_samples(self.factory.data_source.id, 50)
+ table_metadata_list = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertTrue(table_metadata_list.first().sample_updated_at > TIME_BEFORE_UPDATE)
+
+ def test_refresh_schema_doesnt_overwrite_samples(self):
+ self.factory.data_source.query_runner.configuration['samples'] = True
+
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, None)
+
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ "2019-05-09T17:07:52.386910Z"
+ )
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ # Check that a schema refresh doesn't overwrite examples
+ refresh_schema(self.factory.data_source.id)
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ def test_refresh_samples_applied_to_one_data_source(self):
+ ds1 = self.factory.create_data_source()
+ ds2 = self.factory.create_data_source()
+
+ ds1.query_runner.configuration['samples'] = True
+ ds2.query_runner.configuration['samples'] = True
+
+ refresh_schema(ds1.id)
+ refresh_schema(ds2.id)
+ refresh_samples(ds1.id, 50)
+
+ table_metadata = TableMetadata.query.filter(
+ TableMetadata.sample_updated_at.isnot(None)
+ )
+ self.assertEqual(table_metadata.count(), len(self.default_schema_return_value))
+
+ def test_recent_empty_sample_refreshs(self):
+ self.factory.data_source.query_runner.configuration['samples'] = True
+ refresh_schema(self.factory.data_source.id)
+
+ # Confirm no sample exists
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, None)
+
+ LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5)
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ LAST_UPDATE.isoformat()
+ )
+
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ def test_recent_non_empty_sample_doesnt_refresh(self):
+ self.factory.data_source.query_runner.configuration['samples'] = True
+ refresh_schema(self.factory.data_source.id)
+
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ None
+ )
+
+ # Confirm a sample was added
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
+
+ self.patched_get_table_sample.return_value = {self.COLUMN_NAME: "a new example"}
+ LAST_UPDATE = utils.utcnow() - datetime.timedelta(days=5)
+ update_sample(
+ self.factory.data_source.id,
+ 'table',
+ 1,
+ LAST_UPDATE.isoformat()
+ )
+
+ # The sample doesn't take on the new value that is returned.
+ column_metadata = ColumnMetadata.query.first()
+ self.assertEqual(column_metadata.example, self.COLUMN_EXAMPLE)
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 0546272cda..3c927ba5ae 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -3,10 +3,11 @@
from click.testing import CliRunner
from tests import BaseTestCase
+from redash import utils
from redash.utils.configuration import ConfigurationContainer
from redash.query_runner import query_runners
from redash.cli import manager
-from redash.models import DataSource, Group, Organization, User, db
+from redash.models import TableMetadata, DataSource, Group, Organization, User, db
class DataSourceCommandTests(BaseTestCase):
@@ -16,7 +17,7 @@ def test_interactive_new(self):
result = runner.invoke(
manager,
['ds', 'new'],
- input="test\n%s\n\n\n\n\nexample.com\n\n\ntestdb\n" % (pg_i,))
+ input="test\n%s\ntestdb\n" % (pg_i,))
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(DataSource.query.count(), 1)
@@ -139,6 +140,16 @@ def test_connection_bad_delete(self):
self.assertIn("Couldn't find", result.output)
self.assertEqual(DataSource.query.count(), 1)
+ def test_refresh_samples(self):
+ ds = self.factory.create_data_source(
+ name='test1', type='sqlite',
+ options=ConfigurationContainer({"dbpath": "/tmp/test.db"}))
+ runner = CliRunner()
+ result = runner.invoke(manager, ['ds', 'refresh_data_samples', 'test1'])
+ self.assertFalse(result.exception)
+ self.assertEqual(result.exit_code, 0)
+ self.assertIn('Refreshing', result.output)
+
def test_options_edit(self):
self.factory.create_data_source(
name='test1', type='sqlite',
diff --git a/tests/test_models.py b/tests/test_models.py
index 47107761d5..373f3a0df9 100644
--- a/tests/test_models.py
+++ b/tests/test_models.py
@@ -247,7 +247,10 @@ def test_failure_extends_schedule(self):
Execution failures recorded for a query result in exponential backoff
for scheduling future execution.
"""
- query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None}, schedule_failures=4)
+
+ query = self.factory.create_query(schedule={'interval':'60', 'until':None, 'time': None, 'day_of_week':None})
+ # can't be set in create_query due to gen_query_hash
+ query.schedule_failures = 4
retrieved_at = utcnow() - datetime.timedelta(minutes=16)
query_result = self.factory.create_query_result(
retrieved_at=retrieved_at, query_text=query.query_text,