Skip to content

Commit

Permalink
[BEAM-11104] Add self-checkpointing integration test (#17590)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrmccluskey committed May 11, 2022
1 parent 5674f18 commit df67c81
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 0 deletions.
8 changes: 8 additions & 0 deletions sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func CreateList2(a, b interface{}) (*beam.Pipeline, beam.Scope, beam.PCollection
var (
Runner = runners.Runner
defaultRunner = "direct"
mainCalled = false
)

func getRunner() string {
Expand All @@ -81,6 +82,11 @@ func DefaultRunner() string {
return defaultRunner
}

// MainCalled returns true iff Main or MainRet has been called.
func MainCalled() bool {
return mainCalled
}

// Run runs a pipeline for testing. The semantics of the pipeline is expected
// to be verified through passert.
func Run(p *beam.Pipeline) error {
Expand Down Expand Up @@ -124,6 +130,7 @@ func Main(m *testing.M) {
// pipelines on runners other than the direct runner, while setting the default
// runner to use.
func MainWithDefault(m *testing.M, runner string) {
mainCalled = true
defaultRunner = runner
if !flag.Parsed() {
flag.Parse()
Expand All @@ -146,6 +153,7 @@ func MainRet(m *testing.M) int {
// MainRetWithDefault is equivelant to MainWithDefault but returns an exit code
// to pass to os.Exit().
func MainRetWithDefault(m *testing.M, runner string) int {
mainCalled = true
defaultRunner = runner
if !flag.Parsed() {
flag.Parse()
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ var directFilters = []string{
// (BEAM-13075): The direct runner does not currently support multimap side inputs
"TestParDoMultiMapSideInput",
"TestLargeWordcount_Loopback",
// The direct runner does not support self-checkpointing
"TestCheckpointing",
}

var portableFilters = []string{
Expand All @@ -87,6 +89,8 @@ var portableFilters = []string{
"TestPanes",
// TODO(BEAM-12797): Python portable runner times out on Kafka reads.
"TestKafkaIO.*",
// The portable runner does not support self-checkpointing
"TestCheckpointing",
}

var flinkFilters = []string{
Expand All @@ -110,6 +114,8 @@ var samzaFilters = []string{
"TestPanes",
// TODO(BEAM-13006): Samza doesn't yet support post job metrics, used by WordCount
"TestWordCount.*",
// The Samza runner does not support self-checkpointing
"TestCheckpointing",
}

var sparkFilters = []string{
Expand All @@ -124,6 +130,8 @@ var sparkFilters = []string{
"TestPanes",
// [BEAM-13921]: Spark doesn't support side inputs to executable stages
"TestDebeziumIO_BasicRead",
// The spark runner does not support self-checkpointing
"TestCheckpointing",
}

var dataflowFilters = []string{
Expand All @@ -143,6 +151,9 @@ var dataflowFilters = []string{
// Dataflow doesn't support any test that requires loopback.
// Eg. For FileIO examples.
".*Loopback.*",
// Dataflow does not automatically terminate the TestCheckpointing pipeline when
// complete.
"TestCheckpointing",
}

// CheckFilters checks if an integration test is filtered to be skipped, either
Expand All @@ -151,6 +162,10 @@ var dataflowFilters = []string{
// t.Run is used, CheckFilters should be called within the t.Run callback, so
// that sub-tests can be skipped individually.
func CheckFilters(t *testing.T) {
if !ptest.MainCalled() {
panic("ptest.Main() has not been called: please override TestMain to ensure that the integration test runs properly.")
}

// Check for sickbaying first.
n := t.Name()
for _, f := range sickbay {
Expand Down
94 changes: 94 additions & 0 deletions sdks/go/test/integration/primitives/checkpointing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package primitives

import (
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)

func init() {
beam.RegisterType(reflect.TypeOf((*selfCheckpointingDoFn)(nil)).Elem())
}

type selfCheckpointingDoFn struct{}

// CreateInitialRestriction creates the restriction being used by the SDF. In this case, the range
// of values produced by the restriction is [Start, End).
func (fn *selfCheckpointingDoFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
return offsetrange.Restriction{
Start: int64(0),
End: int64(10),
}
}

// CreateTracker wraps the fiven restriction into a LockRTracker type.
func (fn *selfCheckpointingDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

// RestrictionSize returns the size of the current restriction
func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64 {
return rest.Size()
}

// SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction
// at the end of execution.
func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction {
size := int64(1)
s := rest.Start
var splits []offsetrange.Restriction
for e := s + size; e <= rest.End; s, e = e, e+size {
splits = append(splits, offsetrange.Restriction{Start: s, End: e})
}
splits = append(splits, offsetrange.Restriction{Start: s, End: rest.End})
return splits
}

// ProcessElement continually gets the start position of the restriction and emits it as an int64 value before checkpointing.
// This causes the restriction to be split after the claimed work and produce no primary roots.
func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
position := rt.GetRestriction().(offsetrange.Restriction).Start

for {
if rt.TryClaim(position) {
// Successful claim, emit the value and move on.
emit(position)
position += 1
return sdf.ResumeProcessingIn(1 * time.Second)
} else if rt.GetError() != nil || rt.IsDone() {
// Stop processing on error or completion
return sdf.StopProcessing()
} else {
// Failed to claim but no error, resume later.
return sdf.ResumeProcessingIn(5 * time.Second)
}
}
}

// Checkpoints is a small test pipeline to establish the correctness of the simple test case.
func Checkpoints(s beam.Scope) {
beam.Init()

s.Scope("checkpoint")
out := beam.ParDo(s, &selfCheckpointingDoFn{}, beam.Impulse(s))
passert.Count(s, out, "num ints", 10)
}
32 changes: 32 additions & 0 deletions sdks/go/test/integration/primitives/checkpointing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package primitives

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)

func TestCheckpointing(t *testing.T) {
integration.CheckFilters(t)

p, s := beam.NewPipelineWithRoot()
Checkpoints(s)
ptest.RunAndValidate(t, p)
}

0 comments on commit df67c81

Please sign in to comment.