Skip to content

Commit

Permalink
Clarifications for bigquery connector (#1492)
Browse files Browse the repository at this point in the history
* some clarifications for bigquery connector

* rephrased API comment

* added referenced to dataset.cache() API

* fixed typos in the doc
  • Loading branch information
vlasenkoalexey authored Aug 5, 2021
1 parent 00512ab commit 9c2c9d6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
38 changes: 33 additions & 5 deletions tensorflow_io/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ Depending on environment you are using some prerequisites might be already met.
If you choose to use [service account](https://cloud.google.com/docs/authentication/production)
authentication, please make sure that GOOGLE_APPLICATION_CREDENTIALS
environment variable is initialized with a path pointing to JSON file that
contains your service account key.
contains your service account key. If you plan to run BigQuery reader from a Google Cloud VM,
you can just use https://www.googleapis.com/auth/bigquery [scope](https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances#using) to make sure that
VM has access to BigQuery instead.
4. [Enable BigQuery Storage API.](https://cloud.google.com/bigquery/docs/reference/storage/#enabling_the_api)
5. If you see some errors related to roots.pem file in logs, you can solve it via either of the following approaches:

Expand Down Expand Up @@ -84,9 +86,15 @@ def main():
row_restriction="num_characters > 1000",
data_format=BigQueryClient.DataFormat.AVRO)
dataset = read_session.parallel_read_rows()
dataset = dataset.batch(10)
dataset = dataset.prefetch(10)
# You can optionally cache dataset, so data is loaded from BigQuery only once
# during first iteration. For subsequent iterations data is going to be loaded
# from cache. This is useful when doing model training for multiple epochs.
dataset = dataset.cache()

row_index = 0
for row in dataset.prefetch(10):
for row in dataset:
print("row %d: %s" % (row_index, row))
row_index += 1

Expand All @@ -95,8 +103,28 @@ if __name__ == '__main__':

```

It also supports reading BigQuery column with repeated mode (each field contains array of values with primitive type: Integer, Float, Boolean, String, but RECORD is not supported). In this case, selected_fields needs be a dictionary in a
form like
In some cases when row is too wide (has 30+ columns) or if you are doing batching with large batch sizes it might be beneficial to do batching before interleave to get a better performance. Here is an example showing how to do that:

```python
def read_rows(stream):
dataset = read_session.read_rows(stream)
dataset = dataset.batch(batch_size)
return dataset

client = BigQueryClient()
read_session = client.read_session(requested_streams=10, ...)
streams = read_session.get_streams()
streams_count=len(streams)
streams_ds = tf.data.Dataset.from_tensor_slices(streams)
dataset = streams_ds.interleave(
read_rows,
cycle_length=streams_count,
num_parallel_calls=streams_count,
deterministic=False)
...
```

Connector also supports reading BigQuery column with repeated mode (each field contains array of values with primitive type: Integer, Float, Boolean, String, but RECORD is not supported). In this case, selected_fields needs be a dictionary in a form like this:

```python
{ "field_a_name": {"mode": BigQueryClient.FieldMode.REPEATED, output_type: dtypes.int64},
Expand All @@ -106,7 +134,7 @@ form like
}
```
"mode" is BigQuery column attribute concept, it can be 'repeated', 'nullable' or 'required' (enum BigQueryClient.FieldMode.REPEATED, NULLABLE, REQUIRED).The output field order is unrelated to the order of fields in
selected_fields. If "mode" not specified, defaults to "nullable". If "output_type" not specified, DT_STRING is implied for all Tensors.
selected_fields. If "mode" not specified, defaults to "nullable". If "output_type" not specified, DT_STRING is implied for all Tensors.

'repeated' is currently only supported when data_format = BigQueryClient.DataFormat.AVRO (which is default).

Expand Down
11 changes: 7 additions & 4 deletions tensorflow_io/python/ops/bigquery_dataset_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ def read_session(
If not specified, DT_STRING is implied for all Tensors.
row_restriction: Optional. SQL text filtering statement, similar to a
WHERE clause in a query.
requested_streams: Initial number of streams. If unset or 0, we will
provide a value of streams so as to produce reasonable throughput.
Must be non-negative. The number of streams may be lower than the
requested number, depending on the amount parallelism that is reasonable
requested_streams: Desirable number of streams that can be read in parallel.
Must be a positive number. The actual number of streams that
BigQuery Streaming API returns may be lower than this number,
depending on the amount parallelism that is reasonable
for the table and the maximum amount of parallelism allowed by the
system.
Expand Down Expand Up @@ -131,6 +131,9 @@ def read_session(
if not dataset_id:
raise ValueError("`dataset_id` must be a set")

if requested_streams is None or requested_streams <= 0:
raise ValueError("`requested_streams` must be a positive number")

if isinstance(selected_fields, list):
if not isinstance(output_types, list):
raise ValueError(
Expand Down

0 comments on commit 9c2c9d6

Please sign in to comment.