-
Notifications
You must be signed in to change notification settings - Fork 224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support jetStream #695
support jetStream #695
Conversation
Signed-off-by: zhaojizhuang <571130360@qq.com>
Signed-off-by: zhaojizhuang <571130360@qq.com>
@ripienaar jsm module has been dropped |
@wallyqs do you think you might have time to look this over? thanks! |
Signed-off-by: zhaojizhuang <571130360@qq.com>
Signed-off-by: zhaojizhuang <571130360@qq.com>
@wallyqs PR updated |
Signed-off-by: zhaojizhuang <571130360@qq.com>
Signed-off-by: zhaojizhuang <571130360@qq.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see more unit tests, we have been attempting to hit the 80% coverage mark in the repo.
We also should enable some e2e tests to know this is being tested and working, there are several examples in the testing folder, as well as current nats/stan tests.
Does jetstream replace nats/stan?
protocol/jsm/v2/doc.go
Outdated
/* | ||
Package jsm implements the CloudEvent transport implementation using NATS JetStream. | ||
*/ | ||
package jsm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather call this package nats_jetstream
, as we did for kafka_sarama: https://github.com/cloudevents/sdk-go/tree/main/protocol/kafka_sarama/v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah,we can do that
protocol/jsm/v2/message.go
Outdated
|
||
var _ binding.Message = (*Message)(nil) | ||
|
||
func (m *Message) ReadEncoding() binding.Encoding { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document each exported method, even if it is just ReadEncoding implements bindings.interface
or whatever the real interface is.
protocol/jsm/v2/options.go
Outdated
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber") | ||
|
||
// NatsOptions is a helper function to group a variadic nats.ProtocolOption into | ||
// []stan.Option that can be used by either Sender, Consumer or Protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment says stan
, but the import is nats
.
protocol/jsm/v2/message.go
Outdated
|
||
// NewMessage wraps an *nats.Msg in a binding.Message. | ||
// The returned message *can* be read several times safely | ||
func NewMessage(msg *nats.Msg) *Message { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to have unit tests on this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
protocol/jsm/v2/protocol.go
Outdated
Conn *nats.Conn | ||
|
||
Consumer *Consumer | ||
consumerOptions []ConsumerOption |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is strange to store the connection options in the struct if you do not intend to use them again. It would be better to not need to store the options for consumerOptions
and senderOptions
as they give you no value past the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discarded
protocol/jsm/v2/protocol.go
Outdated
Sender *Sender | ||
senderOptions []SenderOption | ||
|
||
connOwned bool // whether this protocol created the stan connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo? stan->nats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, typo...
protocol/jsm/v2/receiver.go
Outdated
return nil, err | ||
} | ||
|
||
c.connOwned = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be moved to line 70? if the NewConsumerFromConn
fails, we leak a connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can't, for c.connOwned = true
must be called after c,err:=NewConsumerFromConn
protocol/jsm/v2/receiver.go
Outdated
c.Conn.Close() | ||
} | ||
|
||
close(c.internalClose) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is strange to close a channel the *Consumer
resource did not create, that should be the responsibility of the observer, and doing this will case a panic if Close
is called again on c
.
protocol/jsm/v2/receiver.go
Outdated
incoming chan msgErr | ||
} | ||
|
||
func NewReceiver() *Receiver { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewReceiver creates a new protocol.Receiver responsible for receiving messages, an implements of protocol.Receiver, used by Consumer
protocol/jsm/v2/receiver.go
Outdated
connOwned bool | ||
} | ||
|
||
func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when should I make a NewConsumer
vs NewReceiver
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewConsumer is used by Nats subscriber,while NewReceiver is used by NewConsumer, like kafka_sarama
nats``stan
and so on
Signed-off-by: zhaojizhuang <571130360@qq.com>
@n3wscott done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for following up on so many review comments! nice work!!!
LGTM/APPROVE.
/retest |
Fix #694
add JetStream support in protocol