Skip to content

Commit

Permalink
Refactoring bulk stats test and add some java docs as mentioned in th…
Browse files Browse the repository at this point in the history
…e review.
  • Loading branch information
zhichen committed Feb 15, 2020
1 parent 270e8d5 commit 0d05e87
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned the specific metrics."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@
"segments",
"store",
"warmer",
"suggest"
"suggest",
"bulk"
],
"description":"Limit the information returned for `indices` metric to the specific index metrics. Isn't used if `indices` (or `all`) metric isn't specified."
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
bulk.total_operations .+ \n
bulk.total_time .+ \n
bulk.total_size_in_bytes .+ \n
$/
---
"Test cat shards output":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected void doRun() throws Exception {
}
// We're done, there's no more operations to execute so we resolve the wrapped listener
finishRequest();
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
}

@Override
Expand All @@ -189,7 +190,6 @@ private void finishRequest() {
() -> new WritePrimaryResult<>(
context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), null,
context.getPrimary(), logger));
primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
}
}.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Bulk related statistics, including the time and size of shard bulk requests,
* starting at the shard level and allowing aggregation to indices and node level
*/
public class BulkStats implements Writeable, ToXContentFragment {

private long total = 0;
private long totalOperations = 0;
private long totalTimeInMillis = 0;
private long totalSizeInBytes = 0;

Expand All @@ -40,13 +45,13 @@ public BulkStats() {
}

public BulkStats(StreamInput in) throws IOException {
total = in.readVLong();
totalOperations = in.readVLong();
totalTimeInMillis = in.readVLong();
totalSizeInBytes = in.readVLong();
}

public BulkStats(long total, long totalTimeInMillis, long totalSizeInBytes) {
this.total = total;
public BulkStats(long totalOperations, long totalTimeInMillis, long totalSizeInBytes) {
this.totalOperations = totalOperations;
this.totalTimeInMillis = totalTimeInMillis;
this.totalSizeInBytes = totalSizeInBytes;
}
Expand All @@ -59,7 +64,7 @@ public void addTotals(BulkStats bulkStats) {
if (bulkStats == null) {
return;
}
this.total += bulkStats.total;
this.totalOperations += bulkStats.totalOperations;
this.totalTimeInMillis += bulkStats.totalTimeInMillis;
this.totalSizeInBytes += bulkStats.totalSizeInBytes;
}
Expand All @@ -68,8 +73,8 @@ public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

public long getTotal() {
return total;
public long getTotalOperations() {
return totalOperations;
}

public TimeValue getTotalTime() {
Expand All @@ -80,24 +85,46 @@ public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalOperations);
out.writeVLong(totalTimeInMillis);
out.writeVLong(totalSizeInBytes);
}

@Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.BULK);
builder.field(Fields.TOTAL, total);
builder.field(Fields.TOTAL_OPERATIONS, totalOperations);
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

final BulkStats that = (BulkStats) o;
return Objects.equals(this.totalOperations, that.totalOperations)
&& Objects.equals(this.totalTimeInMillis, that.totalTimeInMillis)
&& Objects.equals(this.totalSizeInBytes, that.totalSizeInBytes);
}

@Override
public int hashCode() {
return Objects.hash(totalOperations, totalTimeInMillis, totalSizeInBytes);
}

static final class Fields {
static final String BULK = "bulk";
static final String TOTAL = "total";
static final String TOTAL_OPERATIONS = "total_operations";
static final String TOTAL_TIME = "total_time";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.index.shard.IndexShard;

import java.util.concurrent.TimeUnit;

/**
* Internal class that maintains relevant shard bulk statistics / metrics.
* @see IndexShard
*/
public class ShardBulkStats implements BulkOperationListener {

private final StatsHolder totalStats = new StatsHolder();
Expand All @@ -32,7 +37,8 @@ public BulkStats stats() {
return totalStats.stats();
}

@Override public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) {
@Override
public void afterBulk(long shardBulkSizeInBytes, long tookInNanos) {
totalStats.totalSizeInBytes.inc(shardBulkSizeInBytes);
totalStats.shardBulkMetric.inc(tookInNanos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,9 @@ protected Table getTableWithHeader(final RestRequest request) {

table.addCell("search.throttled", "alias:sth;default:false;desc:indicates if the index is search throttled");

table.addCell("bulk.total",
"sibling:pri;alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("bulk.total_operations",
"sibling:pri;alias:bto,bulkTotalOperation;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("pri.bulk.total_operations", "default:false;text-align:right;desc:number of bulk shard ops");

table.addCell("bulk.total_time", "sibling:pri;alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk");
table.addCell("pri.bulk.shard_bulk_time", "default:false;text-align:right;desc:time spend in shard bulk");
Expand Down Expand Up @@ -756,8 +756,8 @@ Table buildTable(final RestRequest request,

table.addCell(searchThrottled);

table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotal());
table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotal());
table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalOperations());
table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalOperations());

table.addCell(totalStats.getBulk() == null ? null : totalStats.getBulk().getTotalTime());
table.addCell(primaryStats.getBulk() == null ? null : primaryStats.getBulk().getTotalTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("suggest.time", "alias:suti,suggestTime;default:false;text-align:right;desc:time spend in suggest");
table.addCell("suggest.total", "alias:suto,suggestTotal;default:false;text-align:right;desc:number of suggest ops");

table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk");
table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk");

Expand Down Expand Up @@ -422,7 +422,7 @@ Table buildTable(boolean fullId, RestRequest req, ClusterStateResponse state, No
table.addCell(searchStats == null ? null : searchStats.getTotal().getSuggestCount());

BulkStats bulkStats = indicesStats == null ? null : indicesStats.getBulk();
table.addCell(bulkStats == null ? null : bulkStats.getTotal());
table.addCell(bulkStats == null ? null : bulkStats.getTotalOperations());
table.addCell(bulkStats == null ? null : bulkStats.getTotalTime());
table.addCell(bulkStats == null ? null : bulkStats.getTotalSizeInBytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected Table getTableWithHeader(final RestRequest request) {
table.addCell("warmer.total", "alias:wto,warmerTotal;default:false;text-align:right;desc:total warmer ops");
table.addCell("warmer.total_time", "alias:wtt,warmerTotalTime;default:false;text-align:right;desc:time spent in warmers");

table.addCell("bulk.total", "alias:bto,bulkTotal;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("bulk.total_operations", "alias:bto,bulkTotalOperations;default:false;text-align:right;desc:number of bulk shard ops");
table.addCell("bulk.total_time", "alias:btti,bulkTotalTime;default:false;text-align:right;desc:time spend in shard bulk");
table.addCell("bulk.total_size_in_bytes", "alias:btsi,bulkTotalSizeInBytes;default:false;text-align:right;desc:total size in bytes of shard bulk");

Expand Down Expand Up @@ -355,7 +355,7 @@ private Table buildTable(RestRequest request, ClusterStateResponse state, Indice
table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::total));
table.addCell(getOrNull(commonStats, CommonStats::getWarmer, WarmerStats::totalTime));

table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotal));
table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalOperations));
table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalTime));
table.addCell(getOrNull(commonStats, CommonStats::getBulk, BulkStats::getTotalSizeInBytes));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@

package org.elasticsearch.index.bulk.stats;

import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

public class BulkStatsTests extends ESTestCase {

public void testSerialize() throws IOException {
BulkStats stats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
BytesStreamOutput out = new BytesStreamOutput();
stats.writeTo(out);
StreamInput input = out.bytes().streamInput();
BulkStats read = new BulkStats(input);
assertEquals(-1, input.read());
assertEquals(stats.getTotal(), read.getTotal());
assertEquals(stats.getTotalTime(), read.getTotalTime());
assertEquals(stats.getTotalSizeInBytes(), read.getTotalSizeInBytes());
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

public class BulkStatsTests extends AbstractWireSerializingTestCase<BulkStats> {

@Override
protected Writeable.Reader<BulkStats> instanceReader() {
return BulkStats::new;
}

@Override
protected BulkStats createTestInstance() {
return new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}

@Override
protected BulkStats mutateInstance(BulkStats instance) {
BulkStats mutateBulkStats = new BulkStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
switch (between(0, 1)) {
case 0:
break;
case 1:
mutateBulkStats.add(instance);
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return mutateBulkStats;
}

public void testAddTotals() {
Expand All @@ -54,9 +64,10 @@ public void testAddTotals() {
}

private static void assertStats(BulkStats stats, long equalTo) {
assertEquals(equalTo, stats.getTotal());
assertEquals(equalTo, stats.getTotalOperations());
assertEquals(equalTo, stats.getTotalTimeInMillis());
assertEquals(equalTo, stats.getTotalSizeInBytes());
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,11 @@ public void testBulkStats() throws Exception {
}
IndicesStatsResponse stats = client().admin().indices().prepareStats(index).setBulk(true).get();

assertThat(stats.getTotal().bulk.getTotal(), equalTo(4L));
assertThat(stats.getTotal().bulk.getTotalOperations(), equalTo(4L));
assertThat(stats.getTotal().bulk.getTotalTimeInMillis(), greaterThan(0L));
assertThat(stats.getTotal().bulk.getTotalSizeInBytes(), greaterThan(0L));

assertThat(stats.getPrimaries().bulk.getTotal(), equalTo(2L));
assertThat(stats.getPrimaries().bulk.getTotalOperations(), equalTo(2L));
assertThat(stats.getPrimaries().bulk.getTotalTimeInMillis(), greaterThan(0L));
assertThat(stats.getPrimaries().bulk.getTotalSizeInBytes(), greaterThan(0L));
}
Expand Down

0 comments on commit 0d05e87

Please sign in to comment.