diff --git a/src/Scanner.java b/src/Scanner.java
index 9d8dd983..7585afde 100644
--- a/src/Scanner.java
+++ b/src/Scanner.java
@@ -29,7 +29,12 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.hbase.async.generated.HBasePB;
+import org.hbase.async.generated.MapReducePB;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
@@ -198,6 +203,12 @@ public final class Scanner {
private boolean moreRows;
private boolean scannerClosedOnServer;
+ private boolean scan_metrics_enabled = false;
+
+ private long last_next_timestamp = System.currentTimeMillis();
+
+ private ScanMetrics scanMetrics = new ScanMetrics();
+
/**
* Constructor.
* This byte array will NOT be copied.
@@ -701,6 +712,18 @@ public void setTimeRange(final long min_timestamp, final long max_timestamp) {
this.max_timestamp = max_timestamp;
}
+ public void setScanMetricsEnabled(final boolean enabled) {
+ scan_metrics_enabled = enabled;
+ }
+
+ public boolean isScanMetricsEnabled() {
+ return scan_metrics_enabled;
+ }
+
+ public ScanMetrics getScanMetrics() {
+ return scanMetrics;
+ }
+
/**
* Scans a number of rows. Calling this method is equivalent to:
*
@@ -740,6 +763,7 @@ public Deferred>> nextRows() {
if (region == DONE) { // We're already done scanning.
return Deferred.fromResult(null);
} else if (region == null) { // We need to open the scanner first.
+ incRPCcallsMetrics();
if (this.isReversed() && !this.isFirstReverseRegion()){
return client.openReverseScanner(this)
.addCallbackDeferring(opened_scanner);
@@ -754,6 +778,7 @@ public Deferred>> nextRows() {
if(scannerClosedOnServer) {
return scanFinished(moreRows);
}
+ incRPCcallsMetrics();
// Need to silence this warning because the callback `got_next_row'
// declares its return type to be Object, because its return value
// may or may not be deferred.
@@ -770,6 +795,8 @@ public Deferred>> nextRows() {
opened_scanner =
new Callback>>, Object>() {
public Deferred>> call(final Object arg) {
+ long currentTime = System.currentTimeMillis();
+ updateSumOfMillisSecBetweenNexts(currentTime);
final Response resp;
if (arg instanceof Long) {
scanner_id = (Long) arg;
@@ -788,9 +815,11 @@ public Deferred>> call(final Object arg) {
LOG.debug("Scanner " + Bytes.hex(scanner_id) + " opened on " + region);
}
if (resp != null) {
+ updateServerSideMetrics(resp.metrics);
if (resp.rows == null) {
return scanFinished(!resp.more);
}
+ updateResultsMetrics(resp.rows);
return Deferred.fromResult(resp.rows);
}
return nextRows(); // Restart the call.
@@ -809,6 +838,8 @@ public String toString() {
private final Callback got_next_row =
new Callback() {
public Object call(final Object response) {
+ long currentTime = System.currentTimeMillis();
+ updateSumOfMillisSecBetweenNexts(currentTime);
ArrayList> rows = null;
Response resp = null;
if (response instanceof Response) { // HBase 0.95 and up
@@ -816,6 +847,7 @@ public Object call(final Object response) {
rows = resp.rows;
scannerClosedOnServer = resp.scannerClosedOnServer;
moreRows = resp.more;
+ updateServerSideMetrics(resp.metrics);
} else if (response instanceof ArrayList) { // HBase 0.94 and before.
@SuppressWarnings("unchecked") // I 3>> generics.
final ArrayList> r =
@@ -829,6 +861,8 @@ public Object call(final Object response) {
return scanFinished(resp != null && !resp.more);
}
+ updateResultsMetrics(rows);
+
final ArrayList lastrow = rows.get(rows.size() - 1);
start_key = lastrow.get(0).key();
return rows;
@@ -844,9 +878,13 @@ public String toString() {
private final Callback nextRowErrback() {
return new Callback() {
public Object call(final Object error) {
+ long currentTime = System.currentTimeMillis();
+ updateSumOfMillisSecBetweenNexts(currentTime);
final RegionInfo old_region = region; // Save before invalidate().
invalidate(); // If there was an error, don't assume we're still OK.
if (error instanceof NotServingRegionException) {
+ incCountOfNSRE();
+ incCountOfRPCRetries();
// We'll resume scanning on another region, and we want to pick up
// right after the last key we successfully returned. Padding the
// last key with an extra 0 gives us the next possible key.
@@ -871,6 +909,7 @@ public Object call(final Object error) {
+ " been holding the scanner open and idle for too long (possibly"
+ " due to a long GC pause on your side or in the RegionServer)",
error);
+ incCountOfRPCRetries();
// Let's re-open ourselves and keep scanning.
return nextRows(); // XXX dangerous endless retry
}
@@ -894,6 +933,7 @@ public Deferred close() {
if (region == null || region == DONE) {
return Deferred.fromResult(null);
}
+ incRPCcallsMetrics();
return client.closeScanner(this).addBoth(closedCallback());
}
@@ -901,6 +941,8 @@ public Deferred close() {
private Callback closedCallback() {
return new Callback() {
public Object call(Object arg) {
+ long currentTime = System.currentTimeMillis();
+ updateSumOfMillisSecBetweenNexts(currentTime);
if (arg instanceof Exception) {
final Exception error = (Exception) arg;
// NotServingRegionException:
@@ -992,6 +1034,7 @@ private Deferred>> continueScanOnNextRegion() {
LOG.debug("Scanner " + Bytes.hex(old_scanner_id) + " done scanning "
+ old_region);
}
+ incRPCcallsMetrics();
client.closeScanner(this).addCallback(new Callback() {
public Object call(final Object arg) {
if(LOG.isDebugEnabled()) {
@@ -1015,6 +1058,7 @@ public String toString() {
scanner_id = 0xDEAD000AA000DEADL; // Make debugging easier.
invalidate();
+ incCountOfRegions();
return nextRows();
}
@@ -1127,7 +1171,7 @@ RegionInfo currentRegion() {
*/
HBaseRpc getNextRowsRequest() {
if (get_next_rows_request == null) {
- get_next_rows_request = new GetNextRowsRequest();
+ get_next_rows_request = new GetNextRowsRequest().withMetricsEnabled(this.isScanMetricsEnabled());
}
return get_next_rows_request;
}
@@ -1136,7 +1180,7 @@ HBaseRpc getNextRowsRequest() {
* Returns an RPC to open this scanner.
*/
HBaseRpc getOpenRequest() {
- return new OpenScannerRequest();
+ return new OpenScannerRequest().withMetricsEnabled(this.isScanMetricsEnabled());
}
/**
@@ -1146,7 +1190,7 @@ HBaseRpc getOpenRequest() {
* @param region_start_key region's start key
*/
HBaseRpc getOpenRequestForReverseScan(final byte[] region_start_key) {
- return new OpenScannerRequest(table, region_start_key);
+ return new OpenScannerRequest(table, region_start_key).withMetricsEnabled(this.isScanMetricsEnabled());
}
/**
@@ -1187,6 +1231,8 @@ final static class Response {
private final boolean scannerClosedOnServer;
+ private final Map metrics;
+
Response(final long scanner_id,
final ArrayList> rows,
final boolean more, final boolean scannerClosedOnServer) {
@@ -1194,6 +1240,18 @@ final static class Response {
this.rows = rows;
this.more = more;
this.scannerClosedOnServer = scannerClosedOnServer;
+ this.metrics = new HashMap();
+ }
+
+ Response(final long scanner_id,
+ final ArrayList> rows,
+ final boolean more, final boolean scannerClosedOnServer,
+ final Map metrics) {
+ this.scanner_id = scanner_id;
+ this.rows = rows;
+ this.more = more;
+ this.scannerClosedOnServer = scannerClosedOnServer;
+ this.metrics = metrics;
}
public String toString() {
@@ -1255,6 +1313,8 @@ private ArrayList> getRows(final ScanResponse resp,
*/
final class OpenScannerRequest extends HBaseRpc {
+ boolean metrics_enabled = false;
+
/**
* Default constructor that is used for every forward Scanner and
* for the first Scanner in a reverse Scan
@@ -1385,6 +1445,7 @@ ChannelBuffer serialize(final byte server_version) {
.setRegion(region.toProtobuf())
.setScan(scan.build())
.setNumberOfRows(max_num_rows)
+ .setTrackScanMetrics(metrics_enabled)
// Hardcoded these parameters to false since AsyncHBase cannot support them
.setClientHandlesHeartbeats(false)
.setClientHandlesPartials(false)
@@ -1463,10 +1524,38 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
throw new InvalidResponseException("Scan RPC response doesn't contain a"
+ " scanner ID", resp);
}
+ Map metrics = getServerSideScanMetrics(resp);
final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
return new Response(resp.getScannerId(),
getRows(resp, buf, cell_size),
- resp.getMoreResults(), scannerClosedOnServer);
+ resp.getMoreResults(), scannerClosedOnServer, metrics);
+ }
+
+ public OpenScannerRequest withMetricsEnabled(boolean enabled) {
+ this.metrics_enabled = enabled;
+ return this;
+ }
+
+ private Map getServerSideScanMetrics(ScanResponse response) {
+ Map metricMap = new HashMap();
+ if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+ return metricMap;
+ }
+
+ MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+ int numberOfMetrics = metrics.getMetricsCount();
+ for (int i = 0; i < numberOfMetrics; i++) {
+ HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+ if (metricPair != null) {
+ String name = metricPair.getName();
+ Long value = metricPair.getValue();
+ if (name != null && value != null) {
+ metricMap.put(name, value);
+ }
+ }
+ }
+
+ return metricMap;
}
public String toString() {
@@ -1482,6 +1571,8 @@ public String toString() {
*/
final class GetNextRowsRequest extends HBaseRpc {
+ boolean metrics_enabled = false;
+
@Override
byte[] method(final byte server_version) {
return (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE
@@ -1503,6 +1594,7 @@ ChannelBuffer serialize(final byte server_version) {
final ScanRequest req = ScanRequest.newBuilder()
.setScannerId(scanner_id)
.setNumberOfRows(max_num_rows)
+ .setTrackScanMetrics(metrics_enabled)
// Hardcoded these parameters to false since AsyncHBase cannot support them
.setClientHandlesHeartbeats(false)
.setClientHandlesPartials(false)
@@ -1523,8 +1615,36 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
if (rows == null) {
return null;
}
+ Map metrics = getServerSideScanMetrics(resp);
final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
- return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer);
+ return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer, metrics);
+ }
+
+ public GetNextRowsRequest withMetricsEnabled(boolean enabled) {
+ this.metrics_enabled = enabled;
+ return this;
+ }
+
+ private Map getServerSideScanMetrics(ScanResponse response) {
+ Map metricMap = new HashMap();
+ if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+ return metricMap;
+ }
+
+ MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+ int numberOfMetrics = metrics.getMetricsCount();
+ for (int i = 0; i < numberOfMetrics; i++) {
+ HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+ if (metricPair != null) {
+ String name = metricPair.getName();
+ Long value = metricPair.getValue();
+ if (name != null && value != null) {
+ metricMap.put(name, value);
+ }
+ }
+ }
+
+ return metricMap;
}
public String toString() {
@@ -1597,4 +1717,169 @@ public String toString() {
}
+ private void updateServerSideMetrics(Map metrics) {
+ if (!metrics.isEmpty()) {
+ for (Map.Entry e : metrics.entrySet()) {
+ if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)) {
+ scanMetrics.count_of_rows_scanned += e.getValue();
+ } else if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)) {
+ scanMetrics.count_of_rows_filtered += e.getValue();
+ }
+ }
+ }
+ }
+
+ private void incRPCcallsMetrics() {
+ if (isScanMetricsEnabled()) {
+ this.scanMetrics.count_of_rpc_calls += 1;
+ }
+ }
+
+ private void updateResultsMetrics(ArrayList> rows) {
+ if (isScanMetricsEnabled()) {
+ long resultSize = 0;
+ for (ArrayList row: rows) {
+ for (KeyValue cell: row) {
+ resultSize += cell.predictSerializedSize();
+ }
+ }
+ this.scanMetrics.count_of_bytes_in_results += resultSize;
+ }
+ }
+
+ private void incCountOfNSRE() {
+ if (isScanMetricsEnabled()) {
+ this.scanMetrics.count_of_nsre += 1;
+ }
+ }
+
+ private void incCountOfRPCRetries() {
+ if (isScanMetricsEnabled()) {
+ this.scanMetrics.count_of_rpc_retries += 1;
+ }
+ }
+
+ private void incCountOfRegions() {
+ if (isScanMetricsEnabled()) {
+ this.scanMetrics.count_of_regions += 1;
+ }
+ }
+
+ private void updateSumOfMillisSecBetweenNexts(long currentTime) {
+ if (isScanMetricsEnabled()) {
+ this.scanMetrics.sum_of_millis_sec_between_nexts += (currentTime - last_next_timestamp);
+ last_next_timestamp = currentTime;
+ }
+ }
+
+ /**
+ * Server-side metrics.
+ * Only supported for HBase 0.95 and above.
+ */
+ private static class ServerSideScanMetrics {
+
+ public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+ public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+
+ protected long count_of_rows_scanned = 0;
+ protected long count_of_rows_filtered = 0;
+
+ /**
+ * Number of rows scanned during scan RPC.
+ * Not every row scanned will be returned to the client
+ * since rows may be filtered.
+ * Always returns 0 if HBase < 0.95.
+ */
+ public long getCountOfRowsScanned() { return count_of_rows_scanned; };
+
+ /**
+ * Number of rows filtered during scan RPC.
+ * Always returns 0 if HBase < 0.95.
+ */
+ public long getCountOfRowsFiltered() { return count_of_rows_filtered; }
+
+ }
+
+ /**
+ * Client-side metrics.
+ *
+ * This class is immutable. You can get updated values by calling
+ * this function again once RPC is completed.
+ * This class is not synchronized because
+ * fields of this class is updated only inside the Deferred callbacks
+ * by {@code Scanner}.
+ *
+ */
+ public static class ScanMetrics extends ServerSideScanMetrics {
+
+ public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS";
+ public static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS";
+ public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION";
+ public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS";
+ public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED";
+ public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES";
+
+ private long count_of_rpc_calls = 0;
+
+ private long sum_of_millis_sec_between_nexts = 0;
+
+ private long count_of_nsre = 0;
+
+ private long count_of_bytes_in_results = 0;
+
+ private long count_of_rpc_retries = 0;
+
+ /**
+ * Starts with 1 because it is incremented when a scanner switches to a next region.
+ */
+ private long count_of_regions = 1;
+
+ /**
+ * Number of RPC calls.
+ */
+ public long getCountOfRPCcalls() { return count_of_rpc_calls; }
+
+ /**
+ * Sum of milliseconds between sequential next calls.
+ */
+ public long getSumOfMillisSecBetweenNexts() { return sum_of_millis_sec_between_nexts; }
+
+ /**
+ * Number of NotServingRegionException caught.
+ */
+ public long getCountOfNSRE() { return count_of_nsre; }
+
+ /**
+ * Number of bytes in Result objects from region servers.
+ */
+ public long getCountOfBytesInResults() { return count_of_bytes_in_results; }
+
+ /**
+ * Number of regions.
+ */
+ public long getCountOfRegions() { return count_of_regions; };
+
+ /**
+ * number of RPC retries
+ */
+ public long getCountOfRPCRetries() { return count_of_rpc_retries; }
+
+ public ScanMetrics() {
+ }
+
+ public Map getMetricsMap() {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ builder.put(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, count_of_rows_scanned);
+ builder.put(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, count_of_rows_filtered);
+ builder.put(RPC_CALLS_METRIC_NAME, count_of_rpc_calls);
+ builder.put(MILLIS_BETWEEN_NEXTS_METRIC_NAME, sum_of_millis_sec_between_nexts);
+ builder.put(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME, count_of_nsre);
+ builder.put(BYTES_IN_RESULTS_METRIC_NAME, count_of_bytes_in_results);
+ builder.put(REGIONS_SCANNED_METRIC_NAME, count_of_regions);
+ builder.put(RPC_RETRIES_METRIC_NAME, count_of_rpc_retries);
+ return builder.build();
+ }
+ }
+
+
}
diff --git a/test/TestIntegration.java b/test/TestIntegration.java
index c04c9bfd..9219de24 100644
--- a/test/TestIntegration.java
+++ b/test/TestIntegration.java
@@ -33,7 +33,6 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -2175,6 +2174,163 @@ public void reverseAndForwardScanMoreThanMaxKVs() throws Exception{
}
+ /** Scan metrics tests. */
+ @Test
+ public void scanMetrics() throws Exception {
+ final String table6 = args[0] + "6";
+ createOrTruncateTable(client, table6, family);
+
+ // Split into 4 regions.
+ splitTable(table6, "aaa");
+ splitTable(table6, "bbb");
+ splitTable(table6, "ccc");
+ alterTableStatus(table6);
+
+ // Put 5 rows to 3 regions.
+ client.setFlushInterval(FAST_FLUSH);
+ final PutRequest put_a1 = new PutRequest(table6, "aaa1", family, "testQualifier", "testValue");
+ client.put(put_a1).join();
+ client.setFlushInterval(FAST_FLUSH);
+ final PutRequest put_a2 = new PutRequest(table6, "aaa2", family, "testQualifier", "testValue");
+ client.put(put_a2).join();
+ final PutRequest put_b1 = new PutRequest(table6, "bbb1", family, "testQualifier", "testValue");
+ client.put(put_b1).join();
+ final PutRequest put_b2 = new PutRequest(table6, "bbb2", family, "testQualifier", "testValue");
+ client.put(put_b2).join();
+ final PutRequest put3 = new PutRequest(table6, "ccc", family, "testQualifier", "testValue");
+ client.put(put3).join();
+
+ // Create forward scanner with metrics enabled.
+ Scanner scanner = client.newScanner(table6);
+ scanner.setScanMetricsEnabled(true);
+ scanner.setStartKey("aaa");
+ scanner.setStopKey("ddd");
+
+ long prevBytesInResult = 0;
+
+ ArrayList> row_a1 = scanner.nextRows(1).join();
+ assertSizeIs(1, row_a1);
+ Scanner.ScanMetrics metrics_a1 = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 1, metrics_a1.getCountOfRegions());
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_a1.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned
+ }
+ assertEquals("incorrect count of RPC calls", 1, metrics_a1.getCountOfRPCcalls()); // + 1 open
+ assertTrue("incorrect count of bytes in results", metrics_a1.getCountOfBytesInResults() > prevBytesInResult);
+ prevBytesInResult = metrics_a1.getCountOfBytesInResults();
+
+ ArrayList> row_a2 = scanner.nextRows(1).join();
+ assertSizeIs(1, row_a2);
+ Scanner.ScanMetrics metrics_a2 = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 1, metrics_a2.getCountOfRegions());
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_a2.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 2, metrics_a2.getCountOfRowsScanned()); // + 1 row scanned
+ }
+ assertEquals("incorrect count of RPC calls", 2, metrics_a2.getCountOfRPCcalls()); // + 1 next
+ assertTrue("incorrect count of bytes in results", metrics_a2.getCountOfBytesInResults() > prevBytesInResult);
+ prevBytesInResult = metrics_a2.getCountOfBytesInResults();
+
+ ArrayList> row_b1 = scanner.nextRows(1).join();
+ assertSizeIs(1, row_b1);
+ Scanner.ScanMetrics metrics_b1 = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 2, metrics_b1.getCountOfRegions()); // + next region
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_b1.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 3, metrics_b1.getCountOfRowsScanned()); // + 1 row scanned
+ }
+ assertEquals("incorrect count of RPC calls", 5, metrics_b1.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open
+ assertTrue("incorrect count of bytes in results", metrics_b1.getCountOfBytesInResults() > prevBytesInResult);
+ prevBytesInResult = metrics_b1.getCountOfBytesInResults();
+
+ ArrayList> row_b2 = scanner.nextRows(1).join();
+ assertSizeIs(1, row_b2);
+ Scanner.ScanMetrics metrics_b2 = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 2, metrics_b2.getCountOfRegions());
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_b2.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 4, metrics_b2.getCountOfRowsScanned()); // + 1 row scanned
+ }
+ assertEquals("incorrect count of RPC calls", 6, metrics_b2.getCountOfRPCcalls()); // + 1 next
+ assertTrue("incorrect count of bytes in results", metrics_b2.getCountOfBytesInResults() > prevBytesInResult);
+ prevBytesInResult = metrics_b2.getCountOfBytesInResults();
+
+ ArrayList> row_c = scanner.nextRows().join();
+ assertSizeIs(1, row_c);
+ Scanner.ScanMetrics metrics_c = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 3, metrics_c.getCountOfRegions()); // + next region
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_c.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 5, metrics_c.getCountOfRowsScanned()); // + 1 row scanned
+ }
+ assertEquals("incorrect count of RPC calls", 9, metrics_c.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open
+ assertTrue("incorrect count of bytes in results", metrics_c.getCountOfBytesInResults() > prevBytesInResult);
+ prevBytesInResult = metrics_c.getCountOfBytesInResults();
+
+ scanner.close().join();
+ Scanner.ScanMetrics metrics_final = scanner.getScanMetrics();
+ assertEquals("incorrect count of regions", 3, metrics_final.getCountOfRegions());
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics_final.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 5, metrics_final.getCountOfRowsScanned());
+ }
+ assertEquals("incorrect count of RPC calls", 10, metrics_final.getCountOfRPCcalls()); // + 1 close
+ assertTrue("incorrect count of bytes in results", metrics_final.getCountOfBytesInResults() == prevBytesInResult);
+ }
+
+ /** Scan metrics of filtered rows tests. */
+ @Test
+ public void scanMetricsWithFilter() throws Exception {
+ client.setFlushInterval(FAST_FLUSH);
+ // Keep only rows with a column qualifier that starts with "qa".
+ final PutRequest put1 = new PutRequest(table, "cpf1", family, "qa1", "v1");
+ final PutRequest put2 = new PutRequest(table, "cpf2", family, "qb2", "v2");
+ final PutRequest put3 = new PutRequest(table, "cpf3", family, "qb3", "v3");
+ final PutRequest put4 = new PutRequest(table, "cpf4", family, "qa4", "v4");
+ Deferred.group(Deferred.group(client.put(put1), client.put(put2)),
+ Deferred.group(client.put(put3), client.put(put4))).join();
+ final Scanner scanner = client.newScanner(table);
+ scanner.setFamily(family);
+ scanner.setStartKey("cpf1");
+ scanner.setStopKey("cpf5");
+ scanner.setFilter(new ColumnPrefixFilter("qa"));
+ scanner.setScanMetricsEnabled(true);
+ final ArrayList> rows = scanner.nextRows().join();
+
+ assertSizeIs(2, rows);
+ ArrayList kvs1 = rows.get(0);
+ assertSizeIs(1, kvs1);
+ assertEq("v1", kvs1.get(0).value());
+ ArrayList kvs4 = rows.get(1);
+ assertSizeIs(1, kvs4);
+ assertEq("v4", kvs4.get(0).value());
+
+ Scanner.ScanMetrics metrics = scanner.getScanMetrics();
+ // server-side metrics works for HBase > 0.94 only
+ if (metrics.getCountOfRowsScanned() == 0) {
+ LOG.warn("Skipping a server-side metrics check for HBase < 0.95");
+ } else {
+ assertEquals("incorrect count of rows scanned", 4, metrics.getCountOfRowsScanned());
+ assertEquals("incorrect count of rows filtered", 2, metrics.getCountOfRowsFiltered());
+ }
+ assertEquals("incorrect count of RPC calls", 1, metrics.getCountOfRPCcalls()); // 1 open
+
+ scanner.close().join();
+ }
+
/** Regression test for issue #2. */
@Test
public void regression2() throws Exception {