Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Support better integration with Map-reduce #556

Open
jtaylor-sfdc opened this issue Nov 15, 2013 · 8 comments
Open

Support better integration with Map-reduce #556

jtaylor-sfdc opened this issue Nov 15, 2013 · 8 comments

Comments

@jtaylor-sfdc
Copy link
Contributor

Idea from the mailing list here and here, to support a SELECT query to determine the output of the Map function and/or an UPSERT SELECT in the Reduce function.

Given that Phoenix already parallelizes your query in a similar way to a Map-reduce job, it's unclear to me what benefit this functionality would provide. Would the SELECT statement be running over only a single regions worth of data? What can you not do with an UPSERT SELECT that you can do with Map-reduce? I'm sure there are plenty of things, but it would be good to list the top ten to identify what's missing from Phoenix.

@mravi
Copy link
Contributor

mravi commented Nov 21, 2013

James,
a) It would be nice to give a minimal API like HCatalog for users to access the meta data of a table and be able to construct Put based on the metadata from within a Map Reduce program.
b) For applications that ingest large volumes of data into Hbase using Map Reduce , it would be difficult to first construct the csv of it and then bulk load into Hbase. For ex. ingesting data from DB. Though this can be done using SQOOP , we lose out in applying validations and decorating data before its ingestion into Hbase.
c) In cases where we would like to build various dimensional data from a given fact table in Hbase , it becomes difficult with a simple UPSERT SELECT as a lot of logic cannot be put in a custom function. The other way would be to load the data into memory , apply logic and then UPSERT into target table , but , this can be calling for memory issues. Here also, a MR job would be a good fit and all that users do is be able to construct the start and end rows for the SCAN for the job using the Phoenix API and then write logic within the mapper or reducer and finally write onto Hbase .

@jtaylor-sfdc
Copy link
Contributor Author

We have APIs to access metadata - they're the standard JDBC interface for DatabaseMetaData. My preference would be that we expose access to these when running map-reduce.

I agree that for (b), the CSV bulk loader only solves a subset of the problem. One thing that might help is making it easy to create built-in functions. Also, using Pig instead of Map-reduce may be a better model.

I agree that for (c), we can't rely on holding everything in memory

@mravi
Copy link
Contributor

mravi commented Dec 3, 2013

Hi James,
Is there a ability to construct a row key for a Put instance given a set of column values that form the primary keys for the table. Primarily , since Phoenix uses the key separator for the primary columns, if we provide end users with a builder class that takes in the column values and give back a byte[] similar to the RowKeySchemaBuilder that would suffice .

 From what I see , all requirements will fail under the three categories
          a) Phoenix HBase table as a source.
          b) Phoenix HBase table as a target
         c) Phoenix HBase table as a source and target.

For a) , we just need to give a Scan instance to the job as part of TableMapReduceUtil , that we can construct using the builder class.
For b), from within the Reducer , users will use the builder to construct the row key for the Put .

Regards
Ravi

@jtaylor-sfdc
Copy link
Contributor Author

To construct the row key for a table, you can do the following:

/**
 * Return the row key bytes given a set of primary key column values.
 * @param conn the database connection
 * @param fullTableName the HBase table name
 * @param pkColumnValues the byte[] of each primary key column in primary
 *  key constraint order (i.e. table.getPKColumns()). Note that if the table is
 *  salted (table.getBucketNum() != null), then the values[0] would ignored
 *  and the column values would start at values[1]. The returned bytes
 *  would have the correct initial salt byte.
 * @return the bytes of the row key
 */
public static byte[] getRowKey(Connection conn, String fullTableName, byte[][] pkColumnValues) {
    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
    PTable table = conn.unwrap(PhoenixConnection.class).getPMetaData().getTable(fullTableName);
    table.newRow(ptr, pkColumnValues);
    return ByteUtil.copyKeyBytesIfNecessary(ptr);
}

@ghost ghost assigned mravi Dec 4, 2013
@mravi
Copy link
Contributor

mravi commented Dec 4, 2013

Hi James,
Thanks for the update. I believe the above code can be used to construct the start and stop rows for the Scan instance to the job. The connection initialization will be a one time cost to us here.
However , to meet b) , we would be forced to initialize a connection in every reducer in the setup method. Is there a way we can avoid it.

Regards
Ravi

@jtaylor-sfdc
Copy link
Contributor Author

Not sure I follow about forming the start/stop key of the Scan. Is this based on a SQL query, as there's quite a bit of code in Phoenix that figures this out?

For (b), it's fine to initialize a connection in the reducer, as our driver is embedded. All connections on the same JVM share the same HConnection between the HBase client and server, so creating a new connection is just a few instantiations.

One other call you'll likely need to make is the following:

  MetaDataClient client = new MetaDataClient(connection);
  // Where with "FOO.BAR", the schemaName is "FOO" and the tableName is "BAR"
  client.updateCache(schemaName, tableName); 

This will populate the client-side cache that your using to retrieve the PTable from the server-side definition in the SYSTEM.TABLE. If you know that the schema of the table won't be changing while your map-reduce job is running, this would be a one-time thing. Otherwise, you'd need to do this prior to forming the row key to ensure you're working with the latest table definition.

@jtaylor-sfdc
Copy link
Contributor Author

@mravi - did anything come out of your effort here?

@mravi
Copy link
Contributor

mravi commented Dec 29, 2013

Hi James,
I got a MR running by using the getRowKey(..) to construct the rowkey .I will write up a simple document on the three use cases discussed above with simple MR jobs to help others if required.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants