Skip to content

Commit

Permalink
Add malfeasance publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Aug 29, 2024
1 parent 64bb6ed commit 6f31673
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 49 deletions.
7 changes: 1 addition & 6 deletions activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ type nipostValidatorV1 interface {
opts ...validatorOption,
) error

VRFNonce(
nodeId types.NodeID,
commitmentAtxId types.ATXID,
vrfNonce, labelsPerUnit uint64,
numUnits uint32,
) error
VRFNonce(nodeId types.NodeID, commitmentAtxId types.ATXID, vrfNonce, labelsPerUnit uint64, numUnits uint32) error
PositioningAtx(id types.ATXID, atxs atxProvider, goldenATXID types.ATXID, pubepoch types.EpochID) error
}

Expand Down
12 changes: 11 additions & 1 deletion activation/malfeasance2_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@ import (
)

// ATXMalfeasancePublisher is the publisher for ATX proofs.
type ATXMalfeasancePublisher struct{}
type ATXMalfeasancePublisher struct {
malPublisher malfeasancePublisher
}

func NewATXMalfeasancePublisher(
malPublisher malfeasancePublisher,
) *ATXMalfeasancePublisher {
return &ATXMalfeasancePublisher{
malPublisher: malPublisher,
}
}

func (p *ATXMalfeasancePublisher) Publish(ctx context.Context, id types.NodeID, proof wire.Proof) error {
// TODO(mafa): implement me
Expand Down
2 changes: 1 addition & 1 deletion hare3/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (h *Hare) Running() int {
return len(h.sessions)
}

func (h *Hare) Handler(ctx context.Context, peer p2p.Peer, buf []byte) error {
func (h *Hare) Handler(ctx context.Context, _ p2p.Peer, buf []byte) error {
msg := &Message{}
if err := codec.Decode(buf, msg); err != nil {
malformedError.Inc()
Expand Down
12 changes: 0 additions & 12 deletions malfeasance/wire/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ type Proof struct {

type ProofData interface {
scale.Type

isProof()
}

func (e *Proof) EncodeScale(enc *scale.Encoder) (int, error) {
Expand Down Expand Up @@ -199,8 +197,6 @@ type AtxProof struct {
Messages [2]AtxProofMsg
}

func (ap *AtxProof) isProof() {}

func (ap *AtxProof) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddObject("first", &ap.Messages[0].InnerMsg)
encoder.AddObject("second", &ap.Messages[1].InnerMsg)
Expand All @@ -211,8 +207,6 @@ type BallotProof struct {
Messages [2]BallotProofMsg
}

func (bp *BallotProof) isProof() {}

func (bp *BallotProof) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddObject("first", &bp.Messages[0].InnerMsg)
encoder.AddObject("second", &bp.Messages[1].InnerMsg)
Expand All @@ -223,8 +217,6 @@ type HareProof struct {
Messages [2]HareProofMsg
}

func (hp *HareProof) isProof() {}

func (hp *HareProof) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddObject("first", &hp.Messages[0].InnerMsg)
encoder.AddObject("second", &hp.Messages[1].InnerMsg)
Expand Down Expand Up @@ -260,8 +252,6 @@ type InvalidPostIndexProof struct {
InvalidIdx uint32
}

func (p *InvalidPostIndexProof) isProof() {}

type BallotProofMsg struct {
InnerMsg types.BallotMetadata

Expand Down Expand Up @@ -322,8 +312,6 @@ type InvalidPrevATXProof struct {
Atx2 wire.ActivationTxV1
}

func (p *InvalidPrevATXProof) isProof() {}

func MalfeasanceInfo(smesher types.NodeID, mp *MalfeasanceProof) string {
var b strings.Builder
b.WriteString(fmt.Sprintf("generate layer: %v\n", mp.Layer))
Expand Down
83 changes: 83 additions & 0 deletions malfeasance2/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package malfeasance2

import (
"context"
"fmt"
"time"

"go.uber.org/zap"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/sql/identities"
"github.com/spacemeshos/go-spacemesh/sql/malfeasance"
)

type Publisher struct {
logger *zap.Logger
cdb *datastore.CachedDB
tortoise tortoise
publisher pubsub.Publisher
}

func NewPublisher(
logger *zap.Logger,
cdb *datastore.CachedDB,
tortoise tortoise,
publisher pubsub.Publisher,
) *Publisher {
return &Publisher{
logger: logger,
cdb: cdb,
tortoise: tortoise,
publisher: publisher,
}
}

func (p *Publisher) PublishV2ATXProof(
ctx context.Context,
smesherIDs []types.NodeID,
domain ProofDomain,
proof []byte,
) error {
// Combine IDs from the present equivocation set for atx.SmesherID and IDs in atx.Marriages.
allMalicious := make(map[types.NodeID]struct{})

for _, id := range smesherIDs {
set, err := identities.EquivocationSet(p.cdb, id)
if err != nil {
return fmt.Errorf("getting equivocation set: %w", err)
}
for _, id := range set {
allMalicious[id] = struct{}{}
}
for _, id := range smesherIDs {
allMalicious[id] = struct{}{}
}
}

for id := range allMalicious {
if err := malfeasance.Add(p.cdb, id, byte(domain), proof, time.Now()); err != nil {
return fmt.Errorf("setting malfeasance proof: %w", err)
}
// TODO(mafa): cache proof
// p.cdb.CacheMalfeasanceProof(id, proof)
p.tortoise.OnMalfeasance(id)
}

// TODO(mafa): check if we are in sync before publishing, if not just return

malfeasanceProof := &MalfeasanceProof{
Version: 0,
Domain: domain,
Proof: proof,
}
if err := p.publisher.Publish(ctx, pubsub.MalfeasanceProof2, codec.MustEncode(malfeasanceProof)); err != nil {
p.logger.Error("failed to broadcast malfeasance proof", zap.Error(err))
return fmt.Errorf("broadcast atx malfeasance proof: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions malfeasance2/publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package malfeasance2

// TODO(mafa): implement me
75 changes: 46 additions & 29 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,41 @@ func (app *App) initServices(ctx context.Context) error {
return nil
})

fetcherWrapped := &layerFetcher{}
proposalsStore := store.New(
store.WithEvictedLayer(app.clock.CurrentLayer()),
store.WithLogger(app.addLogger(ProposalStoreLogger, lg).Zap()),
store.WithCapacity(app.Config.Tortoise.Zdist+1),
)

flog := app.addLogger(Fetcher, lg)
fetcher := fetch.NewFetch(app.cachedDB, proposalsStore, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(flog.Zap()),
)
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

malfeasanceLogger := app.addLogger(MalfeasanceLogger, lg).Zap()
// malfeasancePublisher := malfeasance.NewPublisher(
// malfeasanceLogger,
// app.cachedDB,
// trtl,
// app.host,
// )

// malfeasancePublisher2 := malfeasance2.NewPublisher(
// malfeasanceLogger,
// app.cachedDB,
// trtl,
// app.host,
// )

// atxMalPublisher := activation.NewATXMalfeasancePublisher(
// malfeasancePublisher,
// malfeasancePublisher2,
// )

atxHandler := activation.NewHandler(
app.host.ID(),
Expand All @@ -727,10 +761,11 @@ func (app *App) initServices(ctx context.Context) error {
app.edVerifier,
app.clock,
app.host,
fetcherWrapped,
fetcher,
goldenATXID,
validator,
beaconProtocol,
// atxMalPublisher,
trtl,
app.addLogger(ATXHandlerLogger, lg).Zap(),
activation.WithTickSize(app.Config.TickSize),
Expand All @@ -741,7 +776,6 @@ func (app *App) initServices(ctx context.Context) error {
}

// we can't have an epoch offset which is greater/equal than the number of layers in an epoch

if app.Config.HareEligibility.ConfidenceParam >= app.Config.BaseConfig.LayersPerEpoch {
return fmt.Errorf(
"confidence param should be smaller than layers per epoch. eligibility-confidence-param: %d. "+
Expand All @@ -751,8 +785,13 @@ func (app *App) initServices(ctx context.Context) error {
)
}

blockHandler := blocks.NewHandler(fetcherWrapped, app.db, trtl, msh,
blocks.WithLogger(app.addLogger(BlockHandlerLogger, lg).Zap()))
blockHandler := blocks.NewHandler(
fetcher,
app.db,
trtl,
msh,
blocks.WithLogger(app.addLogger(BlockHandlerLogger, lg).Zap()),
)

app.txHandler = txs.NewTxHandler(
app.conState,
Expand Down Expand Up @@ -802,23 +841,6 @@ func (app *App) initServices(ctx context.Context) error {
app.certifier.Register(sig)
}

proposalsStore := store.New(
store.WithEvictedLayer(app.clock.CurrentLayer()),
store.WithLogger(app.addLogger(ProposalStoreLogger, lg).Zap()),
store.WithCapacity(app.Config.Tortoise.Zdist+1),
)

flog := app.addLogger(Fetcher, lg)
fetcher := fetch.NewFetch(app.cachedDB, proposalsStore, app.host,
fetch.WithContext(ctx),
fetch.WithConfig(app.Config.FETCH),
fetch.WithLogger(flog.Zap()),
)
fetcherWrapped.Fetcher = fetcher
app.eg.Go(func() error {
return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher)
})

patrol := layerpatrol.New()
syncerConf := app.Config.Sync
syncerConf.HareDelayLayers = app.Config.Tortoise.Zdist
Expand Down Expand Up @@ -936,7 +958,7 @@ func (app *App) initServices(ctx context.Context) error {
propHare,
app.edVerifier,
app.host,
fetcherWrapped,
fetcher,
beaconProtocol,
msh,
trtl,
Expand All @@ -959,7 +981,7 @@ func (app *App) initServices(ctx context.Context) error {
proposalsStore,
executor,
msh,
fetcherWrapped,
fetcher,
app.certifier,
patrol,
blocks.WithConfig(blocks.Config{
Expand Down Expand Up @@ -1101,7 +1123,6 @@ func (app *App) initServices(ctx context.Context) error {
return fmt.Errorf("init post service: %w", err)
}

malfeasanceLogger := app.addLogger(MalfeasanceLogger, lg).Zap()
activationMH := activation.NewMalfeasanceHandler(
app.cachedDB,
malfeasanceLogger,
Expand Down Expand Up @@ -2255,10 +2276,6 @@ func (app *App) Host() *p2p.Host {
return app.host
}

type layerFetcher struct {
system.Fetcher
}

func decodeLoggerLevel(cfg *config.Config, name string) (zap.AtomicLevel, error) {
lvl := zap.NewAtomicLevel()
loggers := map[string]string{}
Expand Down

0 comments on commit 6f31673

Please sign in to comment.