Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to improve performance? #164

Closed
leenux opened this issue Dec 14, 2018 · 4 comments
Closed

How to improve performance? #164

leenux opened this issue Dec 14, 2018 · 4 comments

Comments

@leenux
Copy link

leenux commented Dec 14, 2018

I found send 500 messages in 60 seconds.
Env:
cpu 3.2G 2-core
mem 8G
kafka 2.1.0 3-node

package saga

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/stretchr/testify/suite"
	"go.uber.org/zap"
	"strconv"
	"testing"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

type GokaTestSuite struct {
	suite.Suite
}

func (suite *GokaTestSuite) SetupTest() {

}

func (suite *GokaTestSuite) TestClicks() {
	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	go runEmitter(ctx)
	go runProcessor()
	runView(ctx)

	zap.L().Debug("Begin waiting for goka to finish", )
	chwt := make(chan bool)
	WaitTime(chwt, 15*time.Second)
}

func TestGokaTestSuite(t *testing.T) {
	var logger *zap.Logger
	logger, _ = zap.NewDevelopment()
	defer logger.Sync()

	zap.ReplaceGlobals(logger)
	suite.Run(t, new(GokaTestSuite))
}

var (
	brokers             = []string{"127.0.0.1:9092"}
	topic   goka.Stream = "user-click"
	group   goka.Group  = "mini-group"
)

// A user is the object that is stored in the processor's group table
type user struct {
	// number of clicks the user has performed.
	Clicks int
}

// This codec allows marshalling (encode) and unmarshalling (decode) the user to and from the
// group table.
type userCodec struct{}

// Encodes a user into []byte
func (jc *userCodec) Encode(value interface{}) ([]byte, error) {
	if _, isUser := value.(*user); !isUser {
		return nil, fmt.Errorf("Codec requires value *user, got %T", value)
	}
	return json.Marshal(value)
}

// Decodes a user from []byte to it's go representation.
func (jc *userCodec) Decode(data []byte) (interface{}, error) {
	var (
		c   user
		err error
	)
	err = json.Unmarshal(data, &c)
	if err != nil {
		return nil, fmt.Errorf("Error unmarshaling user: %v", err)
	}
	return &c, nil
}

func runEmitter(ctx context.Context) {
	emitter, err := goka.NewEmitter(brokers, topic,
		new(codec.String))
	if err != nil {
		panic(err)
	}
	defer emitter.Finish()

	//t := time.NewTicker(1 * time.Millisecond)
	//defer t.Stop()

	var i int
	for {
		select {
		case <-ctx.Done():
			zap.L().Debug("emit counter",
				zap.Any("i", i),
			)
			return
		default:
			key := fmt.Sprintf("user-%d", i%1000)
			value := fmt.Sprintf("%s", time.Now())
			emitter.EmitSync(key, value)
			i++
		}
	}
}

func process(ctx goka.Context, msg interface{}) {
	var u *user
	if val := ctx.Value(); val != nil {
		u = val.(*user)
	} else {
		u = new(user)
	}

	u.Clicks++
	ctx.SetValue(u)

	fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runProcessor() {
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), process),
		goka.Persist(new(userCodec)),
	)
	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		panic(err)
	}

	p.Run(context.Background())
}

func runView(ctx context.Context) {
	view, err := goka.NewView(brokers,
		goka.GroupTable(group),
		new(userCodec),
	)
	if err != nil {
		panic(err)
	}

	go func() {
		for {
			select {
			case <-ctx.Done():
				for i := 0; i < 1000; i++ {
					userName := "user-" + strconv.Itoa(i)
					value, _ := view.Get(userName)
					zap.L().Debug("view.Get",
						zap.Any("userName", userName),
						zap.Any("value", value),
					)
				}
				return
			default:
				for i := 0; i < 1000; i++ {
					userName := "user-" + strconv.Itoa(i)
					if _, err := view.Get(userName); err != nil {
						zap.L().Error("view.Get",
							zap.Any("userName", userName),
							zap.Any("err", err),
						)
					}
					time.Sleep(time.Millisecond)
				}
			}
		}
	}()
	view.Run(ctx)
}

func WaitTime(ch chan bool, timeout time.Duration) error {
	select {
	case <-ch:
		return nil
	case <-time.After(timeout):
	}
	return errors.New("time reached")
}

output:

... ...
=== RUN   TestGokaTestSuite
--- PASS: TestGokaTestSuite (75.41s)
=== RUN   TestGokaTestSuite/TestClicks
2018/12/14 11:16:06 Processor: starting
2018/12/14 11:16:06 Processor: creating consumer [mini-group]
2018/12/14 11:16:06 Processor: creating producer
2018/12/14 11:16:06 Table mini-group-table has 3 partitions
2018/12/14 11:16:06 Processor: rebalancing: map[]
2018/12/14 11:16:06 Processor: dispatcher started
2018/12/14 11:16:06 view: starting
2018/12/14 11:16:06 view: partition 2 started
2018/12/14 11:16:06 view: partition 0 started
2018/12/14 11:16:06 view: partition 1 started
2018/12/14 11:16:06 Processor: dispatcher stopped
2018/12/14 11:16:06 Processor: rebalancing: map[0:-1 1:-1 2:-1]
2018/12/14 11:16:06 Processor: dispatcher started
[proc] key: user-3 clicks: 5, msg: 2018-12-14 11:16:06.750737212 +0800 CST m=+0.332771512
... ...
[proc] key: user-575 clicks: 2, msg: 2018-12-14 11:17:06.264580942 +0800 CST m=+59.846615275
2018/12/14 11:17:06 view: partition 2 stopped
[proc] key: user-576 clicks: 2, msg: 2018-12-14 11:17:06.369440118 +0800 CST m=+59.951474420
2018-12-14T11:17:06.472+0800	DEBUG	saga/goka_test.go:100	emit counter	{"i": 577}
2018/12/14 11:17:06 view: partition 0 stopped
2018-12-14T11:17:06.658+0800	DEBUG	saga/goka_test.go:156	view.Get	{"userName": "user-0", "value": {"Clicks":5}}
... ...
2018-12-14T11:17:06.684+0800	DEBUG	saga/goka_test.go:156	view.Get	{"userName": "user-576", "value": {"Clicks":1}}
2018-12-14T11:17:06.684+0800	DEBUG	saga/goka_test.go:156	view.Get	{"userName": "user-577", "value": null}
2018-12-14T11:17:06.684+0800	DEBUG	saga/goka_test.go:156	view.Get	{"userName": "user-578", "value": null}
... ...
2018-12-14T11:17:06.693+0800	DEBUG	saga/goka_test.go:156	view.Get	{"userName": "user-999", "value": null}
2018/12/14 11:17:06 view: partition 1 stopped
2018/12/14 11:17:06 view: closing consumer
2018/12/14 11:17:06 view: stopped
2018-12-14T11:17:06.827+0800	DEBUG	saga/goka_test.go:34	Begin waiting for goka to finish
    --- PASS: TestGokaTestSuite/TestClicks (75.40s)
PASS
@leenux
Copy link
Author

leenux commented Dec 15, 2018

I test several case again.Send 1000 msgs and recv 1000 msgs same in all case
1 case:
sarama SyncProducer and sarama Consumer,used 2.5 seconds
2 case:
sarama AsyncProducer and sarama Consumer,used 5.1 seconds
3 case:
goka emitter+processor+leveldb,used 117 seconds
4 case:
goka emitter+processor+built-in memory,used 117 seconds

I compared these three approaches.Is the use of bsm/sarama-cluster the cause of slowness?
From sarama Version 1.19.0 (2018-09-27) can support higher-level consumer group.
Can use sarama group replace bsm/sarama-cluster?

@db7
Copy link
Collaborator

db7 commented Dec 16, 2018

AFAIR sarama-cluster is not used when emitting messages. You can replace the complete consumer/producer library if you like -- but you'll need to implement some stuff.

Are you measuring the startup time as part of your benchmark? Starting a goka processor takes much longer than simply starting a sarama producer/consumer:

  • the processor has first to rebalance and get the partition assignments
  • leveldb files have to be open
  • In some cases, ZooKeeper or Kafka is queried for first and last offset of the partitions (to recover)

Typically, Goka applications run continuously, quickly amortizing the startup time.

@leenux
Copy link
Author

leenux commented Dec 17, 2018

got it,thx

@leenux leenux closed this as completed Dec 17, 2018
@db7
Copy link
Collaborator

db7 commented Dec 17, 2018

I dont think we have any performance benchmarks yet. If you write some, we’d be happy if you could contribute them to the project. Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants