diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index 8aaeb92c38a33..54e3a14acf44f 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -66,6 +66,7 @@ func CreateList2(a, b interface{}) (*beam.Pipeline, beam.Scope, beam.PCollection var ( Runner = runners.Runner defaultRunner = "direct" + mainCalled = false ) func getRunner() string { @@ -81,6 +82,11 @@ func DefaultRunner() string { return defaultRunner } +// MainCalled returns true iff Main or MainRet has been called. +func MainCalled() bool { + return mainCalled +} + // Run runs a pipeline for testing. The semantics of the pipeline is expected // to be verified through passert. func Run(p *beam.Pipeline) error { @@ -124,6 +130,7 @@ func Main(m *testing.M) { // pipelines on runners other than the direct runner, while setting the default // runner to use. func MainWithDefault(m *testing.M, runner string) { + mainCalled = true defaultRunner = runner if !flag.Parsed() { flag.Parse() @@ -146,6 +153,7 @@ func MainRet(m *testing.M) int { // MainRetWithDefault is equivelant to MainWithDefault but returns an exit code // to pass to os.Exit(). func MainRetWithDefault(m *testing.M, runner string) int { + mainCalled = true defaultRunner = runner if !flag.Parsed() { flag.Parse() diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 265556101a614..9b6e5058409e7 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -77,6 +77,8 @@ var directFilters = []string{ // (BEAM-13075): The direct runner does not currently support multimap side inputs "TestParDoMultiMapSideInput", "TestLargeWordcount_Loopback", + // The direct runner does not support self-checkpointing + "TestCheckpointing", } var portableFilters = []string{ @@ -87,6 +89,8 @@ var portableFilters = []string{ "TestPanes", // TODO(BEAM-12797): Python portable runner times out on Kafka reads. "TestKafkaIO.*", + // The portable runner does not support self-checkpointing + "TestCheckpointing", } var flinkFilters = []string{ @@ -110,6 +114,8 @@ var samzaFilters = []string{ "TestPanes", // TODO(BEAM-13006): Samza doesn't yet support post job metrics, used by WordCount "TestWordCount.*", + // The Samza runner does not support self-checkpointing + "TestCheckpointing", } var sparkFilters = []string{ @@ -124,6 +130,8 @@ var sparkFilters = []string{ "TestPanes", // [BEAM-13921]: Spark doesn't support side inputs to executable stages "TestDebeziumIO_BasicRead", + // The spark runner does not support self-checkpointing + "TestCheckpointing", } var dataflowFilters = []string{ @@ -143,6 +151,9 @@ var dataflowFilters = []string{ // Dataflow doesn't support any test that requires loopback. // Eg. For FileIO examples. ".*Loopback.*", + // Dataflow does not automatically terminate the TestCheckpointing pipeline when + // complete. + "TestCheckpointing", } // CheckFilters checks if an integration test is filtered to be skipped, either @@ -151,6 +162,10 @@ var dataflowFilters = []string{ // t.Run is used, CheckFilters should be called within the t.Run callback, so // that sub-tests can be skipped individually. func CheckFilters(t *testing.T) { + if !ptest.MainCalled() { + panic("ptest.Main() has not been called: please override TestMain to ensure that the integration test runs properly.") + } + // Check for sickbaying first. n := t.Name() for _, f := range sickbay { diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go new file mode 100644 index 0000000000000..6da0610c4976b --- /dev/null +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package primitives + +import ( + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*selfCheckpointingDoFn)(nil)).Elem()) +} + +type selfCheckpointingDoFn struct{} + +// CreateInitialRestriction creates the restriction being used by the SDF. In this case, the range +// of values produced by the restriction is [Start, End). +func (fn *selfCheckpointingDoFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction { + return offsetrange.Restriction{ + Start: int64(0), + End: int64(10), + } +} + +// CreateTracker wraps the fiven restriction into a LockRTracker type. +func (fn *selfCheckpointingDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +// RestrictionSize returns the size of the current restriction +func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction +// at the end of execution. +func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction { + size := int64(1) + s := rest.Start + var splits []offsetrange.Restriction + for e := s + size; e <= rest.End; s, e = e, e+size { + splits = append(splits, offsetrange.Restriction{Start: s, End: e}) + } + splits = append(splits, offsetrange.Restriction{Start: s, End: rest.End}) + return splits +} + +// ProcessElement continually gets the start position of the restriction and emits it as an int64 value before checkpointing. +// This causes the restriction to be split after the claimed work and produce no primary roots. +func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation { + position := rt.GetRestriction().(offsetrange.Restriction).Start + + for { + if rt.TryClaim(position) { + // Successful claim, emit the value and move on. + emit(position) + position += 1 + return sdf.ResumeProcessingIn(1 * time.Second) + } else if rt.GetError() != nil || rt.IsDone() { + // Stop processing on error or completion + return sdf.StopProcessing() + } else { + // Failed to claim but no error, resume later. + return sdf.ResumeProcessingIn(5 * time.Second) + } + } +} + +// Checkpoints is a small test pipeline to establish the correctness of the simple test case. +func Checkpoints(s beam.Scope) { + beam.Init() + + s.Scope("checkpoint") + out := beam.ParDo(s, &selfCheckpointingDoFn{}, beam.Impulse(s)) + passert.Count(s, out, "num ints", 10) +} diff --git a/sdks/go/test/integration/primitives/checkpointing_test.go b/sdks/go/test/integration/primitives/checkpointing_test.go new file mode 100644 index 0000000000000..e7c08d1ec5474 --- /dev/null +++ b/sdks/go/test/integration/primitives/checkpointing_test.go @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package primitives + +import ( + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/v2/go/test/integration" +) + +func TestCheckpointing(t *testing.T) { + integration.CheckFilters(t) + + p, s := beam.NewPipelineWithRoot() + Checkpoints(s) + ptest.RunAndValidate(t, p) +}