Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure topic exists as more flexible topic creation option #149

Merged
merged 1 commit into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kafka/mock/kazoo.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ func (_mr *_MockTopicManagerRecorder) EnsureStreamExists(arg0, arg1 interface{})
return _mr.mock.ctrl.RecordCall(_mr.mock, "EnsureStreamExists", arg0, arg1)
}

func (_m *MockTopicManager) EnsureTopicExists(topic string, npar int, rfactor int, config map[string]string) error {
ret := _m.ctrl.Call(_m, "EnsureTopicExists", topic, npar, rfactor, config)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockTopicManagerRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "EnsureTopicExists", arg0, arg1, arg2, arg3)
}

func (_m *MockTopicManager) Partitions(topic string) ([]int32, error) {
ret := _m.ctrl.Call(_m, "Partitions", topic)
ret0, _ := ret[0].([]int32)
Expand Down
30 changes: 26 additions & 4 deletions kafka/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type TopicManager interface {
EnsureTableExists(topic string, npar int) error
// EnsureStreamExists checks that a stream topic exists, or create one if possible
EnsureStreamExists(topic string, npar int) error
// EnsureTopicExists checks that a topic exists, or create one if possible,
// enforcing the given configuration
EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error

// Partitions returns the number of partitions of a topic, that are assigned to the running
// instance, i.e. it doesn't represent all partitions of a topic.
Expand Down Expand Up @@ -66,6 +69,10 @@ func (m *saramaTopicManager) EnsureTableExists(topic string, npar int) error {
return nil
}

func (m *saramaTopicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error {
return fmt.Errorf("not implemented in SaramaTopicManager")
}

// TopicManagerConfig contains the configuration to access the Zookeeper servers
// as well as the desired options of to create tables and stream topics.
type TopicManagerConfig struct {
Expand Down Expand Up @@ -149,6 +156,13 @@ func (m *topicManager) EnsureStreamExists(topic string, npar int) error {
return m.checkPartitions(topic, npar)
}

func (m *topicManager) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error {
if err := checkTopic(m.zk, topic, npar, rfactor, config); err != nil {
return err
}
return m.checkPartitions(topic, npar)
}

func (m *topicManager) Partitions(topic string) ([]int32, error) {
tl, err := m.zk.Topics()
if err != nil {
Expand Down Expand Up @@ -176,14 +190,22 @@ func checkTopic(kz kzoo, topic string, npar int, rfactor int, cfg map[string]str
if err != nil {
return err
}
if ok {
return nil
if !ok {
err = kz.CreateTopic(topic, npar, rfactor, cfg)
if err != nil {
return err
}
}
err = kz.CreateTopic(topic, npar, rfactor, cfg)
// topic exists, check if config the same
c, err := kz.Topic(topic).Config()
if err != nil {
return err
}

for k, v := range cfg {
if c[k] != v {
return fmt.Errorf("expected %s=%s, but found %s", k, cfg[k], c[k])
}
}
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions mock/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func (_mr *_MockTopicManagerRecorder) EnsureTableExists(arg0, arg1 interface{})
return _mr.mock.ctrl.RecordCall(_mr.mock, "EnsureTableExists", arg0, arg1)
}

func (_m *MockTopicManager) EnsureTopicExists(_param0 string, _param1 int, _param2 int, _param3 map[string]string) error {
ret := _m.ctrl.Call(_m, "EnsureTopicExists", _param0, _param1, _param2, _param3)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockTopicManagerRecorder) EnsureTopicExists(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "EnsureTopicExists", arg0, arg1, arg2, arg3)
}

func (_m *MockTopicManager) Partitions(_param0 string) ([]int32, error) {
ret := _m.ctrl.Call(_m, "Partitions", _param0)
ret0, _ := ret[0].([]int32)
Expand Down
5 changes: 5 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ func (tm *topicMgrMock) EnsureStreamExists(topic string, npar int) error {
return nil
}

// EnsureTopicExists checks that a stream exists, or create one if possible
func (tm *topicMgrMock) EnsureTopicExists(topic string, npar, rfactor int, config map[string]string) error {
return nil
}

// Partitions returns the number of partitions of a topic, that are assigned to the running
// instance, i.e. it doesn't represent all partitions of a topic.
func (tm *topicMgrMock) Partitions(topic string) ([]int32, error) {
Expand Down
Loading