Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fetch sourcemaps from Elasticsearch #9722

Merged
merged 152 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
152 commits
Select commit Hold shift + click to select a range
8bcf7b4
feat: fetch sourcemaps from Elasticsearch
kruskall Dec 2, 2022
3d61fc1
lint: fix linter issues
kruskall Dec 2, 2022
d18afa0
lint: fix staticcheck issue
kruskall Dec 4, 2022
6b13bb3
feat: update code to support latest kibana PR
kruskall Dec 12, 2022
27883df
test: udpdate sourcemap test to use the kibana sourcemap API
kruskall Dec 12, 2022
4d8c011
feat: implement cache invalidation in sourcemap cache
kruskall Dec 12, 2022
f04473e
feat: remove fleet and kibana sourcemap fetchers
kruskall Dec 12, 2022
7a64c06
refactor: code cleanup
kruskall Dec 13, 2022
c96587d
fix: do not try to decode the body if the sourcemap was not found
kruskall Dec 13, 2022
855cb02
test: update tests for sourcemap changes
kruskall Dec 13, 2022
cd01a10
lint: revert update task changes
kruskall Dec 13, 2022
9890493
Merge remote-tracking branch 'upstream/main' into feat/es-sourcemap
kruskall Dec 19, 2022
9518384
build: remove unused imports
kruskall Dec 19, 2022
761802b
Merge remote-tracking branch 'upstream/main' into feat/es-sourcemap
kruskall Dec 19, 2022
ac529cd
lint: fix linter issues
kruskall Dec 19, 2022
a9957ea
feat: add support for relative path
kruskall Dec 19, 2022
260d3d5
Merge branch 'main' into feat/es-sourcemap
kruskall Dec 22, 2022
995e254
feat: add support for injected api key
kruskall Dec 23, 2022
cf86bb3
lint: fix formatting issue
kruskall Dec 23, 2022
813cdfb
Merge branch 'main' into feat/es-sourcemap
kruskall Dec 29, 2022
6f0fb18
refactor: use meaningful variable names
kruskall Dec 29, 2022
094dbfc
lint: fix linter issues
kruskall Dec 29, 2022
3f0c2be
refactor: use map data structure to reduce iteration count and improv…
kruskall Dec 29, 2022
cc35612
refactor: do not block when retrieving a sourcemap before the cache i…
kruskall Dec 29, 2022
a3e65cb
refactor: reduce diff noise
kruskall Dec 29, 2022
b1a2bf2
fix: actually add new sourcemaps to the metadata cache
kruskall Dec 29, 2022
0f4b3fa
refactor: don't send the whole metadata as part of the update
kruskall Dec 29, 2022
a5ea7bf
Merge remote-tracking branch 'upstream/main' into feat/es-sourcemap
kruskall Jan 9, 2023
3041003
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 16, 2023
9486691
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 16, 2023
b0b8646
feat: use sourcemapping rum es api_key
kruskall Jan 16, 2023
0942d3d
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 18, 2023
3d72922
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 18, 2023
77ed8f1
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 18, 2023
5e80d77
fix: forward the request to the backend fetcher if the metadata cache…
kruskall Jan 18, 2023
61fed2d
fix: update ES sourcemap result format
kruskall Jan 18, 2023
2131a33
refactor: do not sort by score if we limited the size to 1
kruskall Jan 18, 2023
4d66334
test: always wait for document to be indexed when creating sourcemaps
kruskall Jan 18, 2023
16839e8
test: create sourcemap in ES before starting APM server
kruskall Jan 18, 2023
1031ac2
test: add read privilege for .apm-source-map index to apm_server role
kruskall Jan 18, 2023
413385a
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 19, 2023
4532dfb
fix: ignore query and fragment on bundlefilepath and avoid double fet…
kruskall Jan 19, 2023
cff8dd0
test: empty sourcemap index as part of the cleanup process
kruskall Jan 19, 2023
7946c09
feat: add debug log statement when forwarding requesting before init
kruskall Jan 19, 2023
39510c4
fix: try with both names when forwarding requests
kruskall Jan 19, 2023
020f869
test: update RUM test to use standalone APM server
kruskall Jan 19, 2023
870b635
fix: properly support aliases
kruskall Jan 19, 2023
06a59b1
test: clean path before waiting for sourcemap to be indexed
kruskall Jan 19, 2023
b881da3
test: only run absolute and relative bundle filepath test on standalone
kruskall Jan 19, 2023
df99000
refactor: add more debug messages for empty search results
kruskall Jan 19, 2023
ffa8941
fix: do not include url path twice in identifiers
kruskall Jan 19, 2023
c5508d9
fix: refactor aliases handling and avoid init deadlock
kruskall Jan 19, 2023
b342665
fix: do not return duplicate alias if bundlefilepath is a full url
kruskall Jan 19, 2023
695c495
refactor: add more verbose debug messages to make metadata fetcher co…
kruskall Jan 19, 2023
b3c5792
refactor: clarify cache size log message
kruskall Jan 19, 2023
b48a882
fix: try to lookup the aliases in the metadata cache to account for e…
kruskall Jan 19, 2023
7aded27
refactor: cleanup and reduce duplicate code
kruskall Jan 19, 2023
ba117b4
lint: fix linter issues
kruskall Jan 19, 2023
d4fccb2
fix: use correct log format method
kruskall Jan 19, 2023
54ab5b9
fix: readd support and test for indexpattern
kruskall Jan 19, 2023
1aa9654
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 19, 2023
529b129
test: add back system tests for apm integration
kruskall Jan 21, 2023
778ec6d
feat: bump sourcemap error log message level to error
kruskall Jan 21, 2023
69e2859
fix: prevent sourcemapping specific ES config from being overwritten
kruskall Jan 21, 2023
faece2a
fix: defer closing the init channel to avoid potential deadlock
kruskall Jan 21, 2023
8c03743
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 21, 2023
878b025
refactor: reduce diff noise
kruskall Jan 21, 2023
558a473
refactor: remove unused approval document
kruskall Jan 21, 2023
7df5419
lint: remove unused methods
kruskall Jan 21, 2023
e186512
fix: ignore error when sourcemapping ES config is missing
kruskall Jan 21, 2023
c194073
test: update config test for sourcemap indexpattern changes
kruskall Jan 22, 2023
b56a1ea
refactor: improve metadata fetcher
kruskall Jan 22, 2023
7757b55
fix: document edge cases and try to maximize cache hits
kruskall Jan 23, 2023
0b38f0d
refactor: improve sourcemap es query
kruskall Jan 23, 2023
a890a2f
feat: add back kibana fetcher as fallback
kruskall Jan 23, 2023
174e87e
test: add systemtest for kibana fetcher
kruskall Jan 23, 2023
d12ebdf
fix: do not leak sourcemap sync goroutine
kruskall Jan 23, 2023
90bc423
fix: use scroll search to populate metadata cache
kruskall Jan 23, 2023
88f78a0
refactor: cleanup search query functions
kruskall Jan 23, 2023
873b4c6
refactor: reduce duplicate code
kruskall Jan 23, 2023
c919996
docs: add documentation on why we need to renew scrollID
kruskall Jan 23, 2023
b674d5d
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 23, 2023
4bc95f7
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 25, 2023
e13870b
docs: fix typo
kruskall Jan 25, 2023
3066ce0
refactor: do not encode scroll id in the body manually
kruskall Jan 25, 2023
42bc926
refactor: use result hits instead of manual comparison
kruskall Jan 25, 2023
f258815
docs: fix unicode chars
kruskall Jan 25, 2023
0c34f3c
fix: close invalidation channel
kruskall Jan 25, 2023
444c55b
refactor: add missing json struct tag to essourcemapresponse fields
kruskall Jan 25, 2023
b7389a6
refactor: remove unused fleetcfg parameter
kruskall Jan 25, 2023
4610931
refactor: rename sourcemapfetcher cancel func for clarity
kruskall Jan 25, 2023
ed99eb5
feat: remove sourcemapping metadata option
kruskall Jan 26, 2023
29d329d
lint: fix linter issues
kruskall Jan 26, 2023
cd6ae96
test: improve TestStoreUsesRUMElasticsearchConfig to make sure the co…
kruskall Jan 26, 2023
e2b24a0
feat: return an error if sourcemap is missing
kruskall Jan 26, 2023
0c3eef4
test: update approvals document
kruskall Jan 26, 2023
1d4d261
refactor: rework sync logic and decouple fetchers
kruskall Jan 27, 2023
240da96
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 27, 2023
26ecdfe
fix: prevent race condition between sync goroutines
kruskall Jan 27, 2023
cf94883
refactor: abstract alias retrieval
kruskall Jan 27, 2023
5c92302
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 27, 2023
c8ca86d
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 28, 2023
c593dfc
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 29, 2023
c87ce27
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 30, 2023
cc5a01a
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 30, 2023
f353cd1
refactor: rework fetchers and improve errors
kruskall Jan 31, 2023
04dc1fb
Merge branch 'main' into feat/es-sourcemap
kruskall Jan 31, 2023
88c8011
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 2, 2023
824c68f
refactor: print routine completion message on success
kruskall Feb 2, 2023
9efecb9
lint: fix variable name typo
kruskall Feb 2, 2023
d466045
feat: remove index pattern setting
kruskall Feb 2, 2023
ed6c474
docs remove outdated comment
kruskall Feb 2, 2023
ffaef7f
refactor: reduce data type visibility
kruskall Feb 2, 2023
c4e9407
lint: fix linter issues
kruskall Feb 2, 2023
6d1472c
fix: update sourcemap hash if changed
kruskall Feb 2, 2023
ab147c5
test: add es unavailable sourcemap test
kruskall Feb 2, 2023
dccaec4
fix: handle es unreachable and clarify status code handling code
kruskall Feb 2, 2023
067202b
test: fix kibana sourcemap test
kruskall Feb 2, 2023
5ff04ac
fix: handle http status 401 correctly
kruskall Feb 2, 2023
e64ddb3
refactor: simplify chained fetcher logic
kruskall Feb 2, 2023
a65af2a
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 2, 2023
0db75d9
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 5, 2023
711ed63
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 6, 2023
9c19470
refactor: update invalidation goroutine log message level
kruskall Feb 6, 2023
55f95c9
fix: create a new context when falling back in the chained fetcher
kruskall Feb 6, 2023
868eb55
refactor: update elasticsearch fetcher error message
kruskall Feb 6, 2023
be3ea36
lint: fix method name typo
kruskall Feb 6, 2023
54be00c
refactor: avoid additional gorutine
kruskall Feb 6, 2023
17b485b
refactor: remove isdebug check
kruskall Feb 6, 2023
557e4a3
refactor: update es fetcher to use get api
kruskall Feb 6, 2023
af66eda
refactor: use es library source field instead of encoding the request…
kruskall Feb 6, 2023
6eb7746
refactor: remove unused search.go file
kruskall Feb 6, 2023
2ac90e0
fix: encode document id when building the search query
kruskall Feb 6, 2023
a44b82a
feat: block until the metadata cache is populated
kruskall Feb 7, 2023
ee25e37
fix: handle 401 unauthorized in metadata fetcher
kruskall Feb 7, 2023
11c097b
fix: set the init error during init
kruskall Feb 7, 2023
3530b49
refactor: cleanup metadata updating to avoid useless map operations
kruskall Feb 7, 2023
14d230a
fix: do not block forever when invalidating sourcemaps
kruskall Feb 7, 2023
d8924cc
lint: fix linter issues
kruskall Feb 7, 2023
3ef10e5
test: refactor sourcemap test for clarity and duplicate code
kruskall Feb 7, 2023
bbebe6a
test: update unit test for new es response format
kruskall Feb 7, 2023
c24db8d
docs: remove stale comment
kruskall Feb 7, 2023
08ae602
fix: close ping request response body
kruskall Feb 7, 2023
5b0dfb6
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 7, 2023
2e814f4
fix: do not send empty invalidations
kruskall Feb 7, 2023
53c7d97
test: add metadata fetcher tests
kruskall Feb 7, 2023
8028c46
test: add sourcemap fetcher tests
kruskall Feb 7, 2023
8c61b76
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 7, 2023
4de127a
docs: remove todo
kruskall Feb 7, 2023
7b75446
feat: remove cache expiration option
kruskall Feb 7, 2023
a182dd2
Merge branch 'main' into feat/es-sourcemap
kruskall Feb 7, 2023
1d74a28
lint: fix linter issues
kruskall Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,7 @@ func (s *Runner) Run(ctx context.Context) error {
if err != nil {
return err
}
cachingFetcher, err := sourcemap.NewCachingFetcher(
fetcher, s.config.RumConfig.SourceMapping.Cache.Expiration,
)
if err != nil {
return err
}
sourcemapFetcher = cachingFetcher
sourcemapFetcher = fetcher
}

// Create the runServer function. We start with newBaseRunServer, and then
Expand Down Expand Up @@ -827,26 +821,49 @@ func newSourcemapFetcher(
}
}

return sourcemap.NewFleetFetcher(
ff, err := sourcemap.NewFleetFetcher(
&client,
fleetCfg.AccessAPIKey,
fleetServerURLs,
artifactRefs,
)
if err != nil {
return nil, err
}

cachingFetcher, err := sourcemap.NewCachingFetcher(ff, cfg.Cache.Expiration)
if err != nil {
return nil, err
}

return cachingFetcher, nil
}

// For standalone, we query both Kibana and Elasticsearch for backwards compatibility.
var chained sourcemap.ChainedFetcher
if kibanaClient != nil {
chained = append(chained, sourcemap.NewKibanaFetcher(kibanaClient))
cachingFetcher, err := sourcemap.NewCachingFetcher(sourcemap.NewKibanaFetcher(kibanaClient), cfg.Cache.Expiration)
if err != nil {
return nil, err
}

chained = append(chained, cachingFetcher)
}
esClient, err := newElasticsearchClient(cfg.ESConfig)
if err != nil {
return nil, err
}
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", version.Version)
esFetcher := sourcemap.NewElasticsearchFetcher(esClient, index)
chained = append(chained, esFetcher)
cachingFetcher, err := sourcemap.NewCachingFetcher(esFetcher, cfg.Cache.Expiration)
if err != nil {
return nil, err
}
metadataCachingFetcher := sourcemap.NewMetadataCachingFetcher(esClient, cachingFetcher, index)
metadataCachingFetcher.StartBackgroundSync()

chained = append(chained, metadataCachingFetcher)

return chained, nil
}

Expand Down
7 changes: 6 additions & 1 deletion internal/sourcemap/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ type esSourcemapResponse struct {
Hits []struct {
Source struct {
Sourcemap struct {
Sourcemap string
Service struct {
Name string
Version string
}
BundleFilepath string `json:"bundle_filepath"`
Sourcemap string
}
} `json:"_source"`
}
Expand Down
206 changes: 206 additions & 0 deletions internal/sourcemap/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package sourcemap

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/go-sourcemap/sourcemap"
"github.com/pkg/errors"

"github.com/elastic/apm-server/internal/elasticsearch"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

type Identifier struct {
name string
version string
path string
}

type Metadata struct {
id Identifier
contentHash string
}

type MetadataCachingFetcher struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any tests for the MetadataCachingFetcher logic? I was expecting a fair amount of unit tests covering the cache update logic etc.
I might be missing something here, but with the escape hatch to fetch the body directly from ES, if the metadata cache is not ready, the system tests might still be running as expected even if the metadata cache is not populated.

esClient elasticsearch.Client
set map[Identifier]bool
mu sync.RWMutex
backend Fetcher
logger *logp.Logger
once sync.Once
index string
}

func NewMetadataCachingFetcher(
c elasticsearch.Client,
backend Fetcher,
index string,
) *MetadataCachingFetcher {
return &MetadataCachingFetcher{
esClient: c,
index: index,
set: make(map[Identifier]bool),
backend: backend,
logger: logp.NewLogger(logs.Sourcemap),
}
}

func (s *MetadataCachingFetcher) Fetch(ctx context.Context, name, version, path string) (*sourcemap.Consumer, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataCachingFetcher should always only run as a background job, fetching metadata and populating a cache. Whereas the Cache and Elasticsearch Fetcher are responsible for synchr. fetching soucemap bodies from either the cache or ES. The Fetch method is called sync when processing incoming requests. Why exposing this Fetch from the MetadataCachingFetcher? This causes quite a strong coupling between the fetchers, especially the MetadataCachingFetcher doesn't need to know anything about the sourcemap body fetcher. It only needs to expose a way to access the cache with the metadata.
I guess I had rather expected it the other way around, that the MetadataCachingFetcher would expose a public method GetID and the fetcher would be passed as a parameter to the ElasticsearchFetcher.

Nit: How about also renaming the ElasticsearchFetcher to something indicating what it fetches from ES, as the MetadataCachingFetcher also fetches from ES.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the logic for syncing and sending updates to a separate file and decoupled the fetchers.

Only the sync worker and the Elasticsearch fetchers are coupled/knows about ES. The fetchers have been renamed for clarity and they are decoupled from each other.
I'm not sure I fully understand where you are heading, do you want to merge Metadata and Body Caching fetchers ?

if len(s.set) == 0 {
// First run, populate cache
s.once.Do(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if ES is not yet reachable at this point but later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back at this, I don't think it's a good idea, this will basically block until the cache is populated since we cannot retrieve sourcemaps until then with the current implementation.

I'm not sure what's the best way to proceed here. Returning nothing might be misleading. Should we bypass the cache and send the request to ES if the cache is not populated yet ?

Copy link
Contributor

@simitt simitt Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be if the metadata cache is not ready. If a sourcemap is part of the metadata cache but the body is not yet cached, it will be fetched from ES on request.
So let's take a look at the metadata cache. What are the scenarios for when the cache might not be ready?

  • ES is not reachable, due to a restart, misconfiguration, etc.: in this case, trying to fetch directly from ES would not succeed either.
  • Fetching metadata from ES is ongoing. The fetching and cache update for metadata can be expected to be fairly quick, even when many sourcemaps are stored. Bypassing the metadata cache would work here, but seems like a hack. I'd rather either block the request processing (and collect logs & metrics so we get an understanding how long this blocking period is) or treat it as no sourcemap is available (and again, collect logs & metrics for the scenario). The third option would be to block the APM Server listeners until everything is readily set up, but this would definitely lead to apm event loss, which is definitely worse.

Copy link
Member Author

@kruskall kruskall Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ES is not reachable, due to a restart, misconfiguration, etc.: in this case, trying to fetch directly from ES would not succeed either.

I don't think the metadata cache should be concerned If ES is not ready/reachable/available. A failure in that scenario would be "working as intended" imo because the metadata cache is proxying what it got from the ES fetcher (error, timeout, etc.).

Fetching metadata from ES is ongoing. The fetching and cache update for metadata can be expected to be fairly quick, even when many sourcemaps are stored. Bypassing the metadata cache would work here, but seems like a hack. I'd rather either block the request processing (and collect logs & metrics so we get an understanding how long this blocking period is) or treat it as no sourcemap is available (and again, collect logs & metrics for the scenario). The third option would be to block the APM Server listeners until everything is readily set up, but this would definitely lead to apm event loss, which is definitely worse.

I was thinking of "if init is not completed, forward the request to the backend fetcher". In theory the first request for each sourcemap would be forwarded to ES anyway so this wouldn't create additional work unless we get a request for a missing sourcemap on ES during init. The reasoning behind this was that when the metadata cache is not initialized:

  • if the sourcemap exists on ES: the metadata fetcher will send the request to the es fetcher anyway. No additional time/work/blocking: this would be the same as the metadata cache receiving a request for the first time.
  • if the sourcemap does not exist on ES: the metadata will send the request to the ES fetcher, creating additional work. The additional processing time is offset by the fact we would have had to wait for the init phase to finish if we had blocked so my assumption was that there wasn't gonna be a time difference between the two in the end.

WDYT ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is really used for triggering the first fetch, in case it hasn't yet been initiated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic has been reworked and now we forward the request to ES if the metadata cache is not ready.

This does not create additional costs since the response is cached in the lru cache.

s.sync(ctx)
})
}

key := Identifier{
name: name,
version: version,
path: path,
}

s.mu.RLock()
defer s.mu.RUnlock()

if _, found := s.set[key]; found {
// Only fetch from ES if the sourcemap id exists
return s.backend.Fetch(ctx, name, version, path)
}

return nil, nil
}

func (s *MetadataCachingFetcher) Update(ids []Identifier) {
s.mu.Lock()
defer s.mu.Unlock()

for k := range s.set {
delete(s.set, k)
}

for _, k := range ids {
s.set[k] = true
}
}

func (s *MetadataCachingFetcher) StartBackgroundSync() {
go func() {
// TODO make this a config option ?
t := time.NewTicker(30 * time.Second)
defer t.Stop()

for {
select {
case <-t.C:
ctx, cleanup := context.WithTimeout(context.Background(), 10*time.Second)
s.sync(ctx)
cleanup()
}
}
}()
}

func (s *MetadataCachingFetcher) sync(ctx context.Context) error {
resp, err := s.runSearchQuery(ctx)
if err != nil {
return errors.Wrap(err, errMsgESFailure)
}
defer resp.Body.Close()

// handle error response
if resp.StatusCode >= http.StatusMultipleChoices {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it >=300 instead of >=400?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what the current elasticsearch fetcher is using:

if resp.StatusCode >= http.StatusMultipleChoices {

if resp.StatusCode == http.StatusNotFound {
return nil
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return errors.Wrap(err, errMsgParseSourcemap)
}
return errors.New(fmt.Sprintf("%s %s", errMsgParseSourcemap, b))
}

// parse response
body, err := parseResponse(resp.Body, s.logger)
if err != nil {
return err
}

var ids []Identifier
for _, v := range body.Hits.Hits {
id := Identifier{
name: v.Source.Sourcemap.Service.Name,
version: v.Source.Sourcemap.Service.Version,
path: v.Source.Sourcemap.BundleFilepath,
}

ids = append(ids, id)
}

// Update cache
s.Update(ids)
return nil
}

func (s *MetadataCachingFetcher) runSearchQuery(ctx context.Context) (*esapi.Response, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(queryMetadata()); err != nil {
return nil, err
}

req := esapi.SearchRequest{
Index: []string{s.index},
Body: &buf,
TrackTotalHits: true,
}
return req.Do(ctx, s.esClient)
}

func queryMetadata() map[string]interface{} {
return map[string]interface{}{
"_source": []string{"sourcemap.service.*", "sourcemap.bundle_filepath"},
}
}

func parseResponse(body io.ReadCloser, logger *logp.Logger) (esSourcemapResponse, error) {
b, err := io.ReadAll(body)
if err != nil {
return esSourcemapResponse{}, err
}

var esSourcemapResponse esSourcemapResponse
if err := json.Unmarshal(b, &esSourcemapResponse); err != nil {
return esSourcemapResponse, err
}
hits := esSourcemapResponse.Hits.Total.Value
if hits == 0 || len(esSourcemapResponse.Hits.Hits) == 0 {
return esSourcemapResponse, nil
}

return esSourcemapResponse, nil
}
76 changes: 76 additions & 0 deletions systemtest/sourcemap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package systemtest_test

import (
"bytes"
"context"
"encoding/json"
"io"
"os"
"testing"

Expand All @@ -27,6 +31,7 @@ import (
"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
"github.com/elastic/go-elasticsearch/v8/esapi"
)

func TestRUMErrorSourcemapping(t *testing.T) {
Expand Down Expand Up @@ -131,6 +136,77 @@ func TestNoMatchingSourcemap(t *testing.T) {
)
}

func TestSourcemapElasticsearch(t *testing.T) {
t.Skip("DISABLED FOR NOW")

systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServerTB(t)
srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true}
err := srv.Start()
require.NoError(t, err)

sourcemap, err := os.ReadFile("../testdata/sourcemap/bundle.js.map")
require.NoError(t, err)

type Source struct {
Timestamp string `json:"@timestamp"`
Processor struct {
Name string `json:"name"`
} `json:"processor"`
Sourcemap struct {
Service struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"service"`
BundleFilepath string `json:"bundle_filepath"`
Sourcemap string `json:"sourcemap"`
} `json:"sourcemap"`
}

s := Source{
Timestamp: "1970-01-01T00:00:01.000Z",
Processor: struct {
Name string `json:"name"`
}{
Name: "sourcemap",
},
Sourcemap: struct {
Service struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"service"`
BundleFilepath string `json:"bundle_filepath"`
Sourcemap string `json:"sourcemap"`
}{
Service: struct {
Name string `json:"name"`
Version string `json:"version"`
}{
Name: "apm-agent-js",
Version: "1.0.1",
},
BundleFilepath: "http://localhost:8000/test/e2e/general-usecase/bundle.js.map",
Sourcemap: string(sourcemap),
},
}

b, err := json.Marshal(s)
require.NoError(t, err)

_, err = systemtest.Elasticsearch.Do(context.Background(), &esapi.IndexRequest{
Index: "apm-test-sourcemap",
Body: bytes.NewReader(b),
}, nil)
require.NoError(t, err)

systemtest.Elasticsearch.ExpectMinDocs(t, 1, "apm-*-sourcemap", nil)

// Index an error, applying source mapping and caching the source map in the process.
systemtest.SendRUMEventsPayload(t, srv.URL, "../testdata/intake-v2/errors_rum.ndjson")
result := systemtest.Elasticsearch.ExpectDocs(t, "logs-apm.error-*", nil)
assertSourcemapUpdated(t, result, true)
}

func TestSourcemapCaching(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServerTB(t)
Expand Down