Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into feat/integrate-act…
Browse files Browse the repository at this point in the history
…ivitypub-mastodon-worker
  • Loading branch information
FrankLi123 committed Jul 24, 2024
2 parents 6f97bc6 + 73cd9b9 commit e496004
Show file tree
Hide file tree
Showing 25 changed files with 4,359 additions and 226 deletions.
98 changes: 91 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"

"github.com/grafana/pyroscope-go"
"github.com/redis/rueidis"
"github.com/rss3-network/node/config"
"github.com/rss3-network/node/config/flag"
"github.com/rss3-network/node/config/parameter"
"github.com/rss3-network/node/internal/constant"
"github.com/rss3-network/node/internal/database"
"github.com/rss3-network/node/internal/database/dialer"
Expand All @@ -19,6 +22,7 @@ import (
"github.com/rss3-network/node/internal/node/monitor"
"github.com/rss3-network/node/internal/stream"
"github.com/rss3-network/node/internal/stream/provider"
"github.com/rss3-network/node/provider/ethereum/contract/vsl"
"github.com/rss3-network/node/provider/redis"
"github.com/rss3-network/node/provider/telemetry"
"github.com/samber/lo"
Expand Down Expand Up @@ -74,6 +78,11 @@ var command = cobra.Command{

module := lo.Must(flags.GetString(flag.KeyModule))

var networkParamsCaller *vsl.NetworkParamsCaller

var settlementCaller *vsl.SettlementCaller

// Apply database migrations for all modules except the broadcaster.
if module != BroadcasterArg && len(config.Component.Decentralized) > 0 {
databaseClient, err = dialer.Dial(cmd.Context(), config.Database)
if err != nil {
Expand All @@ -89,27 +98,102 @@ var command = cobra.Command{
if err != nil {
return fmt.Errorf("new redis client: %w", err)
}

vslClient, err := parameter.InitVSLClient()
if err != nil {
return fmt.Errorf("init vsl client: %w", err)
}

networkParamsCaller, err = vsl.NewNetworkParamsCaller(vsl.AddressNetworkParams, vslClient)
if err != nil {
return fmt.Errorf("new network params caller: %w", err)
}

settlementCaller, err = vsl.NewSettlementCaller(vsl.AddressSettlement, vslClient)
if err != nil {
return fmt.Errorf("new settlement caller: %w", err)
}

epoch, err := parameter.GetCurrentEpochFromVSL(settlementCaller)
if err != nil {
return fmt.Errorf("get current epoch: %w", err)
}

// save epoch to redis cache
err = parameter.UpdateCurrentEpoch(cmd.Context(), redisClient, epoch)
if err != nil {
return fmt.Errorf("update current epoch: %w", err)
}

// when start or restart the core, worker or monitor module, it will pull network parameters from VSL and record current epoch
err = parameter.PullNetworkParamsFromVSL(networkParamsCaller, uint64(epoch))
if err != nil {
zap.L().Error("pull network parameters from VSL", zap.Error(err))
}

for network, blockStart := range parameter.CurrentNetworkStartBlock {
if blockStart == nil {
continue // Skip if the start block is not defined.
}

// Convert big.Int to int64; safe as long as the value fits in int64.
blockStartInt64 := blockStart.Int64()

// Update the current block start for the network in Redis.
err := parameter.UpdateBlockStart(cmd.Context(), redisClient, network.String(), blockStartInt64)
if err != nil {
return fmt.Errorf("update current block start: %w", err)
}
}
}

switch module {
case CoreServiceArg:
return runCoreService(cmd.Context(), config, databaseClient, redisClient)
return runCoreService(cmd.Context(), config, databaseClient, redisClient, networkParamsCaller, settlementCaller)
case WorkerArg:
return runWorker(cmd.Context(), config, databaseClient, streamClient, redisClient)
case BroadcasterArg:
return runBroadcaster(cmd.Context(), config)
case MonitorArg:
return runMonitor(cmd.Context(), config, databaseClient, redisClient)
return runMonitor(cmd.Context(), config, databaseClient, redisClient, networkParamsCaller, settlementCaller)
}

return fmt.Errorf("unsupported module %s", lo.Must(flags.GetString(flag.KeyModule)))
},
}

func runCoreService(ctx context.Context, config *config.File, databaseClient database.Client, redisClient rueidis.Client) error {
server := node.NewCoreService(ctx, config, databaseClient, redisClient)
func runCoreService(ctx context.Context, config *config.File, databaseClient database.Client, redisClient rueidis.Client, networkParamsCaller *vsl.NetworkParamsCaller, settlementCaller *vsl.SettlementCaller) error {
server := node.NewCoreService(ctx, config, databaseClient, redisClient, networkParamsCaller, settlementCaller)

return server.Run(ctx)
checkCtx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
if err := node.CheckParams(checkCtx, redisClient, networkParamsCaller, settlementCaller); err != nil {
fmt.Printf("Error checking parameters: %v\n", err)
}
}()

apiErrChan := make(chan error, 1)
go func() {
apiErrChan <- server.Run(ctx)
}()

// Set up signal handling
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)

select {
case sig := <-stopChan:
fmt.Printf("Shutdown signal received: %v.\n", sig)
case err := <-apiErrChan:
cancel() // signal all goroutines to stop on error
return err
}

cancel() // ensure cancellation if exiting normally

return nil
}

// findModuleByID find and returns the specified worker ID in all components.
Expand Down Expand Up @@ -171,8 +255,8 @@ func runBroadcaster(ctx context.Context, config *config.File) error {
return server.Run(ctx)
}

func runMonitor(ctx context.Context, config *config.File, databaseClient database.Client, redisClient rueidis.Client) error {
server, err := monitor.NewMonitor(ctx, config, databaseClient, redisClient)
func runMonitor(ctx context.Context, config *config.File, databaseClient database.Client, redisClient rueidis.Client, networkParamsCaller *vsl.NetworkParamsCaller, settlementCaller *vsl.SettlementCaller) error {
server, err := monitor.NewMonitor(ctx, config, databaseClient, redisClient, networkParamsCaller, settlementCaller)
if err != nil {
return fmt.Errorf("new monitor: %w", err)
}
Expand Down
25 changes: 25 additions & 0 deletions config/parameter/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package parameter

import (
"math/big"

"github.com/rss3-network/protocol-go/schema/network"
)

// NumberOfMonthsToCover the number of months that a Node should cover data for
const NumberOfMonthsToCover = 3

type NetworkTolerance map[network.Network]uint64
type NetworkStartBlock map[network.Network]*big.Int
type NetworkCoreWorkerDiskSpacePerMonth map[network.Network]uint

// CurrentNetworkTolerance should be updated each epoch from vsl
var CurrentNetworkTolerance = NetworkTolerance{}

// CurrentNetworkStartBlock should be updated each epoch from vsl
var CurrentNetworkStartBlock = NetworkStartBlock{}

// CurrentNetworkCoreWorkerDiskSpacePerMonth the disk space required for the network's core worker to store a month worth of data
// The data is calculated based on the average disk space usage during 2024 Q1.
// Actually usage may vary depending on the network's activity.
var CurrentNetworkCoreWorkerDiskSpacePerMonth = NetworkCoreWorkerDiskSpacePerMonth{}
Loading

0 comments on commit e496004

Please sign in to comment.