From 23a5b4e82502b4f8403dde9901536d3816be9296 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 11:02:21 -0700 Subject: [PATCH 1/7] Allow missing struct wrapper methods --- sdks/go/pkg/beam/core/graph/fn.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 931458d922d2b..0dc19e1766659 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -122,7 +122,6 @@ func NewFn(fn interface{}) (*Fn, error) { } methods[name] = f } - return &Fn{Recv: fn, methods: methods, annotations: annotations}, nil } // TODO(lostluck): Consider moving this into the reflectx package. for i := 0; i < val.Type().NumMethod(); i++ { @@ -133,6 +132,9 @@ func NewFn(fn interface{}) (*Fn, error) { if m.Name == "String" { continue // skip: harmless } + if _, ok := methods[m.Name]; ok { + continue // skip : already wrapped + } // CAVEAT(herohde) 5/22/2017: The type val.Type.Method.Type is not // the same as val.Method.Type: the former has the explicit receiver. From d072de65a7340172469ae2f6f1262f5afc55c5cb Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 11:45:01 -0700 Subject: [PATCH 2/7] optimize loadtest harness, add local time logging --- sdks/go/test/load/util.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/go/test/load/util.go b/sdks/go/test/load/util.go index 96436d0d67820..0ed64e5ae0aa1 100644 --- a/sdks/go/test/load/util.go +++ b/sdks/go/test/load/util.go @@ -24,12 +24,12 @@ import ( "log" "net/http" "os" - "reflect" "strings" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) const ( @@ -60,7 +60,8 @@ var ( ) func init() { - beam.RegisterType(reflect.TypeOf((*RuntimeMonitor)(nil)).Elem()) + register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*RuntimeMonitor)(nil)) + register.Emitter2[[]byte, []byte]() } // RuntimeMonitor is a DoFn to record processing time in the pipeline. @@ -132,10 +133,16 @@ func newLoadTestResult(value float64) loadTestResult { // PublishMetrics calculates the runtime and sends the result to InfluxDB database. func PublishMetrics(results metrics.QueryResults) { options := newInfluxDBOptions() - if options.validate() { - if res := toLoadTestResults(results); len(res) > 0 { - publishMetricstoInfluxDB(options, toLoadTestResults(results)) - } + ress := toLoadTestResults(results) + for _, res := range ress { + log.Printf("%s %v", res.metric, time.Duration(float64(time.Second)*res.value)) + } + if len(ress) == 0 { + log.Print("No metrics returned.") + return + } + if options.validate() && len(ress) > 0 { + publishMetricstoInfluxDB(options, ress) } else { log.Print("Missing InfluxDB options. Metrics will not be published to InfluxDB") } @@ -212,8 +219,8 @@ func publishMetricstoInfluxDB(options *influxDBOptions, results []loadTestResult if resp.StatusCode != 204 { jsonData := make(map[string]string) json.Unmarshal(body, &jsonData) - log.Print(fmt.Errorf("Failed to publish metrics to InfluxDB. Received status code %v "+ - "with an error message: %v", resp.StatusCode, jsonData["error"])) + log.Printf("Failed to publish metrics to InfluxDB. Received status code %v "+ + "with an error message: %v", resp.StatusCode, jsonData["error"]) } } From e0ba3bbb48ec167f3ab4909d52848f19182f5166 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 11:45:29 -0700 Subject: [PATCH 3/7] optimize synthetic batch source --- sdks/go/pkg/beam/io/synthetic/source.go | 8 +++++--- sdks/go/pkg/beam/io/synthetic/step.go | 7 ++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go index 822c416f6969e..1aed7f235bf7f 100644 --- a/sdks/go/pkg/beam/io/synthetic/source.go +++ b/sdks/go/pkg/beam/io/synthetic/source.go @@ -27,17 +27,19 @@ import ( "encoding/json" "fmt" "math/rand" - "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) func init() { - beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem()) - beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem()) + // beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem()) + // beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem()) + register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func([]byte, []byte), error]((*sourceFn)(nil)) + register.Emitter2[[]byte, []byte]() } // Source creates a synthetic source transform that emits randomly diff --git a/sdks/go/pkg/beam/io/synthetic/step.go b/sdks/go/pkg/beam/io/synthetic/step.go index 3691fec9b4fe0..d800f5a054d38 100644 --- a/sdks/go/pkg/beam/io/synthetic/step.go +++ b/sdks/go/pkg/beam/io/synthetic/step.go @@ -18,18 +18,19 @@ package synthetic import ( "fmt" "math/rand" - "reflect" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" ) func init() { - beam.RegisterType(reflect.TypeOf((*stepFn)(nil)).Elem()) - beam.RegisterType(reflect.TypeOf((*sdfStepFn)(nil)).Elem()) + register.DoFn3x0[[]byte, []byte, func([]byte, []byte)]((*stepFn)(nil)) + register.DoFn4x0[*sdf.LockRTracker, []byte, []byte, func([]byte, []byte)]((*sdfStepFn)(nil)) + register.Emitter2[[]byte, []byte]() } // Step creates a synthetic step transform that receives KV<[]byte, []byte> From f4fc872613c442555cf03644af3a7f2e13177dd7 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 15:39:31 -0700 Subject: [PATCH 4/7] fix side input test measurement. --- .../jenkins/job_LoadTests_SideInput_Go.groovy | 8 +++--- .../SideInput_Load_Tests.json | 2 +- sdks/go/test/load/sideinput/sideinput.go | 28 +++++++++++++------ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy index 225bbc799989f..55809ac7a103b 100644 --- a/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy +++ b/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy @@ -29,7 +29,7 @@ String now = new Date().format('MMddHHmmss', TimeZone.getTimeZone('UTC')) def batchScenarios = { [ [ - title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-first-iterable', + title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-first-iterable', test : 'sideinput', runner : CommonTestProperties.Runner.DATAFLOW, pipelineOptions: [ @@ -41,7 +41,7 @@ def batchScenarios = { influx_namespace : 'dataflow', influx_measurement : 'go_batch_sideinput_3', input_options : '\'{' + - '"num_records": 400000,' + + '"num_records": 10000000,' + '"key_size": 100,' + '"value_size": 900}\'', access_percentage: 1, @@ -52,7 +52,7 @@ def batchScenarios = { ] ], [ - title : 'SideInput Go Load test: 400mb-1kb-10workers-1window-iterable', + title : 'SideInput Go Load test: 10gb-1kb-10workers-1window-iterable', test : 'sideinput', runner : CommonTestProperties.Runner.DATAFLOW, pipelineOptions: [ @@ -64,7 +64,7 @@ def batchScenarios = { influx_namespace : 'dataflow', influx_measurement : 'go_batch_sideinput_4', input_options : '\'{' + - '"num_records": 400000,' + + '"num_records": 10000000,' + '"key_size": 100,' + '"value_size": 900}\'', num_workers : 10, diff --git a/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json b/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json index 62616951c808d..cee6bb7057337 100644 --- a/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json +++ b/.test-infra/metrics/grafana/dashboards/perftests_metrics/SideInput_Load_Tests.json @@ -21,7 +21,7 @@ "links": [], "panels": [ { - "content": "The following options should be used by default:\n* key size: 100B\n* value size: 900B\n* number of workers: 10\n* size of the window (if fixed windows are used): 1 second\n\n[Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)\n\nUntil the issue [BEAM-11427](https://issues.apache.org/jira/browse/BEAM-11427) in Go SDK is resolved, sideinput iteration test have 400MB, instead of 10GB.", + "content": "Jenkins job definition (Python, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Python.groovy) [Jenkins job definition (Go, Flink)](https://github.com/apache/beam/tree/master/.test-infra/jenkins/job_LoadTests_SideInput_Flink_Go.groovy) [Jenkins job definition (Go, Dataflow)](https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_LoadTests_SideInput_Go.groovy)", "datasource": null, "gridPos": { "h": 8, diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go index 6f7cb6f2d4192..3f95bdfec54d5 100644 --- a/sdks/go/test/load/sideinput/sideinput.go +++ b/sdks/go/test/load/sideinput/sideinput.go @@ -18,17 +18,20 @@ package main import ( "context" "flag" - "reflect" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/test/load" ) func init() { - beam.RegisterDoFn(reflect.TypeOf((*doFn)(nil))) + register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*doFn)(nil)) + register.Emitter2[[]byte, []byte]() + register.Iter2[[]byte, []byte]() + register.Function2x0(impToKV) } var ( @@ -51,11 +54,17 @@ func parseSyntheticConfig() synthetic.SourceConfig { } } +// impToKV just turns an impulse signal into a KV instead of +// adding a single value input version of RuntimeMonitor +func impToKV(imp []byte, emit func([]byte, []byte)) { + emit(imp, imp) +} + type doFn struct { ElementsToAccess int64 } -func (fn *doFn) ProcessElement(_ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) { +func (fn *doFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) { var key, value []byte var i int64 for values(&key, &value) { @@ -74,18 +83,21 @@ func main() { p, s := beam.NewPipelineWithRoot() syntheticConfig := parseSyntheticConfig() - elementsToAccess := syntheticConfig.NumElements * int64(*accessPercentage/100) + elementsToAccess := syntheticConfig.NumElements * int64(float64(*accessPercentage)/float64(100)) src := synthetic.SourceSingle(s, syntheticConfig) - src = beam.ParDo(s, &load.RuntimeMonitor{}, src) - src = beam.ParDo( + imp := beam.Impulse(s) + impKV := beam.ParDo(s, impToKV, imp) + monitored := beam.ParDo(s, &load.RuntimeMonitor{}, impKV) + + useSide := beam.ParDo( s, &doFn{ElementsToAccess: elementsToAccess}, - beam.Impulse(s), + monitored, beam.SideInput{Input: src}) - beam.ParDo(s, &load.RuntimeMonitor{}, src) + beam.ParDo(s, &load.RuntimeMonitor{}, useSide) presult, err := beamx.RunWithMetrics(ctx, p) if err != nil { From 8d0b3a30d346836b4d4b24879ead9032fe6070ad Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 15:41:12 -0700 Subject: [PATCH 5/7] [BEAM-14470] Optimize remaining LoadTests --- sdks/go/test/load/cogbk/cogbk.go | 8 +++- sdks/go/test/load/combine/combine.go | 8 ++++ .../go/test/load/group_by_key/group_by_key.go | 38 +++++++++++++------ sdks/go/test/load/pardo/pardo.go | 10 +++-- 4 files changed, 48 insertions(+), 16 deletions(-) diff --git a/sdks/go/test/load/cogbk/cogbk.go b/sdks/go/test/load/cogbk/cogbk.go index 77b196f1620fe..eefd0bddff1cf 100644 --- a/sdks/go/test/load/cogbk/cogbk.go +++ b/sdks/go/test/load/cogbk/cogbk.go @@ -18,11 +18,11 @@ package main import ( "context" "flag" - "reflect" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/test/load" ) @@ -43,7 +43,9 @@ var ( ) func init() { - beam.RegisterType(reflect.TypeOf((*ungroupAndReiterateFn)(nil)).Elem()) + register.DoFn4x0[[]byte, func(*[]byte) bool, func(*[]byte) bool, func([]byte, []byte)]((*ungroupAndReiterateFn)(nil)) + register.Emitter2[[]byte, []byte]() + register.Iter1[[]byte]() } // ungroupAndReiterateFn reiterates given number of times over CoGBK's output. @@ -51,6 +53,8 @@ type ungroupAndReiterateFn struct { Iterations int } +// TODO use re-iterators once supported. + func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, p1values, p2values func(*[]byte) bool, emit func([]byte, []byte)) { var value []byte for i := 0; i < fn.Iterations; i++ { diff --git a/sdks/go/test/load/combine/combine.go b/sdks/go/test/load/combine/combine.go index 32d46a1d94a73..9f00bbf799099 100644 --- a/sdks/go/test/load/combine/combine.go +++ b/sdks/go/test/load/combine/combine.go @@ -23,6 +23,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/test/load" @@ -52,6 +53,12 @@ func parseSyntheticConfig() synthetic.SourceConfig { } } +func init() { + register.Function2x1(compareLess) + register.Function3x0(getElement) + register.Emitter2[[]byte, []byte]() +} + func compareLess(key []byte, value []byte) bool { return bytes.Compare(key, value) < 0 } @@ -73,6 +80,7 @@ func main() { pcoll := top.LargestPerKey(s, src, *topCount, compareLess) pcoll = beam.ParDo(s, getElement, pcoll) pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll) + _ = pcoll } presult, err := beamx.RunWithMetrics(ctx, p) diff --git a/sdks/go/test/load/group_by_key/group_by_key.go b/sdks/go/test/load/group_by_key/group_by_key.go index 78871c96d6f94..645afabedef49 100644 --- a/sdks/go/test/load/group_by_key/group_by_key.go +++ b/sdks/go/test/load/group_by_key/group_by_key.go @@ -24,6 +24,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/test/load" ) @@ -52,6 +53,30 @@ func parseSyntheticConfig() synthetic.SourceConfig { } } +func init() { + register.DoFn2x2[[]byte, func(*[]byte) bool, []byte, []byte]((*ungroupAndReiterateFn)(nil)) + register.Iter1[[]byte]() +} + +// ungroupAndReiterateFn reiterates given number of times over GBK's output. +type ungroupAndReiterateFn struct { + Iterations int +} + +// TODO use re-iterators once supported. + +func (fn *ungroupAndReiterateFn) ProcessElement(key []byte, values func(*[]byte) bool) ([]byte, []byte) { + var value []byte + for i := 0; i < fn.Iterations; i++ { + for values(&value) { + if i == fn.Iterations-1 { + return key, value + } + } + } + return key, []byte{0} +} + func main() { flag.Parse() beam.Init() @@ -63,18 +88,9 @@ func main() { src = beam.ParDo(s, &load.RuntimeMonitor{}, src) for i := 0; i < *fanout; i++ { pcoll := beam.GroupByKey(s, src) - pcoll = beam.ParDo(s, func(key []byte, values func(*[]byte) bool) ([]byte, []byte) { - for i := 0; i < *iterations; i++ { - var value []byte - for values(&value) { - if i == *iterations-1 { - return key, value - } - } - } - return key, []byte{0} - }, pcoll) + pcoll = beam.ParDo(s, &ungroupAndReiterateFn{*iterations}, pcoll) pcoll = beam.ParDo(s, &load.RuntimeMonitor{}, pcoll) + _ = pcoll } presult, err := beamx.RunWithMetrics(ctx, p) diff --git a/sdks/go/test/load/pardo/pardo.go b/sdks/go/test/load/pardo/pardo.go index 8114ac8a9060d..eceae82bc5d05 100644 --- a/sdks/go/test/load/pardo/pardo.go +++ b/sdks/go/test/load/pardo/pardo.go @@ -19,11 +19,11 @@ import ( "context" "flag" "fmt" - "reflect" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/synthetic" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/test/load" ) @@ -48,7 +48,8 @@ var ( ) func init() { - beam.RegisterType(reflect.TypeOf((*counterOperationFn)(nil)).Elem()) + register.DoFn4x0[context.Context, []byte, []byte, func([]byte, []byte)]((*counterOperationFn)(nil)) + register.Emitter2[[]byte, []byte]() } type counterOperationFn struct { @@ -57,7 +58,10 @@ type counterOperationFn struct { } func newCounterOperationFn(operations, numCounters int) *counterOperationFn { - return &counterOperationFn{operations, numCounters, nil} + return &counterOperationFn{ + Operations: operations, + NumCounters: numCounters, + } } func (fn *counterOperationFn) Setup() { From 8d7f98895f3fbc80469face7490f6a489d2850da Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 13 May 2022 15:49:08 -0700 Subject: [PATCH 6/7] typo --- sdks/go/pkg/beam/io/synthetic/source.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/go/pkg/beam/io/synthetic/source.go b/sdks/go/pkg/beam/io/synthetic/source.go index 1aed7f235bf7f..d80a89c931c52 100644 --- a/sdks/go/pkg/beam/io/synthetic/source.go +++ b/sdks/go/pkg/beam/io/synthetic/source.go @@ -36,8 +36,6 @@ import ( ) func init() { - // beam.RegisterType(reflect.TypeOf((*sourceFn)(nil)).Elem()) - // beam.RegisterType(reflect.TypeOf((*SourceConfig)(nil)).Elem()) register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func([]byte, []byte), error]((*sourceFn)(nil)) register.Emitter2[[]byte, []byte]() } From e09d3925947352f2e79072bda1158cc5f2ce8d0f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Sat, 14 May 2022 15:43:02 -0700 Subject: [PATCH 7/7] review comments --- sdks/go/test/load/sideinput/sideinput.go | 8 ++++---- sdks/go/test/load/util.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/go/test/load/sideinput/sideinput.go b/sdks/go/test/load/sideinput/sideinput.go index 3f95bdfec54d5..c57ed9f0b2359 100644 --- a/sdks/go/test/load/sideinput/sideinput.go +++ b/sdks/go/test/load/sideinput/sideinput.go @@ -28,7 +28,7 @@ import ( ) func init() { - register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*doFn)(nil)) + register.DoFn4x0[[]byte, []byte, func(*[]byte, *[]byte) bool, func([]byte, []byte)]((*iterSideInputFn)(nil)) register.Emitter2[[]byte, []byte]() register.Iter2[[]byte, []byte]() register.Function2x0(impToKV) @@ -60,11 +60,11 @@ func impToKV(imp []byte, emit func([]byte, []byte)) { emit(imp, imp) } -type doFn struct { +type iterSideInputFn struct { ElementsToAccess int64 } -func (fn *doFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) { +func (fn *iterSideInputFn) ProcessElement(_, _ []byte, values func(*[]byte, *[]byte) bool, emit func([]byte, []byte)) { var key, value []byte var i int64 for values(&key, &value) { @@ -93,7 +93,7 @@ func main() { useSide := beam.ParDo( s, - &doFn{ElementsToAccess: elementsToAccess}, + &iterSideInputFn{ElementsToAccess: elementsToAccess}, monitored, beam.SideInput{Input: src}) diff --git a/sdks/go/test/load/util.go b/sdks/go/test/load/util.go index 0ed64e5ae0aa1..4cbfda8e0ba9d 100644 --- a/sdks/go/test/load/util.go +++ b/sdks/go/test/load/util.go @@ -141,7 +141,7 @@ func PublishMetrics(results metrics.QueryResults) { log.Print("No metrics returned.") return } - if options.validate() && len(ress) > 0 { + if options.validate() { publishMetricstoInfluxDB(options, ress) } else { log.Print("Missing InfluxDB options. Metrics will not be published to InfluxDB")