Skip to content

Commit

Permalink
[ML Data Frame] Persist and restore checkpoint and position (elastic#…
Browse files Browse the repository at this point in the history
…41942)

Persist and restore Data frame's current checkpoint and position
  • Loading branch information
davidkyle authored and Gurkan Kaymak committed May 27, 2019
1 parent e59d8e2 commit 4688a9d
Show file tree
Hide file tree
Showing 17 changed files with 288 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(transformId);
}

/**
* Get the persisted stats document name from the Data Frame Transformer Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}

@Nullable
public String getTransformId() {
return transformId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

public class DataFrameTransformProgress implements Writeable, ToXContentObject {

private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
private static final String PERCENT_COMPLETE = "percent_complete";
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final String PERCENT_COMPLETE = "percent_complete";

public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private final String reason;

private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -14,6 +15,7 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;

Expand All @@ -22,7 +24,7 @@

public class DataFrameTransformStateAndStats implements Writeable, ToXContentObject {

private static final String NAME = "data_frame_transform_state_and_stats";
public static final String NAME = "data_frame_transform_state_and_stats";
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");

Expand All @@ -47,6 +49,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}

public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
}
Expand All @@ -58,6 +64,15 @@ public static DataFrameTransformStateAndStats initialStateAndStats(String id, Da
DataFrameTransformCheckpointingInfo.EMPTY);
}

/**
* Get the persisted state and stats document name from the Data Frame Transform Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}

public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
Expand All @@ -73,13 +88,21 @@ public DataFrameTransformStateAndStats(StreamInput in) throws IOException {
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
}

@Nullable
public String getTransformId() {
return transformStats.getTransformId();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -94,16 +95,21 @@ public synchronized IndexerState start() {
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});

if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public static void removeIndices() throws Exception {
wipeIndices();
}

public void wipeDataFrameTransforms() throws IOException, InterruptedException {
public void wipeDataFrameTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;

Expand Down Expand Up @@ -72,7 +72,7 @@ public void testUsage() throws Exception {
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameIndexerTransformStats.NAME);
DataFrameTransformStateAndStats.NAME);
// Verify that we have our two stats documents
assertBusy(() -> {
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
Expand Down Expand Up @@ -100,7 +100,6 @@ public void testUsage() throws Exception {
expectedStats.merge(statName, statistic, Integer::sum);
}


usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));

usageAsMap = entityAsMap(usageResponse);
Expand All @@ -109,7 +108,8 @@ public void testUsage() throws Exception {
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
for(String statName : PROVIDED_STATS) {
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
assertEquals("Incorrect stat " + statName,
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;

Expand Down Expand Up @@ -176,6 +177,7 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo

for(String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);

if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
} else {
Expand All @@ -197,14 +199,15 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameIndexerTransformStats.NAME)));
DataFrameTransformStateAndStats.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setSize(0)
.setQuery(queryBuilder);

final String path = DataFrameField.STATS_FIELD.getPreferredName() + ".";
for(String statName : PROVIDED_STATS) {
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
}

ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
Expand All @@ -213,6 +216,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
logger.error("statistics summations search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}

statsListener.onResponse(parseSearchAggs(searchResponse));
},
failure -> {
Expand Down
Loading

0 comments on commit 4688a9d

Please sign in to comment.