Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring table creation in Big Query Pipeline #343

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

aniketsinghrawat
Copy link
Contributor

fixes: #311
As mentioned in this pr #250 (comment) skipping file prefix validation does not work in case of big query as it was dependent on first_uri to infer schema for big query.

Fixed this issue by creating a pipeline stage that infers the schema and creates the bigquery table using the first uri. This stage is skipped for all subsequent uris.

@aniketsinghrawat aniketsinghrawat marked this pull request as draft June 8, 2023 08:59
@aniketsinghrawat aniketsinghrawat marked this pull request as ready for review June 9, 2023 07:18
@mahrsee1997 mahrsee1997 requested a review from alxmrs June 12, 2023 07:47
@@ -130,6 +131,12 @@ def validate_arguments(cls, known_args: argparse.Namespace, pipeline_args: t.Lis
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options_dict = pipeline_options.get_all_options()

if known_args.output_table:
# checking if the output table is in format (<project>.<dataset>.<table>).
output_table_pattern = r'^[\w-]+\.[\w-]+\.[\w-]+$'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the []s necessary? Can it just be? r'^\w+\.\w+\.\w+$' (Please test my hand-written regex :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

project_ids can contain hypens https://cloud.google.com/resource-manager/docs/creating-managing-projects
\w+ doesn't match when word is like my-project.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense! Then, would it be [\w\-]+? IIRC, [] and - together are often used to express a range.

"""Initializes Sink by creating a BigQuery table based on user input."""
"""Initializes BigQuery table based on user input."""
self.project, self.dataset_id, self.table_id = self.output_table.split('.')
self.table = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space.

table=self.table.table_id,
project=self.project,
dataset=self.dataset_id,
table=self.table_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to create the BQ table in the pipeline, a simpler solution would be to change this step's disposition: https://beam.apache.org/documentation/io/built-in/google-bigquery/#create-disposition

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, we may want to keep your step since we create an opinionated schema...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alxmrs
i tried putting the create table as a transform in the pipeline but faced many issues

  • I tried the sample transform as you suggested but there is no way to ensure table creation happens before the usual pipeline flow. (apache_beam python sdk doesn't have support for Wait.on which could have made this possible)
  • I also tried stateful processing but as we are windowing out pub sub reads, and a state is discarded when the window is expired, the solution didn't work.

I think there is potential in using the create-disposition flag in WriteToBigQuery. Can you please elaborate on what do you mean by opinionated schema

cc: @mahrsee1997

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you mention it, I agree that using the create disposition is easiest! We need to make sure that we pass in our computed schema (say, from the init) into this transform instead of having it automatically make the schema. Thinking it over now, that's what my concern was about: I wasn't sure if we'd get the schema we wanted if it computed it automatically.

Copy link
Collaborator

@mahrsee1997 mahrsee1997 Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was surfing web for some other issue & found this. It might be helpful.

https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.bigquery.html#schemas

weather_mv/loader_pipeline/bq.py Show resolved Hide resolved
@@ -243,6 +261,7 @@ def expand(self, paths):
"""Extract rows of variables from data paths into a BigQuery table."""
extracted_rows = (
paths
| 'CreateTable' >> beam.Map(self.create_bq_table)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concern: I am pretty sure this will try to create a BQ table for every element of paths. Remember, self.tables won't refer to the state of the class, since global state is not really a think for parallel steps like this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aniketsinghrawat aniketsinghrawat changed the title Streaming in weather-mv Refactoring table creation in Big Query Pipeline Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

When streaming jobs with weather-mv, should not require URIs exist before launching the streaming
3 participants