diff --git a/redash/query_runner/big_query.py b/redash/query_runner/big_query.py index f765978585..61e1b38acd 100644 --- a/redash/query_runner/big_query.py +++ b/redash/query_runner/big_query.py @@ -68,12 +68,15 @@ def _load_key(filename): f.close() -def _get_query_results(jobs, project_id, job_id, start_index): - query_reply = jobs.getQueryResults(projectId=project_id, jobId=job_id, startIndex=start_index).execute() +def _get_query_results(jobs, project_id, location, job_id, start_index): + query_reply = jobs.getQueryResults(projectId=project_id, + location=location, + jobId=job_id, + startIndex=start_index).execute() logging.debug('query_reply %s', query_reply) if not query_reply['jobComplete']: time.sleep(10) - return _get_query_results(jobs, project_id, job_id, start_index) + return _get_query_results(jobs, project_id, location, job_id, start_index) return query_reply @@ -110,6 +113,11 @@ def configuration_schema(cls): "type": "boolean", 'title': "Use Standard SQL (Beta)", }, + 'location': { + "type": "string", + "title": "Processing Location", + "default": "US", + }, 'loadSchema': { "type": "boolean", "title": "Load Schema" @@ -120,7 +128,7 @@ def configuration_schema(cls): } }, 'required': ['jsonKeyFile', 'projectId'], - "order": ['projectId', 'jsonKeyFile', 'loadSchema', 'useStandardSql', 'totalMBytesProcessedLimit', 'maximumBillingTier', 'userDefinedFunctionResourceUri'], + "order": ['projectId', 'jsonKeyFile', 'loadSchema', 'useStandardSql', 'location', 'totalMBytesProcessedLimit', 'maximumBillingTier', 'userDefinedFunctionResourceUri'], 'secret': ['jsonKeyFile'] } @@ -148,10 +156,14 @@ def _get_bigquery_service(self): def _get_project_id(self): return self.configuration["projectId"] + def _get_location(self): + return self.configuration.get("location", "US") + def _get_total_bytes_processed(self, jobs, query): job_data = { "query": query, "dryRun": True, + 'location': self._get_location() } if self.configuration.get('useStandardSql', False): @@ -169,6 +181,9 @@ def _get_query_result(self, jobs, query): } } } + job_data['jobReference'] = { + 'location': self._get_location() + } if self.configuration.get('useStandardSql', False): job_data['configuration']['query']['useLegacySql'] = False @@ -183,7 +198,7 @@ def _get_query_result(self, jobs, query): insert_response = jobs.insert(projectId=project_id, body=job_data).execute() current_row = 0 - query_reply = _get_query_results(jobs, project_id=project_id, + query_reply = _get_query_results(jobs, project_id=project_id, location=self._get_location(), job_id=insert_response['jobReference']['jobId'], start_index=current_row) logger.debug("bigquery replied: %s", query_reply) @@ -195,7 +210,9 @@ def _get_query_result(self, jobs, query): rows.append(transform_row(row, query_reply["schema"]["fields"])) current_row += len(query_reply['rows']) - query_reply = jobs.getQueryResults(projectId=project_id, jobId=query_reply['jobReference']['jobId'], + query_reply = jobs.getQueryResults(projectId=project_id, + location=self._get_location(), + jobId=query_reply['jobReference']['jobId'], startIndex=current_row).execute() columns = [{'name': f["name"], @@ -299,6 +316,11 @@ def configuration_schema(cls): "type": "boolean", 'title': "Use Standard SQL (Beta)", }, + 'location': { + "type": "string", + "title": "Processing Location", + "default": "US", + }, 'loadSchema': { "type": "boolean", "title": "Load Schema"