The Hive-BigQuery Connector is a Hive storage handler that enables Hive to interact with BigQuery's storage layer. It allows you to run queries in Hive using the HiveQL dialect to read from and write to BigQuery.
See the details in CHANGES.md.
This connector supports Dataproc 2.0, 2.1, and 2.2.
For Hadoop clusters other than Dataproc, the connector has been tested with the following software versions:
- Hive 1.2.1, 2.3.6, 2.3.9, 3.1.2, and 3.1.3.
- Hadoop 2.6.4, 2.7.0, 2.10.2, 3.2.3, and 3.3.3.
- Tez 0.9.2 on Hadoop 2, and Tez 0.10.1 on Hadoop 3.
- Pig 0.16.0, 0.17.0.
Make sure you have the BigQuery Storage API enabled in your GCP project. Follow these instructions.
For Dataproc clusters, the most convenient way to install the Hive-BigQuery connector is to use the connectors init action.
You can also download an official release JAR from Maven Central.
Alternately, you can build a JAR from source:
-
Clone this repository:
git clone https://github.com/GoogleCloudPlatform/hive-bigquery-connector cd hive-bigquery-connector
-
Compile and package the jar:
- For Hive 1:
./mvnw package -DskipTests -P hive1-generic
-
For Hive 2:
./mvnw package -DskipTests -P hive2-generic
The packaged jar is now available at:
hive-2-bigquery-connector/target/hive-2-bigquery-connector-<version>.jar
-
For Hive 3:
./mvnw package -DskipTests -P hive3-generic
The packaged jar is now available at:
hive-3-bigquery-connector/target/hive-3-bigquery-connector-<version>.jar
Once you have the connector JAR, deploy the JAR to the classpath of all nodes in your Hive cluster.
You can also provide the JAR as a parameter when starting a Hive or Beeline session:
hive --auxpath <jar path>/hive-bigquery-connector-<version>.jar
beeline > add jar <jar path>/hive-bigquery-connector-<version>.jar;
Hive can have two types of tables:
- Managed tables, sometimes referred to as internal tables.
- External tables.
The Hive BigQuery connector supports both types in the following ways.
When you create a managed table using the CREATE TABLE
statement, the connector creates both
the table metadata in the Hive Metastore and a new table in BigQuery with the same schema.
Here's an example:
CREATE TABLE mytable (word_count BIGINT, word STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mytable'
);
When you drop a managed table using the DROP TABLE
statement, the connector drops both the table
metadata from the Hive Metastore and the BigQuery table (including all of its data).
For Hive-3.x, create a managed table with NOT NULL
column restraint will not create the BigQuery table
with corresponding NOT NULL
restraint. The NOT NULL
restraint is still enforced by Hive at runtime.
It is recommended to use external table if user needs to have the NOT NULL
restraint on the BigQuery table.
When you create an external table using the CREATE EXTERNAL TABLE
statement, the connector only
creates the table metadata in the Hive Metastore. It assumes that the corresponding table in
BigQuery already exists.
Here's an example:
CREATE EXTERNAL TABLE mytable (word_count BIGINT, word STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mytable'
);
When you drop an external table using the DROP TABLE
statement, the connector only drops the table
metadata from the Hive Metastore. The corresponding BigQuery table remains unaffected.
It is recommended to collect statistics
(e.g. the number of rows, raw data size, etc) to help Hive to optimize query plan,
therefore improving performance. Follow these steps to collect statistics for a table:
(replace <table_name>
with your table name)
If your Hive has HIVE-24928 (this is applied on Dataproc), statistics can be collected by a quick metadata collection operation through
ANALYZE TABLE <table_name> COMPUTE STATISTICS;
To verify if Hive has reasonable statistics on numRows
and rawDataSize
for your table.
DESCRIBE FORMATTED <table_name>;
Example output:
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
numFiles 0
numRows 12345
rawDataSize 67890
totalSize 34567
Workaround for without HIVE-24928:
User can run
ANALYZE TABLE <table_name> COMPUTE STATISTICS FOR COLUMNS;
to collect column stats and run query with the assistance of column stats.
SET hive.stats.fetch.column.stats=true;
Column stats collection is more expensive operation, so it is recommended to have HIVE-24928 applied.
As Hive's partitioning and BigQuery's partitioning inherently work in different ways, the Hive
PARTITIONED BY
clause is not supported. However, you can still leverage BigQuery's native
partitioning by specifying some table properties. Two types of BigQuery native partitioning are
currently supported: time-unit column partitioning and ingestion time partitioning.
Note: This section is about BigQuery native partitioning. To learn about integrating with partitioned tables in open formats like Parquet or ORC, refer to the section on BigLake integration.
You can partition a BigQuery table on a column of BigQuery types DATE
, DATETIME
, or
TIMESTAMP
, which respectively correspond to the Hive types DATE
, TIMESTAMP
, and
TIMESTAMPLOCALTZ
. When you write data to the table, BigQuery automatically puts the data into the
correct partition based on the values in the column.
For the DATETIME
and TIMESTAMP
BigQuery types, the partitions can have either hourly, daily,
monthly, or yearly granularity. For the DATE
type, the partitions can have daily, monthly, or
yearly granularity. Partition boundaries are based on UTC time.
To create a table partitioned by a time-unit column, you must set the bq.time.partition.field
table property to the column's name.
Here's an example:
CREATE TABLE mytable (int_val BIGINT, ts TIMESTAMP)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mytable',
'bq.time.partition.field'='ts',
'bq.time.partition.type'='MONTH'
);
Check out the official BigQuery documentation about Time-unit column partitioning to learn more.
When you create a table partitioned by ingestion time, BigQuery automatically assigns rows to partitions based on the time when BigQuery ingests the data. You can choose hourly, daily, monthly, or yearly boundaries for the partitions. Partitions boundaries are based on UTC time.
An ingestion time partitioned table also has two pseudo columns:
_PARTITIONTIME
: ingestion time for each row, truncated to the partition boundary (such as hourly or daily). This column has theDATE
Hive type, which corresponds to the BigQueryDATE
type._PARTITIONDATE
: UTC date corresponding to the value in the_PARTITIONTIME
pseudo column. This column has theTIMESTAMPLOCALTZ
Hive type, which corresponds to the BigQueryTIMESTAMP
type.
To create a table partitioned by ingestion time, you must set the bq.time.partition.type
table
property to the partition boundary of your choice (HOUR
, DAY
, MONTH
, or YEAR
).
Here's an example:
CREATE TABLE mytable (int_val BIGINT)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mytable',
'bq.time.partition.type'='DAY'
);
Note: Ingestion time partitioning is currently supported only for read operations.
Check out the official BigQuery documentation about Ingestion time partitioning to learn more.
As Hive's clustering and BigQuery's clustering inherently work in different ways, the Hive
CLUSTERED BY
clause is not supported. However, you can still leverage BigQuery's native clustering
by setting the bq.clustered.fields
table property to a comma-separated list of the columns to
cluster the table by.
Here's an example:
CREATE TABLE mytable (int_val BIGINT, text STRING, purchase_date DATE)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mytable',
'bq.clustered.fields'='int_val,text'
);
Check out the official BigQuery documentation about Clustering to learn more.
You can use the following properties in the TBLPROPERTIES
clause when you create a new table:
Property | Description |
---|---|
bq.table |
Always required. BigQuery table name in the format of project.dataset.table |
bq.time.partition.type |
Time partitioning granularity. Possible values: HOUR , DAY , MONTH , YEAR |
bq.time.partition.field |
Name of a DATE or TIMESTAMP column to partition the table by |
bq.time.partition.expiration.ms |
Partition expiration time in milliseconds |
bq.time.partition.require.filter |
Set it to true to require that all queries on the table must include a predicate filter (a WHERE clause) that filters on the partitioning column. Defaults to false . |
bq.clustered.fields |
Comma-separated list of fields to cluster the table by |
You can set the following Hive/Hadoop configuration properties in your environment:
Property | Default value | Description |
---|---|---|
bq.read.data.format |
arrow |
Data format used for reads from BigQuery. Possible values: arrow , avro . |
bq.temp.gcs.path |
GCS location for storing temporary Avro files when using the indirect write method |
|
bq.write.method |
direct |
Indicates how to write data to BigQuery. Possible values: direct (to directly write to the BigQuery storage API), indirect (to stage temporary Avro files to GCS before loading into BigQuery). |
bq.work.dir.parent.path |
${hadoop.tmp.dir} |
Parent path on HDFS where each job creates its temporary work directory |
bq.work.dir.name.prefix |
hive-bq- |
Prefix used for naming the jobs' temporary directories. |
bq.destination.table.kms.key.name |
Cloud KMS encryption key used to protect the job's destination BigQuery table. Read more in the section on customer-managed encryption keys | |
materializationProject |
Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. | |
materializationDataset |
Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. | |
maxParallelism |
Maximum initial number of read streams | |
viewsEnabled |
false |
Set it to true to enable reading views. |
Add links to Hive & BQ types doc.
Hive | Hive type description | BigQuery | BigQuery type description |
---|---|---|---|
TINYINT |
1-byte signed integer | INT64 |
|
SMALLINT |
2-byte signed integer | INT64 |
|
INT |
4-byte signed integer | INT64 |
|
BIGINT |
8-byte signed integer | INT64 |
|
FLOAT |
4-byte single precision floating point number | FLOAT64 |
|
DOUBLE |
8-byte double precision floating point number | FLOAT64 |
|
DECIMAL |
Alias of NUMERIC . Precision: 38. Scale: 38 |
DECIMAL |
Alias of NUMERIC . Precision: 38. Scale: 9. |
DATE |
Format: YYYY-MM-DD |
DATE |
Format: YYYY-[M]M-[D]D . Supported range: 0001-01-01 to 9999-12-31 |
TIMESTAMP |
Timezone-less timestamp stored as an offset from the UNIX epoch | DATETIME |
A date and time, as they might be displayed on a watch, independent of time zone. |
TIMESTAMPLOCALTZ |
Timezoned timestamp stored as an offset from the UNIX epoch | TIMESTAMP |
Absolute point in time, independent of any time zone or convention such as Daylight Savings Time |
BOOLEAN |
Boolean values are represented by the keywords TRUE and FALSE | BOOLEAN |
|
CHAR |
Variable-length character data | STRING |
|
VARCHAR |
Variable-length character data | STRING |
|
STRING |
Variable-length character data | STRING |
|
BINARY |
Variable-length binary data | BYTES |
|
ARRAY |
Represents repeated values | ARRAY |
|
STRUCT |
Represents nested structures | STRUCT |
|
MAP |
Dictionary of keys and values. Keys must be of primitive type, whereas values can be of any type. | ARRAY<STRUCT> |
BigQuery doesn't support Maps natively. The connector implements it as a list of structs, where each struct has two columns: name and value . |
The BigQuery storage handler supports both the MapReduce and Tez execution engines. Tez is recommended for better
performance -- you can use it by setting the hive.execution.engine=tez
configuration property.
Since BigQuery is backed by a columnar datastore, it can efficiently stream data without reading all columns.
The connector supports predicate pushdowns to the BigQuery Storage Read API. This allows to filter data at the BigQuery storage layer, which reduces the amount of data traversing the network and improves overall performance.
Many built-in Hive UDFs and operators (e.g. AND
, OR
, ABS
, TRIM
, BETWEEN
...) are identical
in BigQuery, so the connector pushes those as-is to BigQuery.
However, some Hive UDFs and operators are different in BigQuery. So the connector automatically converts those to the equivalent functions in BigQuery. Below is the list of UDFs and operators that are automatically converted:
Hive generic UDF | BigQuery function | Notes |
---|---|---|
% |
MOD |
BigQuery currently supports MOD only for the INT64 , NUMERIC , and BIGNUMERIC types |
TO_DATE |
DATE |
|
DATE_ADD |
DATE_ADD |
|
DATE_SUB |
DATE_SUB |
|
DATEDIFF |
DATE_DIFF |
|
DATEDIFF |
DATE_DIFF |
|
HEX |
TO_HEX |
|
UNHEX |
FROM_HEX |
|
NVL |
IFNULL |
|
RLIKE |
REGEXP_CONTAINS |
|
SHIFTLEFT |
<< |
|
SHIFTRIGHT |
>> |
|
YEAR |
EXTRACT(YEAR FROM ...) |
|
MONTH |
EXTRACT(MONTH FROM ...) |
|
DAY |
EXTRACT(DAY FROM ...) |
|
HOUR |
EXTRACT(HOUR FROM ...) |
|
MINUTE |
EXTRACT(MINUTE FROM ...) |
|
SECOND |
EXTRACT(SECOND FROM ...) |
|
DAYOFWEEK |
EXTRACT(DAYOFWEEK FROM ...) |
|
WEEKOFYEAR |
EXTRACT(WEEK FROM ...) |
|
QUARTER |
EXTRACT(QUARTER FROM ...) |
Note: Custom Hive UDFs (aka Hive
plugins) are currently not supported in predicate pushdowns. If a query contains a custom Hive UDF
in a WHERE
filter, then the custom UDF will not be pushed down to BigQuery and will instead be
processed by the Hive query engine.
The connector allows parallel reads from BigQuery by using the BigQuery Storage API.
You can set the preferredMinParallelism
configuration property, which the connector passes to
CreateReadSessionRequest.setPreferredMinStreamCount()
when it creates the BigQuery read session. This parameter can be used to inform the BigQuery service that there is a
desired lower bound on the number of streams. This is typically the target parallelism of the client (e.g. a Hive
cluster with N-workers would set this to a low multiple of N to ensure good cluster utilization). The BigQuery backend
makes a best effort to provide at least this number of streams, but in some cases might provide less.
Additionally, you can set the maxParallelism
configuration property, which the connector passes to
CreateReadSessionRequest.setMaxStreamCount().
If unset or zero, the BigQuery backend server will provide a value of streams to produce reasonable throughput. The
number of streams may be lower than the requested number, depending on the amount parallelism that is reasonable for
the table. There is a default system max limit of 1,000. This must be greater than or equal to the number of MapReduce
splits. Typically, a client should either leave this unset to let the system determine an upper bound or set this as the
maximum "units of work" that the client can gracefully handle.
The connector supports both the Arrow
and Avro read formats. You can specify
which format the connector should use by setting the bq.read.data.format
configuration property to either arrow
or
avro
. The connector uses Arrow by default as Arrow generally performs better than Avro.
The connector supports two methods for writing to BigQuery: direct writes and indirect writes.
The direct write method consists of writing directly to BigQuery by using the BigQuery Write API in "pending" mode.
The indirect write method consists of the following steps:
- During the execution of a write job, each mapper task creates its own BigQuery write stream and writes directly to BigQuery in parallel.
- At the end of the job, the connector commits all the streams together atomically. If the commit succeeds, then the newly written data instantly becomes available for reading.
If for some reason the job fails, all the writes that may have been done through the various open streams are automatically garbage-collected by the BigQuery backend and none of the data ends up persisting in the target table.
The direct write method is generally faster than the indirect write method but incurs costs associated with usage of the BigQuery Storage Write API.
The connector uses this method by default.
The indirect write method consists of the following steps:
- During the execution of a write job, each mapper task creates its own temporary output Avro file and stages it to GCS.
- At the end of the job, the connector commits the writes by executing a BigQuery load job with all the Avro files.
- Once the job is complete, the connector deletes the temporary Avro files from GCS.
This method is generally costs less than the direct write method as BigQuery load jobs are free and this method only incur costs related to GCS write operations and storage. However, this method is also generally much slower due to its multi-stage nature and data being routed through GCS. Learn more about other limitations.
The connector uses the direct write method by default. To let it use the indirect method instead,
set the bq.write.method
configuration property to indirect
, and set the bq.temp.gcs.path
property to indicate where to store the temporary Avro files in GCS.
The connector has preliminary support for reading from BigQuery logical views and BigQuery materialized views.
Please note there are a few caveats:
- The Storage Read API operates on storage directly, so the API cannot be used to read logical or materialized views. To get around this limitation, the connector materializes the views into temporary tables before it can read them. This materialization process can affect overall read performance and incur additional costs to your BigQuery bill.
- By default, the materialized views are created in the same project and dataset. Those can be configured by the
optional
materializationProject
andmaterializationDataset
Hive configuration properties or table properties, respectively. - As mentioned in the BigQuery documentation,
the
materializationDataset
should be in same location as the view. - Reading from views is disabled by default. In order to enable it, set the
viewsEnabled
configuration property totrue
.
The connector supports reading from BigQuery table snapshots.
A BigQuery table snapshot preserves the contents of a table (called the base table) at a particular time. You can save a snapshot of a current table, or create a snapshot of a table as it was at any time in the past seven days.
To link a Hive table to a BigQuery table snapshot, simply specify the snapshot's name in the
bq.table
table property, for example:
CREATE TABLE mytable (abc BIGINT, xyz STRING)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table'='myproject.mydataset.mysnapshot'
);
As stated in the BigQuery Storage API documentation,
read sessions are based on a snapshot isolation model (Note: this is unrelated to
BigQuery table snapshots). All
consumers read based on a specific point in time. The snapshot time is based on the read
session creation time (i.e. when the SELECT
query is initiated).
Note that this consistency model currently only applies to the table data, not its metadata.
Dataproc uses a patched version of Spark that automatically detects a table that has the bq.table
table property, in which case Spark will use the Spark-BigQuery Connector
to access the table's data. This means that on Dataproc you actually do not need to use the
Hive-BigQuery Connector for Spark SQL.
Java example:
SparkConf sparkConf = new SparkConf().setMaster("local");
SparkSession spark =
SparkSession.builder()
.appName("example")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
Dataset<Row> ds = spark.sql("SELECT * FROM mytable");
Row[] rows = ds.collect();
Python example:
spark = SparkSession.builder \
.appName("example") \
.config("spark.master", "local") \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM mytable")
rows = df.collect()
The connector supports Apache Pig via HCatalog.
Here's an example reading from a BigQuery table and writing to another, assuming that
my-database.my-table
and my-database.my-other-table
have been registered as BigQuery tables:
some_data = LOAD 'my-database.my-table' USING org.apache.hive.hcatalog.pig.HCatLoader();
STORE some_data INTO 'my-database.my-other-table' USING org.apache.hive.hcatalog.pig.HCatStorer();
Notes:
- Pig only supports
datetime
types with milliseconds precision, so you may encounter precision loss if you have values with nanoseconds in Hive or BigQuery. Learn more in the HCatalog documentation on data type mappings.
BigLake allows you to store your data in open formats (e.g Parquet, ORC) in an object store like GCS instead of in BigQuery's native storage called Capacitor, but still leverage advanced BigQuery features like metadata caching for query performance, or column-level access control and dynamic data masking for security and governance at scale.
To integrate Hive with BigLake tables backed by GCS, you need to:
- Create a Cloud resource connection.
- Upload your data files to a GCS bucket.
- Create an external table definition in BigQuery that points to the data file URIs in GCS.
- Create an external table definition in Hive that points to the BigQuery table definition.
In this example, we create simple table backed by some CSV files.
The files have the following layout in a GCS bucket:
gs://mybucket/warehouse/my_csv_table/file1.csv
gs://mybucket/warehouse/my_csv_table/file2.csv
External BigQuery table definition:
CREATE EXTERNAL TABLE `myproject.mydataset.my_csv_table`
WITH CONNECTION `myproject.us.myconnection`
OPTIONS (
format = 'CSV',
uris = ['gs://mybucket/warehouse/my_csv_table/*.csv']
)
External Hive table definition:
CREATE EXTERNAL TABLE mydb.my_csv_table (
abc BIGINT,
xyz STRING
)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table' = 'myproject.mydataset.my_csv_table'
)
Sample Hive query to read data:
SELECT abc
FROM mydb.my_csv_table
WHERE xyz = "some text"
In this example, we integrate a hive-partitioned Parquet table with BigLake.
The Parquet files have two columns:
id
: 64-bit integername
: string
The Parquet files are partitioned by city
and have the following layout:
gs://mybucket/warehouse/my_parquet_table/city=Paris/000000_0.parquet
gs://mybucket/warehouse/my_parquet_table/city=Paris/000001_0.parquet
gs://mybucket/warehouse/my_parquet_table/city=London/000000_0.parquet
gs://mybucket/warehouse/my_parquet_table/city=London/000001_0.parquet
gs://mybucket/warehouse/my_parquet_table/city=London/000002_0.parquet
External BigQuery table definition:
CREATE EXTERNAL TABLE `myproject.mydataset.my_parquet_table`
WITH PARTITION COLUMNS
WITH CONNECTION `myproject.us.myconnection`
OPTIONS (
format = 'PARQUET',
uris = ['gs://mybucket/warehouse/my_parquet_table/*'],
hive_partition_uri_prefix = 'gs://mybucket/warehouse/my_parquet_table',
require_hive_partition_filter = true
)
Notes about the above statement:
- The
WITH PARTITION COLUMNS
clause exposes thecity
partition as a column in the BigQuery table schema. - The
require_hive_partition_filter=true
option means that all queries over this table require a partition filter that can be used to eliminate partitions when reading data.
External Hive table definition that matches the BigQuery table's schema:
CREATE EXTERNAL TABLE mydb.my_parquet_table (
id BIGINT,
name STRING
city STRING
)
STORED BY 'com.google.cloud.hive.bigquery.connector.BigQueryStorageHandler'
TBLPROPERTIES (
'bq.table' = 'myproject.mydataset.my_parquet_table'
)
Sample Hive query to read data:
SELECT *
FROM mydb.my_parquet_table
WHERE city = "Paris"
For more information, refer to the official BigQuery documentation on Creating external tables on partitioned data and Creating Cloud Storage BigLake tables.
The connector needs an instance of a GoogleCredentials
in order to connect to the BigQuery APIs.
By default, on Dataproc, the connector automatically uses the cluster service account's credentials.
There are multiple options to override the default behavior and to provide custom credentials:
-
Set the path to a service account's JSON private key in the
GOOGLE_APPLICATION_CREDENTIALS
environment variable. -
Set the path to a service account's JSON private key in the
bq.credentials.file
configuration property. -
Set a base64-encoded service account JSON private key in the
bq.credentials.key
configuration property. -
Set the fully qualified class name of a custom access token provider implementation in the
bq.access.token.provider.fqcn
configuration property. The class must be implemented in Java or other JVM languages such as Scala or Kotlin. It must either have a no-arg constructor or a constructor accepting a singlejava.util.String
parameter. This parameter can be supplied using thebq.access.token.provider.config
configuration property. If the property is not set then the no-arg constructor will be called. The JAR containing the implementation class should be on the cluster's classpath. -
Define service account impersonation for specific users, specific groups, or for all users that run the Hive query by default using the below properties:
-
bq.impersonation.service.account.for.user.<USER_NAME>
(not set by default)The service account to be impersonated for a specific user. You can specify multiple properties using that pattern for multiple users.
-
bq.impersonation.service.account.for.group.<GROUP_NAME>
(not set by default)The service account to be impersonated for a specific group. You can specify multiple properties using that pattern for multiple groups.
-
bq.impersonation.service.account
(not set by default)Default service account to be impersonated for all users.
If any of the above properties are set then the service account specified will be impersonated by generating a short-lived credentials when accessing BigQuery.
If more than one property is set then the service account associated with the username will take precedence over the service account associated with the group name for a matching user and group, which in turn will take precedence over default service account impersonation.
The impersonator service account must have the
serviceAccountTokenCreator
IAM role set on the impersonated service account. -
-
For simpler applications where access token refresh is not required, pass the access token itself with the
bq.access.token
configuration property. You can generate an access token by runninggcloud auth application-default print-access-token
.
You can provide a Cloud KMS key to be used to encrypt the destination table, for example when you
run a CREATE TABLE
statement for a managed table, or when you insert data into a table that
doesn't exist yet. To do so, set the bq.destination.table.kms.key.name
property with the
fully-qualified named of the desired Cloud KMS key in the form:
projects/<KMS_PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY>
The BigQuery service account associated with your project requires access to this encryption key.
The table will be encrypted by the key only if it created by the connector. A pre-existing unencrypted table won't be encrypted just by setting this option.
For further information about using customer-managed encryption keys (CMEK) with BigQuery, see here.
- The
UPDATE
,MERGE
, andDELETE
, andALTER TABLE
statements are currently not supported. - The
PARTITIONED BY
,CLUSTERED BY
,INSERT INTO PARTITION
, andINSERT INTO PARTITION OVERWRITE
statements are currently not supported. Note, however, that partitioning and clustering in BigQuery are supported viaTBLPROPERTIES
. See the corresponding sections on partitioning and clustering. - CTAS (aka
CREATE TABLE AS SELECT
) and CTLT (CREATE TABLE LIKE TABLE
) statements are currently not supported. - If you use the Hive
MAP
type, then the map's key must be ofSTRING
type if you use the Avro format for reading or the indirect method for writing. This is because Avro requires keys to be strings. If you use the Arrow format for reading (default) and the direct method for writing (also default), then there are no type limitations for the keys. - Hive
DECIMAL
data type has precision of 38 and scale of 38, while BigQuery'sNUMERIC
/BIGNUMERIC
have different precisions and scales. (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types), If the BigQuery data ofBIGNUMERIC
's precision and scale out of Hive'sDECIMAL
range, data can show up as NULL. - Custom Hive UDFs (aka Hive plugins) are currently not supported.
- BigQuery ingestion time partitioning is currently supported only for read operations.
- BigQuery integer range partitioning is currently not supported.
To standardize the code's format, use Spotless:
./mvnw spotless:apply
Create a service account and give the following roles in your project:
- BigQuery Admin
- Storage Admin
Download a JSON private key for the service account, and set the GOOGLE_APPLICATION_CREDENTIALS
environment
variable:
export GOOGLE_APPLICATION_CREDENTIALS=<path/to/your/key.json>
Enable the following APIs:
gcloud services enable \
bigquerystorage.googleapis.com \
bigqueryconnection.googleapis.com \
cloudkms.googleapis.com
Define environment variables:
export PROJECT=<my-gcp-project>
export BIGLAKE_LOCATION=us
export BIGLAKE_REGION=us-central1
export BIGLAKE_CONNECTION=hive-integration-tests
export BIGLAKE_BUCKET=${PROJECT}-biglake-tests
Create the test BigLake connection:
bq mk \
--connection \
--project_id="${PROJECT}" \
--location="${BIGLAKE_LOCATION}" \
--connection_type=CLOUD_RESOURCE \
"${BIGLAKE_CONNECTION}"
Create the bucket to host BigLake datasets:
gsutil mb -l "${BIGLAKE_REGION}" "gs://${BIGLAKE_BUCKET}"
Give the BigLake connection's service account access to the bucket:
export BIGLAKE_SA=$(bq show --connection --format json "${PROJECT}.${BIGLAKE_LOCATION}.${BIGLAKE_CONNECTION}" \
| jq -r .cloudResource.serviceAccountId)
gsutil iam ch serviceAccount:${BIGLAKE_SA}:objectViewer gs://${BIGLAKE_BUCKET}
Create a KMS keyring:
gcloud kms keyrings create \
integration_tests_keyring \
--location us
gcloud kms keys create integration_tests_key \
--keyring integration_tests_keyring \
--location us \
--purpose "encryption"
Obtain the BigQuery service account name:
BQ_SERVICE_ACCOUNT=$(bq show --encryption_service_account --format json | jq -r ".ServiceAccountID")
Assign the Encrypter/Decrypter role to the BigQuery service account:
gcloud kms keys add-iam-policy-binding \
--project=${PROJECT} \
--member serviceAccount:${BQ_SERVICE_ACCOUNT} \
--role roles/cloudkms.cryptoKeyEncrypterDecrypter \
--location=us \
--keyring=integration_tests_keyring \
integration_tests_key
You must use Java version 8, as it's the version that Hive itself uses. Make sure that JAVA_HOME
points to the Java
8's base directory.
-
To run the integration tests:
./mvnw verify -Pdataproc22,integration
-
To run a single integration test class:
./mvnw verify -Pdataproc22,integration -Dit.test="BigLakeIntegrationTests"
-
To run a specific integration test method:
./mvnw verify -Pdataproc22,integration -Dit.test="BigLakeIntegrationTests#testReadBigLakeTable"
-
To debug the tests, add the
-Dmaven.failsafe.debug
property:./mvnw verify -Pdataproc22,integration -Dmaven.failsafe.debug
... then run a remote debugger in IntelliJ at port
5005
. Read more about debugging with FailSafe here: https://maven.apache.org/surefire/maven-failsafe-plugin/examples/debugging.html
Acceptance tests create Dataproc clusters with the connector and run jobs to verify it.
The following environment variables must be set and exported first.
GOOGLE_APPLICATION_CREDENTIALS
- the full path to a credentials JSON, either a service account or the result of agcloud auth login
runGOOGLE_CLOUD_PROJECT
- The Google cloud platform project used to test the connectorTEST_BUCKET
- The GCS bucked used to test writing to BigQuery during the integration testsACCEPTANCE_TEST_BUCKET
- The GCS bucked used to test writing to BigQuery during the acceptance tests
To run the acceptance tests:
./mvnw verify -Pdataproc22,acceptance
If you want to avoid rebuilding the shaded-deps-dataproc22
and
shaded-acceptance-tests-dependencies
modules if they have no changes, you can break it down into
the following steps:
# Install hive-bigquery-parent/pom.xml to the Maven local repo
./mvnw install:install-file -Dpackaging=pom -Dfile=hive-bigquery-parent/pom.xml -DpomFile=hive-bigquery-parent/pom.xml
# Build and install the module JARs to the Maven local repo
./mvnw clean install -pl shaded-deps-dataproc22,shaded-acceptance-tests-dependencies -Pdataproc22 -DskipTests
At that point you can just run the tests without rebuilding the modules:
./mvnw clean verify -pl hive-bigquery-connector-common,hive-3-bigquery-connector -Pdataproc22,acceptance
To run the tests for Hadoop 2, pass the -Phadoop2
parameter to the mvnw verify
command to
activate the hadoop2
Maven profile. For Hadoop 3, pass -Phadoop3
instead.
Before you can run the tests with Hadoop 3, you also must install Tez's latest (unreleased) 0.9.3:
-
Install Protobuf v2.5.0:
If you're on MacOS, install these packages:
brew install automake libtool wget
Then compile Protobuf from source:
cd ~ wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.bz2 tar -xvjf protobuf-2.5.0.tar.bz2 rm protobuf-2.5.0.tar.bz2 cd protobuf-2.5.0 ./autogen.sh ./configure --prefix=$(PWD) make; make check make install
-
Get the Tez source:
cd ~ git clone cd git@github.com:apache/tez.git cd tez git checkout origin/branch-0.9
-
Compile and install Tez:
export PATH=${HOME}/protobuf-2.5.0/bin:${PATH} mvn clean install \ --projects=tez-api,tez-common,tez-mapreduce,tez-dag,hadoop-shim,tez-runtime-library,tez-runtime-internals \ -DskipTests=true -Dmaven.javadoc.skip=true \ -Dprotoc.path=${HOME}/protobuf-2.5.0/bin/protoc -Dhadoop.version=3.2.3
If all steps have succeeded, then Tez's
0.9.2-SNAPSHOT
packages should be installed in your local Maven repository and you should be able to run the tests with Hadoop 3 by using the-Phadoop3
argument.