Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ingestion time partition table on BigQuery as incremental materialization #75

Closed
github-christophe-oudar opened this issue Nov 28, 2021 · 6 comments · Fixed by #136
Labels
help_wanted Extra attention is needed type:enhancement New feature or request

Comments

@github-christophe-oudar
Copy link
Contributor

Describe the feature

This is a follow-up from a discussion over the topic with @jtcohen6.

The issues with current incremental materialization implementation

Ingestion time partition tables are supported as table materialization right now but the support is meant to be deprecated.
The specificity for those tables is that partitioning field is _PARTITIONTIME OR _PARTITIONDATE as a "pseudo column".
Since that column doesn't really exist as a column within the table, Google doesn't allow the same kind of operations on the table.
Indeed if you would like to use

partition_by={
 "field": "_PARTITIONTIME",
 "data_type": "timestamp"
}

BigQuery doesn't let you create a ingestion time partitioned table using usual dbt approach (https://cloud.google.com/bigquery/docs/creating-partitioned-tables#create_an_ingestion-time_partitioned_table):

CREATE TABLE
  mydataset.newtable (transaction_id INT64)
PARTITION BY
  DATE_TRUNC(_PARTITIONTIME, DAY)
AS (
SELECT TIMESTAMP("2021-11-01") as _PARTITIONTIME, 1 as transaction_id
)

will fail as _PARTITIONTIME is not directly in the defined columns.


Working approach

So the required approach is to have first:

CREATE TABLE
  mydataset.newtable (transaction_id INT64)
PARTITION BY
  DATE_TRUNC(_PARTITIONTIME, DAY)

And then:

INSERT INTO mydataset.newtable (_PARTITIONTIME, transaction_id)
SELECT TIMESTAMP("2021-11-01"), 1 

Once we move to the merge part, it's indeed possible to insert data with

merge into mydataset.newtable as DBT_INTERNAL_DEST
    using (
    SELECT TIMESTAMP("2021-11-03") as _PARTITIONTIME, 3 as transaction_id
    ) as DBT_INTERNAL_SOURCE
    on FALSE

when not matched by source
        and DBT_INTERNAL_DEST._PARTITIONTIME in unnest([TIMESTAMP("2021-11-03")])
    then delete

when not matched then insert
    (`_PARTITIONTIME`, `transaction_id`)
values
    (`_PARTITIONTIME`, `transaction_id`)

Practically, it won't be 100% straightforward as the it requires to use
SELECT _PARTITIONTIME, * FROM mydataset.newtable__dbt_tmp as _PARTITIONTIME is not directly a column and therefore not in the SELECT * but else it appears to work.

Practically, I think everything can be done within incremental.sql

Describe alternatives you've considered

Alternatives are:

  • To work with column type ingestion tables with a tradeoff on performances
  • working with time ingestion partitioned tables and partition decorator is more efficient but also trickier to implement (I'll make another issue later on that topic)

Additional context

At Teads, we use a lot of time ingestion partitioned tables as they were created prior to column type partitioned table feature on BigQuery. Migrating would be an option but the overhead introduced for selecting data from column type partitioned table is "a drag" to make that move.

Who will this benefit?

It would benefit anyone using time ingestion partition table.

Are you interested in contributing this feature?

Yes

@jtcohen6
Copy link
Contributor

@github-christophe-oudar So sorry again for the delayed response here!

On the fourth or fifth read through, the proposal finally makes sense to me. I'm excited that you've identified a pure-SQL way to handle the creation of the ingestion-time-partitioned table (create table + insert).

One specific part just clicked for me: How does dbt "sort" each partition of data into its appropriate bucket, during the first / full-refresh table creation? The _partitiontime is going to be derived from the actual underlying data returned by the model SQL. In the model, the user specifies something like:

{{ config(
    materialized = 'incremental',
    partition_by={
      "field": "_PARTITIONTIME",
      "data_type": "timestamp"
    }
) }}

select
    transaction_date as _partitiontime,
    transaction_id,
    transaction_date
from {{ source('order_system', 'transactions') }}

Then dbt performs the initial backfill (as well as subsequent merges) using that "derived" _partitiontime, to land the data in the appropriate partition:

insert into mydataset.newtable (_partitiontime, transaction_id, transaction_date)
select
    transaction_date as _partitiontime,
    transaction_id,
    transaction_date
from source_order_system.transactions

And then, amazingly, this works:

#legacySQL
select * from dbt_jcohen.my_incremental_model$20210101

So, what's tricky?

  • If the user specifies _PARTITIONTIME, we'll need to identify that, and to split usual create table as into create table + insert
  • We'll also need the column schema for use in that create table — not a huge deal, we have ways to get that (via temp table or limit 0 where false query)

So, I'm happy with the proposal outlined here—and excited for what we can do performance-wise, in concert with the proposal in #77! I know it's my fault for getting back to you a few months late, and I know you've got other work on your plate these days. If another community member is interested and able to contribute this, we'd be happy to support you.

@jtcohen6 jtcohen6 added good_first_issue Good for newcomers and removed triage:product labels Jan 25, 2022
@github-christophe-oudar
Copy link
Contributor Author

github-christophe-oudar commented Feb 21, 2022

@jtcohen6 I had a look at it last night but it's actually not so straightforward:

  • I tried to create the table without any schema (just the partitioning) through BQ CLI
   bq mk -t \
 --time_partitioning_type=DAY \
 project:dbt_cou.test_partitioned

Then I tried to insert into the table with the data through a SELECT and I got following error
Cannot execute DML statements over table project:dbt_cou.test_partitioned without schema
So it looks like Google is preventing the select to infer the schema here (though I hardly think it should be a restriction).

However as the query

select
    transaction_date as _partitiontime,
    transaction_id,
    transaction_date
from {{ source('order_system', 'transactions') }}

is returning a _partitiontime, it's not valid either to make it a temporary table.

It's a pretty thorny issue. It feels like it requires:

  • to create a table with an empty schema
  • to run the query as a CTE on which we do something along
WITH
 base AS(
 SELECT
   DATE("2022-02-20") AS _PARTITIONTIME,
   1 AS transaction_id)
SELECT
_PARTITIONTIME as partition,
 * EXCEPT(_PARTITIONTIME)
FROM
 base

and save it to the "temp" table

  • copy the schema from that table to the target
  • insert into the table from the temp table
insert into mydataset.newtable (_partitiontime, transaction_id, transaction_date)
select
    partition as _partitiontime,
    transaction_id,
    transaction_date
from source_order_system.transactions

And here we succeeded I guess 🙌

Whenever the table is created, we should do something similar:
as _PARTITIONTIME is a restricted keyword, again we need to "wrap it" to create the temp table before the MERGE statement.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Feb 22, 2022

@github-christophe-oudar Ah, I see what you're saying. The limitation is, BigQuery won't let us run a query or create a temp table that contains a column named _partitiontime.

The downer is that, because this model's SQL returns a "forbidden" column name (_partitiontime), it wouldn't be possible to run it outside the context of a create table or insert statement (e.g. to preview data in the dbt Cloud IDE). I like your thought of offering a different convention, either:

  1. Users name this column something else, such as partitiontime (without the underscore). This gets users closer to what's actually happening behind the scenes.
  2. Users configure partition_by to include both the real column name and the fact that they want to use ingestion-time partitioning with a _partitiontime pseudo-column. This asks dbt to abstract/obfuscate away more inner workings, to offer a nicer user experience.

Revisiting the example above:

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by={
      "field": "transaction_date",
      "data_type": "date",
      "use_partitiontime": true    # something like this?
    }
) }}

select
    transaction_id,
    transaction_date
from {{ source('order_system', 'transactions') }}

Then, we handle the rest:

-- this is the DML that dbt encodes into the materialization, using metadata from preexisting table
insert into mydataset.newtable (_partitiontime, transaction_id, transaction_date)
select timestamp(transaction_date) as _partitiontime, * from (

    -- this is the user-provided model SQL, templated here directly without first saving to temp table
    select
        transaction_id,
        transaction_date
    from source_order_system.transactions
)

Put another way:

{% set partition_time_exp = 
    partition_by.field if partition_by.data_type == 'timestamp' 
    else 'timestamp(' + partition_by.field ')' %}

insert into {{ target_relation }} (_partitiontime, {{ dest_columns_csv }})
select {{ partition_time_exp }} as _partitiontime from (
    {{ sql }}
)

We'd still need a way to get the column schema from the "source" (model SQL) query, in order to template the column list in the initial create table statement, and to power on_schema_change in subsequent incremental runs. If we take this approach, we could either:

  1. Still create a "temp" table from the source query (it doesn't return a column named _partitiontime)
  2. Use get_columns_in_query, which would have the effect of running:
select * from (
    select
        transaction_id,
        transaction_date
    from source_order_system.transactions
) as __dbt_sbq
where false
limit 0

The temp table approach might be preferable, since it's easier to reason about, and more similar to the logic that we have in the materialization today.

@github-christophe-oudar
Copy link
Contributor Author

I agree with you! I think the approach definitely makes sense! 🙌
I'd suggest something like "time_ingestion_partitioning": true maybe?
Anyway a configuration key that's explicit enough and defaults to false.
Thank you for the pointers, it will definitely save me some time!

I'll have another shot over the weekend (maybe earlier if I can find some time to do so 🤞).
It's mostly the single last blocker (along linked #77) for a wider dbt adoption at my current company. It's about time to tackle it 💪.

@github-christophe-oudar
Copy link
Contributor Author

github-christophe-oudar commented Mar 13, 2022

Hello, just to let you know that I started to work on the topic, I'm a bit slow as I just have some time on weekends lately.
I created a draft as it's still WIP (not working yet): #136

@github-actions
Copy link
Contributor

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help_wanted Extra attention is needed type:enhancement New feature or request
Projects
None yet
2 participants