Skip to content

Commit

Permalink
bigquery: move out vanilla BQ destination to separated submodule
Browse files Browse the repository at this point in the history
To make it work, go.mod files are removed. It's enough to have separated packages/directories to restrict dependencies.

Change-Id: Idc07a084294b14daca8e481ba78c99d1d4e1e575
  • Loading branch information
elek committed Mar 6, 2024
1 parent a152f19 commit e871f0a
Showing 25 changed files with 455 additions and 1,351 deletions.
27 changes: 0 additions & 27 deletions Earthfile
Original file line number Diff line number Diff line change
@@ -18,38 +18,11 @@ lint:
RUN staticcheck ./...
RUN check-cross-compile ./...

WORKDIR /go/eventkit/tools
RUN golangci-lint run
RUN staticcheck ./...
RUN check-cross-compile ./...

WORKDIR /go/eventkit/eventkitd
RUN golangci-lint run
RUN staticcheck ./...
RUN check-cross-compile ./...

WORKDIR /go/eventkit/eventkitd-bigquery
RUN golangci-lint run
RUN staticcheck ./...
RUN check-cross-compile ./...

test:
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
go test ./...
WORKDIR /go/eventkit/tools
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
go test ./...
WORKDIR /go/eventkit/eventkitd
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
go test ./...
WORKDIR /go/eventkit/eventkitd-bigquery
RUN --mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg/mod \
go test ./...

check-format:
COPY . .
Original file line number Diff line number Diff line change
@@ -184,7 +184,7 @@ tagloop:
return nil
}

func tableName(event *pb.Event) string {
func TableName(event *pb.Event) string {
var res []string
for _, scope := range event.Scope {
res = append(res, nonSafeTableNameCharacters.ReplaceAllString(scope, "_"))
7 changes: 4 additions & 3 deletions eventkitd-bigquery/bigquery/config.go → bigquery/config.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/zeebo/errs/v2"

"storj.io/eventkit"
"storj.io/eventkit/destination"
)

// CreateDestination creates eventkit destination based on complex configuration.
@@ -74,7 +75,7 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination

ll := lastLayer
lastLayer = func() (eventkit.Destination, error) {
return NewParallel(ll, workers), nil
return destination.NewParallel(ll, workers), nil
}

case "batch":
@@ -106,12 +107,12 @@ func CreateDestination(ctx context.Context, config string) (eventkit.Destination
return nil, errs.Errorf("Unknown parameter for batch destination %s. Please use queueSize/batchSize/flushInterval", key)
}
}
destination, err := lastLayer()
ekDest, err := lastLayer()
if err != nil {
return nil, err
}
lastLayer = func() (eventkit.Destination, error) {
return NewBatchQueue(destination, queueSize, batchSize, flushInterval), nil
return destination.NewBatchQueue(ekDest, queueSize, batchSize, flushInterval), nil
}
}
}
File renamed without changes.
5 changes: 5 additions & 0 deletions bigquery/pkg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package bigquery

import "github.com/spacemonkeygo/monkit/v3"

var mon = monkit.Package()
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package bigquery
package destination

import (
"context"
"sync"
"time"

"github.com/spacemonkeygo/monkit/v3"
"golang.org/x/sync/errgroup"

"storj.io/eventkit"
"storj.io/eventkit/utils"
)

var mon = monkit.Package()

// BatchQueue collects events and send them in batches.
type BatchQueue struct {
batchThreshold int
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package bigquery
package destination

import (
"context"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bigquery
package destination

import (
"context"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package bigquery
package destination

import (
"context"
15 changes: 8 additions & 7 deletions eventkitd-bigquery/bigquery/sink.go
Original file line number Diff line number Diff line change
@@ -4,17 +4,18 @@ import (
"context"
"time"

"storj.io/eventkit/bigquery"
"storj.io/eventkit/eventkitd/listener"
"storj.io/eventkit/pb"
)

// BigQuerySink provides an abstraction for processing events in a transport agnostic way.
type BigQuerySink struct {
client *BigQueryClient
client *bigquery.BigQueryClient
}

func NewBigQuerySink(ctx context.Context, project string, dataset string) (*BigQuerySink, error) {
c, err := NewBigQueryClient(ctx, project, dataset)
c, err := bigquery.NewBigQueryClient(ctx, project, dataset)
if err != nil {
return nil, err
}
@@ -26,21 +27,21 @@ func NewBigQuerySink(ctx context.Context, project string, dataset string) (*BigQ

// Receive is called when the server receive an event to process.
func (b *BigQuerySink) Receive(ctx context.Context, unparsed *listener.Packet, packet pb.Packet) error {
records := make(map[string][]*Record)
records := make(map[string][]*bigquery.Record)
correctedStart := unparsed.ReceivedAt.Add(time.Duration(-packet.SendOffsetNs) * time.Nanosecond)

for _, event := range packet.Events {
eventTime := correctedStart.Add(time.Duration(event.TimestampOffsetNs) * time.Nanosecond)
correction := correctedStart.Sub(packet.StartTimestamp.AsTime())

k := tableName(event)
k := bigquery.TableName(event)

records[k] = append(records[k], &Record{
Application: Application{
records[k] = append(records[k], &bigquery.Record{
Application: bigquery.Application{
Name: packet.Application,
Version: packet.ApplicationVersion,
},
Source: Source{
Source: bigquery.Source{
Instance: packet.Instance,
Address: unparsed.Source.IP.String(),
},
45 changes: 0 additions & 45 deletions eventkitd-bigquery/go.mod

This file was deleted.

Loading

0 comments on commit e871f0a

Please sign in to comment.