Skip to content

Commit

Permalink
add initial version of a TableInputFormat for hadoop integration, nee…
Browse files Browse the repository at this point in the history
…ds more work
  • Loading branch information
avh committed Sep 27, 2011
1 parent 653ca4b commit f02ca57
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ hbaseasync_SOURCES = \
src/TableNotFoundException.java \
src/UnknownRowLockException.java \
src/UnknownScannerException.java \
src/TableInputFormat.java \

hbaseasync_LIBADD := \
$(NETTY) \
$(SLF4J_API) \
$(ZOOKEEPER) \
$(SUASYNC) \
$(HADOOP) \

test_SOURCES = src/Test.java
unittest_SRC = \
Expand Down
289 changes: 289 additions & 0 deletions src/TableInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
* Copyright (c) 2010, 2011 StumbleUpon, Inc. All rights reserved.
* This file is part of Async HBase.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of the StumbleUpon nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package org.hbase.async;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.stumbleupon.async.Deferred;

// REMIND: need faster scanner, using large streaming reads and callbacks
// REMIND: support scanning multiple column families
// REMIND: support scanning time ranges

/**
* Input format for map reduce jobs.
*
* <ul>
* <li><b>hbase.async.cluster</b> - quorum spec for the cluster
* <li><b>hbase.mapreduce.inputtable</b> - the name of the input table
* <li><b>hbase.mapreduce.scan.families</b> - column families to scan, seperated by spaces or commas
* <li><b>hbase.mapreduce.scan.columns</b> - column to scan, prefixed by family, seperated by spaces or commas
* </ul>
*
* @author Arthur van Hoff
*/
public class TableInputFormat extends InputFormat<BytesWritable,List<KeyValue>> implements Configurable
{
private final static byte[] META = ".META.".getBytes();
private final static byte[] INFO = "info".getBytes();
private final static byte[] SERVER = "server".getBytes();

Configuration conf;

public @Override RecordReader<BytesWritable,List<KeyValue>> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException
{
return new TableRecordReader(conf, (TableSplit)split);
}

public @Override List<InputSplit> getSplits(JobContext context) throws IOException
{
List<InputSplit> list = new ArrayList<InputSplit>();
byte[] table = Bytes.UTF8(conf.get("hbase.mapreduce.inputtable"));
HBaseClient client = new HBaseClient(conf.get("hbase.async.cluster"));
try {
TableSplit prev = null;
final Scanner scanner = client.newScanner(META);
try {
scanner.setStartKey(table);
scanner.setFamily(INFO);
scanner.setQualifier(SERVER);

String prefix = new String(table) + ",";

try {
for (int b = 0 ; ; b++) {
Deferred<ArrayList<ArrayList<KeyValue>>> rows = scanner.nextRows(1000);
ArrayList<ArrayList<KeyValue>> results = rows.join();

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

You can skip the creation of the local variable rows and do directly scanner.nextRows().join(), this is a lot more concise (as far as conciseness exists in Java).

If you always give the same number of rows to nextRows(), you may as well just call setMaxNumRows(1000) once out of the loop and then call nextRows() without argument.

if (results == null || results.size() == 0) {

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

Technically, this should only check against null as the API documentation says. Checking against the empty array will break with Jonathan's subsequent changes.

break;
}
for (ArrayList<KeyValue> v : results) {
for (KeyValue kv : v) {
byte[] key = kv.key();
int i = 0;
for (; i < table.length && table[i] == key[i] ; i++);
if (i == table.length && key[i] == ',') {
int j = key.length - 1;
for (i++; key[j] != ',' ; j--);
TableSplit next = new TableSplit();
next.table = table;
String hostport = new String(kv.value());
next.host = hostport.substring(0, hostport.indexOf(':'));
if (j - i > 0) {
next.start = new byte[j - i];
System.arraycopy(key, i, next.start, 0, next.start.length);
}
if (prev != null) {
prev.end = next.start;
}
list.add(next);
prev = next;
}

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

It would be nice if the client itself had a bit of API to tell you what are the region boundaries in between a certain key range. This way you could call this API here.

}
}
}
} catch (Exception e) {
throw new IOException(e);
}

} finally {
try {
scanner.close().join();
} catch (Exception e) {
throw new IOException(e);
}
}

} finally {
client.shutdown();

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

Because your code is synchronous, you need to .join() the Deferred returned by shutdown().

}
return list;
}

public @Override Configuration getConf()
{
return conf;
}
public @Override void setConf(Configuration conf)
{
this.conf = conf;
}

//
// Table split
//
public static class TableSplit extends InputSplit implements Writable, Comparable<TableSplit>
{
public byte[] table;

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

public? not final?

public byte[] start;
public byte[] end;
public String host;

public @Override String[] getLocations()
{
return new String[] {host};
}
public @Override long getLength()
{
return 0;
}
private byte[] readByteArray(DataInput in) throws IOException
{
byte[] data = new byte[in.readInt()];
in.readFully(data);
return data;
}
public @Override void readFields(DataInput in) throws IOException
{
table = readByteArray(in);
start = readByteArray(in);
end = readByteArray(in);
host = new String(readByteArray(in));
}
private void writeByteArray(DataOutput out, byte[] data) throws IOException
{
out.writeInt(data.length);
out.write(data, 0, data.length);
}
public @Override void write(DataOutput out) throws IOException
{
writeByteArray(out, table);
writeByteArray(out, start);
writeByteArray(out, end);
writeByteArray(out, Bytes.UTF8(host));
}
public @Override String toString()
{
return String.format("TableSplit[%s,%s,%s,%s]", new String(table), new String(start), new String(end), host);
}
public @Override int compareTo(TableSplit split)
{
return Bytes.memcmp(start, split.start);
}
public @Override boolean equals(Object other)
{
if (other instanceof TableSplit) {
TableSplit o = (TableSplit)other;
return Bytes.equals(table, o.table) && Bytes.equals(start, o.start) && Bytes.equals(end, o.end) && host.equals(o.host);
}
return false;
}
}

//
// Record Reader
//
public static class TableRecordReader extends RecordReader<BytesWritable,List<KeyValue>>
{
HBaseClient client;
byte[] table;
TableSplit split;
Scanner scanner;
int chunkSize;
BytesWritable key;
int index;
ArrayList<ArrayList<KeyValue>> values;
ArrayList<KeyValue> current;

TableRecordReader(Configuration conf, TableSplit split) throws IOException
{
this.client = new HBaseClient(conf.get("hbase.async.cluster"));
this.chunkSize = Integer.valueOf(conf.get("hbase.async.chunk.size", "1000"));
this.table = Bytes.UTF8(conf.get("hbase.mapreduce.inputtable"));
this.split = split;
this.scanner = client.newScanner(table);
this.key = new BytesWritable();
// REMIND: set additional constraints
if (conf.get("hbase.mapreduce.scan.family") != null) {
for (String fam : conf.get("hbase.mapreduce.scan.families").split("[ ,]")) {
scanner.setFamily(fam);
}
}
if (conf.get("hbase.mapreduce.scan.columns") != null) {
for (String col : conf.get("hbase.mapreduce.scan.columns").split("[ ,]")) {
//scanner.addFilter(col);
scanner.setFamily(col.substring(0, col.indexOf(':')));
scanner.setQualifier(col.substring(col.indexOf(':')+1));
}
}
}
public @Override void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException
{
}
public @Override BytesWritable getCurrentKey() throws IOException
{
return key;
}
public @Override List<KeyValue> getCurrentValue() throws IOException, InterruptedException
{
return current;
}
public @Override boolean nextKeyValue() throws IOException, InterruptedException
{
if (values == null || index == values.size()) {
try {
index = 0;
values = scanner.nextRows(chunkSize).join();

This comment has been minimized.

Copy link
@tsuna

tsuna Sep 29, 2011

Similarly, no need to pass chunkSize all the time to nextRows().

} catch (Exception e) {
throw new IOException(e);
}
}
if (values != null && index < values.size()) {
current = values.get(index++);
key = new BytesWritable(current.get(0).key());
return true;
}
return false;
}
public @Override float getProgress()
{
return 0f;
}
public @Override void close()
{
try {
client.shutdown().join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
1 change: 1 addition & 0 deletions third_party/hadoop/hadoop-core-0.20.2.jar.md5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
18ca8fbbac2df8f438885ac4eb5a22df
33 changes: 33 additions & 0 deletions third_party/hadoop/include.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) 2011 StumbleUpon, Inc. All rights reserved.
# This file is part of Async HBase.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# - Neither the name of the StumbleUpon nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

HADOOP_VERSION := 0.20.2
HADOOP := third_party/hadoop/hadoop-core-$(HADOOP_VERSION).jar
HADOOP_BASE_URL := http://hadoop-via-maven.googlecode.com/svn-history/r3/trunk/repo/org/apache/hadoop/hadoop-core/0.20.2/

$(HADOOP): $(HADOOP).md5
set dummy "$(HADOOP_BASE_URL)" "$(HADOOP)"; shift; $(FETCH_DEPENDENCY)

THIRD_PARTY += $(HADOOP)
1 change: 1 addition & 0 deletions third_party/include.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ include third_party/powermock/include.mk
include third_party/slf4j/include.mk
include third_party/suasync/include.mk
include third_party/zookeeper/include.mk
include third_party/hadoop/include.mk

0 comments on commit f02ca57

Please sign in to comment.