Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Feb 3, 2025
1 parent 4124762 commit b92e069
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 34 deletions.
3 changes: 2 additions & 1 deletion Makefile.d/k3d.mk
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ k3d/start:
$(K3D_COMMAND) cluster create $(K3D_CLUSTER_NAME) \
--agents $(K3D_NODES) \
--image docker.io/rancher/k3s:$(K3S_VERSION) \
--network=$(K3D_NETWORK) \
--host-pid-mode=$(K3D_HOST_PID_MODE) \
--api-port $(K3D_HOST):$(K3D_PORT) \
-v "/lib/modules:/lib/modules" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
$(K3D_OPTIONS)
@make k3d/config

Expand Down
80 changes: 48 additions & 32 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
p.addr = net.JoinHostPort(p.host, p.port)
}

log.Debugf("initial connection will try dialing to %s, host: %s, port:%d, is IP Conn: %t", p.addr, p.host, p.port, p.isIP)
conn, err := p.dial(ctx, p.addr)
if err != nil {
log.Warnf("grpc.New initial Dial check to %s returned error: %v", p.addr, err)
Expand All @@ -155,6 +156,7 @@ func New(ctx context.Context, opts ...Option) (c Conn, err error) {
}
p.port = port
p.addr = net.JoinHostPort(p.host, p.port)
log.Debugf("fallback initial connection will try dialing to %s, host: %s, port:%d, is IP Conn: %t", p.addr, p.host, p.port, p.isIP)
conn, err := p.dial(ctx, p.addr)
if err != nil {
if conn != nil {
Expand Down Expand Up @@ -188,6 +190,7 @@ func (p *pool) init(force bool) {
p.size.Store(defaultPoolSize)
}
p.pmu.RLock()
log.Debug("initializing connection pool, pool len: %d, size: %d", len(p.pool), p.Size())
if force || p.pool == nil || cap(p.pool) == 0 || len(p.pool) == 0 {
p.pmu.RUnlock()
p.pmu.Lock()
Expand Down Expand Up @@ -349,6 +352,8 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
}
p.init(false)

log.Debug("Connecting...")

if p.isIP || !p.resolveDNS {
return p.singleTargetConnect(ctx)
}
Expand All @@ -360,10 +365,16 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
}

func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) {
if p == nil || p.closing.Load() {
return p, nil
}

if uint64(len(ips)) > p.Size() {
p.grow(uint64(len(ips)))
}

log.Debugf("connecting to ips: %v, port: %d", ips, p.port)

err = p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
addr := net.JoinHostPort(ips[idx%len(ips)], p.port)
ierr := p.refreshConn(ctx, idx, pc, addr)
Expand All @@ -389,38 +400,13 @@ func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) {
return p, nil
}

func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) {
if p == nil || p.closing.Load() {
return p, nil
}

hash := p.reconnectHash.Load()
if force || hash == nil || *hash == "" {
return p.Connect(ctx)
}

healthy := p.IsHealthy(ctx)
if healthy {
if !p.isIP && p.resolveDNS && hash != nil && *hash != "" {
ips, err := p.lookupIPAddr(ctx)
if err != nil {
return p, nil
}
if *hash != strings.Join(ips, "-") {
return p.connect(ctx, ips...)
}
}
return p, nil
}

return p.Connect(ctx)
}

func (p *pool) singleTargetConnect(ctx context.Context) (c Conn, err error) {
if p == nil || p.closing.Load() {
return p, nil
}

log.Debugf("connecting to single target addr: %s, host: %s, port: %d", p.addr, p.host, p.port)

failCnt := 0
err = p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
ierr := p.refreshConn(ctx, idx, pc, p.addr)
Expand Down Expand Up @@ -448,7 +434,37 @@ func (p *pool) singleTargetConnect(ctx context.Context) (c Conn, err error) {
return p, nil
}

func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) {
if p == nil || p.closing.Load() {
return p, nil
}

log.Debug("Re-Connecting...")

hash := p.reconnectHash.Load()
if force || hash == nil || *hash == "" {
return p.Connect(ctx)
}

healthy := p.IsHealthy(ctx)
if healthy {
if !p.isIP && p.resolveDNS && hash != nil && *hash != "" {
ips, err := p.lookupIPAddr(ctx)
if err != nil {
return p, nil
}
if *hash != strings.Join(ips, "-") {
return p.connect(ctx, ips...)
}
}
return p, nil
}

return p.Connect(ctx)
}

func (p *pool) Disconnect(ctx context.Context) (err error) {
log.Debug("Disconnecting...")
p.closing.Store(true)
defer p.closing.Store(false)
emap := make(map[string]error, p.len())
Expand Down Expand Up @@ -479,6 +495,7 @@ func (p *pool) dial(ctx context.Context, addr string) (conn *ClientConn, err err
do := func() (conn *ClientConn, err error) {
ctx, cancel := context.WithTimeout(ctx, p.dialTimeout)
defer cancel()
log.Debugf("dialing to %s with timeout %s", addr, p.dialTimeout.String())
conn, err = grpc.NewClient(addr, p.dopts...)
if err != nil {
if conn != nil {
Expand Down Expand Up @@ -523,6 +540,7 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
if p == nil || p.closing.Load() {
return false
}
log.Debug("Checking health...")
var cnt, unhealthy int
pl := p.len()
err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
Expand Down Expand Up @@ -832,6 +850,7 @@ func isHealthy(ctx context.Context, conn *ClientConn) bool {
state := conn.GetState()
switch state {
case connectivity.Ready:
log.Debugf("gRPC target %s's connection status is Ready\tstatus: %s", conn.Target(), state.String())
return true
case connectivity.Connecting:
log.Debugf("gRPC target %s's connection status will be Ready soon\tstatus: %s", conn.Target(), state.String())
Expand All @@ -840,12 +859,9 @@ func isHealthy(ctx context.Context, conn *ClientConn) bool {
log.Debugf("gRPC target %s's connection status is waiting for target\tstatus: %s\ttrying to re-connect...", conn.Target(), state.String())
conn.Connect()
if conn.WaitForStateChange(ctx, state) {
state = conn.GetState()
if state == connectivity.Ready || state == connectivity.Connecting {
log.Debugf("gRPC target %s's connection status enabled for target\tstatus: %s", conn.Target(), state.String())
return true
}
return isHealthy(ctx, conn)
}
log.Errorf("gRPC target %s's connection status is not recovered\tstatus: %s", conn.Target(), state.String())
return false
case connectivity.Shutdown, connectivity.TransientFailure:
log.Errorf("gRPC target %s's connection status is unhealthy\tstatus: %s", conn.Target(), state.String())
Expand Down
2 changes: 1 addition & 1 deletion versions/K3S_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.31.3-k3s1
v1.31.4-k3s1

0 comments on commit b92e069

Please sign in to comment.