Skip to content

Commit

Permalink
Merge branch 'master' into e2e-add-chaos-test-part-two
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored May 7, 2022
2 parents 635c95c + 80697fd commit ef34260
Show file tree
Hide file tree
Showing 67 changed files with 956 additions and 743 deletions.
41 changes: 23 additions & 18 deletions executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"encoding/json"
"strings"
"time"

Expand Down Expand Up @@ -37,10 +38,10 @@ import (
extkv "github.com/hanfei1991/microcosm/pkg/meta/extension"
"github.com/hanfei1991/microcosm/pkg/meta/kvclient"
"github.com/hanfei1991/microcosm/pkg/meta/metaclient"
pkgOrm "github.com/hanfei1991/microcosm/pkg/orm"
"github.com/hanfei1991/microcosm/pkg/p2p"
"github.com/hanfei1991/microcosm/pkg/rpcutil"
"github.com/hanfei1991/microcosm/pkg/serverutils"
"github.com/hanfei1991/microcosm/pkg/tenant"
"github.com/hanfei1991/microcosm/test"
"github.com/hanfei1991/microcosm/test/mock"
)
Expand All @@ -66,8 +67,8 @@ type Server struct {
// etcdCli connects to server master embed etcd, it should be used in service
// discovery only.
etcdCli *clientv3.Client
// framework metastore prefix kvclient
metaKVClient metaclient.KVClient
// framework metastore client
frameMetaClient pkgOrm.Client
// user metastore raw kvclient(reuse for all workers)
userRawKVClient extkv.KVClientEx
p2pMsgRouter p2pImpl.MessageRouter
Expand Down Expand Up @@ -100,8 +101,8 @@ func (s *Server) buildDeps() (*deps.Deps, error) {
return nil, err
}

err = deps.Provide(func() metaclient.KVClient {
return s.metaKVClient
err = deps.Provide(func() pkgOrm.Client {
return s.frameMetaClient
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +155,9 @@ func (s *Server) makeTask(
dctx.Environ.NodeID = p2p.NodeID(s.info.ID)
dctx.Environ.Addr = s.info.Addr

// NOTICE: only take effect when job type is job master
masterMeta := &libModel.MasterMetaKVData{
// TODO: ProjectID
ID: workerID,
Tp: workerType,
Config: workerConfig,
Expand Down Expand Up @@ -236,8 +239,8 @@ func (s *Server) Stop() {
}
}

if s.metaKVClient != nil {
err := s.metaKVClient.Close()
if s.frameMetaClient != nil {
err := s.frameMetaClient.Close()
if err != nil {
log.L().Warn("failed to close connection to framework metastore", zap.Error(err))
}
Expand Down Expand Up @@ -447,21 +450,22 @@ func (s *Server) fetchMetaStore(ctx context.Context) error {
s.cfg.RPCTimeout,
)
if err != nil {
log.L().Error("query framework metastore fail")
return err
}
log.L().Info("update framework metastore", zap.String("addr", resp.Address))

conf := metaclient.StoreConfigParams{
Endpoints: []string{resp.Address},
var conf metaclient.StoreConfigParams
err = json.Unmarshal([]byte(resp.Address), &conf)
if err != nil {
log.L().Error("unmarshal framework metastore config fail", zap.String("conf", resp.Address), zap.Error(err))
return err
}

cliEx, err := kvclient.NewKVClient(&conf)
// TODO: replace the default DB config
s.frameMetaClient, err = pkgOrm.NewClient(conf, pkgOrm.NewDefaultDBConfig())
if err != nil {
log.L().Error("access framework metastore fail", zap.Any("store-conf", conf), zap.Error(err))
log.L().Error("connect to framework metastore fail", zap.Any("conf", conf), zap.Error(err))
return err
}
// [TODO] use FrameTenantID here if support multi-tenant
s.metaKVClient = kvclient.NewPrefixKVClient(cliEx, tenant.DefaultUserTenantID)
log.L().Info("update framework metastore successful", zap.String("addr", resp.Address))

// fetch user metastore connection endpoint
resp, err = s.masterClient.QueryMetaStore(
Expand All @@ -470,18 +474,19 @@ func (s *Server) fetchMetaStore(ctx context.Context) error {
s.cfg.RPCTimeout,
)
if err != nil {
log.L().Error("query user metastore fail")
return err
}
log.L().Info("update user metastore", zap.String("addr", resp.Address))

conf = metaclient.StoreConfigParams{
Endpoints: []string{resp.Address},
}
s.userRawKVClient, err = kvclient.NewKVClient(&conf)
if err != nil {
log.L().Error("access user metastore fail", zap.Any("store-conf", conf), zap.Error(err))
log.L().Error("connect to user metastore fail", zap.Any("store-conf", conf), zap.Error(err))
return err
}
log.L().Info("update user metastore successful", zap.String("addr", resp.Address))

return nil
}
Expand Down
18 changes: 13 additions & 5 deletions jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/log"

Expand All @@ -29,9 +29,9 @@ import (
dmpkg "github.com/hanfei1991/microcosm/pkg/dm"
"github.com/hanfei1991/microcosm/pkg/externalresource/broker"
extkv "github.com/hanfei1991/microcosm/pkg/meta/extension"

kvmock "github.com/hanfei1991/microcosm/pkg/meta/kvclient/mock"
"github.com/hanfei1991/microcosm/pkg/meta/metaclient"
pkgOrm "github.com/hanfei1991/microcosm/pkg/orm"
"github.com/hanfei1991/microcosm/pkg/p2p"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -62,21 +62,24 @@ type masterParamListForTest struct {

MessageHandlerManager p2p.MessageHandlerManager
MessageSender p2p.MessageSender
MetaKVClient metaclient.KVClient
FrameMetaClient pkgOrm.Client
UserRawKVClient extkv.KVClientEx
ExecutorClientManager client.ClientsManager
ServerMasterClient client.MasterClient
ResourceBroker broker.Broker
}

// Init -> Poll -> Close
func (t *testDMJobmasterSuite) TestRunDMJobMaster() {
// FIXME: database is closed ???
func (t *testDMJobmasterSuite) testRunDMJobMaster() {
cli, err := pkgOrm.NewMockClient()
require.NoError(t.T(), err)
mockServerMasterClient := &client.MockServerMasterClient{}
mockExecutorClient := client.NewClientManager()
depsForTest := masterParamListForTest{
MessageHandlerManager: p2p.NewMockMessageHandlerManager(),
MessageSender: p2p.NewMockMessageSender(),
MetaKVClient: kvmock.NewMetaMock(),
FrameMetaClient: cli,
UserRawKVClient: kvmock.NewMetaMock(),
ExecutorClientManager: mockExecutorClient,
ServerMasterClient: mockServerMasterClient,
Expand Down Expand Up @@ -113,6 +116,11 @@ func (t *testDMJobmasterSuite) TestRunDMJobMaster() {
}))
dctx = dctx.WithDeps(dp)
require.NoError(t.T(), jobmaster.Close(context.Background()))

// FIXME: seems that mock db close unexpected here
cli, err = pkgOrm.NewMockClient()
require.NoError(t.T(), err)
depsForTest.FrameMetaClient = cli
jobmaster, err = registry.GlobalWorkerRegistry().CreateWorker(dctx, lib.DMJobMaster, "dm-jobmaster", libMetadata.JobManagerUUID, cfgBytes)
require.NoError(t.T(), err)
require.NoError(t.T(), jobmaster.Init(context.Background()))
Expand Down
2 changes: 1 addition & 1 deletion jobmaster/dm/message_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pkg/clock"
dmpkg "github.com/hanfei1991/microcosm/pkg/dm"
"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
"github.com/pingcap/errors"
)

Expand Down
2 changes: 1 addition & 1 deletion jobmaster/dm/message_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"

"github.com/hanfei1991/microcosm/lib/master"
"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"

"github.com/stretchr/testify/require"

Expand Down
4 changes: 3 additions & 1 deletion jobmaster/dm/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dm

import "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
import (
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
)

// NewDMResourceID returns a ResourceID in DM's style. Currently only support local resource.
func NewDMResourceID(taskName, sourceName string) resourcemeta.ResourceID {
Expand Down
2 changes: 1 addition & 1 deletion jobmaster/dm/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
dmconfig "github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/log"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion jobmaster/dm/worker_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
"github.com/pingcap/errors"
dmconfig "github.com/pingcap/tiflow/dm/dm/config"
"github.com/stretchr/testify/require"
Expand Down
3 changes: 2 additions & 1 deletion lib/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/hanfei1991/microcosm/pkg/errctx"
"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
Expand Down Expand Up @@ -91,6 +91,7 @@ func NewBaseJobMaster(
baseMaster := NewBaseMaster(
ctx, &jobMasterImplAsMasterImpl{jobMasterImpl}, workerID)
baseWorker := NewBaseWorker(
// TODO: need worker_type
ctx, &jobMasterImplAsWorkerImpl{jobMasterImpl}, workerID, masterID)
errCenter := errctx.NewErrCenter()
baseMaster.(*DefaultBaseMaster).errCenter = errCenter
Expand Down
9 changes: 7 additions & 2 deletions lib/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
dcontext "github.com/hanfei1991/microcosm/pkg/context"
"github.com/hanfei1991/microcosm/pkg/deps"
mockkv "github.com/hanfei1991/microcosm/pkg/meta/kvclient/mock"
pkgOrm "github.com/hanfei1991/microcosm/pkg/orm"
"github.com/hanfei1991/microcosm/pkg/p2p"
)

Expand Down Expand Up @@ -135,16 +136,20 @@ func (m *testJobMasterImpl) Status() libModel.WorkerStatus {
}

func newBaseJobMasterForTests(impl JobMasterImpl) *DefaultBaseJobMaster {
cli, err := pkgOrm.NewMockClient()
if err != nil {
panic(err)
}
params := masterParamListForTest{
MessageHandlerManager: p2p.NewMockMessageHandlerManager(),
MessageSender: p2p.NewMockMessageSender(),
MetaKVClient: mockkv.NewMetaMock(),
FrameMetaClient: cli,
UserRawKVClient: mockkv.NewMetaMock(),
ExecutorClientManager: client.NewClientManager(),
ServerMasterClient: &client.MockServerMasterClient{},
}
dp := deps.NewDeps()
err := dp.Provide(func() masterParamListForTest {
err = dp.Provide(func() masterParamListForTest {
return params
})
if err != nil {
Expand Down
31 changes: 16 additions & 15 deletions lib/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"github.com/hanfei1991/microcosm/pkg/deps"
"github.com/hanfei1991/microcosm/pkg/errctx"
derror "github.com/hanfei1991/microcosm/pkg/errors"
"github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta"
extKV "github.com/hanfei1991/microcosm/pkg/meta/extension"
resourcemeta "github.com/hanfei1991/microcosm/pkg/externalresource/resourcemeta/model"
extkv "github.com/hanfei1991/microcosm/pkg/meta/extension"
"github.com/hanfei1991/microcosm/pkg/meta/kvclient"
"github.com/hanfei1991/microcosm/pkg/meta/metaclient"
pkgOrm "github.com/hanfei1991/microcosm/pkg/orm"
"github.com/hanfei1991/microcosm/pkg/p2p"
"github.com/hanfei1991/microcosm/pkg/quota"
"github.com/hanfei1991/microcosm/pkg/tenant"
Expand Down Expand Up @@ -111,10 +112,10 @@ type DefaultBaseMaster struct {
// dependencies
messageHandlerManager p2p.MessageHandlerManager
messageSender p2p.MessageSender
// framework metastore prefix kvclient
metaKVClient metaclient.KVClient
// framework metastore client
frameMetaClient pkgOrm.Client
// user metastore raw kvclient
userRawKVClient extKV.KVClientEx
userRawKVClient extkv.KVClientEx
executorClientManager client.ClientsManager
serverMasterClient client.MasterClient

Expand Down Expand Up @@ -157,10 +158,10 @@ type masterParams struct {

MessageHandlerManager p2p.MessageHandlerManager
MessageSender p2p.MessageSender
// framework metastore prefix kvclient
MetaKVClient metaclient.KVClient
// framework metastore client
FrameMetaClient pkgOrm.Client
// user metastore raw kvclient
UserRawKVClient extKV.KVClientEx
UserRawKVClient extkv.KVClientEx
ExecutorClientManager client.ClientsManager
ServerMasterClient client.MasterClient
}
Expand Down Expand Up @@ -195,7 +196,7 @@ func NewBaseMaster(
Impl: impl,
messageHandlerManager: params.MessageHandlerManager,
messageSender: params.MessageSender,
metaKVClient: params.MetaKVClient,
frameMetaClient: params.FrameMetaClient,
userRawKVClient: params.UserRawKVClient,
executorClientManager: params.ExecutorClientManager,
serverMasterClient: params.ServerMasterClient,
Expand Down Expand Up @@ -259,7 +260,7 @@ func (m *DefaultBaseMaster) doInit(ctx context.Context) (isFirstStartUp bool, er
m.workerManager = master.NewWorkerManager(
m.id,
epoch,
m.metaKVClient,
m.frameMetaClient,
m.messageSender,
func(_ context.Context, handle master.WorkerHandle) error {
return m.Impl.OnWorkerOnline(handle)
Expand Down Expand Up @@ -414,14 +415,14 @@ func (m *DefaultBaseMaster) OnError(err error) {
// master meta is persisted before it is created, in this function we update some
// fileds to the current value, including epoch, nodeID and advertiseAddr.
func (m *DefaultBaseMaster) refreshMetadata(ctx context.Context) (isInit bool, epoch libModel.Epoch, err error) {
metaClient := metadata.NewMasterMetadataClient(m.id, m.metaKVClient)
metaClient := metadata.NewMasterMetadataClient(m.id, m.frameMetaClient)

masterMeta, err := metaClient.Load(ctx)
if err != nil {
return false, 0, err
}

epoch, err = m.metaKVClient.GenEpoch(ctx)
epoch, err = m.frameMetaClient.GenEpoch(ctx)
if err != nil {
return false, 0, err
}
Expand All @@ -431,7 +432,7 @@ func (m *DefaultBaseMaster) refreshMetadata(ctx context.Context) (isInit bool, e
masterMeta.Addr = m.advertiseAddr
masterMeta.NodeID = m.nodeID

if err := metaClient.Store(ctx, masterMeta); err != nil {
if err := metaClient.Update(ctx, masterMeta); err != nil {
return false, 0, errors.Trace(err)
}

Expand All @@ -445,14 +446,14 @@ func (m *DefaultBaseMaster) refreshMetadata(ctx context.Context) (isInit bool, e
func (m *DefaultBaseMaster) markStatusCodeInMetadata(
ctx context.Context, code libModel.MasterStatusCode,
) error {
metaClient := metadata.NewMasterMetadataClient(m.id, m.metaKVClient)
metaClient := metadata.NewMasterMetadataClient(m.id, m.frameMetaClient)
masterMeta, err := metaClient.Load(ctx)
if err != nil {
return errors.Trace(err)
}

masterMeta.StatusCode = code
return metaClient.Store(ctx, masterMeta)
return metaClient.Update(ctx, masterMeta)
}

// prepareWorkerConfig extracts information from WorkerConfig into detail fields.
Expand Down
4 changes: 2 additions & 2 deletions lib/master/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/hanfei1991/microcosm/pkg/clock"
"github.com/hanfei1991/microcosm/pkg/errctx"
derror "github.com/hanfei1991/microcosm/pkg/errors"
"github.com/hanfei1991/microcosm/pkg/meta/metaclient"
pkgOrm "github.com/hanfei1991/microcosm/pkg/orm"
"github.com/hanfei1991/microcosm/pkg/p2p"
)

Expand Down Expand Up @@ -67,7 +67,7 @@ const (
func NewWorkerManager(
masterID libModel.MasterID,
epoch libModel.Epoch,
meta metaclient.KVClient,
meta pkgOrm.Client,
messageSender p2p.MessageSender,
onWorkerOnline Callback,
onWorkerOffline CallbackWithError,
Expand Down
Loading

0 comments on commit ef34260

Please sign in to comment.