Skip to content
This repository was archived by the owner on Aug 31, 2022. It is now read-only.

bigquery operator #125

Merged
merged 2 commits into from
Aug 17, 2018
Merged

bigquery operator #125

merged 2 commits into from
Aug 17, 2018

Conversation

danielnorberg
Copy link
Contributor

@danielnorberg danielnorberg commented Aug 1, 2018

Usage:

Run a query and create table from results

final Task<TableId> task = Task.named("task")
    .ofType(TableId.class)
    .context(BigQueryContext.create(table))
    .context(BigQueryOperator.create())
    .process((stagingTable, bq) -> bq.job(
        JobInfo.of(QueryJobConfiguration.newBuilder("SELECT foo FROM input")
            .setDestinationTable(stagingTable.tableId())
            .build()))
        .success(response -> stagingTable.publish()));

Run a query and get the results directly

final Task<String> task = Task.named("task")
    .ofType(String.class)
    .context(BigQueryOperator.create())
    .process(bq -> bq.query("SELECT foo FROM input")
        .success(result -> "success!"));

Load data from GCS (via hades)

final Task<TableId> task = Task.named("load")
    .ofType(TableId.class)
    .input(() -> HadesTasks.lookup("src-endpoint", "src-partition"))
    .context(BigQueryContext.create(TableId.of("dst-project", "dst-dataset", "dst-table")))
    .operator(BigQueryOperator.create())
    .process((input, table, bq) -> bq
        .job(LoadJobConfiguration
            .newBuilder(table.tableId(), input.uri())
            .setSchema(Schema.of(...))) // configure avro schema?
        .success(job -> table.publish()));

@NatashaL
Copy link
Contributor

LGTM 👍

@danielnorberg danielnorberg merged commit abad2ad into master Aug 17, 2018
@danielnorberg danielnorberg deleted the bigquery-operator branch August 17, 2018 02:40
narape added a commit that referenced this pull request Nov 12, 2019
As described in #125 , the original
idea was to be able to write
final Task<TableId> task = Task.named("task")
    .ofType(TableId.class)
    .context(BigQueryContext.create(table))
    .context(BigQueryOperator.create())
    .process((stagingTable, bq) -> bq.job(
        JobInfo.of(QueryJobConfiguration.newBuilder("SELECT foo FROM input")
            .setDestinationTable(stagingTable.tableId())
            .build()))
        .success(response -> stagingTable.publish()));

But because success is package private this use case is only allowed in
the classes in the same package, like the unit test
@narape narape mentioned this pull request Nov 12, 2019
10 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants