Skip to content

Commit

Permalink
proxy/client: do not print error on EOF (pingcap#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox committed Mar 7, 2023
1 parent 891bd6f commit 65775be
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
12 changes: 10 additions & 2 deletions pkg/proxy/client/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"context"
"crypto/tls"
"io"
"net"

"github.com/pingcap/TiProxy/lib/util/errors"
Expand All @@ -27,6 +28,10 @@ import (
"go.uber.org/zap"
)

var (
ErrClientConn = errors.New("this is an error from client")
)

type ClientConnection struct {
logger *zap.Logger
frontendTLSConfig *tls.Config // the TLS config to connect to clients.
Expand All @@ -38,7 +43,7 @@ type ClientConnection struct {
}

func NewClientConnection(logger *zap.Logger, conn net.Conn, frontendTLSConfig *tls.Config, backendTLSConfig *tls.Config, nsmgr *namespace.NamespaceManager, bemgr *backend.BackendConnManager) *ClientConnection {
pkt := pnet.NewPacketIO(conn)
pkt := pnet.NewPacketIOWrapErr(conn, ErrClientConn)
return &ClientConnection{
logger: logger,
frontendTLSConfig: frontendTLSConfig,
Expand Down Expand Up @@ -77,7 +82,10 @@ func (cc *ClientConnection) Run(ctx context.Context) {
}

if err := cc.processMsg(ctx); err != nil {
cc.logger.Info("process message fails", zap.String("remoteAddr", cc.Addr()), zap.Error(err))
clientErr := errors.Is(err, ErrClientConn)
if !(clientErr && errors.Is(err, io.EOF)) {
cc.logger.Info("process message fails", zap.String("remoteAddr", cc.Addr()), zap.Error(err), zap.Bool("clientErr", clientErr), zap.Bool("serverErr", !clientErr))
}
}
}

Expand Down
34 changes: 25 additions & 9 deletions pkg/proxy/net/packetio.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,15 @@ type PacketIO struct {
buf *bufio.ReadWriter
proxyInited *atomic.Bool
proxy *Proxy
wrap error
sequence uint8
}

func NewPacketIO(conn net.Conn) *PacketIO {
return NewPacketIOWrapErr(conn, nil)
}

func NewPacketIOWrapErr(conn net.Conn, wrap error) *PacketIO {
buf := bufio.NewReadWriter(
bufio.NewReaderSize(conn, defaultReaderSize),
bufio.NewWriterSize(conn, defaultWriterSize),
Expand All @@ -91,6 +96,7 @@ func NewPacketIO(conn net.Conn) *PacketIO {
conn,
buf.Reader,
},
wrap: wrap,
sequence: 0,
// TODO: disable it by default now
proxyInited: atomic.NewBool(true),
Expand All @@ -99,6 +105,13 @@ func NewPacketIO(conn net.Conn) *PacketIO {
return p
}

func (p *PacketIO) wrapErr(err error) error {
if p.wrap != nil {
return errors.Wrap(p.wrap, err)
}
return err
}

// Proxy returned parsed proxy header from clients if any.
func (p *PacketIO) Proxy() *Proxy {
if p.proxyInited.Load() {
Expand All @@ -124,7 +137,7 @@ func (p *PacketIO) GetSequence() uint8 {
return p.sequence
}

func (p *PacketIO) ReadOnePacket() ([]byte, bool, error) {
func (p *PacketIO) readOnePacket() ([]byte, bool, error) {
var header [4]byte

if _, err := io.ReadFull(p.conn, header[:]); err != nil {
Expand Down Expand Up @@ -172,9 +185,9 @@ func (p *PacketIO) ReadOnePacket() ([]byte, bool, error) {
func (p *PacketIO) ReadPacket() ([]byte, error) {
var data []byte
for {
buf, more, err := p.ReadOnePacket()
buf, more, err := p.readOnePacket()
if err != nil {
return nil, err
return nil, p.wrapErr(err)
}

data = append(data, buf...)
Expand All @@ -186,7 +199,7 @@ func (p *PacketIO) ReadPacket() ([]byte, error) {
return data, nil
}

func (p *PacketIO) WriteOnePacket(data []byte) (int, bool, error) {
func (p *PacketIO) writeOnePacket(data []byte) (int, bool, error) {
more := false
length := len(data)
if length >= mysql.MaxPayloadLen {
Expand Down Expand Up @@ -216,17 +229,17 @@ func (p *PacketIO) WriteOnePacket(data []byte) (int, bool, error) {
func (p *PacketIO) WritePacket(data []byte, flush bool) error {
// The original data might be empty.
for {
n, more, err := p.WriteOnePacket(data)
n, more, err := p.writeOnePacket(data)
if err != nil {
return err
return p.wrapErr(err)
}
data = data[n:]
// if the last packet ends with a length of MaxPayloadLen exactly
// we need another zero-length packet to end it
if len(data) == 0 {
if more {
if _, _, err := p.WriteOnePacket(nil); err != nil {
return err
if _, _, err := p.writeOnePacket(nil); err != nil {
return p.wrapErr(err)
}
}
break
Expand All @@ -240,13 +253,16 @@ func (p *PacketIO) WritePacket(data []byte, flush bool) error {

func (p *PacketIO) Flush() error {
if err := p.buf.Flush(); err != nil {
return errors.Wrap(ErrFlushConn, err)
return p.wrapErr(errors.Wrap(ErrFlushConn, err))
}
return nil
}

func (p *PacketIO) Close() error {
var errs []error
if p.wrap != nil {
errs = append(errs, p.wrap)
}
/*
TODO: flush when we want to smoothly exit
if err := p.Flush(); err != nil {
Expand Down

0 comments on commit 65775be

Please sign in to comment.