Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement a persistent journal for lotus node operations #2101

Merged
merged 5 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -324,6 +325,14 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}

journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
})

// reverse the apply array
for i := len(apply)/2 - 1; i >= 0; i-- {
opp := len(apply) - 1 - i
Expand Down
146 changes: 146 additions & 0 deletions journal/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package journal

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
)

func InitializeSystemJournal(dir string) error {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
j, err := OpenFSJournal(dir)
if err != nil {
return err
}
currentJournal = j
return nil
}

func Add(sys string, val interface{}) {
if currentJournal == nil {
log.Warn("no journal configured")
return
}
currentJournal.AddEntry(sys, val)
}

var log = logging.Logger("journal")

var currentJournal Journal

type Journal interface {
AddEntry(system string, obj interface{})
Close() error
}

// fsJournal is a basic journal backed by files on a filesystem
type fsJournal struct {
fi *os.File
fSize int64

lk sync.Mutex

journalDir string

incoming chan *JournalEntry
journalSizeLimit int64

closing chan struct{}
}

func OpenFSJournal(dir string) (*fsJournal, error) {
fsj := &fsJournal{
journalDir: dir,
incoming: make(chan *JournalEntry, 32),
journalSizeLimit: 1 << 30,
closing: make(chan struct{}),
}

if err := fsj.rollJournalFile(); err != nil {
return nil, err
}

go fsj.runLoop()

return fsj, nil
}

type JournalEntry struct {
System string
Timestamp time.Time
Val interface{}
}

func (fsj *fsJournal) putEntry(je *JournalEntry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := fsj.fi.Write(append(b, '\n'))
if err != nil {
return err
}

fsj.fSize += int64(n)

if fsj.fSize >= fsj.journalSizeLimit {
fsj.rollJournalFile()
}

return nil
}

func (fsj *fsJournal) rollJournalFile() error {
if fsj.fi != nil {
fsj.fi.Close()
}

nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", time.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}

fsj.fi = nfi
fsj.fSize = 0
return nil
}

func (fsj *fsJournal) runLoop() {
for {
select {
case je := <-fsj.incoming:
if err := fsj.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-fsj.closing:
fsj.fi.Close()
return
}
}
}

func (fsj *fsJournal) AddEntry(system string, obj interface{}) {
je := &JournalEntry{
System: system,
Timestamp: time.Now(),
Val: obj,
}
select {
case fsj.incoming <- je:
case <-fsj.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}

func (fsj *fsJournal) Close() error {
close(fsj.closing)
return nil
}
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const (
ExtractApiKey
HeadMetricsKey
RunPeerTaggerKey
JournalKey

SetApiEndpointKey

Expand Down Expand Up @@ -150,6 +151,7 @@ func defaults() []Option {
Override(new(record.Validator), modules.RecordValidator),
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
Override(new(dtypes.ShutdownChan), make(chan struct{})),
Override(JournalKey, modules.SetupJournal),

// Filecoin modules

Expand Down
6 changes: 6 additions & 0 deletions node/modules/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"io"
"io/ioutil"
"path/filepath"

"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/addrutil"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
Expand Down Expand Up @@ -93,3 +95,7 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
func DrandBootstrap() (dtypes.DrandBootstrap, error) {
return build.DrandBootstrap()
}

func SetupJournal(lr repo.LockedRepo) error {
return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal"))
}