Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine): initial rate-limiting engine implementation #324

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion api-contracts/workflows/workflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service WorkflowService {
rpc PutWorkflow(PutWorkflowRequest) returns (WorkflowVersion);
rpc ScheduleWorkflow(ScheduleWorkflowRequest) returns (WorkflowVersion);
rpc TriggerWorkflow(TriggerWorkflowRequest) returns (TriggerWorkflowResponse);
rpc PutRateLimit(PutRateLimitRequest) returns (PutRateLimitResponse);
}

message PutWorkflowRequest {
Expand Down Expand Up @@ -58,6 +59,12 @@ message CreateWorkflowStepOpts {
repeated string parents = 5; // (optional) the step parents. if none are passed in, this is a root step
string user_data = 6; // (optional) the custom step user data, assuming string representation of JSON
int32 retries = 7; // (optional) the number of retries for the step, default 0
repeated CreateStepRateLimit rate_limits = 8; // (optional) the rate limits for the step
}

message CreateStepRateLimit {
string key = 1; // (required) the key for the rate limit
int32 units = 2; // (required) the number of units this step consumes
}

// ListWorkflowsRequest is the request for ListWorkflows.
Expand Down Expand Up @@ -133,4 +140,23 @@ message TriggerWorkflowRequest {

message TriggerWorkflowResponse {
string workflow_run_id = 1;
}
}

enum RateLimitDuration {
SECOND = 0;
MINUTE = 1;
HOUR = 2;
}

message PutRateLimitRequest {
// (required) the global key for the rate limit
string key = 1;

// (required) the max limit for the rate limit (per unit of time)
int32 limit = 2;

// (required) the duration of time for the rate limit (second|minute|hour)
RateLimitDuration duration = 3;
}

message PutRateLimitResponse {}
119 changes: 119 additions & 0 deletions examples/rate-limit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package main

import (
"fmt"
"time"

"github.com/joho/godotenv"

"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

type rateLimitInput struct {
Index int `json:"index"`
}

type stepOneOutput struct {
Message string `json:"message"`
}

func StepOne(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &rateLimitInput{}

err = ctx.WorkflowInput(input)

if err != nil {
return nil, err
}

ctx.StreamEvent([]byte(fmt.Sprintf("This is a stream event %d", input.Index)))

return &stepOneOutput{
Message: fmt.Sprintf("This ran at %s", time.Now().String()),
}, nil
}

func main() {
err := godotenv.Load()

if err != nil {
panic(err)
}

c, err := client.New()

if err != nil {
panic(err)
}

err = c.Admin().PutRateLimit("api1", &types.RateLimitOpts{
Max: 3,
Duration: "second",
})

if err != nil {
panic(err)
}

w, err := worker.NewWorker(
worker.WithClient(
c,
),
)

if err != nil {
panic(err)
}

err = w.On(
worker.NoTrigger(),
&worker.WorkflowJob{
Name: "rate-limit-workflow",
Description: "This illustrates rate limiting.",
Steps: []*worker.WorkflowStep{
worker.Fn(StepOne).SetName("step-one").SetRateLimit(
worker.RateLimit{
Units: 1,
Key: "api1",
},
),
},
},
)

if err != nil {
panic(err)
}

_, err = w.Start()

if err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}

for i := 0; i < 12; i++ {
_, err = c.Admin().RunWorkflow("rate-limit-workflow", &rateLimitInput{
Index: i,
})

if err != nil {
panic(err)
}
}

interrupt := cmdutils.InterruptChan()

cleanup, err := w.Start()
if err != nil {
panic(err)
}

<-interrupt

if err := cleanup(); err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
}
16 changes: 16 additions & 0 deletions internal/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions internal/repository/prisma/dbsqlc/rate_limits.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- name: UpsertRateLimit :one
INSERT INTO "RateLimit" (
"tenantId",
"key",
"limitValue",
"value",
"window"
) VALUES (
@tenantId::uuid,
@key::text,
sqlc.arg('limit')::int,
sqlc.arg('limit')::int,
COALESCE(sqlc.narg('window')::text, '1 minute')
) ON CONFLICT ("tenantId", "key") DO UPDATE SET
"limitValue" = sqlc.arg('limit')::int,
"window" = COALESCE(sqlc.narg('window')::text, '1 minute')
RETURNING *;
57 changes: 57 additions & 0 deletions internal/repository/prisma/dbsqlc/rate_limits.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions internal/repository/prisma/dbsqlc/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ CREATE TABLE "LogLine" (
CONSTRAINT "LogLine_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "RateLimit" (
"tenantId" UUID NOT NULL,
"key" TEXT NOT NULL,
"limitValue" INTEGER NOT NULL,
"value" INTEGER NOT NULL,
"window" TEXT NOT NULL,
"lastRefill" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP
);

-- CreateTable
CREATE TABLE "SNSIntegration" (
"id" UUID NOT NULL,
Expand Down Expand Up @@ -285,6 +295,14 @@ CREATE TABLE "Step" (
CONSTRAINT "Step_pkey" PRIMARY KEY ("id")
);

-- CreateTable
CREATE TABLE "StepRateLimit" (
"units" INTEGER NOT NULL,
"stepId" UUID NOT NULL,
"rateLimitKey" TEXT NOT NULL,
"tenantId" UUID NOT NULL
);

-- CreateTable
CREATE TABLE "StepRun" (
"id" UUID NOT NULL,
Expand Down Expand Up @@ -744,6 +762,9 @@ CREATE UNIQUE INDEX "JobRunLookupData_jobRunId_key" ON "JobRunLookupData"("jobRu
-- CreateIndex
CREATE UNIQUE INDEX "JobRunLookupData_jobRunId_tenantId_key" ON "JobRunLookupData"("jobRunId" ASC, "tenantId" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "RateLimit_tenantId_key_key" ON "RateLimit"("tenantId" ASC, "key" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "SNSIntegration_id_key" ON "SNSIntegration"("id" ASC);

Expand All @@ -762,6 +783,9 @@ CREATE UNIQUE INDEX "Step_id_key" ON "Step"("id" ASC);
-- CreateIndex
CREATE UNIQUE INDEX "Step_jobId_readableId_key" ON "Step"("jobId" ASC, "readableId" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "StepRateLimit_stepId_rateLimitKey_key" ON "StepRateLimit"("stepId" ASC, "rateLimitKey" ASC);

-- CreateIndex
CREATE UNIQUE INDEX "StepRun_id_key" ON "StepRun"("id" ASC);

Expand Down Expand Up @@ -1002,6 +1026,9 @@ ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_stepRunId_fkey" FOREIGN KEY ("step
-- AddForeignKey
ALTER TABLE "LogLine" ADD CONSTRAINT "LogLine_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "RateLimit" ADD CONSTRAINT "RateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "SNSIntegration" ADD CONSTRAINT "SNSIntegration_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

Expand All @@ -1017,6 +1044,15 @@ ALTER TABLE "Step" ADD CONSTRAINT "Step_jobId_fkey" FOREIGN KEY ("jobId") REFERE
-- AddForeignKey
ALTER TABLE "Step" ADD CONSTRAINT "Step_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_stepId_fkey" FOREIGN KEY ("stepId") REFERENCES "Step"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "StepRateLimit" ADD CONSTRAINT "StepRateLimit_tenantId_rateLimitKey_fkey" FOREIGN KEY ("tenantId", "rateLimitKey") REFERENCES "RateLimit"("tenantId", "key") ON DELETE RESTRICT ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "StepRun" ADD CONSTRAINT "StepRun_jobRunId_fkey" FOREIGN KEY ("jobRunId") REFERENCES "JobRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;

Expand Down
1 change: 1 addition & 0 deletions internal/repository/prisma/dbsqlc/sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sql:
- stream_event.sql
- logs.sql
- tenants.sql
- rate_limits.sql
schema:
- schema.sql
strict_order_by: false
Expand Down
Loading
Loading