Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
add temporaltest package
Browse files Browse the repository at this point in the history
  • Loading branch information
jlegrone committed Jul 17, 2021
1 parent cf5e093 commit f81b595
Show file tree
Hide file tree
Showing 10 changed files with 535 additions and 86 deletions.
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module github.com/DataDog/temporalite
go 1.16

require (
github.com/iancoleman/strcase v0.1.2
github.com/jmoiron/sqlx v1.2.1-0.20200615141059-0794cb1f47ee
github.com/mattn/go-sqlite3 v1.10.0
github.com/iancoleman/strcase v0.1.3
github.com/jmoiron/sqlx v1.3.4
github.com/mattn/go-sqlite3 v1.14.7
github.com/urfave/cli/v2 v2.3.0
go.temporal.io/api v1.4.1-0.20210420220407-6f00f7f98373
go.temporal.io/server v1.9.2
go.temporal.io/api v1.4.1-0.20210429213054-a9a257b5cf16
go.temporal.io/sdk v1.8.0
go.temporal.io/server v1.10.5
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.37.0
)
178 changes: 116 additions & 62 deletions go.sum

Large diffs are not rendered by default.

69 changes: 53 additions & 16 deletions internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ const (
)

type Config struct {
Ephemeral bool
DatabaseFilePath string
FrontendPort int
Logger log.Logger
Ephemeral bool
DatabaseFilePath string
FrontendPort int
DynamicPorts bool
Namespaces []string
DefaultNamespaceRetentionPeriod time.Duration
Logger log.Logger
portProvider *portProvider
}

func NewDefaultConfig() (*Config, error) {
Expand All @@ -32,18 +36,29 @@ func NewDefaultConfig() (*Config, error) {
}

return &Config{
Ephemeral: false,
DatabaseFilePath: filepath.Join(userConfigDir, "temporalite/db/default.db"),
FrontendPort: 7233,
Ephemeral: false,
DatabaseFilePath: filepath.Join(userConfigDir, "temporalite/db/default.db"),
FrontendPort: 7233,
DynamicPorts: false,
Namespaces: nil,
DefaultNamespaceRetentionPeriod: 24 * time.Hour,
Logger: log.NewZapLogger(log.BuildZapLogger(log.Config{
Stdout: true,
Level: "debug",
OutputFile: "",
})),
portProvider: &portProvider{},
}, nil
}

func Convert(cfg *Config) *config.Config {
defer func() {
if err := cfg.portProvider.close(); err != nil {
panic(err)
}
// time.Sleep(5 * time.Second)
}()

sqliteConfig := config.SQL{
PluginName: sqlite.PluginName,
ConnectAttributes: make(map[string]string),
Expand All @@ -56,6 +71,18 @@ func Convert(cfg *Config) *config.Config {
sqliteConfig.DatabaseName = cfg.DatabaseFilePath
}

var (
metricsPort = cfg.FrontendPort + 200
pprofPort = cfg.FrontendPort + 201
)
if cfg.DynamicPorts {
if cfg.FrontendPort == 0 {
cfg.FrontendPort = cfg.portProvider.mustGetFreePort()
}
metricsPort = cfg.portProvider.mustGetFreePort()
pprofPort = cfg.portProvider.mustGetFreePort()
}

return &config.Config{
Global: config.Global{
Membership: config.Membership{
Expand All @@ -64,11 +91,11 @@ func Convert(cfg *Config) *config.Config {
},
Metrics: &metrics.Config{
Prometheus: &metrics.PrometheusConfig{
ListenAddress: fmt.Sprintf("%s:%d", broadcastAddress, cfg.FrontendPort+200),
ListenAddress: fmt.Sprintf("%s:%d", broadcastAddress, metricsPort),
HandlerPath: "/metrics",
},
},
PProf: config.PProf{Port: cfg.FrontendPort + 201},
PProf: config.PProf{Port: pprofPort},
},
Persistence: config.Persistence{
DefaultStore: persistenceStoreName,
Expand All @@ -95,10 +122,10 @@ func Convert(cfg *Config) *config.Config {
Policy: "noop",
},
Services: map[string]config.Service{
"frontend": cfg.getService(0),
"history": cfg.getService(1),
"matching": cfg.getService(2),
"worker": cfg.getService(3),
"frontend": cfg.mustGetService(0),
"history": cfg.mustGetService(1),
"matching": cfg.mustGetService(2),
"worker": cfg.mustGetService(3),
},
Archival: config.Archival{
History: config.HistoryArchival{
Expand Down Expand Up @@ -128,11 +155,21 @@ func Convert(cfg *Config) *config.Config {
}
}

func (o *Config) getService(frontendPortOffset int) config.Service {
func (o *Config) mustGetService(frontendPortOffset int) config.Service {
var (
grpcPort = o.FrontendPort + frontendPortOffset
membershipPort = o.FrontendPort + 100 + frontendPortOffset
)
if o.DynamicPorts {
if frontendPortOffset != 0 {
grpcPort = o.portProvider.mustGetFreePort()
}
membershipPort = o.portProvider.mustGetFreePort()
}
return config.Service{
RPC: config.RPC{
GRPCPort: o.FrontendPort + frontendPortOffset,
MembershipPort: o.FrontendPort + 100 + frontendPortOffset,
GRPCPort: grpcPort,
MembershipPort: membershipPort,
BindOnLocalHost: true,
BindOnIP: "",
},
Expand Down
48 changes: 48 additions & 0 deletions internal/liteconfig/freeport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package liteconfig

import (
"fmt"
"net"
)

// Modified from https://github.com/phayes/freeport/blob/95f893ade6f232a5f1511d61735d89b1ae2df543/freeport.go

type portProvider struct {
listeners []*net.TCPListener
}

// getFreePort asks the kernel for a free open port that is ready to use.
func (p *portProvider) getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
if addr, err = net.ResolveTCPAddr("tcp6", "[::1]:0"); err != nil {
panic(fmt.Sprintf("temporalite: failed to get free port: %v", err))
}
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}

p.listeners = append(p.listeners, l)

return l.Addr().(*net.TCPAddr).Port, nil
}

func (p *portProvider) mustGetFreePort() int {
port, err := p.getFreePort()
if err != nil {
panic(err)
}
return port
}

func (p *portProvider) close() error {
for _, l := range p.listeners {
if err := l.Close(); err != nil {
return err
}
}
return nil
}
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ func buildCLI() *cli.App {
if c.String(logFormatFlag) == "pretty" {
lcfg := zap.NewDevelopmentConfig()
lcfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
l, _ := lcfg.Build(
l, err := lcfg.Build(
zap.WithCaller(false),
zap.AddStacktrace(zapcore.ErrorLevel),
)
if err != nil {
return err
}
logger := tlog.NewZapLogger(l)
opts = append(opts, server.WithLogger(logger))
}
Expand Down
14 changes: 14 additions & 0 deletions server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ func WithFrontendPort(port int) Option {
})
}

// WithDynamicPorts starts Temporal on any available ports.
func WithDynamicPorts() Option {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
cfg.DynamicPorts = true
})
}

// WithNamespaces registers each namespace on Temporal start.
func WithNamespaces(namespaces ...string) Option {
return newApplyFuncContainer(func(cfg *liteconfig.Config) {
cfg.Namespaces = append(cfg.Namespaces, namespaces...)
})
}

type applyFuncContainer struct {
applyInternal func(*liteconfig.Config)
}
Expand Down
120 changes: 118 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
package server

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/DataDog/temporalite/internal/liteconfig"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/server/common/authorization"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/temporal"
"google.golang.org/grpc"
)

type Server struct {
internal *temporal.Server
frontendHostPort string
config *liteconfig.Config
setupWaitGroup sync.WaitGroup
}

type Option interface {
Expand All @@ -39,7 +50,7 @@ func New(opts ...Option) (*Server, error) {
return nil, fmt.Errorf("unable to instantiate claim mapper: %w", err)
}

return &Server{
s := &Server{
internal: temporal.NewServer(
temporal.WithConfig(cfg),
temporal.ForServices(temporal.Services),
Expand All @@ -52,13 +63,118 @@ func New(opts ...Option) (*Server, error) {
temporal.WithDynamicConfigClient(dynamicconfig.NewNoopClient()),
),
frontendHostPort: cfg.PublicClient.HostPort,
}, nil
config: c,
}
s.setupWaitGroup.Add(1)

return s, nil
}

func (s *Server) Start() error {
if len(s.config.Namespaces) > 0 {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
nsClient, err := s.newNamespaceClient(ctx)
if err != nil {
panic(err)
}
defer nsClient.Close()

// Create namespaces
var errNamespaceExists *serviceerror.NamespaceAlreadyExists
for _, ns := range s.config.Namespaces {
if err := nsClient.Register(ctx, &workflowservice.RegisterNamespaceRequest{
Namespace: ns,
WorkflowExecutionRetentionPeriod: &s.config.DefaultNamespaceRetentionPeriod,
}); err != nil && !errors.As(err, &errNamespaceExists) {
panic(err)
}
}

// Wait for each namespace to be ready
for _, ns := range s.config.Namespaces {
c, err := s.newClient(context.Background(), ns)
if err != nil {
panic(err)
}

// Wait up to 1 minute (20ms backoff x 3000 attempts)
var (
maxAttempts = 3000
backoff = 20 * time.Millisecond
)
for i := 0; i < maxAttempts; i++ {
_, err = c.ListOpenWorkflow(context.Background(), &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: ns,
})
if err == nil {
if _, err := c.DescribeTaskQueue(context.Background(), "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil {
fmt.Println(err)
break
}
}
time.Sleep(backoff)
}
if err != nil {
panic(fmt.Sprintf("could not connect to namespace %q: %s", ns, err))
}

c.Close()
}

s.setupWaitGroup.Done()
}()
} else {
s.setupWaitGroup.Done()
}

return s.internal.Start()
}

func (s *Server) Stop() {
s.internal.Stop()
}

func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client, error) {
s.setupWaitGroup.Wait()
return s.newClient(ctx, namespace)
}

func (s *Server) newClient(ctx context.Context, namespace string) (client.Client, error) {
return client.NewClient(client.Options{
Namespace: namespace,
HostPort: s.frontendHostPort,
ConnectionOptions: client.ConnectionOptions{
DisableHealthCheck: false,
HealthCheckTimeout: timeoutFromContext(ctx, time.Minute),
},
})
}

func (s *Server) newNamespaceClient(ctx context.Context) (client.NamespaceClient, error) {
if err := s.healthCheckFrontend(ctx); err != nil {
return nil, err
}
return client.NewNamespaceClient(client.Options{
HostPort: s.frontendHostPort,
ConnectionOptions: client.ConnectionOptions{
DisableHealthCheck: false,
HealthCheckTimeout: timeoutFromContext(ctx, time.Minute),
},
})
}

func timeoutFromContext(ctx context.Context, defaultTimeout time.Duration) time.Duration {
if deadline, ok := ctx.Deadline(); ok {
return deadline.Sub(time.Now())
}
return defaultTimeout
}

func (s *Server) healthCheckFrontend(ctx context.Context) error {
if _, err := grpc.DialContext(ctx, s.frontendHostPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
return nil
}
Loading

0 comments on commit f81b595

Please sign in to comment.