Skip to content

Commit

Permalink
Merge pull request #2 from Enlighten-Fund/revert-1-dong-buffer
Browse files Browse the repository at this point in the history
Revert "Implement buffered write"
  • Loading branch information
0xDong committed Dec 29, 2022
2 parents 5e636a6 + 88f80a5 commit c0a58a3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 46 deletions.
3 changes: 2 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
if err != nil {
return nil, nil, 0, fmt.Errorf("create parity logger failed: %w", err)
}
defer tracer.Close()
vm.GlobalParityLogger = tracer
cfg.Debug = true
cfg.Tracer = tracer
Expand All @@ -95,6 +96,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
if err != nil {
return nil, nil, 0, fmt.Errorf("create tx logger failed: %w", err)
}
defer txLogger.Close()

// Mutate the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand Down Expand Up @@ -165,7 +167,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}

vm.ReceiptDumpLogger(block.NumberU64(), 100000, 1000, receiptsToDump)
vm.TryClearBuffer(block.NumberU64())
return receipts, allLogs, *usedGas, nil
}

Expand Down
91 changes: 46 additions & 45 deletions core/vm/dump_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,54 +31,23 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

const cleanBufferPeriod uint64 = 1000

var buffer = make(map[string]*strings.Builder)
var lastBufferClearTime = time.Time{}

func TryClearBuffer(blockNumber uint64) error {
curTime := time.Now()
elapsedTime := curTime.Sub(lastBufferClearTime)
if elapsedTime.Seconds() > 5.0 || (blockNumber+1)%cleanBufferPeriod == 0 {
defer func(start time.Time) {
fmt.Printf("Write to disk. Block_number = %v ,cost time = %v\n", strconv.FormatUint(blockNumber, 10), time.Since(start))
}(time.Now())
for logPath, sb := range buffer {
if err := os.MkdirAll(path.Dir(logPath), 0755); err != nil {
return fmt.Errorf("mkdir for all parents [%v] failed: %w", path.Dir(logPath), err)
}
file, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
if err != nil {
return fmt.Errorf("create file %s failed: %w", logPath, err)
}
if _, err := file.WriteString(sb.String()); err != nil {
return err
}
if err := file.Close(); err != nil {
return err
}
}
buffer = make(map[string]*strings.Builder)
lastBufferClearTime = curTime
}
return nil
}

func getFile(taskName string, blockNumber uint64, perFolder, perFile uint64) (*strings.Builder, error) {
func getFile(taskName string, blockNumber uint64, perFolder, perFile uint64) (*os.File, error) {
cwd, err := os.Getwd()
if err != nil {
return nil, fmt.Errorf("get current work dir failed: %w", err)
}

logPath := path.Join(cwd, taskName, strconv.FormatUint(blockNumber/perFolder, 10), strconv.FormatUint(blockNumber/perFile, 10)+".log")
fmt.Printf("log path: %v, block: %v\n", logPath, blockNumber)
if err := os.MkdirAll(path.Dir(logPath), 0755); err != nil {
return nil, fmt.Errorf("mkdir for all parents [%v] failed: %w", path.Dir(logPath), err)
}

sb, ok := buffer[logPath]
if !ok {
sb = &strings.Builder{}
buffer[logPath] = sb
file, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
if err != nil {
return nil, fmt.Errorf("create file %s failed: %w", logPath, err)
}
return sb, nil
return file, nil
}

type ParityTraceItemAction struct {
Expand Down Expand Up @@ -122,6 +91,7 @@ type ParityLogger struct {
sb *strings.Builder
encoder *json.Encoder
activePrecompiles []common.Address
file *os.File
stack []*ParityTraceItem
items []*ParityTraceItem
}
Expand All @@ -132,17 +102,29 @@ var GlobalParityLogger *ParityLogger
// NewParityLogger creates a new EVM tracer that prints execution steps as parity trace format
// into the provided stream.
func NewParityLogger(ctx *ParityLogContext, blockNumber uint64, perFolder, perFile uint64) (*ParityLogger, error) {
sb, err := getFile("traces", blockNumber, perFolder, perFile)
file, err := getFile("traces", blockNumber, perFolder, perFile)
if err != nil {
return nil, err
}
l := &ParityLogger{context: ctx, sb: sb, encoder: json.NewEncoder(sb)}

sb := &strings.Builder{}
l := &ParityLogger{context: ctx, sb: sb, encoder: json.NewEncoder(sb), file: file}
if l.context == nil {
l.context = &ParityLogContext{}
}
return l, nil
}

func (l *ParityLogger) Close() error {
defer func(start time.Time) {
fmt.Printf("Write traces to file. Cost time = %v\n", time.Since(start))
}(time.Now())
if _, err := l.file.WriteString(l.sb.String()); err != nil {
return err
}
return l.file.Close()
}

func (l *ParityLogger) CaptureStart(env *EVM, from, to common.Address, create bool, input []byte, gas uint64, value *big.Int) {
//rules := env.ChainConfig().Rules(env.Context.BlockNumber)
rules := env.ChainConfig().Rules(env.Context.BlockNumber, env.Context.Random != nil)
Expand Down Expand Up @@ -246,10 +228,12 @@ func ReceiptDumpLogger(blockNumber uint64, perFolder, perFile uint64, receipts t
defer func(start time.Time) {
fmt.Printf("Dump receipt, block_number = %v ,cost time = %v\n", strconv.FormatUint(blockNumber, 10), time.Since(start))
}(time.Now())
sb, err := getFile("receipts", blockNumber, perFolder, perFile)
file, err := getFile("receipts", blockNumber, perFolder, perFile)
if err != nil {
return err
}

sb := &strings.Builder{}
encoder := json.NewEncoder(sb)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
Expand All @@ -260,27 +244,33 @@ func ReceiptDumpLogger(blockNumber uint64, perFolder, perFile uint64, receipts t
}
}
}
if _, err := file.WriteString(sb.String()); err != nil {
return err
}
return nil
}

type TxLogger struct {
blockNumber uint64
blockHash common.Hash
sb *strings.Builder
file *os.File
encoder *json.Encoder
signer types.Signer
isLondon bool
baseFee *big.Int
}

func NewTxLogger(signer types.Signer, isLondon bool, baseFee *big.Int, blockHash common.Hash, blockNumber uint64, perFolder, perFile uint64) (*TxLogger, error) {
sb, err := getFile("transactions", blockNumber, perFolder, perFile)
file, err := getFile("transactions", blockNumber, perFolder, perFile)
if err != nil {
return nil, err
}
sb := &strings.Builder{}
return &TxLogger{
blockNumber: blockNumber,
blockHash: blockHash,
file: file,
sb: sb,
encoder: json.NewEncoder(sb),
signer: signer,
Expand Down Expand Up @@ -350,14 +340,25 @@ func (t *TxLogger) DumpStateSyncTxn(index int, txHash common.Hash) error {
return nil
}

func (t *TxLogger) Close() error {
defer func(start time.Time) {
fmt.Printf("Write transactions to file. Cost time = %v\n", time.Since(start))
}(time.Now())
if _, err := t.file.WriteString(t.sb.String()); err != nil {
return err
}
return t.file.Close()
}

func BlockDumpLogger(block *types.Block, perFolder, perFile uint64) error {
defer func(start time.Time) {
fmt.Printf("Dump blocks, block_number = %v ,cost time = %v\n", strconv.FormatUint(block.NumberU64(), 10), time.Since(start))
}(time.Now())
sb, err := getFile("blocks", block.NumberU64(), perFolder, perFile)
file, err := getFile("blocks", block.NumberU64(), perFolder, perFile)
if err != nil {
return err
}
defer file.Close()

entry := map[string]interface{}{
"timestamp": block.Time(),
Expand All @@ -371,7 +372,7 @@ func BlockDumpLogger(block *types.Block, perFolder, perFile uint64) error {
"nonce": block.Nonce(),
"size": block.Size(),
}
encoder := json.NewEncoder(sb)
encoder := json.NewEncoder(file)
if err := encoder.Encode(entry); err != nil {
return fmt.Errorf("failed to encode transaction entry %w", err)
}
Expand Down

0 comments on commit c0a58a3

Please sign in to comment.