Skip to content

Commit

Permalink
feat: add async logs blocks (#667)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Feb 26, 2025
1 parent 2b5becf commit e953c93
Show file tree
Hide file tree
Showing 19 changed files with 455 additions and 25 deletions.
12 changes: 10 additions & 2 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/ledger/internal/api/common"
systemstore "github.com/formancehq/ledger/internal/storage/system"
"github.com/formancehq/ledger/internal/worker"
"net/http"
"net/http/pprof"
"time"
Expand Down Expand Up @@ -97,7 +98,9 @@ func NewServeCommand() *cobra.Command {
publish.FXModuleFromFlags(cmd, service.IsDebug(cmd)),
auth.FXModuleFromFlags(cmd),
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
storage.NewFXModule(serveConfiguration.autoUpgrade),
storage.NewFXModule(storage.ModuleConfig{
AutoUpgrade: serveConfiguration.autoUpgrade,
}),
systemcontroller.NewFXModule(systemcontroller.ModuleConfiguration{
NumscriptInterpreter: numscriptInterpreter,
NumscriptInterpreterFlags: numscriptInterpreterFlags,
Expand Down Expand Up @@ -152,7 +155,10 @@ func NewServeCommand() *cobra.Command {

workerEnabled, _ := cmd.Flags().GetBool(WorkerEnabledFlag)
if workerEnabled {
options = append(options, newWorkerModule())
options = append(options, worker.NewFXModule(worker.ModuleConfig{
Schedule: serveConfiguration.workerConfiguration.hashLogsBlockCRONSpec,
MaxBlockSize: serveConfiguration.workerConfiguration.hashLogsBlockMaxSize,
}))
}

return service.New(cmd.OutOrStdout(), options...).Run(cmd)
Expand Down Expand Up @@ -190,6 +196,7 @@ type serveConfiguration struct {
numscriptCacheMaxCount uint
autoUpgrade bool
bind string
workerConfiguration workerConfiguration
}

func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
Expand All @@ -198,6 +205,7 @@ func discoverServeConfiguration(cmd *cobra.Command) serveConfiguration {
ret.numscriptCacheMaxCount, _ = cmd.Flags().GetUint(NumscriptCacheMaxCountFlag)
ret.autoUpgrade, _ = cmd.Flags().GetBool(AutoUpgradeFlag)
ret.bind, _ = cmd.Flags().GetString(BindFlag)
ret.workerConfiguration = discoverWorkerConfiguration(cmd)

return ret
}
Expand Down
36 changes: 30 additions & 6 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,32 @@ import (
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/ledger/internal/storage"
"github.com/formancehq/ledger/internal/worker"
"github.com/spf13/cobra"
"go.uber.org/fx"
)

const (
WorkerAsyncBlockHasherMaxBlockSizeFlag = "worker-async-block-hasher-max-block-size"
WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule"
)

type workerConfiguration struct {
hashLogsBlockMaxSize int
hashLogsBlockCRONSpec string
}

func discoverWorkerConfiguration(cmd *cobra.Command) workerConfiguration {
ret := workerConfiguration{}
ret.hashLogsBlockCRONSpec, _ = cmd.Flags().GetString(WorkerAsyncBlockHasherScheduleFlag)
ret.hashLogsBlockMaxSize, _ = cmd.Flags().GetInt(WorkerAsyncBlockHasherMaxBlockSizeFlag)

return ret
}

func addWorkerFlags(cmd *cobra.Command) {
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size")
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule")
}

func NewWorkerCommand() *cobra.Command {
Expand All @@ -24,14 +45,21 @@ func NewWorkerCommand() *cobra.Command {
return err
}

workerConfiguration := discoverWorkerConfiguration(cmd)

return service.New(cmd.OutOrStdout(),
fx.NopLogger,
otlp.FXModuleFromFlags(cmd),
otlptraces.FXModuleFromFlags(cmd),
otlpmetrics.FXModuleFromFlags(cmd),
bunconnect.Module(*connectionOptions, service.IsDebug(cmd)),
storage.NewFXModule(false),
newWorkerModule(),
storage.NewFXModule(storage.ModuleConfig{
// todo
}),
worker.NewFXModule(worker.ModuleConfig{
MaxBlockSize: workerConfiguration.hashLogsBlockMaxSize,
Schedule: workerConfiguration.hashLogsBlockCRONSpec,
}),
).Run(cmd)
},
}
Expand All @@ -44,7 +72,3 @@ func NewWorkerCommand() *cobra.Command {

return cmd
}

func newWorkerModule() fx.Option {
return fx.Options()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/onsi/gomega v1.36.2
github.com/ory/dockertest/v3 v3.11.0
github.com/pborman/uuid v1.2.1
github.com/robfig/cron/v3 v3.0.1
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/riandyrn/otelchi v0.12.1 h1:FdRKK3/RgZ/T+d+qTH5Uw3MFx0KwRF38SkdfTMMq/m8=
github.com/riandyrn/otelchi v0.12.1/go.mod h1:weZZeUJURvtCcbWsdb7Y6F8KFZGedJlSrgUjq9VirV8=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ func newVmStoreAdapter(tx Store) *vmStoreAdapter {
}
}

type ListLedgersQuery bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[struct{}]]
type ListLedgerQueryPayload struct {
Features map[string]string
}

type ListLedgersQuery bunpaginate.OffsetPaginatedQuery[PaginatedQueryOptions[ListLedgerQueryPayload]]

func NewListLedgersQuery(pageSize uint64) ListLedgersQuery {
return ListLedgersQuery{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Create async hash log procedure
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
set search_path = '{{.Schema}}';

create table logs_blocks (
id serial,
previous bigint primary key,
ledger varchar,
from_id bigint,
to_id bigint,
hash bytea,
date timestamp without time zone
);

create type block as (
max_log_id bigint,
block_id bigint,
hash bytea
);

create or replace function create_block(_ledger varchar, max_block_size integer, previous_block block) returns block
language plpgsql
as $$
declare
hash bytea;
max_log_id bigint;
new_block_id bigint;
begin
select max(id), public.digest(coalesce(previous_block.hash, '') || string_agg(
type ||
encode(memento, 'escape') ||
(to_json(date::timestamp)#>>'{}') ||
coalesce(idempotency_key, '') ||
id,
''
), 'sha256'::text)
from (
select *
from logs
where id > previous_block.max_log_id and ledger = _ledger
order by id
limit max_block_size
) logs
into max_log_id, hash
;
if max_log_id is null then
return (0, 0, null)::block;
end if;

insert into logs_blocks (ledger, previous, from_id, to_id, hash, date)
values (_ledger, previous_block.block_id, previous_block.max_log_id, max_log_id, hash, now())
returning id
into new_block_id;

return (max_log_id, new_block_id, hash)::block;
end;
$$ set search_path from current;

create or replace procedure create_blocks(_ledger varchar, max_block_size integer)
language plpgsql
as $$
declare
previous_block block;
begin
select to_id, id, hash
from logs_blocks
where ledger = _ledger
order by previous desc
limit 1
into previous_block;

if not found then
previous_block = (0, 0, null)::block;
end if;

loop
select *
from create_block(_ledger, max_block_size, coalesce(previous_block, (0,0, null)::block)) v
into previous_block;
if previous_block.max_log_id = 0 then
return;
end if;
end loop;
end;
$$ set search_path from current;
1 change: 1 addition & 0 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (d *Driver) Initialize(ctx context.Context) error {
func (d *Driver) detectRollbacks(ctx context.Context) error {

logging.FromContext(ctx).Debugf("Checking for downgrades on system schema")

if err := detectDowngrades(d.systemStore.GetMigrator(), ctx); err != nil {
return fmt.Errorf("detecting rollbacks of system schema: %w", err)
}
Expand Down
5 changes: 4 additions & 1 deletion internal/storage/ledger/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
func TestLogsInsert(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

t.Run("check hash against core", func(t *testing.T) {
// Insert a first tx (we don't have any previous hash to use at this moment)
store := newLedgerStore(t)
log1 := ledger.NewLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
Expand Down Expand Up @@ -70,6 +70,7 @@ func TestLogsInsert(t *testing.T) {

t.Run("duplicate IK", func(t *testing.T) {
// Insert a first tx (we don't have any previous hash to use at this moment)
store := newLedgerStore(t)
logTx := ledger.NewLog(ledger.CreatedTransaction{
Transaction: ledger.NewTransaction(),
AccountMetadata: ledger.AccountMetadata{},
Expand All @@ -95,6 +96,7 @@ func TestLogsInsert(t *testing.T) {

t.Run("hash consistency over high concurrency", func(t *testing.T) {
errGroup, _ := errgroup.WithContext(ctx)
store := newLedgerStore(t)
const countLogs = 50
for range countLogs {
errGroup.Go(func() error {
Expand Down Expand Up @@ -151,6 +153,7 @@ func TestLogsInsert(t *testing.T) {
{name: "with utf-8 characters", metadata: map[string]string{"rate": "½"}},
}

store := newLedgerStore(t)
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
Expand Down
19 changes: 14 additions & 5 deletions internal/storage/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ var (
func TestMain(m *testing.M) {
WithTestMain(func(t *TestingTForMain) int {
srv.LoadAsync(func() *pgtesting.PostgresServer {
ret := pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto"))
ret := pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()),
pgtesting.WithExtension("pgcrypto"),
)

defaultBunDB.LoadAsync(func() *bun.DB {
db, err := sql.Open("pgx", ret.GetDSN())
Expand Down Expand Up @@ -71,7 +73,7 @@ type T interface {
Cleanup(func())
}

func newLedgerStore(t T) *ledgerstore.Store {
func newLedgerStore(t T, opts ...func(cfg *ledger.Configuration)) *ledgerstore.Store {
t.Helper()

<-defaultBunDB.Done()
Expand All @@ -80,11 +82,18 @@ func newLedgerStore(t T) *ledgerstore.Store {
ledgerName := uuid.NewString()[:8]
ctx := logging.TestingContext()

l := ledger.MustNewWithDefault(ledgerName)
err := bucket.GetMigrator(defaultBunDB.GetValue(), "_default").Up(ctx)
ledgerConfiguration := ledger.NewDefaultConfiguration()
for _, opt := range opts {
opt(&ledgerConfiguration)
}

l, err := ledger.New(ledgerName, ledgerConfiguration)
require.NoError(t, err)

err = bucket.GetMigrator(defaultBunDB.GetValue(), "_default").Up(ctx)
require.NoError(t, err)

store, err := defaultDriver.GetValue().CreateLedger(ctx, &l)
store, err := defaultDriver.GetValue().CreateLedger(ctx, l)
require.NoError(t, err)

return store
Expand Down
8 changes: 6 additions & 2 deletions internal/storage/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import (

const HealthCheckName = `storage-driver-up-to-date`

func NewFXModule(autoUpgrade bool) fx.Option {
type ModuleConfig struct {
AutoUpgrade bool
}

func NewFXModule(config ModuleConfig) fx.Option {
ret := []fx.Option{
driver.NewFXModule(),
health.ProvideHealthCheck(func(driver *driver.Driver) health.NamedCheck {
Expand All @@ -32,7 +36,7 @@ func NewFXModule(autoUpgrade bool) fx.Option {
}))
}),
}
if autoUpgrade {
if config.AutoUpgrade {
ret = append(ret,
fx.Invoke(func(lc fx.Lifecycle, driver *driver.Driver) {
var (
Expand Down
10 changes: 8 additions & 2 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@ func (d *DefaultStore) ListLedgers(ctx context.Context, q ledgercontroller.ListL
Column("*").
Order("added_at asc")

return bunpaginate.UsingOffset[ledgercontroller.PaginatedQueryOptions[struct{}], ledger.Ledger](
if len(q.Options.Options.Features) > 0 {
for key, value := range q.Options.Options.Features {
query = query.Where("features->>? = ?", key, value)
}
}

return bunpaginate.UsingOffset[ledgercontroller.PaginatedQueryOptions[ledgercontroller.ListLedgerQueryPayload], ledger.Ledger](
ctx,
query,
bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[struct{}]](q),
bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[ledgercontroller.ListLedgerQueryPayload]](q),
)
}

Expand Down
Loading

0 comments on commit e953c93

Please sign in to comment.