-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring table creation in Big Query Pipeline #343
base: main
Are you sure you want to change the base?
Refactoring table creation in Big Query Pipeline #343
Conversation
6089a99
to
e3ca3b1
Compare
@@ -130,6 +131,12 @@ def validate_arguments(cls, known_args: argparse.Namespace, pipeline_args: t.Lis | |||
pipeline_options = PipelineOptions(pipeline_args) | |||
pipeline_options_dict = pipeline_options.get_all_options() | |||
|
|||
if known_args.output_table: | |||
# checking if the output table is in format (<project>.<dataset>.<table>). | |||
output_table_pattern = r'^[\w-]+\.[\w-]+\.[\w-]+$' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the []
s necessary? Can it just be? r'^\w+\.\w+\.\w+$'
(Please test my hand-written regex :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
project_ids can contain hypens
https://cloud.google.com/resource-manager/docs/creating-managing-projects
\w+ doesn't match when word is like my-project
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that makes sense! Then, would it be [\w\-]+
? IIRC, []
and -
together are often used to express a range.
weather_mv/loader_pipeline/bq.py
Outdated
"""Initializes Sink by creating a BigQuery table based on user input.""" | ||
"""Initializes BigQuery table based on user input.""" | ||
self.project, self.dataset_id, self.table_id = self.output_table.split('.') | ||
self.table = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space.
table=self.table.table_id, | ||
project=self.project, | ||
dataset=self.dataset_id, | ||
table=self.table_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we wanted to create the BQ table in the pipeline, a simpler solution would be to change this step's disposition: https://beam.apache.org/documentation/io/built-in/google-bigquery/#create-disposition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, we may want to keep your step since we create an opinionated schema...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alxmrs
i tried putting the create table as a transform in the pipeline but faced many issues
- I tried the
sample
transform as you suggested but there is no way to ensure table creation happens before the usual pipeline flow. (apache_beam python sdk doesn't have support forWait.on
which could have made this possible) - I also tried stateful processing but as we are windowing out pub sub reads, and a state is discarded when the window is expired, the solution didn't work.
I think there is potential in using the create-disposition
flag in WriteToBigQuery
. Can you please elaborate on what do you mean by opinionated schema
cc: @mahrsee1997
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that you mention it, I agree that using the create disposition is easiest! We need to make sure that we pass in our computed schema
(say, from the init
) into this transform instead of having it automatically make the schema. Thinking it over now, that's what my concern was about: I wasn't sure if we'd get the schema we wanted if it computed it automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was surfing web for some other issue & found this. It might be helpful.
https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#schemas
@@ -243,6 +261,7 @@ def expand(self, paths): | |||
"""Extract rows of variables from data paths into a BigQuery table.""" | |||
extracted_rows = ( | |||
paths | |||
| 'CreateTable' >> beam.Map(self.create_bq_table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concern: I am pretty sure this will try to create a BQ table for every element of paths
. Remember, self.tables
won't refer to the state of the class, since global state is not really a think for parallel steps like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using something like sample
: https://beam.apache.org/documentation/transforms/python/aggregation/sample/
weather-mv
fixes: #311
As mentioned in this pr #250 (comment) skipping file prefix validation does not work in case of big query as it was dependent on first_uri to infer schema for big query.
Fixed this issue by creating a pipeline stage that infers the schema and creates the bigquery table using the first uri. This stage is skipped for all subsequent uris.