Skip to content

Commit

Permalink
[BEAM-13903] Improve coverage of metricsx package (apache#16994)
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm authored and nancyxu123 committed Mar 9, 2022
1 parent 3748dd5 commit ee7a1fb
Showing 1 changed file with 181 additions and 3 deletions.
184 changes: 181 additions & 3 deletions sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Counters()
size := len(got)
if size < 1 {
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
Expand All @@ -68,6 +68,184 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
}
}

func TestFromMonitoringInfos_Msec(t *testing.T) {
want := metrics.MsecResult{
Attempted: metrics.MsecValue{
Start: 15 * time.Millisecond,
Process: 20 * time.Millisecond,
Finish: 40 * time.Millisecond,
Total: 25 * time.Millisecond,
},
Committed: metrics.MsecValue{
Start: 0 * time.Millisecond,
Process: 0 * time.Millisecond,
Finish: 0 * time.Millisecond,
Total: 0 * time.Millisecond,
},
Key: metrics.StepKey{
Step: "main.customDoFn",
Name: "customCounter",
Namespace: "customDoFn",
},
}

labels := map[string]string{
"PTRANSFORM": "main.customDoFn",
"NAMESPACE": "customDoFn",
"NAME": "customCounter",
}

startValue, err := Int64Counter(int64(15))
if err != nil {
t.Fatalf("Failed to encode Int64Counter: %v", err)
}
processValue, err := Int64Counter(int64(20))
if err != nil {
t.Fatalf("Failed to encode Int64Counter: %v", err)
}
finishValue, err := Int64Counter(int64(40))
if err != nil {
t.Fatalf("Failed to encode Int64Counter: %v", err)
}
totalValue, err := Int64Counter(int64(25))
if err != nil {
t.Fatalf("Failed to encode Int64Counter: %v", err)
}
mStartBundleInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(ExecutionMsecUrn(0)),
Type: UrnToType(ExecutionMsecUrn(0)),
Labels: labels,
Payload: startValue,
}
mProcessBundleInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(ExecutionMsecUrn(1)),
Type: UrnToType(ExecutionMsecUrn(1)),
Labels: labels,
Payload: processValue,
}
mFinishBundleInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(ExecutionMsecUrn(2)),
Type: UrnToType(ExecutionMsecUrn(2)),
Labels: labels,
Payload: finishValue,
}
mTotalTimeInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(ExecutionMsecUrn(3)),
Type: UrnToType(ExecutionMsecUrn(3)),
Labels: labels,
Payload: totalValue,
}

attempted := []*pipepb.MonitoringInfo{mStartBundleInfo, mProcessBundleInfo, mFinishBundleInfo, mTotalTimeInfo}
committed := []*pipepb.MonitoringInfo{}
p := &pipepb.Pipeline{}

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Msecs()
size := len(got)
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
t.Fatalf("Invalid MsecResult: got: %v, want: %v, diff(-want,+got):\n %v",
got[0], want, d)
}
}

func TestFromMonitoringInfos_PColCounters(t *testing.T) {
var value int64 = 15
want := metrics.PColResult{
Attempted: metrics.PColValue{
ElementCount: 15,
},
Key: metrics.StepKey{
Step: "main.customDoFn",
Name: "customCounter",
Namespace: "customDoFn",
}}

payload, err := Int64Counter(value)
if err != nil {
t.Fatalf("Failed to encode Int64Counter: %v", err)
}

labels := map[string]string{
"PTRANSFORM": "main.customDoFn",
"NAMESPACE": "customDoFn",
"NAME": "customCounter",
}

mInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(UrnElementCount),
Type: UrnToType(UrnElementCount),
Labels: labels,
Payload: payload,
}

attempted := []*pipepb.MonitoringInfo{mInfo}
committed := []*pipepb.MonitoringInfo{}
p := &pipepb.Pipeline{}

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
size := len(got)
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
got[0], want, d)
}
}

func TestFromMonitoringInfos_SampledByteSize(t *testing.T) {
want := metrics.PColResult{
Attempted: metrics.PColValue{
SampledByteSize: metrics.DistributionValue{
Count: 100,
Sum: 5,
Min: -12,
Max: 30,
},
},
Key: metrics.StepKey{
Step: "main.customDoFn",
Name: "customCounter",
Namespace: "customDoFn",
}}

var count, sum, min, max int64 = 100, 5, -12, 30
payload, err := Int64Distribution(count, sum, min, max)
if err != nil {
t.Fatalf("Failed to encode Int64Distribution: %v", err)
}

labels := map[string]string{
"PTRANSFORM": "main.customDoFn",
"NAMESPACE": "customDoFn",
"NAME": "customCounter",
}

mInfo := &pipepb.MonitoringInfo{
Urn: UrnToString(UrnSampledByteSize),
Type: UrnToType(UrnSampledByteSize),
Labels: labels,
Payload: payload,
}

attempted := []*pipepb.MonitoringInfo{mInfo}
committed := []*pipepb.MonitoringInfo{}
p := &pipepb.Pipeline{}

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
size := len(got)
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, FromMonitoringInfos(p, attempted, committed).AllMetrics())
}
if d := cmp.Diff(want, got[0]); d != "" {
t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
got[0], want, d)
}
}

func TestFromMonitoringInfos_Distributions(t *testing.T) {
var count, sum, min, max int64 = 100, 5, -12, 30

Expand Down Expand Up @@ -109,7 +287,7 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) {

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Distributions()
size := len(got)
if size < 1 {
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
Expand Down Expand Up @@ -159,7 +337,7 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) {

got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Gauges()
size := len(got)
if size < 1 {
if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
Expand Down

0 comments on commit ee7a1fb

Please sign in to comment.