Skip to content

Commit

Permalink
Merge pull request #2513 from kyoshidajp/support_location_in_bq_dataset
Browse files Browse the repository at this point in the history
Add location property to BigQuery data source settings
  • Loading branch information
arikfr authored Jul 31, 2018
2 parents 85230e3 + 5ca59a5 commit c6f75dd
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions redash/query_runner/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ def _load_key(filename):
f.close()


def _get_query_results(jobs, project_id, job_id, start_index):
query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute()
def _get_query_results(jobs, project_id, location, job_id, start_index):
query_reply = jobs.getQueryResults(projectId=project_id,
location=location,
jobId=job_id,
startIndex=start_index).execute()
logging.debug('query_reply %s', query_reply)
if not query_reply['jobComplete']:
time.sleep(10)
return _get_query_results(jobs, project_id, job_id, start_index)
return _get_query_results(jobs, project_id, location, job_id, start_index)

return query_reply

Expand Down Expand Up @@ -110,6 +113,11 @@ def configuration_schema(cls):
"type": "boolean",
'title': "Use Standard SQL (Beta)",
},
'location': {
"type": "string",
"title": "Processing Location",
"default": "US",
},
'loadSchema': {
"type": "boolean",
"title": "Load Schema"
Expand All @@ -120,7 +128,7 @@ def configuration_schema(cls):
}
},
'required': ['jsonKeyFile', 'projectId'],
"order": ['projectId', 'jsonKeyFile', 'loadSchema', 'useStandardSql', 'totalMBytesProcessedLimit', 'maximumBillingTier', 'userDefinedFunctionResourceUri'],
"order": ['projectId', 'jsonKeyFile', 'loadSchema', 'useStandardSql', 'location', 'totalMBytesProcessedLimit', 'maximumBillingTier', 'userDefinedFunctionResourceUri'],
'secret': ['jsonKeyFile']
}

Expand Down Expand Up @@ -148,10 +156,14 @@ def _get_bigquery_service(self):
def _get_project_id(self):
return self.configuration["projectId"]

def _get_location(self):
return self.configuration.get("location", "US")

def _get_total_bytes_processed(self, jobs, query):
job_data = {
"query": query,
"dryRun": True,
'location': self._get_location()
}

if self.configuration.get('useStandardSql', False):
Expand All @@ -169,6 +181,9 @@ def _get_query_result(self, jobs, query):
}
}
}
job_data['jobReference'] = {
'location': self._get_location()
}

if self.configuration.get('useStandardSql', False):
job_data['configuration']['query']['useLegacySql'] = False
Expand All @@ -183,7 +198,7 @@ def _get_query_result(self, jobs, query):

insert_response = jobs.insert(projectId=project_id, body=job_data).execute()
current_row = 0
query_reply = _get_query_results(jobs, project_id=project_id,
query_reply = _get_query_results(jobs, project_id=project_id, location=self._get_location(),
job_id=insert_response['jobReference']['jobId'], start_index=current_row)

logger.debug("bigquery replied: %s", query_reply)
Expand All @@ -195,7 +210,9 @@ def _get_query_result(self, jobs, query):
rows.append(transform_row(row, query_reply["schema"]["fields"]))

current_row += len(query_reply['rows'])
query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'],
query_reply = jobs.getQueryResults(projectId=project_id,
location=self._get_location(),
jobId=query_reply['jobReference']['jobId'],
startIndex=current_row).execute()

columns = [{'name': f["name"],
Expand Down Expand Up @@ -320,6 +337,11 @@ def configuration_schema(cls):
"type": "boolean",
'title': "Use Standard SQL (Beta)",
},
'location': {
"type": "string",
"title": "Processing Location",
"default": "US",
},
'loadSchema': {
"type": "boolean",
"title": "Load Schema"
Expand Down

0 comments on commit c6f75dd

Please sign in to comment.