Skip to content

Commit

Permalink
added filtering callback support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Payne committed Sep 28, 2011
1 parent 22f8f76 commit ef2c07b
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 177 deletions.
75 changes: 57 additions & 18 deletions src/Examples.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/
// no package

import java.util.List;
import java.util.ArrayList;

import org.hbase.async.HBaseClient;
Expand All @@ -34,6 +35,8 @@
import org.hbase.async.KeyValue;
import org.hbase.async.Scanner;

import com.stumbleupon.async.Callback;

//
// create 'hbase_async_test', {NAME => 'fam1'}, {NAME => 'fam2'}
//
Expand All @@ -44,10 +47,10 @@ public class Examples

public static void usage()
{
System.err.printf("usage:\n");
System.err.printf(" Examples <quorum> [-put <count>]\n");
System.err.printf(" Examples <quorum> [-get <row>]\n");
System.err.printf(" Examples <quorum> [-scan]\n");
System.err.println("usage:");
System.err.println(" Examples <quorum> -put <count>");
System.err.println(" Examples <quorum> -get <row> [-filter [keep]]");
System.err.println(" Examples <quorum> -scan [-filter [keep]]");
}

public static void main(String[] argv) throws Exception
Expand All @@ -61,13 +64,25 @@ public static void main(String[] argv) throws Exception

int result = 1;
HBaseClient client = new HBaseClient(quorum);

// create filter if specified
SimpleFilter filter = null;
for (int i = 0; i < argv.length; i++) {
String a = argv[i];
if (a.equals("-filter")) {
boolean keep = (++i < argv.length && argv[i].equals("keep"));
filter = new SimpleFilter(keep);
break;
}
}

try {
if (argv[1].equals("-put")) {
result = put(client, argv);
} else if (argv[1].equals("-get")) {
result = get(client, argv);
result = get(client, argv, filter);
} else if (argv[1].equals("-scan")) {
result = scan(client, argv);
result = scan(client, argv, filter);
} else {
System.err.printf("error: invalid arguments\n");
usage();
Expand All @@ -93,7 +108,7 @@ public static int put(HBaseClient client, String[] argv) throws Exception
client.put(new PutRequest(TABLE, "key" + i, "fam2", "col2", "value4"));
}
System.out.printf("put %,d values\n", n);

if (true) {
// REMIND: this should not be needed, but we need it because of a race condition in shutdown
Thread.sleep(5000);
Expand All @@ -102,20 +117,44 @@ public static int put(HBaseClient client, String[] argv) throws Exception
return 0;
}

public static int get(HBaseClient client, String[] argv) throws Exception
//
// Simple filter class which processes each KeyValue but does not include it in the results.
//
static class SimpleFilter implements Callback<KeyValue,KeyValue> {
int count;
boolean keep;
SimpleFilter(boolean keep) {
this.keep = keep;
}

public KeyValue call(KeyValue kv) {
print(true, count++, kv);
return keep ? kv : null;
}
}

public static int get(HBaseClient client, String[] argv, SimpleFilter filter) throws Exception
{
List<KeyValue> values = client.get(new GetRequest(TABLE, argv[2]), filter).join();
int count = 0;
for (KeyValue val : client.get(new GetRequest(TABLE, argv[2])).join()) {
print(count++, val);
for (KeyValue val : values) {
if (filter == null) {
print(false, count++, val);
} else {
// the filter prints them
}
}
System.out.printf("got %,d values\n", count);
return count > 0 ? 0 : 1;
System.out.printf("got back %,d values\n", values.size());
if (filter != null) {
System.out.printf("filter processed %,d values\n", filter.count);
}
return values.size() > 0 ? 0 : 1;
}

public static int scan(HBaseClient client, String[] argv) throws Exception
public static int scan(HBaseClient client, String[] argv, SimpleFilter filter) throws Exception
{
int count = 0;
Scanner scanner = client.newScanner(TABLE);
Scanner scanner = client.newScanner(TABLE, filter);
try {
for (;;) {
ArrayList<ArrayList<KeyValue>> results = scanner.nextRows(5).join();
Expand All @@ -124,19 +163,19 @@ public static int scan(HBaseClient client, String[] argv) throws Exception
}
for (ArrayList<KeyValue> values : results) {
for (KeyValue val : values) {
print(count++, val);
print(false, count++, val);
}
}
}
System.out.printf("found %,d values\n", count);
} finally {
scanner.close();
}
System.out.printf("found %,d values\n", count);
return count > 0 ? 0 : 1;
}

static void print(int count, KeyValue val)
static void print(boolean fromFilter, int count, KeyValue val)
{
System.out.printf("%4d: %s, %s, %s, %s, %s\n", count, TABLE, new String(val.key()), new String(val.family()), new String(val.qualifier()), new String(val.value()));
System.out.printf("%4d: %s%s, %s, %s, %s, %s\n", count, fromFilter ? "[filter] " : "", TABLE, new String(val.key()), new String(val.family()), new String(val.qualifier()), new String(val.value()));
}
}
40 changes: 39 additions & 1 deletion src/HBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,19 @@ public Deferred<ArrayList<KeyValue>> get(final GetRequest request) {
return sendRpcToRegion(request).addCallbacks(got, Callback.PASSTHROUGH);
}

/**
* Retrieves data from HBase and invokes a filter callback for each KeyValue
* as they are processed. If the filter returns the KeyValue it will
* be included in the final result otherwise it is dropped. This is also a
* way to process items as they come in and free up the memory as you go.
* @param request The {@code get} request.
* @param filter The {@code Callback} to invoke, null is OK.
*/
public Deferred<ArrayList<KeyValue>> get(final GetRequest request, Callback<KeyValue,KeyValue> filter) {
request.filteringCallback = filter;
return sendRpcToRegion(request).addCallbacks(got, Callback.PASSTHROUGH);
}

/** Singleton callback to handle responses of "get" RPCs. */
private static final Callback<ArrayList<KeyValue>, Object> got =
new Callback<ArrayList<KeyValue>, Object>() {
Expand All @@ -744,14 +757,39 @@ public Scanner newScanner(final byte[] table) {
return new Scanner(this, table);
}

/**
* Creates a new {@link Scanner} for a particular table with the specified
* filtering {@link Callback}.
* @param table The name of the table you intend to scan.
* @param filter The filtering Callback, null is OK.
* @return A new scanner for this table.
*/
public Scanner newScanner(final byte[] table, Callback<KeyValue,KeyValue> filter) {
Scanner s = new Scanner(this, table);
s.filteringCallback = filter;
return s;
}

/**
* Creates a new {@link Scanner} for a particular table.
* @param table The name of the table you intend to scan.
* The string is assumed to use the platform's default charset.
* @return A new scanner for this table.
*/
public Scanner newScanner(final String table) {
return new Scanner(this, table.getBytes());
return newScanner(table.getBytes(), null);
}

/**
* Creates a new {@link Scanner} for a particular table with the specified
* filtering {@link Callback}.
* @param table The name of the table you intend to scan.
* The string is assumed to use the platform's default charset.
* @param filter The filtering Callback, null is OK.
* @return A new scanner for this table.
*/
public Scanner newScanner(final String table, Callback<KeyValue,KeyValue> filter) {
return newScanner(table.getBytes(), filter);
}

/**
Expand Down
20 changes: 20 additions & 0 deletions src/HBaseRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

/**
Expand Down Expand Up @@ -293,6 +294,13 @@ public interface HasValue {
*/
byte attempt; // package-private for RegionClient and HBaseClient only.

/**
* A filtering Callback instance. If it is non-null it will be invoked for
* each KeyValue encountered during a scan or get. If the filter returns
* its argument, the KeyValue will be included in the results.
*/
Callback<KeyValue,KeyValue> filteringCallback;

/**
* Package private constructor for RPCs that aren't for any region.
* @param method The name of the method to invoke on the RegionServer.
Expand Down Expand Up @@ -391,6 +399,18 @@ boolean versionSensitive() {
return false;
}

/**
* Invokes the KeyValue filter, if present, to decide whether to keep the KeyValue
* and add it to the collection.
* @see RegionClient
*/
KeyValue invokeFilter(KeyValue kv) throws Exception {
if (filteringCallback != null) {
return filteringCallback.call(kv);
}
return kv;
}

public String toString() {
// Try to rightsize the buffer.
final StringBuilder buf = new StringBuilder(16 + method.length + 2
Expand Down
Loading

0 comments on commit ef2c07b

Please sign in to comment.