Skip to content

Commit

Permalink
refine error handle to avoid blocking at sourceDB
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Jul 9, 2019
1 parent 6b171c1 commit 11782cb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
38 changes: 29 additions & 9 deletions arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,25 @@ func (s *Server) Run() error {
wg.Done()
}()

res := make(chan error)
wg.Add(1)
var syncErr error = nil
go func() {
err := syncBinlogs(s.kafkaReader.Messages(), s.load)
wg.Done()
if err != nil {
s.Close()
defer wg.Done()
for syncErr = range res {
if syncErr != nil {
s.Close()
break
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
syncBinlogs(s.kafkaReader.Messages(), s.load, res)
}()

err := s.load.Run()
if err != nil {
s.Close()
Expand All @@ -200,6 +210,10 @@ func (s *Server) Run() error {
return errors.Trace(err)
}

if syncErr != nil {
return errors.Trace(syncErr)
}

if err = s.saveFinishTS(StatusNormal); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -265,14 +279,20 @@ func (s *Server) loadStatus() (int, error) {
return status, errors.Trace(err)
}

func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) {
func syncBinlogs(source <-chan *reader.Message, ld loader.Loader, res chan error) {
dest := ld.Input()
var txn *loader.Txn
var err error = nil
for msg := range source {
log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset))
txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed", zap.Error(err))
return err
continue
}
txn, err = loader.SlaveBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
res <- err
continue
}
txn.Metadata = msg
dest <- txn
Expand All @@ -281,5 +301,5 @@ func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) {
queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest)))
}
ld.Close()
return nil
close(res)
}
12 changes: 11 additions & 1 deletion arbiter/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,17 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) {
}()
ld := dummyLoader{input: dest}

err := syncBinlogs(source, &ld)
res := make(chan error)
var err error = nil
go func() {
for err := range res {
if err != nil {
break
}
}
}()

syncBinlogs(source, &ld, res)
c.Assert(err, IsNil)

c.Assert(len(dest), Equals, 2)
Expand Down

0 comments on commit 11782cb

Please sign in to comment.