Skip to content

Commit

Permalink
[ML] Add an ingest pipeline definition to structure finder (#34350)
Browse files Browse the repository at this point in the history
The ingest pipeline that is produced is very simple.  It
contains a grok processor if the format is semi-structured
text, a date processor if the format contains a timestamp,
and a remove processor if required to remove the interim
timestamp field parsed out of semi-structured text.

Eventually the UI should offer the option to customize the
pipeline with additional processors to perform other data
preparation steps before ingesting data to an index.
  • Loading branch information
droberts195 authored and kcm committed Oct 30, 2018
1 parent a13d521 commit 186ab38
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 7 deletions.
68 changes: 68 additions & 0 deletions docs/reference/ml/apis/find-file-structure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,20 @@ If the request does not encounter errors, you receive the following result:
"type" : "double"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"date" : {
"field" : "tpep_pickup_datetime",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"YYYY-MM-dd HH:mm:ss"
]
}
}
]
},
"field_stats" : {
"DOLocationID" : {
"count" : 19998,
Expand Down Expand Up @@ -1366,6 +1380,33 @@ this:
"type" : "text"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel}.*"
]
}
},
{
"date" : {
"field" : "timestamp",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"ISO8601"
]
}
},
{
"remove" : {
"field" : "timestamp"
}
}
]
},
"field_stats" : {
"loglevel" : {
"count" : 53,
Expand Down Expand Up @@ -1499,6 +1540,33 @@ this:
"type" : "keyword"
}
},
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : [
"\\[%{TIMESTAMP_ISO8601:timestamp}\\]\\[%{LOGLEVEL:loglevel} *\\]\\[%{JAVACLASS:class} *\\] \\[%{HOSTNAME:node}\\] %{JAVALOGMESSAGE:message}"
]
}
},
{
"date" : {
"field" : "timestamp",
"timezone" : "{{ beat.timezone }}",
"formats" : [
"ISO8601"
]
}
},
{
"remove" : {
"field" : "timestamp"
}
}
]
},
"field_stats" : { <2>
"class" : {
"count" : 53,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -103,6 +104,7 @@ public String toString() {
public static final ParseField JAVA_TIMESTAMP_FORMATS = new ParseField("java_timestamp_formats");
public static final ParseField NEED_CLIENT_TIMEZONE = new ParseField("need_client_timezone");
public static final ParseField MAPPINGS = new ParseField("mappings");
public static final ParseField INGEST_PIPELINE = new ParseField("ingest_pipeline");
public static final ParseField FIELD_STATS = new ParseField("field_stats");
public static final ParseField EXPLANATION = new ParseField("explanation");

Expand All @@ -128,6 +130,7 @@ public String toString() {
PARSER.declareStringArray(Builder::setJavaTimestampFormats, JAVA_TIMESTAMP_FORMATS);
PARSER.declareBoolean(Builder::setNeedClientTimezone, NEED_CLIENT_TIMEZONE);
PARSER.declareObject(Builder::setMappings, (p, c) -> new TreeMap<>(p.map()), MAPPINGS);
PARSER.declareObject(Builder::setIngestPipeline, (p, c) -> p.mapOrdered(), INGEST_PIPELINE);
PARSER.declareObject(Builder::setFieldStats, (p, c) -> {
Map<String, FieldStats> fieldStats = new TreeMap<>();
while (p.nextToken() == XContentParser.Token.FIELD_NAME) {
Expand Down Expand Up @@ -157,15 +160,16 @@ public String toString() {
private final String timestampField;
private final boolean needClientTimezone;
private final SortedMap<String, Object> mappings;
private final Map<String, Object> ingestPipeline;
private final SortedMap<String, FieldStats> fieldStats;
private final List<String> explanation;

public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampleStart, String charset, Boolean hasByteOrderMarker,
Format format, String multilineStartPattern, String excludeLinesPattern, List<String> columnNames,
Boolean hasHeaderRow, Character delimiter, Character quote, Boolean shouldTrimFields, String grokPattern,
String timestampField, List<String> jodaTimestampFormats, List<String> javaTimestampFormats,
boolean needClientTimezone, Map<String, Object> mappings, Map<String, FieldStats> fieldStats,
List<String> explanation) {
boolean needClientTimezone, Map<String, Object> mappings, Map<String, Object> ingestPipeline,
Map<String, FieldStats> fieldStats, List<String> explanation) {

this.numLinesAnalyzed = numLinesAnalyzed;
this.numMessagesAnalyzed = numMessagesAnalyzed;
Expand All @@ -188,6 +192,7 @@ public FileStructure(int numLinesAnalyzed, int numMessagesAnalyzed, String sampl
(javaTimestampFormats == null) ? null : Collections.unmodifiableList(new ArrayList<>(javaTimestampFormats));
this.needClientTimezone = needClientTimezone;
this.mappings = Collections.unmodifiableSortedMap(new TreeMap<>(mappings));
this.ingestPipeline = (ingestPipeline == null) ? null : Collections.unmodifiableMap(new LinkedHashMap<>(ingestPipeline));
this.fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(fieldStats));
this.explanation = Collections.unmodifiableList(new ArrayList<>(explanation));
}
Expand All @@ -212,6 +217,7 @@ public FileStructure(StreamInput in) throws IOException {
timestampField = in.readOptionalString();
needClientTimezone = in.readBoolean();
mappings = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap()));
ingestPipeline = in.readBoolean() ? Collections.unmodifiableMap(in.readMap()) : null;
fieldStats = Collections.unmodifiableSortedMap(new TreeMap<>(in.readMap(StreamInput::readString, FieldStats::new)));
explanation = Collections.unmodifiableList(in.readList(StreamInput::readString));
}
Expand Down Expand Up @@ -262,6 +268,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(timestampField);
out.writeBoolean(needClientTimezone);
out.writeMap(mappings);
if (ingestPipeline == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeMap(ingestPipeline);
}
out.writeMap(fieldStats, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
out.writeCollection(explanation, StreamOutput::writeString);
}
Expand Down Expand Up @@ -342,6 +354,10 @@ public SortedMap<String, Object> getMappings() {
return mappings;
}

public Map<String, Object> getIngestPipeline() {
return ingestPipeline;
}

public SortedMap<String, FieldStats> getFieldStats() {
return fieldStats;
}
Expand Down Expand Up @@ -397,6 +413,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.field(NEED_CLIENT_TIMEZONE.getPreferredName(), needClientTimezone);
builder.field(MAPPINGS.getPreferredName(), mappings);
if (ingestPipeline != null) {
builder.field(INGEST_PIPELINE.getPreferredName(), ingestPipeline);
}
if (fieldStats.isEmpty() == false) {
builder.startObject(FIELD_STATS.getPreferredName());
for (Map.Entry<String, FieldStats> entry : fieldStats.entrySet()) {
Expand Down Expand Up @@ -476,6 +495,7 @@ public static class Builder {
private List<String> javaTimestampFormats;
private boolean needClientTimezone;
private Map<String, Object> mappings;
private Map<String, Object> ingestPipeline;
private Map<String, FieldStats> fieldStats = Collections.emptyMap();
private List<String> explanation;

Expand Down Expand Up @@ -582,6 +602,11 @@ public Builder setMappings(Map<String, Object> mappings) {
return this;
}

public Builder setIngestPipeline(Map<String, Object> ingestPipeline) {
this.ingestPipeline = ingestPipeline;
return this;
}

public Builder setFieldStats(Map<String, FieldStats> fieldStats) {
this.fieldStats = Objects.requireNonNull(fieldStats);
return this;
Expand Down Expand Up @@ -708,7 +733,8 @@ public FileStructure build() {

return new FileStructure(numLinesAnalyzed, numMessagesAnalyzed, sampleStart, charset, hasByteOrderMarker, format,
multilineStartPattern, excludeLinesPattern, columnNames, hasHeaderRow, delimiter, quote, shouldTrimFields, grokPattern,
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, fieldStats, explanation);
timestampField, jodaTimestampFormats, javaTimestampFormats, needClientTimezone, mappings, ingestPipeline, fieldStats,
explanation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -74,6 +75,14 @@ public static FileStructure createTestFileStructure() {
}
builder.setMappings(mappings);

if (randomBoolean()) {
Map<String, Object> ingestPipeline = new LinkedHashMap<>();
for (String field : generateRandomStringArray(5, 20, false, false)) {
ingestPipeline.put(field, Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(10)));
}
builder.setMappings(ingestPipeline);
}

if (randomBoolean()) {
Map<String, FieldStats> fieldStats = new TreeMap<>();
for (String field : generateRandomStringArray(5, 20, false, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,14 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.collect(Collectors.joining(",")));
}

boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone))
.setMultilineStartPattern(timeLineRegex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
import org.elasticsearch.xpack.ml.filestructurefinder.TimestampFormatFinder.TimestampMatch;

Expand All @@ -15,6 +16,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +39,8 @@ public final class FileStructureUtils {
private static final int KEYWORD_MAX_LEN = 256;
private static final int KEYWORD_MAX_SPACES = 5;

private static final String BEAT_TIMEZONE_FIELD = "beat.timezone";

private FileStructureUtils() {
}

Expand Down Expand Up @@ -306,4 +310,53 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
int length = str.length();
return length > KEYWORD_MAX_LEN || length - str.replaceAll("\\s", "").length() > KEYWORD_MAX_SPACES;
}

/**
* Create an ingest pipeline definition appropriate for the file structure.
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
* fully structured formats.
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
* <code>null</code> if there is no timestamp.
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
* May be <code>null</code> if {@code timestampField} is also <code>null</code>.
* @param needClientTimezone Is the timezone of the client supplying data to ingest required to uniquely parse the timestamp?
* @return The ingest pipeline definition, or <code>null</code> if none is required.
*/
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, String timestampField, List<String> timestampFormats,
boolean needClientTimezone) {

if (grokPattern == null && timestampField == null) {
return null;
}

Map<String, Object> pipeline = new LinkedHashMap<>();
pipeline.put(Pipeline.DESCRIPTION_KEY, "Ingest pipeline created by file structure finder");

List<Map<String, Object>> processors = new ArrayList<>();

if (grokPattern != null) {
Map<String, Object> grokProcessorSettings = new LinkedHashMap<>();
grokProcessorSettings.put("field", "message");
grokProcessorSettings.put("patterns", Collections.singletonList(grokPattern));
processors.add(Collections.singletonMap("grok", grokProcessorSettings));
}

if (timestampField != null) {
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
dateProcessorSettings.put("field", timestampField);
if (needClientTimezone) {
dateProcessorSettings.put("timezone", "{{ " + BEAT_TIMEZONE_FIELD + " }}");
}
dateProcessorSettings.put("formats", timestampFormats);
processors.add(Collections.singletonMap("date", dateProcessorSettings));
}

// This removes the interim timestamp field used for semi-structured text formats
if (grokPattern != null && timestampField != null) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
}

pipeline.put(Pipeline.PROCESSORS_KEY, processors);
return pipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ static JsonFileStructureFinder makeJsonFileStructureFinder(List<String> explanat
Tuple<String, TimestampMatch> timeField =
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
if (timeField != null) {
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,16 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
}
}

boolean needClientTimeZone = bestTimestamp.v1().hasTimezoneDependentParsing();

FileStructure structure = structureBuilder
.setTimestampField(interimTimestampField)
.setJodaTimestampFormats(bestTimestamp.v1().jodaTimestampFormats)
.setJavaTimestampFormats(bestTimestamp.v1().javaTimestampFormats)
.setNeedClientTimezone(bestTimestamp.v1().hasTimezoneDependentParsing())
.setNeedClientTimezone(needClientTimeZone)
.setGrokPattern(grokPattern)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, interimTimestampField,
bestTimestamp.v1().jodaTimestampFormats, needClientTimeZone))
.setMappings(mappings)
.setFieldStats(fieldStats)
.setExplanation(explanation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
Tuple<String, TimestampMatch> timeField =
FileStructureUtils.guessTimestampField(explanation, sampleRecords, overrides, timeoutChecker);
if (timeField != null) {
boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().jodaTimestampFormats)
.setJavaTimestampFormats(timeField.v2().javaTimestampFormats)
.setNeedClientTimezone(timeField.v2().hasTimezoneDependentParsing());
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, topLevelTag + "." + timeField.v1(),
timeField.v2().jodaTimestampFormats, needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Loading

0 comments on commit 186ab38

Please sign in to comment.