Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve throughput, remove deadlocks #949

Merged
merged 9 commits into from
Oct 10, 2024
Merged
37 changes: 27 additions & 10 deletions internal/msgqueue/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type MessageQueueImpl struct {

ready bool

disableTenantExchangePubs bool

// lru cache for tenant ids
tenantIdCache *lru.Cache[string, bool]
}
Expand All @@ -60,16 +62,18 @@ func (t *MessageQueueImpl) IsReady() bool {
type MessageQueueImplOpt func(*MessageQueueImplOpts)

type MessageQueueImplOpts struct {
l *zerolog.Logger
url string
qos int
l *zerolog.Logger
url string
qos int
disableTenantExchangePubs bool
}

func defaultMessageQueueImplOpts() *MessageQueueImplOpts {
l := logger.NewDefaultLogger("rabbitmq")

return &MessageQueueImplOpts{
l: &l,
l: &l,
disableTenantExchangePubs: false,
}
}

Expand All @@ -91,6 +95,12 @@ func WithQos(qos int) MessageQueueImplOpt {
}
}

func WithDisableTenantExchangePubs(disable bool) MessageQueueImplOpt {
return func(opts *MessageQueueImplOpts) {
opts.disableTenantExchangePubs = disable
}
}

// New creates a new MessageQueueImpl.
func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -105,11 +115,12 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
opts.l = &newLogger

t := &MessageQueueImpl{
ctx: ctx,
identity: identity(),
l: opts.l,
qos: opts.qos,
configFs: fs,
ctx: ctx,
identity: identity(),
l: opts.l,
qos: opts.qos,
configFs: fs,
disableTenantExchangePubs: opts.disableTenantExchangePubs,
}

constructor := func(context.Context) (*amqp.Connection, error) {
Expand Down Expand Up @@ -196,6 +207,9 @@ func (t *MessageQueueImpl) SetQOS(prefetchCount int) {

// AddMessage adds a msg to the queue.
func (t *MessageQueueImpl) AddMessage(ctx context.Context, q msgqueue.Queue, msg *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "add-message")
defer span.End()

// inject otel carrier into the message
if msg.OtelCarrier == nil {
msg.OtelCarrier = telemetry.GetCarrier(ctx)
Expand Down Expand Up @@ -339,6 +353,9 @@ func (t *MessageQueueImpl) startPublishing() func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

ctx, span := telemetry.NewSpanWithCarrier(ctx, "publish-message", msg.OtelCarrier)
defer span.End()

t.l.Debug().Msgf("publishing msg %s to queue %s", msg.ID, msg.q.Name())

pubMsg := amqp.Publishing{
Expand All @@ -358,7 +375,7 @@ func (t *MessageQueueImpl) startPublishing() func() error {
}

// if this is a tenant msg, publish to the tenant exchange
if msg.TenantID() != "" {
if !t.disableTenantExchangePubs && msg.TenantID() != "" {
// determine if the tenant exchange exists
if _, ok := t.tenantIdCache.Get(msg.TenantID()); !ok {
// register the tenant exchange
Expand Down
20 changes: 5 additions & 15 deletions internal/queueutils/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (
)

type OperationPool struct {
ops sync.Map
timeout time.Duration
description string
method OpMethod
ql *zerolog.Logger
setTenantsMu sync.RWMutex
ops sync.Map
timeout time.Duration
description string
method OpMethod
ql *zerolog.Logger
}

func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod) *OperationPool {
Expand All @@ -28,9 +27,6 @@ func NewOperationPool(ql *zerolog.Logger, timeout time.Duration, description str
}

func (p *OperationPool) SetTenants(tenants []*dbsqlc.Tenant) {
p.setTenantsMu.Lock()
defer p.setTenantsMu.Unlock()

tenantMap := make(map[string]bool)

for _, t := range tenants {
Expand All @@ -48,16 +44,10 @@ func (p *OperationPool) SetTenants(tenants []*dbsqlc.Tenant) {
}

func (p *OperationPool) RunOrContinue(id string) {
p.setTenantsMu.RLock()
defer p.setTenantsMu.RUnlock()

p.GetOperation(id).RunOrContinue(p.ql)
}

func (p *OperationPool) GetOperation(id string) *SerialOperation {
p.setTenantsMu.RLock()
defer p.setTenantsMu.RUnlock()

op, ok := p.ops.Load(id)

if !ok {
Expand Down
8 changes: 4 additions & 4 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms
return fmt.Errorf("could not parse started at: %w", err)
}

err = ec.repo.StepRun().StepRunStarted(ctx, metadata.TenantId, payload.StepRunId, startedAt)
err = ec.repo.StepRun().StepRunStarted(ctx, metadata.TenantId, payload.WorkflowRunId, payload.StepRunId, startedAt)

if err != nil {
return fmt.Errorf("could not update step run: %w", err)
Expand Down Expand Up @@ -913,7 +913,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
stepOutput = []byte(payload.StepOutputData)
}

err = ec.repo.StepRun().StepRunSucceeded(ctx, metadata.TenantId, payload.StepRunId, finishedAt, stepOutput)
err = ec.repo.StepRun().StepRunSucceeded(ctx, metadata.TenantId, payload.WorkflowRunId, payload.StepRunId, finishedAt, stepOutput)

if err != nil {
return fmt.Errorf("could not update step run: %w", err)
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
}

// fail step run
err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, stepRunId, failedAt, errorReason, int(oldStepRun.SRRetryCount))
err = ec.repo.StepRun().StepRunFailed(ctx, tenantId, sqlchelpers.UUIDToStr(oldStepRun.WorkflowRunId), stepRunId, failedAt, errorReason, int(oldStepRun.SRRetryCount))

if err != nil {
return fmt.Errorf("could not fail step run: %w", err)
Expand Down Expand Up @@ -1129,7 +1129,7 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR
return fmt.Errorf("could not get step run: %w", err)
}

err = ec.repo.StepRun().StepRunCancelled(ctx, tenantId, stepRunId, now, reason)
err = ec.repo.StepRun().StepRunCancelled(ctx, tenantId, sqlchelpers.UUIDToStr(oldStepRun.WorkflowRunId), stepRunId, now, reason)

if err != nil {
return fmt.Errorf("could not cancel step run: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func (s *DispatcherImpl) handleStepRunStarted(inputCtx context.Context, request
return nil, err
}

err = s.repo.StepRun().StepRunStarted(ctx, tenantId, request.StepRunId, startedAt)
err = s.repo.StepRun().StepRunStarted(ctx, tenantId, sqlchelpers.UUIDToStr(sr.WorkflowRunId), request.StepRunId, startedAt)

if err != nil {
return nil, fmt.Errorf("could not mark step run started: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF
rabbitmq.WithURL(cf.MessageQueue.RabbitMQ.URL),
rabbitmq.WithLogger(&l),
rabbitmq.WithQos(cf.MessageQueue.RabbitMQ.Qos),
rabbitmq.WithDisableTenantExchangePubs(cf.Runtime.DisableTenantPubs),
)

ing, err = ingestor.NewIngestor(
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ type ConfigFileRuntime struct {
// FlushItemsThreshold is the number of items to hold in memory until flushing to the database
FlushItemsThreshold int `mapstructure:"flushItemsThreshold" json:"flushItemsThreshold,omitempty" default:"100"`

// How many buckets to hash into for parallelizing updates
UpdateHashFactor int `mapstructure:"updateHashFactor" json:"updateHashFactor,omitempty" default:"100"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min 1?


// How many concurrent updates to allow
UpdateConcurrentFactor int `mapstructure:"updateConcurrentFactor" json:"updateConcurrentFactor,omitempty" default:"10"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min 1?


// Allow new tenants to be created
AllowSignup bool `mapstructure:"allowSignup" json:"allowSignup,omitempty" default:"true"`

Expand All @@ -121,6 +127,9 @@ type ConfigFileRuntime struct {

// Allow passwords to be changed
AllowChangePassword bool `mapstructure:"allowChangePassword" json:"allowChangePassword,omitempty" default:"true"`

// DisableTenantPubs controls whether tenant pubsub is disabled
DisableTenantPubs bool `mapstructure:"disableTenantPubs" json:"disableTenantPubs,omitempty"`
}

type SecurityCheckConfigFile struct {
Expand Down Expand Up @@ -409,6 +418,7 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.allowInvites", "SERVER_ALLOW_INVITES")
_ = v.BindEnv("runtime.allowCreateTenant", "SERVER_ALLOW_CREATE_TENANT")
_ = v.BindEnv("runtime.allowChangePassword", "SERVER_ALLOW_CHANGE_PASSWORD")
_ = v.BindEnv("runtime.disableTenantPubs", "SERVER_DISABLE_TENANT_PUBS")

// security check options
_ = v.BindEnv("securityCheck.enabled", "SERVER_SECURITY_CHECK_ENABLED")
Expand Down Expand Up @@ -493,6 +503,8 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("runtime.singleQueueLimit", "SERVER_SINGLE_QUEUE_LIMIT")
_ = v.BindEnv("runtime.flushPeriodMilliseconds", "SERVER_FLUSH_PERIOD_MILLISECONDS")
_ = v.BindEnv("runtime.flushItemsThreshold", "SERVER_FLUSH_ITEMS_THRESHOLD")
_ = v.BindEnv("runtime.updateHashFactor", "SERVER_UPDATE_HASH_FACTOR")
_ = v.BindEnv("runtime.updateConcurrentFactor", "SERVER_UPDATE_CONCURRENT_FACTOR")

// tls options
_ = v.BindEnv("tls.tlsStrategy", "SERVER_TLS_STRATEGY")
Expand Down
Loading
Loading