From 4f4dca216e17fc96dd536d2900b123a050a3bdc4 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 21 Jul 2021 09:08:06 +0800 Subject: [PATCH] ingest: introduce apm_data_stream_migration (#5768) --- .../apm_data_stream_migration.json | 30 ++++++++ .../apm_data_stream_migration.json | 30 ++++++++ .../apm_data_stream_migration.json | 30 ++++++++ .../apm_data_stream_migration.json | 30 ++++++++ .../apm_data_stream_migration.json | 30 ++++++++ changelogs/head.asciidoc | 1 + ingest/pipeline/definition.json | 33 +++++++++ ingest/pipeline/definition.yml | 32 +++++++++ systemtest/elasticsearch.go | 10 ++- systemtest/ingest_test.go | 70 +++++++++++++++++++ 10 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 apmpackage/apm/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json create mode 100644 apmpackage/apm/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_data_stream_migration.json create mode 100644 apmpackage/apm/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json create mode 100644 apmpackage/apm/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json create mode 100644 apmpackage/apm/data_stream/traces/elasticsearch/ingest_pipeline/apm_data_stream_migration.json diff --git a/apmpackage/apm/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json b/apmpackage/apm/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json new file mode 100644 index 00000000000..24218404aaf --- /dev/null +++ b/apmpackage/apm/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json @@ -0,0 +1,30 @@ +{ + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_data_stream_migration.json b/apmpackage/apm/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_data_stream_migration.json new file mode 100644 index 00000000000..24218404aaf --- /dev/null +++ b/apmpackage/apm/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_data_stream_migration.json @@ -0,0 +1,30 @@ +{ + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json b/apmpackage/apm/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json new file mode 100644 index 00000000000..24218404aaf --- /dev/null +++ b/apmpackage/apm/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json @@ -0,0 +1,30 @@ +{ + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json b/apmpackage/apm/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json new file mode 100644 index 00000000000..24218404aaf --- /dev/null +++ b/apmpackage/apm/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_data_stream_migration.json @@ -0,0 +1,30 @@ +{ + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/data_stream/traces/elasticsearch/ingest_pipeline/apm_data_stream_migration.json b/apmpackage/apm/data_stream/traces/elasticsearch/ingest_pipeline/apm_data_stream_migration.json new file mode 100644 index 00000000000..24218404aaf --- /dev/null +++ b/apmpackage/apm/data_stream/traces/elasticsearch/ingest_pipeline/apm_data_stream_migration.json @@ -0,0 +1,30 @@ +{ + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] +} \ No newline at end of file diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 572a425e8b2..e77bfe7652f 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -43,6 +43,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] * Add HTTP span fields as top level ECS fields {pull}5396[5396] * Introduce `apm-server.auth.anonymous.*` config {pull}5623[5623] * Upgrade Go to 1.16.6 {pull}5754[5754] +* Introduce ingest pipeline `apm_data_stream_migration` for migrating pre-data stream indices {5768}[5768] [float] ==== Deprecated diff --git a/ingest/pipeline/definition.json b/ingest/pipeline/definition.json index be18aa91152..7a36b5c597a 100644 --- a/ingest/pipeline/definition.json +++ b/ingest/pipeline/definition.json @@ -44,6 +44,39 @@ ] } }, + { + "id": "apm_data_stream_migration", + "body": { + "description": "Migrate APM events to data streams", + "processors": [ + { + "script": { + "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", + "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'error'", + "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" + } + }, + { + "script": { + "if": "ctx.processor?.event == 'metric'", + "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" + } + }, + { + "set": { + "if": "ctx.data_stream != null", + "field": "_index", + "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + } + } + ] + } + }, { "id": "apm_user_agent", "body": { diff --git a/ingest/pipeline/definition.yml b/ingest/pipeline/definition.yml index 079c6869cc5..6a7dd045f58 100644 --- a/ingest/pipeline/definition.yml +++ b/ingest/pipeline/definition.yml @@ -21,6 +21,38 @@ apm: name: apm_metrics_dynamic_template if: ctx.processor?.event == 'metric' +# apm_data_stream_migration is not used in the main apm pipeline, +# it is installed for migrating legacy indices to data streams, +# e.g. using the Kibana Upgrade Assistant. +apm_data_stream_migration: + description: Migrate APM events to data streams + processors: + - script: + if: ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction' + source: | + ctx.data_stream = ["type": "traces", "dataset": "apm", "namespace": "migrated"] + - script: + if: ctx.processor?.event == 'error' + source: | + ctx.data_stream = ["type": "logs", "dataset": "apm.error", "namespace": "migrated"] + - script: + if: ctx.processor?.event == 'metric' + source: | + String dataset; + if (ctx["metricset.name"] != "app") { + dataset = "apm.internal"; + } else { + String serviceName = ctx.service.name; + serviceName = serviceName.toLowerCase(); + serviceName = /[\\\/*?"<>| ,#:-]/.matcher(serviceName).replaceAll('_'); + dataset = "apm.app." + serviceName; + } + ctx.data_stream = ["type": "metrics", "dataset": dataset, "namespace": "migrated"]; + - set: + if: ctx.data_stream != null + field: _index + value: "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" + apm_user_agent: description: Add user agent information for APM events processors: diff --git a/systemtest/elasticsearch.go b/systemtest/elasticsearch.go index 5b770f7031d..d73095a1f08 100644 --- a/systemtest/elasticsearch.go +++ b/systemtest/elasticsearch.go @@ -119,7 +119,15 @@ func CleanupElasticsearch(t testing.TB) { } // Delete indices, data streams, and ingest pipelines. - if err := doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}); err != nil { + if err := doReq(esapi.IndicesDeleteRequest{Index: []string{ + legacyPrefix, + // traces-apm*, metrics-apm*, and logs-apm* could get created + // as indices instead of data streams in some tests, so issue + // index delete requests for those too. + apmTracesPrefix, + apmMetricsPrefix, + apmLogsPrefix, + }}); err != nil { t.Fatal(err) } doParallel( diff --git a/systemtest/ingest_test.go b/systemtest/ingest_test.go index c913d84af38..78a5119b2d9 100644 --- a/systemtest/ingest_test.go +++ b/systemtest/ingest_test.go @@ -18,12 +18,19 @@ package systemtest_test import ( + "context" + "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" + "go.elastic.co/apm" + + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/elastic/go-elasticsearch/v7/esutil" + "github.com/elastic/apm-server/systemtest" "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/estest" @@ -53,3 +60,66 @@ func TestIngestPipeline(t *testing.T) { assert.True(t, destinationIP.Exists()) assert.Equal(t, "::1", destinationIP.String()) } + +func TestDataStreamMigrationIngestPipeline(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewServer(t) + + // Send a transaction, span, error, and metrics. + tracer := srv.Tracer() + tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(func(ctx context.Context, m *apm.Metrics) error { + m.Add("custom_metric", nil, 123) + return nil + })) + tx := tracer.StartTransaction("name", "type") + span := tx.StartSpan("name", "type", nil) + tracer.NewError(errors.New("boom")).Send() + span.End() + tx.End() + tracer.Flush(nil) + tracer.SendMetrics(nil) + + // We expect at least 6 events: + // - onboarding + // - transaction + // - span + // - error + // - internal metricset + // - app metricset + for _, query := range []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "onboarding"}, + estest.TermQuery{Field: "processor.event", Value: "transaction"}, + estest.TermQuery{Field: "processor.event", Value: "span"}, + estest.TermQuery{Field: "processor.event", Value: "error"}, + estest.TermQuery{Field: "metricset.name", Value: "transaction_breakdown"}, + estest.TermQuery{Field: "metricset.name", Value: "app"}, + } { + systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query) + } + + refresh := true + _, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.ReindexRequest{ + Refresh: &refresh, + Body: esutil.NewJSONReader(map[string]interface{}{ + "source": map[string]interface{}{ + "index": "apm-*", + }, + "dest": map[string]interface{}{ + "index": "apm-migration", + "pipeline": "apm_data_stream_migration", + "op_type": "create", + }, + }), + }, nil) + require.NoError(t, err) + + // There should only be an onboarding doc in "apm-migration". + result := systemtest.Elasticsearch.ExpectDocs(t, "apm-migration", nil) + require.Len(t, result.Hits.Hits, 1) + assert.Equal(t, "onboarding", gjson.GetBytes(result.Hits.Hits[0].RawSource, "processor.event").String()) + + systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-migrated", nil) // transaction, span + systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-migrated", nil) + systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.internal-migrated", nil) + systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.app.systemtest-migrated", nil) +}