diff --git a/src/Scanner.java b/src/Scanner.java index 9d8dd983..7585afde 100644 --- a/src/Scanner.java +++ b/src/Scanner.java @@ -29,7 +29,12 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import org.hbase.async.generated.HBasePB; +import org.hbase.async.generated.MapReducePB; import org.jboss.netty.buffer.ChannelBuffer; import org.slf4j.Logger; @@ -198,6 +203,12 @@ public final class Scanner { private boolean moreRows; private boolean scannerClosedOnServer; + private boolean scan_metrics_enabled = false; + + private long last_next_timestamp = System.currentTimeMillis(); + + private ScanMetrics scanMetrics = new ScanMetrics(); + /** * Constructor. * This byte array will NOT be copied. @@ -701,6 +712,18 @@ public void setTimeRange(final long min_timestamp, final long max_timestamp) { this.max_timestamp = max_timestamp; } + public void setScanMetricsEnabled(final boolean enabled) { + scan_metrics_enabled = enabled; + } + + public boolean isScanMetricsEnabled() { + return scan_metrics_enabled; + } + + public ScanMetrics getScanMetrics() { + return scanMetrics; + } + /** * Scans a number of rows. Calling this method is equivalent to: *
@@ -740,6 +763,7 @@ public Deferred>> nextRows() {
     if (region == DONE) {  // We're already done scanning.
       return Deferred.fromResult(null);
     } else if (region == null) {  // We need to open the scanner first.
+      incRPCcallsMetrics();
       if (this.isReversed() && !this.isFirstReverseRegion()){
         return client.openReverseScanner(this)
                 .addCallbackDeferring(opened_scanner);
@@ -754,6 +778,7 @@ public Deferred>> nextRows() {
     if(scannerClosedOnServer) {
       return scanFinished(moreRows);
     }
+    incRPCcallsMetrics();
     // Need to silence this warning because the callback `got_next_row'
     // declares its return type to be Object, because its return value
     // may or may not be deferred.
@@ -770,6 +795,8 @@ public Deferred>> nextRows() {
     opened_scanner =
       new Callback>>, Object>() {
           public Deferred>> call(final Object arg) {
+            long currentTime = System.currentTimeMillis();
+            updateSumOfMillisSecBetweenNexts(currentTime);
             final Response resp;
             if (arg instanceof Long) {
               scanner_id = (Long) arg;
@@ -788,9 +815,11 @@ public Deferred>> call(final Object arg) {
               LOG.debug("Scanner " + Bytes.hex(scanner_id) + " opened on " + region);
             }
             if (resp != null) {
+              updateServerSideMetrics(resp.metrics);
               if (resp.rows == null) {
                 return scanFinished(!resp.more);
               }
+              updateResultsMetrics(resp.rows);
               return Deferred.fromResult(resp.rows);
             }
             return nextRows();  // Restart the call.
@@ -809,6 +838,8 @@ public String toString() {
   private final Callback got_next_row =
     new Callback() {
       public Object call(final Object response) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         ArrayList> rows = null;
         Response resp = null;
         if (response instanceof Response) {  // HBase 0.95 and up
@@ -816,6 +847,7 @@ public Object call(final Object response) {
           rows = resp.rows;
           scannerClosedOnServer = resp.scannerClosedOnServer;
           moreRows = resp.more;
+          updateServerSideMetrics(resp.metrics);
         } else if (response instanceof ArrayList) {  // HBase 0.94 and before.
           @SuppressWarnings("unchecked")  // I 3>> generics.
           final ArrayList> r =
@@ -829,6 +861,8 @@ public Object call(final Object response) {
           return scanFinished(resp != null && !resp.more);
         }
 
+        updateResultsMetrics(rows);
+
         final ArrayList lastrow = rows.get(rows.size() - 1);
         start_key = lastrow.get(0).key();
         return rows;
@@ -844,9 +878,13 @@ public String toString() {
   private final Callback nextRowErrback() {
     return new Callback() {
       public Object call(final Object error) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         final RegionInfo old_region = region;  // Save before invalidate().
         invalidate();  // If there was an error, don't assume we're still OK.
         if (error instanceof NotServingRegionException) {
+          incCountOfNSRE();
+          incCountOfRPCRetries();
           // We'll resume scanning on another region, and we want to pick up
           // right after the last key we successfully returned.  Padding the
           // last key with an extra 0 gives us the next possible key.
@@ -871,6 +909,7 @@ public Object call(final Object error) {
             + " been holding the scanner open and idle for too long (possibly"
             + " due to a long GC pause on your side or in the RegionServer)",
             error);
+          incCountOfRPCRetries();
           // Let's re-open ourselves and keep scanning.
           return nextRows();  // XXX dangerous endless retry
         }
@@ -894,6 +933,7 @@ public Deferred close() {
     if (region == null || region == DONE) {
       return Deferred.fromResult(null);
     }
+    incRPCcallsMetrics();
     return client.closeScanner(this).addBoth(closedCallback());
   }
 
@@ -901,6 +941,8 @@ public Deferred close() {
   private Callback closedCallback() {
     return new Callback() {
       public Object call(Object arg) {
+        long currentTime = System.currentTimeMillis();
+        updateSumOfMillisSecBetweenNexts(currentTime);
         if (arg instanceof Exception) {
           final Exception error = (Exception) arg;
           // NotServingRegionException:
@@ -992,6 +1034,7 @@ private Deferred>> continueScanOnNextRegion() {
       LOG.debug("Scanner " + Bytes.hex(old_scanner_id) + " done scanning "
               + old_region);
     }
+    incRPCcallsMetrics();
     client.closeScanner(this).addCallback(new Callback() {
       public Object call(final Object arg) {
         if(LOG.isDebugEnabled()) {
@@ -1015,6 +1058,7 @@ public String toString() {
 
     scanner_id = 0xDEAD000AA000DEADL;   // Make debugging easier.
     invalidate();
+    incCountOfRegions();
     return nextRows();
   }
 
@@ -1127,7 +1171,7 @@ RegionInfo currentRegion() {
    */
   HBaseRpc getNextRowsRequest() {
     if (get_next_rows_request == null) {
-      get_next_rows_request = new GetNextRowsRequest();
+      get_next_rows_request = new GetNextRowsRequest().withMetricsEnabled(this.isScanMetricsEnabled());
     }
     return get_next_rows_request;
   }
@@ -1136,7 +1180,7 @@ HBaseRpc getNextRowsRequest() {
    * Returns an RPC to open this scanner.
    */
   HBaseRpc getOpenRequest() {
-    return new OpenScannerRequest();
+    return new OpenScannerRequest().withMetricsEnabled(this.isScanMetricsEnabled());
   }
 
   /**
@@ -1146,7 +1190,7 @@ HBaseRpc getOpenRequest() {
    * @param region_start_key region's start key
    */
   HBaseRpc getOpenRequestForReverseScan(final byte[] region_start_key) {
-    return new OpenScannerRequest(table, region_start_key);
+    return new OpenScannerRequest(table, region_start_key).withMetricsEnabled(this.isScanMetricsEnabled());
   }
 
   /**
@@ -1187,6 +1231,8 @@ final static class Response {
 
     private final boolean scannerClosedOnServer;
 
+    private final Map metrics;
+
     Response(final long scanner_id,
              final ArrayList> rows,
              final boolean more, final boolean scannerClosedOnServer) {
@@ -1194,6 +1240,18 @@ final static class Response {
       this.rows = rows;
       this.more = more;
       this.scannerClosedOnServer = scannerClosedOnServer;
+      this.metrics = new HashMap();
+    }
+
+    Response(final long scanner_id,
+             final ArrayList> rows,
+             final boolean more, final boolean scannerClosedOnServer,
+             final Map metrics) {
+      this.scanner_id = scanner_id;
+      this.rows = rows;
+      this.more = more;
+      this.scannerClosedOnServer = scannerClosedOnServer;
+      this.metrics = metrics;
     }
 
     public String toString() {
@@ -1255,6 +1313,8 @@ private ArrayList> getRows(final ScanResponse resp,
    */
   final class OpenScannerRequest extends HBaseRpc {
 
+    boolean metrics_enabled = false;
+
     /**
      * Default constructor that is used for every forward Scanner and
      * for the first Scanner in a reverse Scan
@@ -1385,6 +1445,7 @@ ChannelBuffer serialize(final byte server_version) {
         .setRegion(region.toProtobuf())
         .setScan(scan.build())
         .setNumberOfRows(max_num_rows)
+        .setTrackScanMetrics(metrics_enabled)
         // Hardcoded these parameters to false since AsyncHBase cannot support them
         .setClientHandlesHeartbeats(false)
         .setClientHandlesPartials(false)
@@ -1463,10 +1524,38 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
         throw new InvalidResponseException("Scan RPC response doesn't contain a"
                                            + " scanner ID", resp);
       }
+      Map metrics = getServerSideScanMetrics(resp);
       final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
       return new Response(resp.getScannerId(),
                           getRows(resp, buf, cell_size),
-                          resp.getMoreResults(), scannerClosedOnServer);
+                          resp.getMoreResults(), scannerClosedOnServer, metrics);
+    }
+
+    public OpenScannerRequest withMetricsEnabled(boolean enabled) {
+      this.metrics_enabled = enabled;
+      return this;
+    }
+
+    private Map getServerSideScanMetrics(ScanResponse response) {
+      Map metricMap = new HashMap();
+      if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+        return metricMap;
+      }
+
+      MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+      int numberOfMetrics = metrics.getMetricsCount();
+      for (int i = 0; i < numberOfMetrics; i++) {
+        HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+        if (metricPair != null) {
+          String name = metricPair.getName();
+          Long value = metricPair.getValue();
+          if (name != null && value != null) {
+            metricMap.put(name, value);
+          }
+        }
+      }
+
+      return metricMap;
     }
 
     public String toString() {
@@ -1482,6 +1571,8 @@ public String toString() {
    */
   final class GetNextRowsRequest extends HBaseRpc {
 
+    boolean metrics_enabled = false;
+
     @Override
     byte[] method(final byte server_version) {
       return (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE
@@ -1503,6 +1594,7 @@ ChannelBuffer serialize(final byte server_version) {
       final ScanRequest req = ScanRequest.newBuilder()
         .setScannerId(scanner_id)
         .setNumberOfRows(max_num_rows)
+        .setTrackScanMetrics(metrics_enabled)
         // Hardcoded these parameters to false since AsyncHBase cannot support them
         .setClientHandlesHeartbeats(false)
         .setClientHandlesPartials(false)
@@ -1523,8 +1615,36 @@ Response deserialize(final ChannelBuffer buf, final int cell_size) {
       if (rows == null) {
         return null;
       }
+      Map metrics = getServerSideScanMetrics(resp);
       final boolean scannerClosedOnServer = resp.hasMoreResultsInRegion() && !resp.getMoreResultsInRegion();
-      return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer);
+      return new Response(resp.getScannerId(), rows, resp.getMoreResults(), scannerClosedOnServer, metrics);
+    }
+
+    public GetNextRowsRequest withMetricsEnabled(boolean enabled) {
+      this.metrics_enabled = enabled;
+      return this;
+    }
+
+    private Map getServerSideScanMetrics(ScanResponse response) {
+      Map metricMap = new HashMap();
+      if (response == null || !response.hasScanMetrics() || response.getScanMetrics() == null) {
+        return metricMap;
+      }
+
+      MapReducePB.ScanMetrics metrics = response.getScanMetrics();
+      int numberOfMetrics = metrics.getMetricsCount();
+      for (int i = 0; i < numberOfMetrics; i++) {
+        HBasePB.NameInt64Pair metricPair = metrics.getMetrics(i);
+        if (metricPair != null) {
+          String name = metricPair.getName();
+          Long value = metricPair.getValue();
+          if (name != null && value != null) {
+            metricMap.put(name, value);
+          }
+        }
+      }
+
+      return metricMap;
     }
 
     public String toString() {
@@ -1597,4 +1717,169 @@ public String toString() {
 
   }
 
+  private void updateServerSideMetrics(Map metrics) {
+    if (!metrics.isEmpty()) {
+      for (Map.Entry e : metrics.entrySet()) {
+        if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)) {
+          scanMetrics.count_of_rows_scanned += e.getValue();
+        } else if (e.getKey().equals(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)) {
+          scanMetrics.count_of_rows_filtered += e.getValue();
+        }
+      }
+    }
+  }
+
+  private void incRPCcallsMetrics() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.count_of_rpc_calls += 1;
+    }
+  }
+
+  private void updateResultsMetrics(ArrayList> rows) {
+    if (isScanMetricsEnabled()) {
+      long resultSize = 0;
+      for (ArrayList row: rows) {
+        for (KeyValue cell: row) {
+          resultSize += cell.predictSerializedSize();
+        }
+      }
+      this.scanMetrics.count_of_bytes_in_results += resultSize;
+    }
+  }
+
+  private void incCountOfNSRE() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.count_of_nsre += 1;
+    }
+  }
+
+  private void incCountOfRPCRetries() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.count_of_rpc_retries += 1;
+    }
+  }
+
+  private void incCountOfRegions() {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.count_of_regions += 1;
+    }
+  }
+
+  private void updateSumOfMillisSecBetweenNexts(long currentTime) {
+    if (isScanMetricsEnabled()) {
+      this.scanMetrics.sum_of_millis_sec_between_nexts += (currentTime - last_next_timestamp);
+      last_next_timestamp = currentTime;
+    }
+  }
+
+  /**
+   * Server-side metrics.
+   * Only supported for HBase 0.95 and above.
+   */
+  private static class ServerSideScanMetrics {
+
+    public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = "ROWS_SCANNED";
+    public static final String COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME = "ROWS_FILTERED";
+
+    protected long count_of_rows_scanned = 0;
+    protected long count_of_rows_filtered = 0;
+
+    /**
+     * Number of rows scanned during scan RPC.
+     * Not every row scanned will be returned to the client
+     * since rows may be filtered.
+     * Always returns 0 if HBase < 0.95.
+     */
+    public long getCountOfRowsScanned() { return count_of_rows_scanned; };
+
+    /**
+     * Number of rows filtered during scan RPC.
+     * Always returns 0 if HBase < 0.95.
+     */
+    public long getCountOfRowsFiltered() { return count_of_rows_filtered; }
+
+  }
+
+  /**
+   * Client-side metrics.
+   * 

+ * This class is immutable. You can get updated values by calling + * this function again once RPC is completed. + * This class is not synchronized because + * fields of this class is updated only inside the Deferred callbacks + * by {@code Scanner}. + *

+ */ + public static class ScanMetrics extends ServerSideScanMetrics { + + public static final String RPC_CALLS_METRIC_NAME = "RPC_CALLS"; + public static final String MILLIS_BETWEEN_NEXTS_METRIC_NAME = "MILLIS_BETWEEN_NEXTS"; + public static final String NOT_SERVING_REGION_EXCEPTION_METRIC_NAME = "NOT_SERVING_REGION_EXCEPTION"; + public static final String BYTES_IN_RESULTS_METRIC_NAME = "BYTES_IN_RESULTS"; + public static final String REGIONS_SCANNED_METRIC_NAME = "REGIONS_SCANNED"; + public static final String RPC_RETRIES_METRIC_NAME = "RPC_RETRIES"; + + private long count_of_rpc_calls = 0; + + private long sum_of_millis_sec_between_nexts = 0; + + private long count_of_nsre = 0; + + private long count_of_bytes_in_results = 0; + + private long count_of_rpc_retries = 0; + + /** + * Starts with 1 because it is incremented when a scanner switches to a next region. + */ + private long count_of_regions = 1; + + /** + * Number of RPC calls. + */ + public long getCountOfRPCcalls() { return count_of_rpc_calls; } + + /** + * Sum of milliseconds between sequential next calls. + */ + public long getSumOfMillisSecBetweenNexts() { return sum_of_millis_sec_between_nexts; } + + /** + * Number of NotServingRegionException caught. + */ + public long getCountOfNSRE() { return count_of_nsre; } + + /** + * Number of bytes in Result objects from region servers. + */ + public long getCountOfBytesInResults() { return count_of_bytes_in_results; } + + /** + * Number of regions. + */ + public long getCountOfRegions() { return count_of_regions; }; + + /** + * number of RPC retries + */ + public long getCountOfRPCRetries() { return count_of_rpc_retries; } + + public ScanMetrics() { + } + + public Map getMetricsMap() { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, count_of_rows_scanned); + builder.put(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, count_of_rows_filtered); + builder.put(RPC_CALLS_METRIC_NAME, count_of_rpc_calls); + builder.put(MILLIS_BETWEEN_NEXTS_METRIC_NAME, sum_of_millis_sec_between_nexts); + builder.put(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME, count_of_nsre); + builder.put(BYTES_IN_RESULTS_METRIC_NAME, count_of_bytes_in_results); + builder.put(REGIONS_SCANNED_METRIC_NAME, count_of_regions); + builder.put(RPC_RETRIES_METRIC_NAME, count_of_rpc_retries); + return builder.build(); + } + } + + } diff --git a/test/TestIntegration.java b/test/TestIntegration.java index c04c9bfd..9219de24 100644 --- a/test/TestIntegration.java +++ b/test/TestIntegration.java @@ -33,7 +33,6 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -2175,6 +2174,163 @@ public void reverseAndForwardScanMoreThanMaxKVs() throws Exception{ } + /** Scan metrics tests. */ + @Test + public void scanMetrics() throws Exception { + final String table6 = args[0] + "6"; + createOrTruncateTable(client, table6, family); + + // Split into 4 regions. + splitTable(table6, "aaa"); + splitTable(table6, "bbb"); + splitTable(table6, "ccc"); + alterTableStatus(table6); + + // Put 5 rows to 3 regions. + client.setFlushInterval(FAST_FLUSH); + final PutRequest put_a1 = new PutRequest(table6, "aaa1", family, "testQualifier", "testValue"); + client.put(put_a1).join(); + client.setFlushInterval(FAST_FLUSH); + final PutRequest put_a2 = new PutRequest(table6, "aaa2", family, "testQualifier", "testValue"); + client.put(put_a2).join(); + final PutRequest put_b1 = new PutRequest(table6, "bbb1", family, "testQualifier", "testValue"); + client.put(put_b1).join(); + final PutRequest put_b2 = new PutRequest(table6, "bbb2", family, "testQualifier", "testValue"); + client.put(put_b2).join(); + final PutRequest put3 = new PutRequest(table6, "ccc", family, "testQualifier", "testValue"); + client.put(put3).join(); + + // Create forward scanner with metrics enabled. + Scanner scanner = client.newScanner(table6); + scanner.setScanMetricsEnabled(true); + scanner.setStartKey("aaa"); + scanner.setStopKey("ddd"); + + long prevBytesInResult = 0; + + ArrayList> row_a1 = scanner.nextRows(1).join(); + assertSizeIs(1, row_a1); + Scanner.ScanMetrics metrics_a1 = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 1, metrics_a1.getCountOfRegions()); + // server-side metrics works for HBase > 0.94 only + if (metrics_a1.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 1, metrics_a1.getCountOfRowsScanned()); // + 1 row scanned + } + assertEquals("incorrect count of RPC calls", 1, metrics_a1.getCountOfRPCcalls()); // + 1 open + assertTrue("incorrect count of bytes in results", metrics_a1.getCountOfBytesInResults() > prevBytesInResult); + prevBytesInResult = metrics_a1.getCountOfBytesInResults(); + + ArrayList> row_a2 = scanner.nextRows(1).join(); + assertSizeIs(1, row_a2); + Scanner.ScanMetrics metrics_a2 = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 1, metrics_a2.getCountOfRegions()); + // server-side metrics works for HBase > 0.94 only + if (metrics_a2.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 2, metrics_a2.getCountOfRowsScanned()); // + 1 row scanned + } + assertEquals("incorrect count of RPC calls", 2, metrics_a2.getCountOfRPCcalls()); // + 1 next + assertTrue("incorrect count of bytes in results", metrics_a2.getCountOfBytesInResults() > prevBytesInResult); + prevBytesInResult = metrics_a2.getCountOfBytesInResults(); + + ArrayList> row_b1 = scanner.nextRows(1).join(); + assertSizeIs(1, row_b1); + Scanner.ScanMetrics metrics_b1 = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 2, metrics_b1.getCountOfRegions()); // + next region + // server-side metrics works for HBase > 0.94 only + if (metrics_b1.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 3, metrics_b1.getCountOfRowsScanned()); // + 1 row scanned + } + assertEquals("incorrect count of RPC calls", 5, metrics_b1.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open + assertTrue("incorrect count of bytes in results", metrics_b1.getCountOfBytesInResults() > prevBytesInResult); + prevBytesInResult = metrics_b1.getCountOfBytesInResults(); + + ArrayList> row_b2 = scanner.nextRows(1).join(); + assertSizeIs(1, row_b2); + Scanner.ScanMetrics metrics_b2 = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 2, metrics_b2.getCountOfRegions()); + // server-side metrics works for HBase > 0.94 only + if (metrics_b2.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 4, metrics_b2.getCountOfRowsScanned()); // + 1 row scanned + } + assertEquals("incorrect count of RPC calls", 6, metrics_b2.getCountOfRPCcalls()); // + 1 next + assertTrue("incorrect count of bytes in results", metrics_b2.getCountOfBytesInResults() > prevBytesInResult); + prevBytesInResult = metrics_b2.getCountOfBytesInResults(); + + ArrayList> row_c = scanner.nextRows().join(); + assertSizeIs(1, row_c); + Scanner.ScanMetrics metrics_c = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 3, metrics_c.getCountOfRegions()); // + next region + // server-side metrics works for HBase > 0.94 only + if (metrics_c.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 5, metrics_c.getCountOfRowsScanned()); // + 1 row scanned + } + assertEquals("incorrect count of RPC calls", 9, metrics_c.getCountOfRPCcalls()); // + 1 empty next + 1 close + 1 open + assertTrue("incorrect count of bytes in results", metrics_c.getCountOfBytesInResults() > prevBytesInResult); + prevBytesInResult = metrics_c.getCountOfBytesInResults(); + + scanner.close().join(); + Scanner.ScanMetrics metrics_final = scanner.getScanMetrics(); + assertEquals("incorrect count of regions", 3, metrics_final.getCountOfRegions()); + // server-side metrics works for HBase > 0.94 only + if (metrics_final.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 5, metrics_final.getCountOfRowsScanned()); + } + assertEquals("incorrect count of RPC calls", 10, metrics_final.getCountOfRPCcalls()); // + 1 close + assertTrue("incorrect count of bytes in results", metrics_final.getCountOfBytesInResults() == prevBytesInResult); + } + + /** Scan metrics of filtered rows tests. */ + @Test + public void scanMetricsWithFilter() throws Exception { + client.setFlushInterval(FAST_FLUSH); + // Keep only rows with a column qualifier that starts with "qa". + final PutRequest put1 = new PutRequest(table, "cpf1", family, "qa1", "v1"); + final PutRequest put2 = new PutRequest(table, "cpf2", family, "qb2", "v2"); + final PutRequest put3 = new PutRequest(table, "cpf3", family, "qb3", "v3"); + final PutRequest put4 = new PutRequest(table, "cpf4", family, "qa4", "v4"); + Deferred.group(Deferred.group(client.put(put1), client.put(put2)), + Deferred.group(client.put(put3), client.put(put4))).join(); + final Scanner scanner = client.newScanner(table); + scanner.setFamily(family); + scanner.setStartKey("cpf1"); + scanner.setStopKey("cpf5"); + scanner.setFilter(new ColumnPrefixFilter("qa")); + scanner.setScanMetricsEnabled(true); + final ArrayList> rows = scanner.nextRows().join(); + + assertSizeIs(2, rows); + ArrayList kvs1 = rows.get(0); + assertSizeIs(1, kvs1); + assertEq("v1", kvs1.get(0).value()); + ArrayList kvs4 = rows.get(1); + assertSizeIs(1, kvs4); + assertEq("v4", kvs4.get(0).value()); + + Scanner.ScanMetrics metrics = scanner.getScanMetrics(); + // server-side metrics works for HBase > 0.94 only + if (metrics.getCountOfRowsScanned() == 0) { + LOG.warn("Skipping a server-side metrics check for HBase < 0.95"); + } else { + assertEquals("incorrect count of rows scanned", 4, metrics.getCountOfRowsScanned()); + assertEquals("incorrect count of rows filtered", 2, metrics.getCountOfRowsFiltered()); + } + assertEquals("incorrect count of RPC calls", 1, metrics.getCountOfRPCcalls()); // 1 open + + scanner.close().join(); + } + /** Regression test for issue #2. */ @Test public void regression2() throws Exception {