-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
consumer: sometimes got heartbeat but not message #206
Comments
did some dig and find if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
return
} |
func (r *Consumer) redistributeRDY() {
r.log(LogLevelInfo, "in func rdyLoop redistributeRDY")
if r.inBackoffTimeout() {
r.log(LogLevelInfo, "rdyLoop redistributeRDY: inBackoffTimeout")
return
}
// if an external heuristic set needRDYRedistributed we want to wait
// until we can actually redistribute to proceed
conns := r.conns()
if len(conns) == 0 {
r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns=0")
return
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: conns!=0")
maxInFlight := r.getMaxInFlight()
if len(conns) > int(maxInFlight) {
r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
len(conns), maxInFlight)
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")
if r.inBackoff() && len(conns) > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")
if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
r.log(LogLevelInfo, "rdyLoop redistributeRDY: CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) not true")
return
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: 3333") |
|
maxInFlight := r.getMaxInFlight()
r.log(LogLevelInfo, "count max in fight=%d count conns= %d",int(maxInFlight), len(conns))
if len(conns) > int(maxInFlight) {
r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
len(conns), maxInFlight)
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: 1111")
r.log(LogLevelInfo, "in backoff? %s", r.inBackoff())
if r.inBackoff() && len(conns) > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}
r.log(LogLevelInfo, "rdyLoop redistributeRDY: 2222")
|
seems any reproducted channel will not call updateRDY for any time. digging why. |
What do you mean by "reproducted"? |
sorry for that, @ploxiln . I mean when this issue appears. the associated consumer will not call function |
When I make nsqd log verbose , I saw channel created. but when curl
|
when got command NOP, conn state is |
Seems every connection registed self to But when print channel.clients in nsq, the queue is gone.
|
This sounds like a race in creating ephemeral channels - like the channel is cleaned-up before the client is fully connected (because ephemeral channels with no connections are removed). (But I haven't really investigated myself yet.) |
thanks @ploxiln . I've printed address of It's really likely as you said.
|
Seems like specifically a race in |
@slayercat thanks for the report — at this point it seems like a bug in |
thanks, @mreiferson . any workaround here? like mask the cleanup process? |
I suspect that the fix in |
I masked delete callback of channel & topic. and it seems work for me. // RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(clientID int64) {
c.Lock()
defer c.Unlock()
_, ok := c.clients[clientID]
if !ok {
return
}
delete(c.clients, clientID)
if len(c.clients) == 0 && c.ephemeral == true {
//go c.deleter.Do(func() { c.deleteCallback(c) })
////////mask for https://github.com/nsqio/go-nsq/issues/206
}
}
// DeleteExistingChannel removes a channel from the topic only if it exists
func (t *Topic) DeleteExistingChannel(channelName string) error {
t.Lock()
channel, ok := t.channelMap[channelName]
if !ok {
t.Unlock()
return errors.New("channel does not exist")
}
delete(t.channelMap, channelName)
// not defered so that we can continue while the channel async closes
numChannels := len(t.channelMap)
t.Unlock()
t.ctx.nsqd.logf("TOPIC(%s): deleting channel %s", t.name, channel.name)
// delete empties the channel before closing
// (so that we dont leave any messages around)
channel.Delete()
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
if numChannels == 0 && t.ephemeral == true {
//go t.deleter.Do(func() { t.deleteCallback(t) })
////////mask for https://github.com/nsqio/go-nsq/issues/206
}
return nil
} |
Your workaround is equivalent to using a normal non-ephemeral topic and channel. Usually that's what you really want anyway :) |
thanks, @ploxiln ephemeral channel won't write messages to disk. so there's some kind differ :) I'll trying to fix the issue when my schedule allows. thank you again. |
continue in nsqio/nsq#883 please, thanks! |
…ent. It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206. Fixes nsqio#883
…ent. It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206. Fixes nsqio#883
…ent. It's possible for a client reconnecting quickly and subscribed to an ephemeral channel to race with nsqd's cleanup of said ephemeral channel, as documented in nsqio/go-nsq#206. Fixes nsqio#883
1. env
nsqd --version
nsqd v1.0.0-compat (built w/go1.8)
go-nsq version
commit b9762cd
Tue Feb 14 16:13:23 2017 -0800
# go version
go version go1.8 linux/amd64
2. what do i meet
http://127.0.0.1:4151/stats
[d.c2.cp#ephemeral/detect#ephemeral] (127.0.0.3:4150) heartbeat received
. but seems won't got any other nsq message.code snip
3. how to reproduct
The text was updated successfully, but these errors were encountered: