From f730ebcb9b60d4fedec4e19d539c009bb9403c05 Mon Sep 17 00:00:00 2001 From: Andrei Kurilov <18027129+akurilov@users.noreply.github.com> Date: Fri, 8 Nov 2024 11:20:24 +0200 Subject: [PATCH] init --- .github/workflows/release.yaml | 36 ++ .github/workflows/staging.yaml | 74 ++++ .github/workflows/testing.yaml | 51 +++ .gitignore | 7 +- Dockerfile | 12 + Makefile | 41 +++ README.md | 18 +- api/grpc/client_test.go | 216 ++++++++++++ api/grpc/cloudevents/cloudevent.proto | 61 ++++ api/grpc/controller.go | 88 +++++ api/grpc/server.go | 24 ++ api/grpc/service.proto | 64 ++++ config/config.go | 69 ++++ config/config_test.go | 22 ++ go.mod | 37 ++ go.sum | 92 +++++ helm/source-websocket/.helmignore | 24 ++ helm/source-websocket/Chart.yaml | 24 ++ helm/source-websocket/templates/NOTES.txt | 22 ++ helm/source-websocket/templates/_helpers.tpl | 62 ++++ helm/source-websocket/templates/hpa.yaml | 32 ++ helm/source-websocket/templates/ingress.yaml | 61 ++++ helm/source-websocket/templates/service.yaml | 15 + .../templates/serviceaccount.yaml | 12 + helm/source-websocket/templates/sts.yaml | 125 +++++++ .../templates/tests/test-connection.yaml | 15 + helm/source-websocket/values.yaml | 114 ++++++ main.go | 144 ++++++++ model/filter.go | 7 + model/metadata.go | 15 + model/order.go | 15 + model/stream.go | 11 + scripts/cover.sh | 11 + scripts/release.sh | 12 + scripts/staging.sh | 6 + service/handler/handler.go | 83 +++++ service/handler/mock.go | 20 ++ service/interceptor/default.go | 21 ++ service/interceptor/interceptor.go | 9 + service/interceptor/logging.go | 33 ++ service/logging.go | 55 +++ service/mock.go | 62 ++++ service/service.go | 105 ++++++ service/service_test.go | 140 ++++++++ service/writer/logging.go | 42 +++ service/writer/mock.go | 25 ++ service/writer/service.go | 156 +++++++++ storage/mock.go | 66 ++++ storage/mongo/storage.go | 259 ++++++++++++++ storage/mongo/storage_test.go | 324 ++++++++++++++++++ storage/storage.go | 20 ++ 51 files changed, 3054 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/release.yaml create mode 100644 .github/workflows/staging.yaml create mode 100644 .github/workflows/testing.yaml create mode 100644 Dockerfile create mode 100644 Makefile create mode 100644 api/grpc/client_test.go create mode 100644 api/grpc/cloudevents/cloudevent.proto create mode 100644 api/grpc/controller.go create mode 100644 api/grpc/server.go create mode 100644 api/grpc/service.proto create mode 100644 config/config.go create mode 100644 config/config_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 helm/source-websocket/.helmignore create mode 100644 helm/source-websocket/Chart.yaml create mode 100644 helm/source-websocket/templates/NOTES.txt create mode 100644 helm/source-websocket/templates/_helpers.tpl create mode 100644 helm/source-websocket/templates/hpa.yaml create mode 100644 helm/source-websocket/templates/ingress.yaml create mode 100644 helm/source-websocket/templates/service.yaml create mode 100644 helm/source-websocket/templates/serviceaccount.yaml create mode 100644 helm/source-websocket/templates/sts.yaml create mode 100644 helm/source-websocket/templates/tests/test-connection.yaml create mode 100644 helm/source-websocket/values.yaml create mode 100644 main.go create mode 100644 model/filter.go create mode 100644 model/metadata.go create mode 100644 model/order.go create mode 100644 model/stream.go create mode 100755 scripts/cover.sh create mode 100755 scripts/release.sh create mode 100755 scripts/staging.sh create mode 100644 service/handler/handler.go create mode 100644 service/handler/mock.go create mode 100644 service/interceptor/default.go create mode 100644 service/interceptor/interceptor.go create mode 100644 service/interceptor/logging.go create mode 100644 service/logging.go create mode 100644 service/mock.go create mode 100644 service/service.go create mode 100644 service/service_test.go create mode 100644 service/writer/logging.go create mode 100644 service/writer/mock.go create mode 100644 service/writer/service.go create mode 100644 storage/mock.go create mode 100644 storage/mongo/storage.go create mode 100644 storage/mongo/storage_test.go create mode 100644 storage/storage.go diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..542b72a --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,36 @@ +name: Release + +on: + push: + tags: + - v[0-9]+.[0-9]+.[0-9]+ + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: 1.23 + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + with: + version: '3.x' + + - name: Test + run: make test + + - name: Registry login + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ secrets.REGISTRY_USER }} + password: ${{ secrets.REGISTRY_ACCESS_TOKEN }} + + - name: Release + run: make release diff --git a/.github/workflows/staging.yaml b/.github/workflows/staging.yaml new file mode 100644 index 0000000..f9a8db9 --- /dev/null +++ b/.github/workflows/staging.yaml @@ -0,0 +1,74 @@ +name: Staging + +on: + push: + branches: + - "mistress" + +env: + COMPONENT: source-websocket + VERSION: latest + CHART_VERSION: 0.0.0 + +jobs: + + deploy: + runs-on: ubuntu-latest + steps: + + - uses: actions/checkout@v2 + + - name: Registry login + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ secrets.REGISTRY_USER }} + password: ${{ secrets.REGISTRY_ACCESS_TOKEN }} + + - name: Staging + run: make staging + + - name: Set up Helm + uses: azure/setup-helm@v1 + with: + version: v3.12.0 + + - name: Helm Lint + run: | + helm lint helm/${COMPONENT} + + - name: Helm Package + run: | + helm dependency update helm/${COMPONENT} + mkdir helm/package + helm package helm/${COMPONENT} --destination helm/package + cd helm/package + helm repo index . + + - name: Publish Helm Chart + uses: peaceiris/actions-gh-pages@v3 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: helm/package/ + + - name: Google Cloud Auth + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GKE_SA_KEY }}' + + - uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ secrets.GKE_RPOJECT_ID }} + + - name: Kubeconfig + run: | + gcloud components install gke-gcloud-auth-plugin + gcloud container clusters get-credentials ${{ secrets.GKE_CLUSTER_NAME_DEMO }} \ + --region ${{ secrets.GKE_CLUSTER_REGION }} \ + --project ${{ secrets.GKE_PROJECT_ID }} + + - name: Helm Upgrade + run: | + helm upgrade --install ${COMPONENT} helm/package/${COMPONENT}-0.0.0.tgz \ + --set replicaCount=2 \ + --set-string podAnnotations.commit=$(git rev-parse --short HEAD) diff --git a/.github/workflows/testing.yaml b/.github/workflows/testing.yaml new file mode 100644 index 0000000..d62e261 --- /dev/null +++ b/.github/workflows/testing.yaml @@ -0,0 +1,51 @@ +name: Testing + +on: + push: + branches: + - "*" + - "!mistress" + +jobs: + + build: + runs-on: ubuntu-latest + steps: + + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: 1.23 + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + with: + # repo-token is necessary to avoid the rate limit issue + repo-token: ${{ secrets.GITHUB_TOKEN }} + version: "3.x" + + - name: Build + run: make build + + test: + runs-on: ubuntu-latest + steps: + + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: 1.23 + + - name: Install Protoc + uses: arduino/setup-protoc@v1 + with: + version: "3.x" + + - name: Test + run: make test + env: + DB_URI_TEST_MONGO: ${{ secrets.DB_URI_TEST_MONGO }} diff --git a/.gitignore b/.gitignore index 6f72f89..b7652d1 100644 --- a/.gitignore +++ b/.gitignore @@ -15,11 +15,10 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # Go workspace file go.work -go.work.sum -# env file -.env +**/*.pb.go +cover.tmp diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..302dbfe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.23.2-alpine3.20 AS builder +WORKDIR /go/src/source-websocket +COPY . . +RUN \ + apk add protoc protobuf-dev make git && \ + make build + +FROM alpine:3.20 +RUN apk --no-cache add ca-certificates \ + && update-ca-certificates +COPY --from=builder /go/src/source-websocket/source-websocket /bin/source-websocket +ENTRYPOINT ["/bin/source-websocket"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f23dbe2 --- /dev/null +++ b/Makefile @@ -0,0 +1,41 @@ +.PHONY: test clean +default: build + +BINARY_FILE_NAME=source-websocket +COVERAGE_FILE_NAME=cover.out +COVERAGE_TMP_FILE_NAME=cover.tmp + +proto: + go install github.com/golang/protobuf/protoc-gen-go@latest + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.5.1 + PATH=${PATH}:~/go/bin protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative \ + api/grpc/*.proto + +vet: proto + go vet + +test: vet + go test -race -cover -coverprofile=${COVERAGE_FILE_NAME} ./... + cat ${COVERAGE_FILE_NAME} | grep -v _mock.go | grep -v logging.go | grep -v .pb.go > ${COVERAGE_FILE_NAME}.tmp + mv -f ${COVERAGE_FILE_NAME}.tmp ${COVERAGE_FILE_NAME} + go tool cover -func=${COVERAGE_FILE_NAME} | grep -Po '^total\:\h+\(statements\)\h+\K.+(?=\.\d+%)' > ${COVERAGE_TMP_FILE_NAME} + ./scripts/cover.sh + rm -f ${COVERAGE_TMP_FILE_NAME} + +build: proto + CGO_ENABLED=0 GOOS=linux GOARCH= GOARM= go build -o ${BINARY_FILE_NAME} main.go + chmod ugo+x ${BINARY_FILE_NAME} + +docker: + docker build -t awakari/source-websocket . + + +staging: docker + ./scripts/staging.sh + +release: docker + ./scripts/release.sh + +clean: + go clean + rm -f ${BINARY_FILE_NAME} ${COVERAGE_FILE_NAME} diff --git a/README.md b/README.md index 8cdea4b..e88f38c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,18 @@ # source-websocket -WebSocket source implementation for Awakari +Server-sent events source type + +```shell +grpcurl \ + -plaintext \ + -proto api/grpc/service.proto \ + -d @ \ + localhost:50051 \ + awakari.source.websocket.Service/Create +``` + +```json +{ + "url": "wss://www.seismicportal.eu/standing_order/websocket", + "groupId": "default" +} +``` diff --git a/api/grpc/client_test.go b/api/grpc/client_test.go new file mode 100644 index 0000000..28e34c9 --- /dev/null +++ b/api/grpc/client_test.go @@ -0,0 +1,216 @@ +package grpc + +import ( + "context" + "fmt" + "github.com/awakari/source-websocket/service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + "log/slog" + "os" + "testing" + "time" +) + +var port uint16 = 50051 + +var log = slog.Default() + +func TestMain(m *testing.M) { + svc := service.NewServiceMock() + svc = service.NewServiceLogging(svc, log) + go func() { + err := Serve(port, svc) + if err != nil { + log.Error(err.Error()) + } + }() + code := m.Run() + os.Exit(code) +} + +func TestServiceClient_Create(t *testing.T) { + // + addr := fmt.Sprintf("localhost:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.Nil(t, err) + client := NewServiceClient(conn) + // + cases := map[string]struct { + req *CreateRequest + err error + }{ + "ok": { + req: &CreateRequest{ + Url: "url0", + }, + }, + "empty url": { + req: &CreateRequest{}, + err: status.Error(codes.InvalidArgument, "empty url"), + }, + "fail": { + req: &CreateRequest{ + Url: "fail", + }, + err: status.Error(codes.Internal, "unexpected"), + }, + "conflict": { + req: &CreateRequest{ + Url: "conflict", + }, + err: status.Error(codes.AlreadyExists, "conflict"), + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + _, err := client.Create(context.TODO(), c.req) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestServiceClient_Read(t *testing.T) { + // + addr := fmt.Sprintf("localhost:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.Nil(t, err) + client := NewServiceClient(conn) + // + cases := map[string]struct { + req *ReadRequest + groupId string + userId string + createdAt *timestamppb.Timestamp + err error + }{ + "ok": { + req: &ReadRequest{ + Url: "url0", + }, + groupId: "group0", + userId: "user1", + createdAt: timestamppb.New(time.Date(2024, 11, 4, 14, 52, 0, 0, time.UTC)), + }, + "fail": { + req: &ReadRequest{ + Url: "fail", + }, + err: status.Error(codes.Internal, "unexpected"), + }, + "missing": { + req: &ReadRequest{ + Url: "missing", + }, + err: status.Error(codes.NotFound, "not found"), + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + resp, err := client.Read(context.TODO(), c.req) + assert.ErrorIs(t, err, c.err) + if c.err == nil { + assert.Equal(t, c.groupId, resp.GroupId) + assert.Equal(t, c.userId, resp.UserId) + assert.Equal(t, c.createdAt, resp.CreatedAt) + } + }) + } +} + +func TestServiceClient_Delete(t *testing.T) { + // + addr := fmt.Sprintf("localhost:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.Nil(t, err) + client := NewServiceClient(conn) + // + cases := map[string]struct { + req *DeleteRequest + err error + }{ + "ok": { + req: &DeleteRequest{ + Url: "url0", + }, + }, + "fail": { + req: &DeleteRequest{ + Url: "fail", + }, + err: status.Error(codes.Internal, "unexpected"), + }, + "missing": { + req: &DeleteRequest{ + Url: "missing", + }, + err: status.Error(codes.NotFound, "not found"), + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + _, err := client.Delete(context.TODO(), c.req) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestServiceClient_List(t *testing.T) { + // + addr := fmt.Sprintf("localhost:%d", port) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.Nil(t, err) + client := NewServiceClient(conn) + // + cases := map[string]struct { + req *ListRequest + urls []string + err error + }{ + "ok1": { + req: &ListRequest{}, + urls: []string{ + "url0", + "url1", + }, + }, + "ok2": { + req: &ListRequest{ + Filter: &Filter{ + GroupId: "group1", + UserId: "user2", + Pattern: "foo", + }, + Order: Order_DESC, + }, + urls: []string{ + "url0", + "url1", + }, + }, + "fail": { + req: &ListRequest{ + Cursor: "fail", + }, + err: status.Error(codes.Internal, "unexpected"), + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + resp, err := client.List(context.TODO(), c.req) + assert.ErrorIs(t, err, c.err) + if c.err == nil { + assert.Equal(t, c.urls, resp.Urls) + } + }) + } +} diff --git a/api/grpc/cloudevents/cloudevent.proto b/api/grpc/cloudevents/cloudevent.proto new file mode 100644 index 0000000..0425a98 --- /dev/null +++ b/api/grpc/cloudevents/cloudevent.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +package pb; + +option go_package = "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"; + +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +// CloudEvent is copied from +// https://github.com/cloudevents/spec/blob/master/protobuf-format.md. +message CloudEvent { + // Unique event identifier. + string id = 1; + // URI of the event source. + string source = 2; + // Version of the spec in use. + string spec_version = 3; + // Event type identifier. + string type = 4; + + // Optional & Extension Attributes + map attributes = 5; + + // CloudEvent Data (Bytes, Text, or Proto) + oneof data { + // If the event is binary data then the datacontenttype attribute + // should be set to an appropriate media-type. + bytes binary_data = 6; + // If the event is string data then the datacontenttype attribute + // should be set to an appropriate media-type such as application/json. + string text_data = 7; + // If the event is a protobuf then it must be encoded using this Any + // type. The datacontenttype attribute should be set to + // application/protobuf and the dataschema attribute set to the message + // type. + google.protobuf.Any proto_data = 8; + } +} + +// CloudEventAttribute enables extensions to use any of the seven allowed +// data types as the value of an envelope key. +message CloudEventAttributeValue { + // The value can be any one of these types. + oneof attr { + // Boolean value. + bool ce_boolean = 1; + // Integer value. + int32 ce_integer = 2; + // String value. + string ce_string = 3; + // Byte string value. + bytes ce_bytes = 4; + // URI value. + string ce_uri = 5; + // URI reference value. + string ce_uri_ref = 6; + // Timestamp value. + google.protobuf.Timestamp ce_timestamp = 7; + } +} diff --git a/api/grpc/controller.go b/api/grpc/controller.go new file mode 100644 index 0000000..74914d6 --- /dev/null +++ b/api/grpc/controller.go @@ -0,0 +1,88 @@ +package grpc + +import ( + "context" + "errors" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/service" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + "time" +) + +type controller struct { + svc service.Service +} + +func NewController(svc service.Service) ServiceServer { + return controller{ + svc: svc, + } +} + +func (c controller) Create(ctx context.Context, req *CreateRequest) (resp *CreateResponse, err error) { + resp = &CreateResponse{} + switch req.Url { + case "": + err = status.Error(codes.InvalidArgument, "empty url") + default: + err = c.svc.Create(ctx, req.Url, req.Auth, req.GroupId, req.UserId, time.Now().UTC()) + err = translateError(err) + } + return +} + +func (c controller) Read(ctx context.Context, req *ReadRequest) (resp *ReadResponse, err error) { + resp = &ReadResponse{} + var str model.Stream + str, err = c.svc.Read(ctx, req.Url) + if err == nil { + resp.GroupId = str.GroupId + resp.UserId = str.UserId + resp.CreatedAt = timestamppb.New(str.CreatedAt.UTC()) + } + err = translateError(err) + return +} + +func (c controller) Delete(ctx context.Context, req *DeleteRequest) (resp *DeleteResponse, err error) { + resp = &DeleteResponse{} + err = c.svc.Delete(ctx, req.Url, req.GroupId, req.UserId) + err = translateError(err) + return +} + +func (c controller) List(ctx context.Context, req *ListRequest) (resp *ListResponse, err error) { + resp = &ListResponse{} + filter := model.Filter{} + if req.Filter != nil { + filter.GroupId = req.Filter.GroupId + filter.UserId = req.Filter.UserId + filter.Pattern = req.Filter.Pattern + } + var order model.Order + switch req.Order { + case Order_DESC: + order = model.OrderDesc + } + var urls []string + urls, err = c.svc.List(ctx, req.Limit, filter, order, req.Cursor) + if err == nil { + resp.Urls = urls + } + err = translateError(err) + return +} + +func translateError(src error) (dst error) { + switch { + case errors.Is(src, service.ErrNotFound): + dst = status.Error(codes.NotFound, src.Error()) + case errors.Is(src, service.ErrConflict): + dst = status.Error(codes.AlreadyExists, src.Error()) + case src != nil: + dst = status.Error(codes.Internal, src.Error()) + } + return +} diff --git a/api/grpc/server.go b/api/grpc/server.go new file mode 100644 index 0000000..fa18989 --- /dev/null +++ b/api/grpc/server.go @@ -0,0 +1,24 @@ +package grpc + +import ( + "fmt" + "github.com/awakari/source-websocket/service" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + "net" +) + +func Serve(port uint16, search service.Service) (err error) { + srv := grpc.NewServer() + c := NewController(search) + RegisterServiceServer(srv, c) + reflection.Register(srv) + grpc_health_v1.RegisterHealthServer(srv, health.NewServer()) + conn, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err == nil { + err = srv.Serve(conn) + } + return +} diff --git a/api/grpc/service.proto b/api/grpc/service.proto new file mode 100644 index 0000000..025be9b --- /dev/null +++ b/api/grpc/service.proto @@ -0,0 +1,64 @@ +syntax = "proto3"; + +package awakari.source.websocket; + +option go_package = "./api/grpc"; + +import "google/protobuf/timestamp.proto"; + +service Service { + rpc Create(CreateRequest) returns (CreateResponse); + rpc Read(ReadRequest) returns (ReadResponse); + rpc Delete(DeleteRequest) returns (DeleteResponse); + rpc List(ListRequest) returns (ListResponse); +} + +message CreateRequest { + string url = 1; + string auth = 2; + string groupId = 3; + string userId = 4; +} + +message CreateResponse {} + +message ReadRequest { + string url = 1; +} + +message ReadResponse { + string groupId = 1; + string userId = 2; + google.protobuf.Timestamp createdAt = 3; +} + +message DeleteRequest { + string url = 1; + string groupId = 2; + string userId = 3; +} + +message DeleteResponse { +} + +message ListRequest { + uint32 limit = 1; + string cursor = 2; + Filter filter = 3; + Order order = 4; +} + +message Filter { + string groupId = 1; + string userId = 2; + string pattern = 3; +} + +enum Order { + ASC = 0; + DESC = 1; +} + +message ListResponse { + repeated string urls = 1; +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..fc8e09f --- /dev/null +++ b/config/config.go @@ -0,0 +1,69 @@ +package config + +import ( + "github.com/kelseyhightower/envconfig" + "time" +) + +type Config struct { + Api ApiConfig + Db DbConfig + Event WebsocketConfig + Log struct { + Level int `envconfig:"LOG_LEVEL" default:"-4" required:"true"` + } + Replica ReplicaConfig +} + +type ApiConfig struct { + Port uint16 `envconfig:"API_PORT" default:"50051" required:"true"` + Writer struct { + Backoff time.Duration `envconfig:"API_WRITER_BACKOFF" default:"10s" required:"true"` + BatchSize uint32 `envconfig:"API_WRITER_BATCH_SIZE" default:"16" required:"true"` + Cache WriterCacheConfig + Uri string `envconfig:"API_WRITER_URI" default:"resolver:50051" required:"true"` + } + UserAgent string `envconfig:"API_USER_AGENT" default:"Awakari" required:"true"` + GroupId string `envconfig:"API_GROUP_ID" default:"default" required:"true"` + Events EventsConfig +} + +type EventsConfig struct { + Source string `envconfig:"API_EVENTS_SOURCE" default:"https://awakari.com/pub.html?srcType=ws" required:"true"` +} + +type DbConfig struct { + Uri string `envconfig:"DB_URI" default:"mongodb://localhost:27017/?retryWrites=true&w=majority" required:"true"` + Name string `envconfig:"DB_NAME" default:"sources" required:"true"` + UserName string `envconfig:"DB_USERNAME" default:""` + Password string `envconfig:"DB_PASSWORD" default:""` + Table struct { + Name string `envconfig:"DB_TABLE_NAME" default:"websocket" required:"true"` + Retention time.Duration `envconfig:"DB_TABLE_RETENTION" default:"2160h" required:"true"` + Shard bool `envconfig:"DB_TABLE_SHARD" default:"true"` + } + Tls struct { + Enabled bool `envconfig:"DB_TLS_ENABLED" default:"false" required:"true"` + Insecure bool `envconfig:"DB_TLS_INSECURE" default:"false" required:"true"` + } +} + +type WebsocketConfig struct { + StreamTimeout time.Duration `envconfig:"WEBSOCKET_STREAM_TIMEOUT" default:"1m" required:"true"` + Type string `envconfig:"WEBSOCKET_TYPE" required:"true" default:"com_awakari_websocket_v1"` +} + +type ReplicaConfig struct { + Count uint32 `envconfig:"REPLICA_COUNT" required:"true"` + Name string `envconfig:"REPLICA_NAME" required:"true"` +} + +type WriterCacheConfig struct { + Size uint32 `envconfig:"API_WRITER_CACHE_SIZE" default:"100" required:"true"` + Ttl time.Duration `envconfig:"API_WRITER_CACHE_TTL" default:"24h" required:"true"` +} + +func NewConfigFromEnv() (cfg Config, err error) { + err = envconfig.Process("", &cfg) + return +} diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..b3f7c90 --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,22 @@ +package config + +import ( + "github.com/stretchr/testify/assert" + "log/slog" + "os" + "testing" + "time" +) + +func TestConfig(t *testing.T) { + os.Setenv("API_WRITER_BACKOFF", "23h") + os.Setenv("API_WRITER_URI", "writer:56789") + os.Setenv("LOG_LEVEL", "4") + os.Setenv("REPLICA_COUNT", "2") + os.Setenv("REPLICA_NAME", "source-websocket-1") + cfg, err := NewConfigFromEnv() + assert.Nil(t, err) + assert.Equal(t, 23*time.Hour, cfg.Api.Writer.Backoff) + assert.Equal(t, "writer:56789", cfg.Api.Writer.Uri) + assert.Equal(t, slog.LevelWarn, slog.Level(cfg.Log.Level)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..08ce762 --- /dev/null +++ b/go.mod @@ -0,0 +1,37 @@ +module github.com/awakari/source-websocket + +go 1.23 + +require ( + github.com/awakari/client-sdk-go v1.2.2 + github.com/cenkalti/backoff/v4 v4.3.0 + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 + github.com/coder/websocket v1.8.12 + github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/stretchr/testify v1.9.0 + go.mongodb.org/mongo-driver v1.17.1 + google.golang.org/grpc v1.68.0 + google.golang.org/protobuf v1.35.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/processout/grpc-go-pool v1.2.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.30.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..00f7ba9 --- /dev/null +++ b/go.sum @@ -0,0 +1,92 @@ +github.com/awakari/client-sdk-go v1.2.2 h1:y7N4Q71xZG1mfFX0nErLjm1myWZJjX1K8BboAeSFU1Y= +github.com/awakari/client-sdk-go v1.2.2/go.mod h1:L4xph2ZeYoUmnvKE4XT/GmDLYv4qCVS8Bm4sx2ozRhM= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 h1:FIvfKlS2mcuP0qYY6yzdIU9xdrRd/YMP0bNwFjXd0u8= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2/go.mod h1:POsdVp/08Mki0WD9QvvgRRpg9CQ6zhjfRrBoEY8JFS8= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/processout/grpc-go-pool v1.2.1 h1:hbp1BOA02CIxEAoRLHGpUhhPFv77nwfBLBeO3Ya9P7I= +github.com/processout/grpc-go-pool v1.2.1/go.mod h1:F4hiNj96O6VQ87jv4rdz8R9tkHdelQQJ/J2B1a5VSt4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helm/source-websocket/.helmignore b/helm/source-websocket/.helmignore new file mode 100644 index 0000000..b7e49a2 --- /dev/null +++ b/helm/source-websocket/.helmignore @@ -0,0 +1,24 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ +values-dev.yaml diff --git a/helm/source-websocket/Chart.yaml b/helm/source-websocket/Chart.yaml new file mode 100644 index 0000000..0801d02 --- /dev/null +++ b/helm/source-websocket/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: source-websocket +description: A Helm chart for Awakari source-websocket + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.0.0 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "latest" diff --git a/helm/source-websocket/templates/NOTES.txt b/helm/source-websocket/templates/NOTES.txt new file mode 100644 index 0000000..e30ccb4 --- /dev/null +++ b/helm/source-websocket/templates/NOTES.txt @@ -0,0 +1,22 @@ +1. Get the application URL by running these commands: +{{- if .Values.ingress.enabled }} +{{- range $host := .Values.ingress.hosts }} + {{- range .paths }} + http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }} + {{- end }} +{{- end }} +{{- else if contains "NodePort" .Values.service.type }} + export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "source-websocket.fullname" . }}) + export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") + echo http://$NODE_IP:$NODE_PORT +{{- else if contains "LoadBalancer" .Values.service.type }} + NOTE: It may take a few minutes for the LoadBalancer IP to be available. + You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "source-websocket.fullname" . }}' + export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "source-websocket.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}") + echo http://$SERVICE_IP:{{ .Values.service.port }} +{{- else if contains "ClusterIP" .Values.service.type }} + export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "source-websocket.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}") + export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}") + echo "Visit http://127.0.0.1:50051 to use your application" + kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 50051:$CONTAINER_PORT +{{- end }} diff --git a/helm/source-websocket/templates/_helpers.tpl b/helm/source-websocket/templates/_helpers.tpl new file mode 100644 index 0000000..623d6f0 --- /dev/null +++ b/helm/source-websocket/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "source-websocket.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "source-websocket.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "source-websocket.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "source-websocket.labels" -}} +helm.sh/chart: {{ include "source-websocket.chart" . }} +{{ include "source-websocket.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "source-websocket.selectorLabels" -}} +app.kubernetes.io/name: {{ include "source-websocket.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "source-websocket.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "source-websocket.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/helm/source-websocket/templates/hpa.yaml b/helm/source-websocket/templates/hpa.yaml new file mode 100644 index 0000000..c73f998 --- /dev/null +++ b/helm/source-websocket/templates/hpa.yaml @@ -0,0 +1,32 @@ +{{- if .Values.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "source-websocket.fullname" . }} + labels: + {{- include "source-websocket.labels" . | nindent 4 }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "source-websocket.fullname" . }} + minReplicas: {{ .Values.autoscaling.minReplicas }} + maxReplicas: {{ .Values.autoscaling.maxReplicas }} + metrics: + {{- if .Values.autoscaling.targetCPUUtilizationValue }} + - type: Resource + resource: + name: cpu + target: + type: AverageValue + averageValue: {{ .Values.autoscaling.targetCPUUtilizationValue }} + {{- end }} + {{- if .Values.autoscaling.targetMemoryUtilizationValue }} + - type: Resource + resource: + name: memory + target: + type: AverageValue + averageValue: {{ .Values.autoscaling.targetMemoryUtilizationValue }} + {{- end }} +{{- end }} diff --git a/helm/source-websocket/templates/ingress.yaml b/helm/source-websocket/templates/ingress.yaml new file mode 100644 index 0000000..3b80b33 --- /dev/null +++ b/helm/source-websocket/templates/ingress.yaml @@ -0,0 +1,61 @@ +{{- if .Values.ingress.enabled -}} +{{- $fullName := include "source-websocket.fullname" . -}} +{{- $svcPort := .Values.service.port -}} +{{- if and .Values.ingress.className (not (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion)) }} + {{- if not (hasKey .Values.ingress.annotations "kubernetes.io/ingress.class") }} + {{- $_ := set .Values.ingress.annotations "kubernetes.io/ingress.class" .Values.ingress.className}} + {{- end }} +{{- end }} +{{- if semverCompare ">=1.19-0" .Capabilities.KubeVersion.GitVersion -}} +apiVersion: networking.k8s.io/v1 +{{- else if semverCompare ">=1.14-0" .Capabilities.KubeVersion.GitVersion -}} +apiVersion: networking.k8s.io/v1beta1 +{{- else -}} +apiVersion: extensions/v1beta1 +{{- end }} +kind: Ingress +metadata: + name: {{ $fullName }} + labels: + {{- include "source-websocket.labels" . | nindent 4 }} + {{- with .Values.ingress.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + {{- if and .Values.ingress.className (semverCompare ">=1.18-0" .Capabilities.KubeVersion.GitVersion) }} + ingressClassName: {{ .Values.ingress.className }} + {{- end }} + {{- if .Values.ingress.tls }} + tls: + {{- range .Values.ingress.tls }} + - hosts: + {{- range .hosts }} + - {{ . | quote }} + {{- end }} + secretName: {{ .secretName }} + {{- end }} + {{- end }} + rules: + {{- range .Values.ingress.hosts }} + - host: {{ .host | quote }} + http: + paths: + {{- range .paths }} + - path: {{ .path }} + {{- if and .pathType (semverCompare ">=1.18-0" $.Capabilities.KubeVersion.GitVersion) }} + pathType: {{ .pathType }} + {{- end }} + backend: + {{- if semverCompare ">=1.19-0" $.Capabilities.KubeVersion.GitVersion }} + service: + name: {{ $fullName }} + port: + number: {{ $svcPort }} + {{- else }} + serviceName: {{ $fullName }} + servicePort: {{ $svcPort }} + {{- end }} + {{- end }} + {{- end }} +{{- end }} diff --git a/helm/source-websocket/templates/service.yaml b/helm/source-websocket/templates/service.yaml new file mode 100644 index 0000000..302386b --- /dev/null +++ b/helm/source-websocket/templates/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "source-websocket.fullname" . }} + labels: + {{- include "source-websocket.labels" . | nindent 4 }} +spec: + type: {{ .Values.service.type }} + ports: + - port: {{ .Values.service.port }} + targetPort: grpc + protocol: TCP + name: grpc + selector: + {{- include "source-websocket.selectorLabels" . | nindent 4 }} diff --git a/helm/source-websocket/templates/serviceaccount.yaml b/helm/source-websocket/templates/serviceaccount.yaml new file mode 100644 index 0000000..3f72454 --- /dev/null +++ b/helm/source-websocket/templates/serviceaccount.yaml @@ -0,0 +1,12 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "source-websocket.serviceAccountName" . }} + labels: + {{- include "source-websocket.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +{{- end }} diff --git a/helm/source-websocket/templates/sts.yaml b/helm/source-websocket/templates/sts.yaml new file mode 100644 index 0000000..d6661b9 --- /dev/null +++ b/helm/source-websocket/templates/sts.yaml @@ -0,0 +1,125 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ include "source-websocket.fullname" . }} + labels: + {{- include "source-websocket.labels" . | nindent 4 }} +spec: + {{- if not .Values.autoscaling.enabled }} + replicas: {{ .Values.replicaCount }} + {{- end }} + selector: + matchLabels: + {{- include "source-websocket.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "source-websocket.selectorLabels" . | nindent 8 }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "source-websocket.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + priorityClassName: "{{ .Values.priority.class }}" + containers: + - name: {{ .Chart.Name }} + env: + - name: API_PORT + value: "{{ .Values.service.port }}" + - name: API_WRITER_BACKOFF + value: "{{ .Values.api.writer.backoff }}" + - name: API_WRITER_BATCH_SIZE + value: "{{ .Values.api.writer.batchSize }}" + - name: API_WRITER_CACHE_SIZE + value: "{{ .Values.api.writer.cache.size }}" + - name: API_WRITER_CACHE_TTL + value: "{{ .Values.api.writer.cache.ttl }}" + - name: API_WRITER_URI + value: "{{ .Values.api.writer.uri }}" + - name: DB_NAME + value: {{ .Values.db.name }} + - name: DB_URI + valueFrom: + secretKeyRef: + name: "{{ .Values.db.secret.name }}" + key: "{{ .Values.db.secret.keys.url }}" + - name: DB_USERNAME + valueFrom: + secretKeyRef: + name: "{{ .Values.db.secret.name }}" + key: "{{ .Values.db.secret.keys.username }}" + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: "{{ .Values.db.secret.name }}" + key: "{{ .Values.db.secret.keys.password }}" + - name: DB_TABLE_NAME + value: {{ .Values.db.table.name }} + - name: DB_TABLE_SHARD + value: "{{ .Values.db.table.shard }}" + - name: DB_TLS_ENABLED + value: "{{ .Values.db.tls.enabled }}" + - name: DB_TLS_INSECURE + value: "{{ .Values.db.tls.insecure }}" + - name: DB_TABLE_RETENTION + value: "{{ .Values.db.table.retention }}" + - name: WEBSOCKET_TYPE + value: "{{ .Values.websocket.type }}" + - name: LOG_LEVEL + value: "{{ .Values.log.level }}" + - name: API_USER_AGENT + value: "{{ .Values.api.userAgent }}" + - name: API_GROUP_ID + value: "{{ .Values.api.groupId }}" + - name: WEBSOCKET_STREAM_TIMEOUT + value: "{{ .Values.websocket.stream.timeout }}" + - name: API_EVENTS_SOURCE + value: "{{ .Values.api.events.source }}" + - name: REPLICA_COUNT + value: "{{ .Values.replicaCount }}" + - name: REPLICA_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: grpc + containerPort: {{ .Values.service.port }} + protocol: TCP + livenessProbe: + grpc: + port: {{ .Values.service.port }} + readinessProbe: + grpc: + port: {{ .Values.service.port }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + topologySpreadConstraints: + - maxSkew: 1 + topologyKey: kubernetes.io/hostname + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + app.kubernetes.io/name: {{ include "source-websocket.name" . }} diff --git a/helm/source-websocket/templates/tests/test-connection.yaml b/helm/source-websocket/templates/tests/test-connection.yaml new file mode 100644 index 0000000..606a9bd --- /dev/null +++ b/helm/source-websocket/templates/tests/test-connection.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + name: "{{ include "source-websocket.fullname" . }}-test-connection" + labels: + {{- include "source-websocket.labels" . | nindent 4 }} + annotations: + "helm.sh/hook": test +spec: + containers: + - name: wget + image: busybox + command: ['wget'] + args: ['{{ include "source-websocket.fullname" . }}:{{ .Values.service.port }}'] + restartPolicy: Never diff --git a/helm/source-websocket/values.yaml b/helm/source-websocket/values.yaml new file mode 100644 index 0000000..16662b1 --- /dev/null +++ b/helm/source-websocket/values.yaml @@ -0,0 +1,114 @@ +# Default values for source-websocket. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 + +image: + repository: ghcr.io/awakari/source-websocket + pullPolicy: Always + tag: "latest" +imagePullSecrets: + - name: github-registry +nameOverride: "" +fullnameOverride: "" + +serviceAccount: + # Specifies whether a service account should be created + create: true + # Annotations to add to the service account + annotations: {} + # The name of the service account to use. + # If not set and create is true, a name is generated using the fullname template + name: "" + +podAnnotations: {} + +podSecurityContext: {} + # fsGroup: 2000 + +securityContext: {} + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # runAsUser: 1000 + +service: + type: ClusterIP + port: 50051 + +ingress: + enabled: false + annotations: + kubernetes.io/ingress.class: nginx + nginx.ingress.kubernetes.io/backend-protocol: "GRPC" + hosts: + - host: source-websocket.local + paths: + - path: / + pathType: ImplementationSpecific + tls: [] + # - secretName: chart-example-tls + # hosts: + # - chart-example.local + +resources: + requests: + cpu: 1m + memory: 16Mi + limits: + cpu: 100m + memory: 64Mi + +autoscaling: + enabled: true + minReplicas: 1 + maxReplicas: 100 + targetCPUUtilizationValue: 100m + targetMemoryUtilizationValue: 64Mi + +priority: + class: "awk-major" + +nodeSelector: {} + +tolerations: [] + +api: + writer: + backoff: "10s" + batchSize: 16 + cache: + size: 100 + ttl: "24h" + uri: "api:50051" + userAgent: "Awakari" + groupId: "default" + events: + source: "https://awakari.com/pub.html?srcType=ws" +db: + # Database name to use. + name: source + secret: + name: "db-mongo" + keys: + url: "url" + username: "username" + password: "password" + table: + # Database table name to use. + name: websocket + retention: "2160h" # 90 days + shard: false + tls: + enabled: false + insecure: false +websocket: + stream: + timeout: "1m" + type: "com_awakari_websocket_v1" +log: + # https://pkg.go.dev/golang.org/x/exp/slog#Level + level: -4 diff --git a/main.go b/main.go new file mode 100644 index 0000000..eb898c8 --- /dev/null +++ b/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "fmt" + "github.com/awakari/client-sdk-go/api" + apiGrpc "github.com/awakari/source-websocket/api/grpc" + "github.com/awakari/source-websocket/config" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/service" + "github.com/awakari/source-websocket/service/handler" + "github.com/awakari/source-websocket/service/interceptor" + "github.com/awakari/source-websocket/service/writer" + "github.com/awakari/source-websocket/storage/mongo" + "log/slog" + "os" + "strconv" + "strings" + "sync" +) + +func main() { + + cfg, err := config.NewConfigFromEnv() + if err != nil { + panic(fmt.Sprintf("failed to load the config from env: %s", err)) + } + + opts := slog.HandlerOptions{ + Level: slog.Level(cfg.Log.Level), + } + log := slog.New(slog.NewTextHandler(os.Stdout, &opts)) + log.Info("starting the update for the feeds") + + // determine the replica index + replicaNameParts := strings.Split(cfg.Replica.Name, "-") + if len(replicaNameParts) < 2 { + panic("unable to parse the replica name: " + cfg.Replica.Name) + } + var replicaIndex int + replicaIndex, err = strconv.Atoi(replicaNameParts[len(replicaNameParts)-1]) + if err != nil { + panic(err) + } + if replicaIndex < 0 { + panic(fmt.Sprintf("Negative replica index: %d", replicaIndex)) + } + log.Info(fmt.Sprintf("Replica: %d", replicaIndex)) + + var clientAwk api.Client + clientAwk, err = api. + NewClientBuilder(). + WriterUri(cfg.Api.Writer.Uri). + Build() + if err != nil { + panic(fmt.Sprintf("failed to initialize the Awakari API client: %s", err)) + } + defer clientAwk.Close() + log.Info("initialized the Awakari API client") + + svcWriter := writer.NewService(clientAwk, cfg.Api.Writer.Backoff, cfg.Api.Writer.Cache, log) + svcWriter = writer.NewLogging(svcWriter, log) + + ctx := context.Background() + stor, err := mongo.NewStorage(ctx, cfg.Db) + if err != nil { + panic(err) + } + defer stor.Close() + + interceptors := []interceptor.Interceptor{ + interceptor.NewLogging(interceptor.NewDefault(svcWriter), log, "default"), + } + + handlersLock := &sync.Mutex{} + handlerByUrl := make(map[string]handler.Handler) + handlerFactory := handler.NewFactory(cfg.Api, cfg.Event, interceptors) + + svc := service.NewService(stor, uint32(replicaIndex), handlersLock, handlerByUrl, handlerFactory) + svc = service.NewServiceLogging(svc, log) + err = resumeHandlers(ctx, log, svc, uint32(replicaIndex), handlersLock, handlerByUrl, handlerFactory) + if err != nil { + panic(err) + } + + log.Info(fmt.Sprintf("starting to listen the gRPC API @ port #%d...", cfg.Api.Port)) + err = apiGrpc.Serve(cfg.Api.Port, svc) + if err != nil { + panic(err) + } +} + +func resumeHandlers( + ctx context.Context, + log *slog.Logger, + svc service.Service, + replicaIndex uint32, + handlersLock *sync.Mutex, + handlerByUrl map[string]handler.Handler, + handlerFactory handler.Factory, +) (err error) { + var cursor string + var urls []string + var str model.Stream + for { + urls, err = svc.List(ctx, 100, model.Filter{}, model.OrderAsc, cursor) + if err == nil { + if len(urls) == 0 { + break + } + cursor = urls[len(urls)-1] + for _, url := range urls { + str, err = svc.Read(ctx, url) + if err == nil && str.Replica == replicaIndex { + resumeHandler(ctx, log, url, str, handlersLock, handlerByUrl, handlerFactory) + } + if err != nil { + break + } + } + } + if err != nil { + break + } + } + return +} + +func resumeHandler( + ctx context.Context, + log *slog.Logger, + url string, + str model.Stream, + handlersLock *sync.Mutex, + handlerByUrl map[string]handler.Handler, + handlerFactory handler.Factory, +) { + handlersLock.Lock() + defer handlersLock.Unlock() + h := handlerFactory(url, str) + handlerByUrl[url] = h + go h.Handle(ctx) + log.Info(fmt.Sprintf("resumed handler for %s", url)) +} diff --git a/model/filter.go b/model/filter.go new file mode 100644 index 0000000..ea28956 --- /dev/null +++ b/model/filter.go @@ -0,0 +1,7 @@ +package model + +type Filter struct { + GroupId string + UserId string + Pattern string +} diff --git a/model/metadata.go b/model/metadata.go new file mode 100644 index 0000000..6672589 --- /dev/null +++ b/model/metadata.go @@ -0,0 +1,15 @@ +package model + +const CeSpecVersion = "1.0" +const CeKeyAction = "action" +const CeKeyLanguage = "language" +const CeKeyLength = "length" +const CeKeyObjectUrl = "objecturl" +const CeKeyRevision = "revision" +const CeKeySchema = "schema" +const CeKeySubject = "subject" +const CeKeyTime = "time" +const CeKeyTitle = "title" + +const CeKeyWikiNotifyUrl = "wikinotifyurl" +const CeKeyWikiServerUrl = "wikiserverurl" diff --git a/model/order.go b/model/order.go new file mode 100644 index 0000000..a1907b2 --- /dev/null +++ b/model/order.go @@ -0,0 +1,15 @@ +package model + +type Order int + +const ( + OrderAsc Order = iota + OrderDesc +) + +func (o Order) String() string { + return [...]string{ + "Asc", + "Desc", + }[o] +} diff --git a/model/stream.go b/model/stream.go new file mode 100644 index 0000000..a79cbfd --- /dev/null +++ b/model/stream.go @@ -0,0 +1,11 @@ +package model + +import "time" + +type Stream struct { + Auth string + GroupId string + UserId string + CreatedAt time.Time + Replica uint32 +} diff --git a/scripts/cover.sh b/scripts/cover.sh new file mode 100755 index 0000000..58f1f47 --- /dev/null +++ b/scripts/cover.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +COVERAGE=$(cat cover.tmp) +THRESHOLD=47 +if [[ ${COVERAGE} -lt ${THRESHOLD} ]]; \ + then \ + echo "FAILED: test coverage ${COVERAGE}% < ${THRESHOLD}%"; \ + exit 1; \ + else \ + echo "PASSED: test coverage ${COVERAGE} >= ${THRESHOLD}%"; \ +fi diff --git a/scripts/release.sh b/scripts/release.sh new file mode 100755 index 0000000..a9db182 --- /dev/null +++ b/scripts/release.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +export REGISTRY=ghcr.io +export ORG=awakari +export COMPONENT=source-websocket +export SLUG=${REGISTRY}/${ORG}/${COMPONENT} +export VERSION=$(git describe --tags --abbrev=0 | cut -c 2-) +echo "Releasing version: $VERSION" +docker tag ${ORG}/${COMPONENT} "${SLUG}":"${VERSION}" +docker tag ${ORG}/${COMPONENT} "${SLUG}":latest +docker push "${SLUG}":"${VERSION}" +docker push "${SLUG}":latest diff --git a/scripts/staging.sh b/scripts/staging.sh new file mode 100755 index 0000000..05c03ae --- /dev/null +++ b/scripts/staging.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +export SLUG=ghcr.io/awakari/source-websocket +export VERSION=latest +docker tag awakari/source-websocket "${SLUG}":"${VERSION}" +docker push "${SLUG}":"${VERSION}" diff --git a/service/handler/handler.go b/service/handler/handler.go new file mode 100644 index 0000000..c2799ac --- /dev/null +++ b/service/handler/handler.go @@ -0,0 +1,83 @@ +package handler + +import ( + "context" + "github.com/awakari/source-websocket/config" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/service/interceptor" + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "io" +) + +type Handler interface { + io.Closer + Handle(ctx context.Context) +} + +type handler struct { + url string + str model.Stream + cfgApi config.ApiConfig + cfgEvt config.WebsocketConfig + interceptors []interceptor.Interceptor + + conn *websocket.Conn +} + +type Factory func(url string, str model.Stream) Handler + +func NewFactory(cfgApi config.ApiConfig, cfgEvt config.WebsocketConfig, interceptors []interceptor.Interceptor) Factory { + return func(url string, str model.Stream) Handler { + return &handler{ + url: url, + str: str, + cfgApi: cfgApi, + cfgEvt: cfgEvt, + interceptors: interceptors, + } + } +} + +func (h *handler) Close() error { + return h.conn.Close(websocket.StatusNormalClosure, "") +} + +func (h *handler) Handle(ctx context.Context) { + for { + evtN, err := h.handleStream(ctx) + if evtN == 0 && err != nil { + panic(err) + } + } +} + +func (h *handler) handleStream(ctx context.Context) (evtN uint64, err error) { + h.conn, _, err = websocket.Dial(ctx, h.url, nil) + if err == nil { + defer h.conn.CloseNow() + for { + err = h.handleStreamEvent(ctx, h.url) + if err != nil { + break + } + } + } + return +} + +func (h *handler) handleStreamEvent(ctx context.Context, url string) (err error) { + ctxWithTimeout, cancel := context.WithTimeout(ctx, h.cfgEvt.StreamTimeout) + defer cancel() + var raw map[string]any + err = wsjson.Read(ctxWithTimeout, h.conn, &raw) + if err == nil { + var matched bool + for _, i := range h.interceptors { + if matched, err = i.Handle(ctx, url, raw); matched { + break + } + } + } + return +} diff --git a/service/handler/mock.go b/service/handler/mock.go new file mode 100644 index 0000000..6e55eed --- /dev/null +++ b/service/handler/mock.go @@ -0,0 +1,20 @@ +package handler + +import ( + "context" + "github.com/awakari/source-websocket/model" +) + +type mockHandler struct{} + +var NewMock Factory = func(url string, str model.Stream) Handler { + return mockHandler{} +} + +func (m mockHandler) Close() error { + return nil +} + +func (m mockHandler) Handle(ctx context.Context) { + return +} diff --git a/service/interceptor/default.go b/service/interceptor/default.go new file mode 100644 index 0000000..1cacdb7 --- /dev/null +++ b/service/interceptor/default.go @@ -0,0 +1,21 @@ +package interceptor + +import ( + "context" + "github.com/awakari/source-websocket/service/writer" +) + +type defaultInterceptor struct { + w writer.Service +} + +func NewDefault(w writer.Service) Interceptor { + return defaultInterceptor{ + w: w, + } +} + +func (d defaultInterceptor) Handle(ctx context.Context, url string, raw map[string]any) (matches bool, err error) { + matches = true + return +} diff --git a/service/interceptor/interceptor.go b/service/interceptor/interceptor.go new file mode 100644 index 0000000..c5dd39f --- /dev/null +++ b/service/interceptor/interceptor.go @@ -0,0 +1,9 @@ +package interceptor + +import ( + "context" +) + +type Interceptor interface { + Handle(ctx context.Context, src string, raw map[string]any) (matches bool, err error) +} diff --git a/service/interceptor/logging.go b/service/interceptor/logging.go new file mode 100644 index 0000000..1f7300c --- /dev/null +++ b/service/interceptor/logging.go @@ -0,0 +1,33 @@ +package interceptor + +import ( + "context" + "fmt" + "log/slog" +) + +type logging struct { + i Interceptor + log *slog.Logger + t string +} + +func NewLogging(i Interceptor, log *slog.Logger, t string) Interceptor { + return logging{ + i: i, + log: log, + t: t, + } +} + +func (l logging) Handle(ctx context.Context, src string, raw map[string]any) (matches bool, err error) { + if matches, err = l.i.Handle(ctx, src, raw); matches { + switch err { + case nil: + l.log.Debug(fmt.Sprintf("interceptor(%s).Handle(%s, %+v): ok", l.t, src, raw)) + default: + l.log.Error(fmt.Sprintf("interceptor(%s).Handle(%s, %+v): %s", l.t, src, raw, err)) + } + } + return +} diff --git a/service/logging.go b/service/logging.go new file mode 100644 index 0000000..9244f66 --- /dev/null +++ b/service/logging.go @@ -0,0 +1,55 @@ +package service + +import ( + "context" + "fmt" + "github.com/awakari/source-websocket/model" + "log/slog" + "time" +) + +type logging struct { + svc Service + log *slog.Logger +} + +func NewServiceLogging(svc Service, log *slog.Logger) Service { + return logging{ + svc: svc, + log: log, + } +} + +func (l logging) Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error) { + err = l.svc.Create(ctx, url, auth, groupId, userId, at) + l.log.Log(context.TODO(), logLevel(err), fmt.Sprintf("service.Create(%s, %s, %s, %s, %s): %s", url, auth, groupId, userId, at, err)) + return +} + +func (l logging) Read(ctx context.Context, url string) (str model.Stream, err error) { + str, err = l.svc.Read(ctx, url) + l.log.Log(context.TODO(), logLevel(err), fmt.Sprintf("service.Read(%s): %+v, %s", url, str, err)) + return +} + +func (l logging) Delete(ctx context.Context, url, groupId, userId string) (err error) { + err = l.svc.Delete(ctx, url, groupId, userId) + l.log.Log(context.TODO(), logLevel(err), fmt.Sprintf("service.Delete(%s, %s/%s): %s", url, groupId, userId, err)) + return +} + +func (l logging) List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) { + urls, err = l.svc.List(ctx, limit, filter, order, cursor) + l.log.Log(context.TODO(), logLevel(err), fmt.Sprintf("service.List(%d, %+v, %+v, %s): %d, %s", limit, filter, order, cursor, len(urls), err)) + return +} + +func logLevel(err error) (lvl slog.Level) { + switch err { + case nil: + lvl = slog.LevelDebug + default: + lvl = slog.LevelError + } + return +} diff --git a/service/mock.go b/service/mock.go new file mode 100644 index 0000000..93de485 --- /dev/null +++ b/service/mock.go @@ -0,0 +1,62 @@ +package service + +import ( + "context" + "github.com/awakari/source-websocket/model" + "time" +) + +type mock struct { +} + +func NewServiceMock() Service { + return mock{} +} + +func (m mock) Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error) { + switch url { + case "fail": + err = ErrUnexpected + case "conflict": + err = ErrConflict + } + return +} + +func (m mock) Read(ctx context.Context, url string) (str model.Stream, err error) { + switch url { + case "missing": + err = ErrNotFound + case "fail": + err = ErrUnexpected + default: + str.GroupId = "group0" + str.UserId = "user1" + str.CreatedAt = time.Date(2024, 11, 4, 14, 52, 0, 0, time.UTC) + str.Replica = 1 + } + return +} + +func (m mock) Delete(ctx context.Context, url, groupId, userId string) (err error) { + switch url { + case "missing": + err = ErrNotFound + case "fail": + err = ErrUnexpected + } + return +} + +func (m mock) List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) { + switch cursor { + case "fail": + err = ErrUnexpected + default: + urls = []string{ + "url0", + "url1", + } + } + return +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 0000000..9bc470b --- /dev/null +++ b/service/service.go @@ -0,0 +1,105 @@ +package service + +import ( + "context" + "errors" + "fmt" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/service/handler" + "github.com/awakari/source-websocket/storage" + "sync" + "time" +) + +type Service interface { + Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error) + Read(ctx context.Context, url string) (str model.Stream, err error) + Delete(ctx context.Context, url, groupId, userId string) (err error) + List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) +} + +type svc struct { + stor storage.Storage + replicaIndex uint32 + handlersLock *sync.Mutex + handlerByUrl map[string]handler.Handler + handlerFactory handler.Factory +} + +var ErrNotFound = errors.New("not found") +var ErrConflict = errors.New("conflict") +var ErrUnexpected = errors.New("unexpected") + +func NewService( + stor storage.Storage, + replicaIndex uint32, + handlersLock *sync.Mutex, + handlerByUrl map[string]handler.Handler, + handlerFactory handler.Factory, +) Service { + return svc{ + stor: stor, + replicaIndex: replicaIndex, + handlersLock: handlersLock, + handlerByUrl: handlerByUrl, + handlerFactory: handlerFactory, + } +} + +func (s svc) Create(ctx context.Context, url, auth, groupId, userId string, at time.Time) (err error) { + str := model.Stream{ + Auth: auth, + GroupId: groupId, + UserId: userId, + CreatedAt: at, + Replica: s.replicaIndex, + } + err = s.stor.Create(ctx, url, str) + if err == nil { + s.handlersLock.Lock() + defer s.handlersLock.Unlock() + h := s.handlerFactory(url, str) + s.handlerByUrl[url] = h + go h.Handle(context.Background()) + } + err = translateError(err) + return +} + +func (s svc) Read(ctx context.Context, url string) (str model.Stream, err error) { + str, err = s.stor.Read(ctx, url) + err = translateError(err) + return +} + +func (s svc) Delete(ctx context.Context, url, groupId, userId string) (err error) { + err = s.stor.Delete(ctx, url, groupId, userId) + if err == nil { + s.handlersLock.Lock() + defer s.handlersLock.Unlock() + h, hOk := s.handlerByUrl[url] + if hOk { + err = h.Close() + } + } + err = translateError(err) + return +} + +func (s svc) List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) { + urls, err = s.stor.List(ctx, limit, filter, order, cursor) + err = translateError(err) + return +} + +func translateError(src error) (dst error) { + switch { + case errors.Is(src, storage.ErrConflict): + dst = fmt.Errorf("%w: %s", ErrConflict, src) + case errors.Is(src, storage.ErrNotFound): + dst = fmt.Errorf("%w: %s", ErrNotFound, src) + case src != nil: + dst = fmt.Errorf("%w: %s", ErrUnexpected, src) + } + return +} diff --git a/service/service_test.go b/service/service_test.go new file mode 100644 index 0000000..0a77c70 --- /dev/null +++ b/service/service_test.go @@ -0,0 +1,140 @@ +package service + +import ( + "context" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/service/handler" + "github.com/awakari/source-websocket/storage" + "github.com/stretchr/testify/assert" + "log/slog" + "sync" + "testing" + "time" +) + +func TestService_Create(t *testing.T) { + handlerByUrl := make(map[string]handler.Handler) + s := NewService(storage.NewMockStorage(), 1, &sync.Mutex{}, handlerByUrl, handler.NewMock) + s = NewServiceLogging(s, slog.Default()) + cases := map[string]struct { + url string + auth string + groupId string + userId string + at time.Time + handlerCount int + err error + }{ + "ok": { + handlerCount: 1, + }, + "fail": { + url: "fail", + err: ErrUnexpected, + }, + "conflict": { + url: "conflict", + err: ErrConflict, + }, + } + for k, c := range cases { + t.Run(k, func(t *testing.T) { + err := s.Create(context.TODO(), c.url, c.auth, c.groupId, c.userId, c.at) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, c.handlerCount, len(handlerByUrl)) + clear(handlerByUrl) + }) + } +} + +func TestService_Read(t *testing.T) { + s := NewService(storage.NewMockStorage(), 1, &sync.Mutex{}, make(map[string]handler.Handler), handler.NewMock) + s = NewServiceLogging(s, slog.Default()) + cases := map[string]struct { + url string + str model.Stream + err error + }{ + "ok": { + str: model.Stream{ + GroupId: "group0", + UserId: "user1", + CreatedAt: time.Date(2024, 11, 4, 14, 52, 0, 0, time.UTC), + Replica: 1, + }, + }, + "fail": { + url: "fail", + err: ErrUnexpected, + }, + "missing": { + url: "missing", + err: ErrNotFound, + }, + } + for k, c := range cases { + t.Run(k, func(t *testing.T) { + str, err := s.Read(context.TODO(), c.url) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, c.str, str) + }) + } +} + +func TestService_Delete(t *testing.T) { + s := NewService(storage.NewMockStorage(), 1, &sync.Mutex{}, make(map[string]handler.Handler), handler.NewMock) + s = NewServiceLogging(s, slog.Default()) + cases := map[string]struct { + url string + groupId string + userId string + err error + }{ + "ok": {}, + "fail": { + url: "fail", + err: ErrUnexpected, + }, + "missing": { + url: "missing", + err: ErrNotFound, + }, + } + for k, c := range cases { + t.Run(k, func(t *testing.T) { + err := s.Delete(context.TODO(), c.url, c.groupId, c.userId) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestService_List(t *testing.T) { + s := NewService(storage.NewMockStorage(), 1, &sync.Mutex{}, make(map[string]handler.Handler), handler.NewMock) + s = NewServiceLogging(s, slog.Default()) + cases := map[string]struct { + limit uint32 + filter model.Filter + order model.Order + cursor string + urls []string + err error + }{ + "ok": { + urls: []string{ + "url0", + "url1", + }, + }, + "fail": { + cursor: "fail", + err: ErrUnexpected, + }, + } + for k, c := range cases { + t.Run(k, func(t *testing.T) { + urls, err := s.List(context.TODO(), c.limit, c.filter, c.order, c.cursor) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, c.urls, urls) + }) + } +} diff --git a/service/writer/logging.go b/service/writer/logging.go new file mode 100644 index 0000000..9e3706a --- /dev/null +++ b/service/writer/logging.go @@ -0,0 +1,42 @@ +package writer + +import ( + "context" + "fmt" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "log/slog" +) + +type logging struct { + svc Service + log *slog.Logger +} + +func NewLogging(svc Service, log *slog.Logger) Service { + return logging{ + svc: svc, + log: log, + } +} + +func (l logging) Close() (err error) { + err = l.svc.Close() + l.log.Log(context.TODO(), logLevel(err), fmt.Sprintf("writer.Close(): %s", err)) + return +} + +func (l logging) Write(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) { + err = l.svc.Write(ctx, evt, groupId, userId) + l.log.Log(ctx, logLevel(err), fmt.Sprintf("writer.Write(evt=%s, groupId=%s, userId=%s): %s", evt.Id, groupId, userId, err)) + return +} + +func logLevel(err error) (lvl slog.Level) { + switch err { + case nil: + lvl = slog.LevelDebug + default: + lvl = slog.LevelError + } + return +} diff --git a/service/writer/mock.go b/service/writer/mock.go new file mode 100644 index 0000000..107cb88 --- /dev/null +++ b/service/writer/mock.go @@ -0,0 +1,25 @@ +package writer + +import ( + "context" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" +) + +type mock struct { +} + +func NewMock() Service { + return mock{} +} + +func (m mock) Close() error { + return nil +} + +func (m mock) Write(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) { + switch userId { + case "fail": + err = ErrWrite + } + return +} diff --git a/service/writer/service.go b/service/writer/service.go new file mode 100644 index 0000000..c118665 --- /dev/null +++ b/service/writer/service.go @@ -0,0 +1,156 @@ +package writer + +import ( + "context" + "errors" + "fmt" + "github.com/awakari/client-sdk-go/api" + "github.com/awakari/client-sdk-go/api/grpc/limits" + "github.com/awakari/client-sdk-go/api/grpc/permits" + "github.com/awakari/client-sdk-go/api/grpc/resolver" + "github.com/awakari/client-sdk-go/model" + "github.com/awakari/source-websocket/config" + "github.com/cenkalti/backoff/v4" + "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + "github.com/hashicorp/golang-lru/v2/expirable" + "google.golang.org/grpc/metadata" + "io" + "log/slog" + "sync" + "time" +) + +type Service interface { + io.Closer + Write(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) +} + +type service struct { + cache *expirable.LRU[string, model.Writer[*pb.CloudEvent]] + cacheLock *sync.Mutex + clientAwk api.Client + backoffTimeLimit time.Duration + log *slog.Logger +} + +const accSep = ":" +const backoffInitDelay = 100 * time.Millisecond + +var ErrWrite = errors.New("failed to write event") +var errNoAck = errors.New("event is not accepted") + +func NewService(clientAwk api.Client, backoffTimeLimit time.Duration, cfgCache config.WriterCacheConfig, log *slog.Logger) Service { + funcEvict := func(_ string, w model.Writer[*pb.CloudEvent]) { + _ = w.Close() + } + return service{ + cache: expirable.NewLRU[string, model.Writer[*pb.CloudEvent]](int(cfgCache.Size), funcEvict, cfgCache.Ttl), + cacheLock: &sync.Mutex{}, + clientAwk: clientAwk, + backoffTimeLimit: backoffTimeLimit, + log: log, + } +} + +func (svc service) Close() (err error) { + svc.cacheLock.Lock() + defer svc.cacheLock.Unlock() + for _, k := range svc.cache.Keys() { + w, found := svc.cache.Get(k) + if found { + err = errors.Join(err, w.Close()) + } + } + svc.cache.Purge() + return +} + +func (svc service) Write(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) { + err = svc.getWriterAndPublish(ctx, evt, groupId, userId) + if err != nil { + err = svc.retryBackoff(func() error { + return svc.getWriterAndPublish(ctx, evt, groupId, userId) + }) + } + if err != nil { + err = fmt.Errorf("%w id: %s, cause: %s", ErrWrite, evt.Id, err) + } + return +} + +func (svc service) getWriterAndPublish(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) { + var w model.Writer[*pb.CloudEvent] + w, err = svc.getWriter(ctx, groupId, userId) + if err == nil { + err = svc.publish(w, evt) + switch { + case errors.Is(err, limits.ErrReached): + svc.log.Debug(fmt.Sprintf("Publish failure: evt.Id=%s, userId=%s, err=%s", evt.Id, userId, err)) + err = nil // don't retry this time + fallthrough // reopen the writer the next time + case errors.Is(err, limits.ErrUnavailable): + fallthrough + case errors.Is(err, permits.ErrUnavailable): + fallthrough + case errors.Is(err, resolver.ErrUnavailable): + fallthrough + case errors.Is(err, resolver.ErrInternal): + fallthrough + case errors.Is(err, io.EOF): + // close and remove the writer from the cache + k := writerKey(groupId, userId) + svc.cacheLock.Lock() + defer svc.cacheLock.Unlock() + svc.cache.Remove(k) + _ = w.Close() + } + } + return +} + +func (svc service) getWriter(ctx context.Context, groupId, userId string) (w model.Writer[*pb.CloudEvent], err error) { + k := writerKey(groupId, userId) + svc.cacheLock.Lock() + defer svc.cacheLock.Unlock() + w, found := svc.cache.Get(k) + if !found { + ctxGroupId := metadata.AppendToOutgoingContext(ctx, "x-awakari-group-id", groupId) + w, err = svc.clientAwk.OpenMessagesWriter(ctxGroupId, userId) + if err == nil { + svc.cache.Add(k, w) + } + } + return +} + +func (svc service) publish(w model.Writer[*pb.CloudEvent], evt *pb.CloudEvent) (err error) { + err = svc.tryPublish(w, evt) + if err == errNoAck { + err = svc.retryBackoff(func() error { + return svc.tryPublish(w, evt) + }) + } + return +} + +func (svc service) tryPublish(w model.Writer[*pb.CloudEvent], evt *pb.CloudEvent) (err error) { + var ackCount uint32 + ackCount, err = w.WriteBatch([]*pb.CloudEvent{evt}) + if err == nil && ackCount < 1 { + err = errNoAck // it's an error to retry + } + return +} + +func (svc service) retryBackoff(op func() error) (err error) { + b := backoff.NewExponentialBackOff() + b.InitialInterval = backoffInitDelay + b.MaxElapsedTime = svc.backoffTimeLimit + err = backoff.Retry(op, b) + return +} + +func writerKey(groupId, userId string) (k string) { + k = fmt.Sprintf("%s%s%s", groupId, accSep, userId) + return +} diff --git a/storage/mock.go b/storage/mock.go new file mode 100644 index 0000000..40d2b68 --- /dev/null +++ b/storage/mock.go @@ -0,0 +1,66 @@ +package storage + +import ( + "context" + "github.com/awakari/source-websocket/model" + "time" +) + +type mockStorage struct{} + +func NewMockStorage() Storage { + return mockStorage{} +} + +func (m mockStorage) Close() error { + //TODO implement me + panic("implement me") +} + +func (m mockStorage) Create(ctx context.Context, url string, str model.Stream) (err error) { + switch url { + case "fail": + err = ErrUnexpected + case "conflict": + err = ErrConflict + } + return +} + +func (m mockStorage) Read(ctx context.Context, url string) (str model.Stream, err error) { + switch url { + case "missing": + err = ErrNotFound + case "fail": + err = ErrUnexpected + default: + str.GroupId = "group0" + str.UserId = "user1" + str.CreatedAt = time.Date(2024, 11, 4, 14, 52, 0, 0, time.UTC) + str.Replica = 1 + } + return +} + +func (m mockStorage) Delete(ctx context.Context, url, groupId, userId string) (err error) { + switch url { + case "missing": + err = ErrNotFound + case "fail": + err = ErrUnexpected + } + return +} + +func (m mockStorage) List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) { + switch cursor { + case "fail": + err = ErrUnexpected + default: + urls = []string{ + "url0", + "url1", + } + } + return +} diff --git a/storage/mongo/storage.go b/storage/mongo/storage.go new file mode 100644 index 0000000..ed27ca8 --- /dev/null +++ b/storage/mongo/storage.go @@ -0,0 +1,259 @@ +package mongo + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "github.com/awakari/source-websocket/config" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/storage" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "time" +) + +type storageMongo struct { + conn *mongo.Client + db *mongo.Database + coll *mongo.Collection +} + +type record struct { + Url string `bson:"url"` + Auth string `bson:"auth"` + GroupId string `bson:"gid"` + UserId string `bson:"uid"` + ReplicaIndex uint32 `bson:"ridx"` + CreatedAt time.Time `bson:"createdAt"` +} + +const attrUrl = "url" +const attrAuth = "auth" +const attrGroupId = "gid" +const attrUserId = "uid" +const attrReplicaIndex = "ridx" +const attrCreatedAt = "createdAt" + +var optsSrvApi = options.ServerAPI(options.ServerAPIVersion1) +var optsGet = options. + FindOne(). + SetShowRecordID(false). + SetProjection(projRead) +var projRead = bson.D{ + { + Key: attrAuth, + Value: 1, + }, + { + Key: attrGroupId, + Value: 1, + }, + { + Key: attrUserId, + Value: 1, + }, + { + Key: attrReplicaIndex, + Value: 1, + }, + { + Key: attrCreatedAt, + Value: 1, + }, +} +var projList = bson.D{ + { + Key: attrUrl, + Value: 1, + }, +} +var sortListAsc = bson.D{ + { + Key: attrUrl, + Value: 1, + }, +} +var sortListDesc = bson.D{ + { + Key: attrUrl, + Value: -1, + }, +} + +func NewStorage(ctx context.Context, cfgDb config.DbConfig) (s storage.Storage, err error) { + clientOpts := options. + Client(). + ApplyURI(cfgDb.Uri). + SetServerAPIOptions(optsSrvApi) + if cfgDb.Tls.Enabled { + clientOpts = clientOpts.SetTLSConfig(&tls.Config{InsecureSkipVerify: cfgDb.Tls.Insecure}) + } + if len(cfgDb.UserName) > 0 { + auth := options.Credential{ + Username: cfgDb.UserName, + Password: cfgDb.Password, + PasswordSet: len(cfgDb.Password) > 0, + } + clientOpts = clientOpts.SetAuth(auth) + } + conn, err := mongo.Connect(ctx, clientOpts) + var sm storageMongo + if err == nil { + db := conn.Database(cfgDb.Name) + coll := db.Collection(cfgDb.Table.Name) + sm.conn = conn + sm.db = db + sm.coll = coll + _, err = sm.ensureIndices(ctx, cfgDb.Table.Retention) + } + if err == nil { + s = sm + } + return +} + +func (sm storageMongo) ensureIndices(ctx context.Context, retentionPeriod time.Duration) ([]string, error) { + return sm.coll.Indexes().CreateMany(ctx, []mongo.IndexModel{ + { + Keys: bson.D{ + { + Key: attrUrl, + Value: 1, + }, + }, + Options: options. + Index(). + SetUnique(true), + }, + }) +} + +func (sm storageMongo) Close() error { + return sm.conn.Disconnect(context.TODO()) +} + +func (sm storageMongo) Create(ctx context.Context, url string, str model.Stream) (err error) { + _, err = sm.coll.InsertOne(ctx, record{ + Url: url, + Auth: str.Auth, + GroupId: str.GroupId, + UserId: str.UserId, + ReplicaIndex: str.Replica, + CreatedAt: str.CreatedAt.UTC(), + }) + err = decodeError(err, url) + return +} + +func (sm storageMongo) Read(ctx context.Context, url string) (str model.Stream, err error) { + q := bson.M{ + attrUrl: url, + } + var result *mongo.SingleResult + result = sm.coll.FindOne(ctx, q, optsGet) + err = result.Err() + var rec record + if err == nil { + err = result.Decode(&rec) + } + if err == nil { + str.Auth = rec.Auth + str.CreatedAt = rec.CreatedAt.UTC() + str.GroupId = rec.GroupId + str.UserId = rec.UserId + str.Replica = rec.ReplicaIndex + } + err = decodeError(err, url) + return +} + +func (sm storageMongo) Delete(ctx context.Context, url, groupId, userId string) (err error) { + var result *mongo.DeleteResult + result, err = sm.coll.DeleteOne(ctx, bson.M{ + attrUrl: url, + attrGroupId: groupId, + attrUserId: userId, + }) + switch err { + case nil: + if result.DeletedCount < 1 { + err = fmt.Errorf("%w by url %s", storage.ErrNotFound, url) + } + default: + err = decodeError(err, url) + } + return +} + +func (sm storageMongo) List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) { + q := bson.M{} + if filter.UserId != "" { + q[attrGroupId] = filter.GroupId + q[attrUserId] = filter.UserId + } + optsList := options. + Find(). + SetLimit(int64(limit)). + SetShowRecordID(false). + SetProjection(projList) + var clauseCursor bson.M + switch order { + case model.OrderDesc: + clauseCursor = bson.M{ + "$lt": cursor, + } + optsList = optsList.SetSort(sortListDesc) + default: + clauseCursor = bson.M{ + "$gt": cursor, + } + optsList = optsList.SetSort(sortListAsc) + } + q["$and"] = []bson.M{ + { + attrUrl: clauseCursor, + }, + { + "$or": []bson.M{ + { + attrUrl: bson.M{ + "$regex": filter.Pattern, + }, + }, + { + attrUrl: bson.M{ + "$regex": filter.Pattern, + }, + }, + }, + }, + } + var cur *mongo.Cursor + cur, err = sm.coll.Find(ctx, q, optsList) + if err == nil { + for cur.Next(ctx) { + var rec record + err = errors.Join(err, cur.Decode(&rec)) + if err == nil { + urls = append(urls, rec.Url) + } + } + } + err = decodeError(err, cursor) + return +} + +func decodeError(src error, url string) (dst error) { + switch { + case src == nil: + case errors.Is(src, mongo.ErrNoDocuments): + dst = fmt.Errorf("%w: %s", storage.ErrNotFound, url) + case mongo.IsDuplicateKeyError(src): + dst = fmt.Errorf("%w: %s", storage.ErrConflict, url) + default: + dst = fmt.Errorf("%w: %s", storage.ErrUnexpected, src) + } + return +} diff --git a/storage/mongo/storage_test.go b/storage/mongo/storage_test.go new file mode 100644 index 0000000..ca59ca2 --- /dev/null +++ b/storage/mongo/storage_test.go @@ -0,0 +1,324 @@ +package mongo + +import ( + "context" + "fmt" + "github.com/awakari/source-websocket/config" + "github.com/awakari/source-websocket/model" + "github.com/awakari/source-websocket/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +var dbUri = os.Getenv("DB_URI_TEST_MONGO") + +func TestNewStorage(t *testing.T) { + // + collName := fmt.Sprintf("tgchans-test-%d", time.Now().UnixMicro()) + dbCfg := config.DbConfig{ + Uri: dbUri, + Name: "sources", + } + dbCfg.Table.Name = collName + dbCfg.Table.Shard = false + dbCfg.Tls.Enabled = true + dbCfg.Tls.Insecure = true + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + assert.Nil(t, err) + assert.NotNil(t, s) + // + clear(ctx, t, s.(storageMongo)) +} + +func clear(ctx context.Context, t *testing.T, s storageMongo) { + require.Nil(t, s.coll.Drop(ctx)) + require.Nil(t, s.Close()) +} + +func TestStorageMongo_Create(t *testing.T) { + // + collName := fmt.Sprintf("websocket-test-%d", time.Now().UnixMicro()) + dbCfg := config.DbConfig{ + Uri: dbUri, + Name: "sources", + } + dbCfg.Table.Name = collName + dbCfg.Tls.Enabled = true + dbCfg.Tls.Insecure = true + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + require.Nil(t, err) + assert.NotNil(t, s) + // + sm := s.(storageMongo) + defer clear(ctx, t, s.(storageMongo)) + // + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url0", + Auth: "token1", + GroupId: "group2", + UserId: "user3", + ReplicaIndex: 4, + CreatedAt: time.Now().UTC(), + }) + require.Nil(t, err) + // + cases := map[string]struct { + url string + auth string + groupId string + userId string + replicaIndex uint32 + at time.Time + err error + }{ + "ok empty": {}, + "ok": { + url: "url1", + auth: "token1", + groupId: "group2", + userId: "user3", + replicaIndex: 4, + at: time.Now().UTC(), + }, + "dup id": { + url: "url0", + err: storage.ErrConflict, + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + err = s.Create(ctx, c.url, model.Stream{ + Auth: c.auth, + GroupId: c.groupId, + UserId: c.userId, + CreatedAt: c.at, + Replica: c.replicaIndex, + }) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestStorageMongo_Read(t *testing.T) { + // + collName := fmt.Sprintf("websocket-test-%d", time.Now().UnixMicro()) + dbCfg := config.DbConfig{ + Uri: dbUri, + Name: "sources", + } + dbCfg.Table.Name = collName + dbCfg.Tls.Enabled = true + dbCfg.Tls.Insecure = true + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + require.Nil(t, err) + assert.NotNil(t, s) + // + sm := s.(storageMongo) + defer clear(ctx, t, s.(storageMongo)) + // + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url0", + Auth: "token1", + GroupId: "group2", + UserId: "user3", + ReplicaIndex: 4, + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + }) + require.Nil(t, err) + // + cases := map[string]struct { + url string + out model.Stream + err error + }{ + "ok": { + url: "url0", + out: model.Stream{ + Auth: "token1", + GroupId: "group2", + UserId: "user3", + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + Replica: 4, + }, + }, + "missing": { + url: "url1", + err: storage.ErrNotFound, + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + var str model.Stream + str, err = s.Read(ctx, c.url) + assert.Equal(t, c.out, str) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestStorageMongo_Delete(t *testing.T) { + // + collName := fmt.Sprintf("websocket-test-%d", time.Now().UnixMicro()) + dbCfg := config.DbConfig{ + Uri: dbUri, + Name: "sources", + } + dbCfg.Table.Name = collName + dbCfg.Tls.Enabled = true + dbCfg.Tls.Insecure = true + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + require.Nil(t, err) + assert.NotNil(t, s) + // + sm := s.(storageMongo) + defer clear(ctx, t, s.(storageMongo)) + // + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url0", + Auth: "token1", + GroupId: "group2", + UserId: "user3", + ReplicaIndex: 4, + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + }) + require.Nil(t, err) + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url1", + Auth: "token2", + GroupId: "group3", + UserId: "user4", + ReplicaIndex: 5, + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + }) + require.Nil(t, err) + // + cases := map[string]struct { + url string + groupId string + userId string + err error + }{ + "ok": { + url: "url0", + groupId: "group2", + userId: "user3", + }, + "invalid url": { + url: "url2", + groupId: "group3", + userId: "user4", + err: storage.ErrNotFound, + }, + "missing user id": { + url: "url1", + groupId: "group3", + err: storage.ErrNotFound, + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + err = s.Delete(ctx, c.url, c.groupId, c.userId) + assert.ErrorIs(t, err, c.err) + }) + } +} + +func TestStorageMongo_List(t *testing.T) { + // + collName := fmt.Sprintf("websocket-test-%d", time.Now().UnixMicro()) + dbCfg := config.DbConfig{ + Uri: dbUri, + Name: "sources", + } + dbCfg.Table.Name = collName + dbCfg.Tls.Enabled = true + dbCfg.Tls.Insecure = true + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour) + defer cancel() + s, err := NewStorage(ctx, dbCfg) + require.Nil(t, err) + assert.NotNil(t, s) + // + sm := s.(storageMongo) + defer clear(ctx, t, s.(storageMongo)) + // + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url0", + Auth: "token1", + GroupId: "group2", + UserId: "user3", + ReplicaIndex: 4, + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + }) + require.Nil(t, err) + _, err = sm.coll.InsertOne(ctx, record{ + Url: "url1", + Auth: "token2", + GroupId: "group3", + UserId: "user4", + ReplicaIndex: 5, + CreatedAt: time.Date(2024, 11, 4, 18, 49, 25, 0, time.UTC), + }) + require.Nil(t, err) + // + cases := map[string]struct { + limit uint32 + filter model.Filter + order model.Order + cursor string + urls []string + err error + }{ + "asc": { + urls: []string{ + "url0", + "url1", + }, + }, + "desc w/ limit": { + cursor: "zzzz", + limit: 1, + order: model.OrderDesc, + urls: []string{ + "url1", + }, + }, + "filter": { + filter: model.Filter{ + GroupId: "group2", + UserId: "user3", + }, + urls: []string{ + "url0", + }, + }, + "cursor": { + cursor: "url0", + urls: []string{ + "url1", + }, + }, + } + // + for k, c := range cases { + t.Run(k, func(t *testing.T) { + urls, err := s.List(ctx, c.limit, c.filter, c.order, c.cursor) + assert.ErrorIs(t, err, c.err) + assert.Equal(t, c.urls, urls) + }) + } +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..5141d0d --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,20 @@ +package storage + +import ( + "context" + "errors" + "github.com/awakari/source-websocket/model" + "io" +) + +type Storage interface { + io.Closer + Create(ctx context.Context, url string, str model.Stream) (err error) + Read(ctx context.Context, url string) (str model.Stream, err error) + Delete(ctx context.Context, url, groupId, userId string) (err error) + List(ctx context.Context, limit uint32, filter model.Filter, order model.Order, cursor string) (urls []string, err error) +} + +var ErrNotFound = errors.New("not found") +var ErrConflict = errors.New("conflict") +var ErrUnexpected = errors.New("unexpected error")