Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can I debug a "negative WaitGroup counter" in a processor? #198

Closed
activeshadow opened this issue Jul 2, 2019 · 6 comments
Closed

Comments

@activeshadow
Copy link

I've got a processor that keeps panicking with the following:

panic: sync: negative WaitGroup counter

goroutine 100 [running]:
sync.(*WaitGroup).Add(0xc00003e030, 0xffffffffffffffff)
        /home/foobar/.asdf/installs/golang/1.12.2/go/src/sync/waitgroup.go:74 +0x135
sync.(*WaitGroup).Done(...)
        /home/foobar/.asdf/installs/golang/1.12.2/go/src/sync/waitgroup.go:99
github.com/lovoo/goka.(*cbContext).tryCommit(0xc0003ea6e0, 0x0, 0x0)
        /home/foobar/develop/darkcubed/goka/context.go:348 +0x9a
github.com/lovoo/goka.(*cbContext).emitDone(0xc0003ea6e0, 0x0, 0x0)
        /home/foobar/develop/darkcubed/goka/context.go:312 +0x89
github.com/lovoo/goka.(*cbContext).emit.func1(0x0, 0x0)
        /home/foobar/develop/darkcubed/goka/context.go:147 +0x55
github.com/lovoo/goka/kafka.(*Promise).executeCallbacks(0xc000ec48c0)
        /home/foobar/develop/darkcubed/goka/kafka/promise.go:27 +0x64
github.com/lovoo/goka/kafka.(*Promise).Finish(0xc000ec48c0, 0x0, 0x0, 0x0)
        /home/foobar/develop/darkcubed/goka/kafka/promise.go:55 +0x89
github.com/lovoo/goka/kafka.(*producer).run(0xc0000a3900)
        /home/foobar/develop/darkcubed/goka/kafka/producer.go:71 +0x1eb
created by github.com/lovoo/goka/kafka.NewProducer
        /home/foobar/develop/darkcubed/goka/kafka/producer.go:35 +0x20e

How can I debug this, given the stack trace doesn't show what part of my code is making the call? I suspect it has something to do with using the loopback table, since this is the only processor I have that uses it, and it's the only processor I have that's panicking like this.

Note that I'm using a local copy of the goka library with a few edits for unit testing.

@db7
Copy link
Collaborator

db7 commented Jul 2, 2019

I could image that happening if you are starting goroutines inside the process callback, or using the Ctx object after the process callback returned?

@activeshadow
Copy link
Author

Dang... you're right @db7, I do call ctx.Loopback inside of a Goroutine. I'm an idiot. Thank you for pointing this out!

I'm calling ctx.Loopback in a Goroutine in order to delay via exponential backoff to avoid a thundering herd scenario. Any other ideas on how to add a delay?

@db7
Copy link
Collaborator

db7 commented Jul 2, 2019

@activeshadow. Glad it was the right guess.

If you want to use a goroutine, then it's better to create a producer and pass the producer and message to the goroutine (but not the goka.Context). It may be also safer cloning the message. Depending on how your codec works, it could reuse message buffers in upcoming process callbacks and cause weird behaviour if the message is not cloned.

This approach has a problem, however. You won't have at least one processing anymore. If your process crashes and you "decoupled" the message from the context, you won't see the message again. The input messages are committed once the callback returns and all emits/loopbacks/sets of the context have completed. You won't have that anymore. So just use that if that is ok for you.


Other ideas:

1. Stream delay

If you need to slow down the complete stream, the best way is to "put the callbacks to sleep", ie

func mycallback(ctx goka.Context, m interface{}){
   sleep until time.Now() > m.TriggerTime
   do whatever with m
}

Since each partition processed by a single goroutine, and the partition is processed in FIFO order, this will effectively slow down the complete stream. I've used this in an application that a stream of events should always be processed 1h later.

2. Selective delay with loopback

If you just want to slow down the processing of few messages, then you can use loopback and Kafka itself.

func mycallback(ctx goka.Context, m interface{}){
   if special m, ctx.Loopback(m)
   otherwise do whatever with m 
}
func mycallback(ctx goka.Context, m interface{}){
  if time.Now() < m.TriggerTime {
    ctx.Loopback(m) // keep looping until time has passed
  }
  do whatever with m 
}

This may be ok depending on your traffic, may be not.

3. Selective stream delay

Another idea is to combine both ideas (1) and (2).

  • create a few topics: delay-by-1s, delay-by-1m, delay-by-10m, delay-by-1h
  • for each of these topics create a callback that delays the stream by the specified time as in (1).
  • once a message arrives in the input topic(s), decide whether to process it or to delay it as in (2).
  • if you delay it, wrap the message with additional information (number of delays, timestamp, etc)
  • ctx.Emit the wrapped message to the topic with the selected delay, eg, delay-by-1s
  • all messages in the topic delay-by-1s will be delayed by one second, after the delay send the message back to the main input topic
  • if the message has to be delayed again, ctx.Emit it to the delay-by-1m instead, and so on.

Let me know if any of these suggestions make sense.

@activeshadow
Copy link
Author

Thanks @db7. I think I'll start w/ option 2. I was using the same handler for the loopback as I was for the input, so i'll just create a new handler for the loopback that does exactly what the handler for the input does, but with a delay first.

@db7
Copy link
Collaborator

db7 commented Aug 7, 2019

@activeshadow can I close this issue? If you are still facing related problems, please let us know.

@activeshadow
Copy link
Author

@db7 I can close it! No longer having this issue based on your suggestions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants