Skip to content

Commit

Permalink
optimization: Unified Sorter (#972) (#1122)
Browse files Browse the repository at this point in the history
* cherry pick #972 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Co-authored-by: Zixiong Liu <liuzixiong@pingcap.com>
  • Loading branch information
ti-srebot and liuzix authored Nov 26, 2020
1 parent 9995596 commit 3221296
Show file tree
Hide file tree
Showing 36 changed files with 2,845 additions and 70 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ ignore:
- "*.toml"
- "*.md"
- "docs/.*"
- "testing_utils/.*"
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto|ticdc\/tests|integration'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'kv_gen')
CDC_PKG := github.com/pingcap/ticdc
FAILPOINT_DIR := $$(for p in $(PACKAGES); do echo $${p\#"github.com/pingcap/$(PROJECT)/"}|grep -v "github.com/pingcap/$(PROJECT)"; done)
FAILPOINT := bin/failpoint-ctl
Expand Down Expand Up @@ -105,6 +105,8 @@ integration_test_build: check_failpoint_ctl
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)

integration_test: integration_test_mysql
Expand Down Expand Up @@ -143,8 +145,8 @@ check: check-copyright fmt lint check-static tidy errdoc

coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
@goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
Expand All @@ -156,7 +158,7 @@ else
endif

check-static: tools/bin/golangci-lint
tools/bin/golangci-lint run --timeout 10m0s
tools/bin/golangci-lint run --timeout 10m0s --skip-files kv_gen

clean:
go clean -i ./...
Expand Down
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cdc

import (
"github.com/pingcap/ticdc/cdc/puller/sorter"
"time"

"github.com/pingcap/ticdc/cdc/entry"
Expand All @@ -37,5 +38,6 @@ func init() {
puller.InitMetrics(registry)
sink.InitMetrics(registry)
entry.InitMetrics(registry)
sorter.InitMetrics(registry)
initProcessorMetrics(registry)
}
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SortEngine string
const (
SortInMemory SortEngine = "memory"
SortInFile SortEngine = "file"
SortUnified SortEngine = "unified"
)

// FeedState represents the running state of a changefeed
Expand Down
18 changes: 11 additions & 7 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate msgp

package model

import (
Expand All @@ -32,6 +34,7 @@ const (

// RegionFeedEvent from the kv layer.
// Only one of the event will be setted.
//msgp:ignore RegionFeedEvent
type RegionFeedEvent struct {
Val *RawKVEntry
Resolved *ResolvedSpan
Expand All @@ -53,25 +56,26 @@ func (e *RegionFeedEvent) GetValue() interface{} {

// ResolvedSpan guarantees all the KV value event
// with commit ts less than ResolvedTs has been emitted.
//msgp:ignore ResolvedSpan
type ResolvedSpan struct {
Span regionspan.ComparableSpan
ResolvedTs uint64
}

// RawKVEntry notify the KV operator
type RawKVEntry struct {
OpType OpType
Key []byte
OpType OpType `msg:"op_type"`
Key []byte `msg:"key"`
// nil for delete type
Value []byte
Value []byte `msg:"value"`
// nil for insert type
OldValue []byte
StartTs uint64
OldValue []byte `msg:"old_value"`
StartTs uint64 `msg:"start_ts"`
// Commit or resolved TS
CRTs uint64
CRTs uint64 `msg:"crts"`

// Additonal debug info
RegionID uint64
RegionID uint64 `msg:"region_id"`
}

func (v *RawKVEntry) String() string {
Expand Down
Loading

0 comments on commit 3221296

Please sign in to comment.