Skip to content

Compaction

Henry Haiying Cai edited this page Jan 29, 2015 · 49 revisions
  • Author: Ziyang
  • Reviewer: Henry

One of the canonical use cases of Gobblin is periodically pulling data from data stores, where we pull full snapshots of the data store with low frequency (due to their sizes), and pull deltas of the snapshots, which contains all records inserted or updated since the last pull, with higher frequency. Each time we pull deltas, we merge them with the latest snapshot we have, which is called a compaction. Since delta tables don't have information of deleted records, such information is only available the next time we pull the snapshot.

One way to perform compaction is using Hive. Let us look at a few examples.

<stakiar> I suggest adding some more introduction to this area; briefly mention the different topics you are going to cover in this page and the goal of each section. </stakiar>

A Simple Example

<lqiao> There is a strong assumption on serial order of snapshot and delta, and snapshot and delta never overlap in time. We need to call it out in our deployment page to make sure that's the case. For the first release it's ok to live with that constraint. </lqiao>

<stakiar> Looks like the first part of the document talks about how to handle the case where the deltas only have (key, value). This would require snapshots and deltas to be taken sequentially. However, looks like the end of the document talks about how to do compaction with deltas (key, value, delta), which would allow deltas to be taken in parallel with snapshots. It would probably be a good idea to touch upon this point at the beginning of the document, that way it's clearer to the users what the tradeoffs are between having the delta schema as (key, value) vs. (key, value, delta). </stakiar>

*<hcai> I think there might be two use cases: 1. apply delta on snapshot; 2. rollup hourly into daily. *</hcai>

Suppose we have one snapshot table 'snapshot (key INT, value STRING)' and one delta table 'delta (key INT, value STRING)'. Both tables have been pulled and stored in avro format (e.g., snapshot.avro, delta.avro, schema.avsc).

We first create tables for snapshot and delta:

CREATE EXTERNAL TABLE snapshot
STORED AS AVRO
LOCATION '<path-to-snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');

The delta table can be created similarly. Note that If your Hive version is before 0.14, the create table statement has to be written as

CREATE EXTERNAL TABLE snapshot
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS 
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '<path-to-snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');

Next we retrieve the records in the snapshot that are not updated in the delta, using a left outer join:

CREATE TEMPORARY TABLE not_updated AS
SELECT snapshot.key, snapshot.value FROM
snapshot LEFT OUTER JOIN delta
ON snapshot.key=delta.key
WHERE delta.key IS NULL;

Note that delta.key IS NULL should be in the WHERE clause rather than the join condition (i.e., ON snapshot.key=delta.key AND delta.key IS NULL), since the outer join has to be evaluated on condition snapshot.key=delta.key first, before certain keys in delta become NULL.

If you are using a Hive version before 0.14, you may change TEMPORARY TABLE to TABLE or VIEW.

An alternative way to retrieve not_updated is using a NOT IN subquery:

CREATE TEMPORARY TABLE not_updated AS
SELECT * from SNAPSHOT
WHERE snapshot.key NOT IN
(SELECT delta.key FROM delta);

However, this only works if the primary key of snapshot and delta contains a single attribute.

Finally we do a UNION ALL on not_updated and delta, and insert it to the new_snapshot table:

CREATE EXTERNAL TABLE new_snapshot
STORED AS AVRO
LOCATION '<path-to-new_snapshot.avro>'
TBLPROPERTIES ('avro.schema.url'='<path-to-schema.avsc>');

INSERT OVERWRITE TABLE new_snapshot AS 
SELECT * FROM not_updated
UNION ALL
SELECT * FROM delta;

<lqiao> Does this generate avro files automatically or the data format needs to be specified? We need to make sure the compaction outcome is not just in hive but also in avro format (first release), so down stream can continue with Pig or M/R to process the data. </lqiao>

Multiple Deltas

One extraction of the delta tables may get multiple delta tables of a snapshot. Suppose we have a snapshot table and multiple delta tables delta_1,..., delta_k, in the order of timestamp (thus if j>i, an update on a key on delta_j should trump an update on the same key on delta_i). We can first merge the delta tables into a single delta table with the following pseudocode:

CREATE TABLE tmp_delta(key INT, value STRING);
merged_delta = delta_1;
for (i from 2 to k)
    CREATE TABLE diff as
        SELECT merged_delta.key, merged_delta.value FROM
        merged_delta LEFT OUTER JOIN delta_i
        ON merged_delta.key=delta_i.key
        WHERE delta_i.key IS NULL;
    INSERT OVERWRITE TABLE tmp_delta
        SELECT * FROM diff
        UNION ALL
        SELECT * FROM delta_i;
    merged_delta = tmp_delta;

Then we can do the same thing as in the previous example.

Schema Changes

<stakiar> I know not all users may be using Avro, but does Avro provide an functionality to do schema evolution with Hive automatically? </stakiar>

Suppose we have a snapshot table and a delta table, with the same key attributes but different value attributes. Changing key attributes is out of the scope of this document, since it requires specific solutions depending on how the key attributes are changed.

New Attribute in Delta

Suppose the schema of snapshot is (key INT, value1 STRING) and the schema of delta is (key INT, value1 STRING, value2 STRING), and the default value of value2 is 'default'. After we obtain temporary table not_updated, we can add value2 to not_updated by

CREATE TABLE not_updated_with_new_attr
(key INT, value1 STRING, value2 STRING);

INSERT OVERWRITE TABLE not_updated_with_new_attr
SELECT *, 'default' from not_updated;

Then we can union not_updated_with_new_attr with delta. If the default value of value2 is NULL, then we can simply do

ALTER TABLE not_updated
ADD COLUMNS (value2 STRING);

Removed Attribute in Delta

Suppose the schema of snapshot is (key INT, value1 STRING, value2 STRING) and the schema of delta is (key INT, value1 STRING). Then we can simply project value2 out in the UNION ALL statement:

INSERT OVERWRITE TABLE new_snapshot AS 
SELECT key, value1 FROM not_updated
UNION ALL
SELECT * FROM delta;

Partitioned Table

Suppose the schema of snapshot and delta is (key INT, value STRING) partitioned by (year INT, month INT). To improve performance, in the left outer join we should include year and month in the join condition:

CREATE TEMPORARY TABLE not_updated AS
SELECT snapshot.key, snapshot.value FROM
snapshot LEFT OUTER JOIN delta
ON snapshot.key=delta.key
AND snapshot.date=delta.date
AND snapshot.month=delta.month
WHERE delta.key IS NULL;

This tells Hive to join corresponding partitions in snapshot and delta, rather than join the full tables.

Duplicate Keys in Delta

Suppose the schema of snapshot and delta is (key INT, value STRING, ts TIMESTAMP), and the delta table may contain multiple records of the same key if that key has been updated multiple times. In this case we should only merge the update with the latest timestamp with snapshot. To do so we can deduplicate the records in delta and only keep the one with the latest timestamp:

SELECT t.*
FROM delta t WHERE t.ts IN
(SELECT MAX(ts) FROM delta s
 WHERE s.key=t.key);

Overlap of Pulling Snapshot and Delta

In some applications the job pulling the snapshot and the job pulling the delta may run in parallel rather than sequentially. This means that it is possible that some records in delta are older than the corresponding records in snapshot, and such records in the snapshot should survive after merging snapshot and delta.

In this case each record should have a timestamp, and whenever a key is present in both snapshot and delta, the record with the latest timestamp wins. Suppose the schema of snapshot and delta is (key INT, value STRING, ts TIMESTAMP). We can first union snapshot and delta, then perform a deduplication similar as the previous query.

CREATE TABLE merged AS 
SELECT * FROM snapshot
UNION ALL
SELECT * FROM delta;

INSERT OVERWRITE TABLE new_snapshot AS
SELECT t.*
FROM merged t WHERE t.ts IN
(SELECT MAX(ts) FROM merged s
 WHERE s.key=t.key);
Clone this wiki locally