Skip to content

Commit

Permalink
add ctx
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed May 6, 2023
1 parent 12b1b1f commit b300e75
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func checkEtcdWithHangLeader(t *testing.T) error {
// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard)
go proxyWithDiscard(context.Background(), re, ep1, proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
Expand Down Expand Up @@ -402,7 +402,7 @@ func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embe
}
}

func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
func proxyWithDiscard(ctx context.Context, re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
Expand All @@ -413,29 +413,34 @@ func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscar
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
pipe(ctx, connect, serverConnect, enableDiscard)
}(connect)
}
}

func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
func pipe(ctx context.Context, src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
err := ioCopy(ctx, src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
err := ioCopy(ctx, dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
func ioCopy(ctx context.Context, dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
select {
case <-ctx.Done():
return nil
default:
}
if enableDiscard.Load() {
io.Copy(io.Discard, src)
continue
Expand Down

0 comments on commit b300e75

Please sign in to comment.