From 11782cb3b0857942d8c9b0e699c6086d9b38ec39 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 9 Jul 2019 11:38:20 +0800 Subject: [PATCH] refine error handle to avoid blocking at sourceDB --- arbiter/server.go | 38 +++++++++++++++++++++++++++++--------- arbiter/server_test.go | 12 +++++++++++- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/arbiter/server.go b/arbiter/server.go index 2cc60df81..ff1e56bf4 100644 --- a/arbiter/server.go +++ b/arbiter/server.go @@ -180,15 +180,25 @@ func (s *Server) Run() error { wg.Done() }() + res := make(chan error) wg.Add(1) + var syncErr error = nil go func() { - err := syncBinlogs(s.kafkaReader.Messages(), s.load) - wg.Done() - if err != nil { - s.Close() + defer wg.Done() + for syncErr = range res { + if syncErr != nil { + s.Close() + break + } } }() + wg.Add(1) + go func() { + defer wg.Done() + syncBinlogs(s.kafkaReader.Messages(), s.load, res) + }() + err := s.load.Run() if err != nil { s.Close() @@ -200,6 +210,10 @@ func (s *Server) Run() error { return errors.Trace(err) } + if syncErr != nil { + return errors.Trace(syncErr) + } + if err = s.saveFinishTS(StatusNormal); err != nil { return errors.Trace(err) } @@ -265,14 +279,20 @@ func (s *Server) loadStatus() (int, error) { return status, errors.Trace(err) } -func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) { +func syncBinlogs(source <-chan *reader.Message, ld loader.Loader, res chan error) { dest := ld.Input() + var txn *loader.Txn + var err error = nil for msg := range source { log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) - txn, err := loader.SlaveBinlogToTxn(msg.Binlog) if err != nil { - log.Error("transfer binlog failed", zap.Error(err)) - return err + continue + } + txn, err = loader.SlaveBinlogToTxn(msg.Binlog) + if err != nil { + log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err)) + res <- err + continue } txn.Metadata = msg dest <- txn @@ -281,5 +301,5 @@ func syncBinlogs(source <-chan *reader.Message, ld loader.Loader) (err error) { queueSizeGauge.WithLabelValues("loader_input").Set(float64(len(dest))) } ld.Close() - return nil + close(res) } diff --git a/arbiter/server_test.go b/arbiter/server_test.go index dbf0a4199..8bb0d497c 100644 --- a/arbiter/server_test.go +++ b/arbiter/server_test.go @@ -407,7 +407,17 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { }() ld := dummyLoader{input: dest} - err := syncBinlogs(source, &ld) + res := make(chan error) + var err error = nil + go func() { + for err := range res { + if err != nil { + break + } + } + }() + + syncBinlogs(source, &ld, res) c.Assert(err, IsNil) c.Assert(len(dest), Equals, 2)