Skip to content

Commit

Permalink
Merge pull request #198 from line/egon/merge_linemint
Browse files Browse the repository at this point in the history
merge linemint for initial ebony
  • Loading branch information
Woosang Son authored Mar 9, 2021
2 parents 277ad4a + 0bd87be commit ef06e93
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 13 deletions.
38 changes: 37 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,30 @@ type RPCConfig struct {
// 1024 - 40 - 10 - 50 = 924 = ~900
MaxOpenConnections int `mapstructure:"max_open_connections"`

// mirrors http.Server#ReadTimeout
// ReadTimeout is the maximum duration for reading the entire
// request, including the body.
//
// Because ReadTimeout does not let Handlers make per-request
// decisions on each request body's acceptable deadline or
// upload rate, most users will prefer to use
// ReadHeaderTimeout. It is valid to use them both.
ReadTimeout time.Duration `mapstructure:"read_timeout"`

// mirrors http.Server#WriteTimeout
// WriteTimeout is the maximum duration before timing out
// writes of the response. It is reset whenever a new
// request's header is read. Like ReadTimeout, it does not
// let Handlers make decisions on a per-request basis.
WriteTimeout time.Duration `mapstructure:"write_timeout"`

// mirrors http.Server#IdleTimeout
// IdleTimeout is the maximum amount of time to wait for the
// next request when keep-alives are enabled. If IdleTimeout
// is zero, the value of ReadTimeout is used. If both are
// zero, there is no timeout.
IdleTimeout time.Duration `mapstructure:"idle_timeout"`

// Maximum number of unique clientIDs that can /subscribe
// If you're using /broadcast_tx_commit, set to the estimated maximum number
// of broadcast_tx_commit calls per block.
Expand All @@ -343,7 +367,7 @@ type RPCConfig struct {
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`

// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// WARNING: Using a value larger than 'WriteTimeout' will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
// See https://github.com/tendermint/tendermint/issues/3435
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
Expand Down Expand Up @@ -388,6 +412,9 @@ func DefaultRPCConfig() *RPCConfig {

Unsafe: false,
MaxOpenConnections: 900,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,

MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
Expand Down Expand Up @@ -419,6 +446,15 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxOpenConnections < 0 {
return errors.New("max_open_connections can't be negative")
}
if cfg.ReadTimeout < 0 {
return errors.New("read_timeout can't be negative")
}
if cfg.WriteTimeout < 0 {
return errors.New("write_timeout can't be negative")
}
if cfg.IdleTimeout < 0 {
return errors.New("idle_timeout can't be negative")
}
if cfg.MaxSubscriptionClients < 0 {
return errors.New("max_subscription_clients can't be negative")
}
Expand Down
25 changes: 24 additions & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,29 @@ unsafe = {{ .RPC.Unsafe }}
# 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = {{ .RPC.MaxOpenConnections }}
# mirrors http.Server#ReadTimeout
# ReadTimeout is the maximum duration for reading the entire
# request, including the body.
# Because ReadTimeout does not let Handlers make per-request
# decisions on each request body's acceptable deadline or
# upload rate, most users will prefer to use
# ReadHeaderTimeout. It is valid to use them both.
read_timeout = "{{ .RPC.ReadTimeout }}"
# mirrors http.Server#WriteTimeout
# WriteTimeout is the maximum duration before timing out
# writes of the response. It is reset whenever a new
# request's header is read. Like ReadTimeout, it does not
# let Handlers make decisions on a per-request basis.
write_timeout = "{{ .RPC.WriteTimeout }}"
# mirrors http.Server#IdleTimeout
# IdleTimeout is the maximum amount of time to wait for the
# next request when keep-alives are enabled. If IdleTimeout
# is zero, the value of ReadTimeout is used. If both are
# zero, there is no timeout.
idle_timeout = "{{ .RPC.IdleTimeout }}"
# Maximum number of unique clientIDs that can /subscribe
# If you're using /broadcast_tx_commit, set to the estimated maximum number
# of broadcast_tx_commit calls per block.
Expand All @@ -207,7 +230,7 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# WARNING: Using a value larger than 'WriteTimeout' will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.
# See https://github.com/tendermint/tendermint/issues/3435
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
Expand Down
8 changes: 7 additions & 1 deletion mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
Expand Down Expand Up @@ -305,7 +306,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
return
}

mem.metrics.RecheckTimes.Add(1)
mem.metrics.RecheckCount.Add(1)
mem.resCbRecheck(req, res)

// update metrics
Expand Down Expand Up @@ -610,6 +611,7 @@ func (mem *CListMempool) Update(

// Either recheck non-committed txs to see if they became invalid
// or just notify there're some txs left.
recheckStartTime := time.Now().UnixNano()
if mem.Size() > 0 {
if mem.config.Recheck {
mem.logger.Debug("recheck txs", "numtxs", mem.Size(), "height", height)
Expand All @@ -621,6 +623,10 @@ func (mem *CListMempool) Update(
mem.notifyTxsAvailable()
}
}
recheckEndTime := time.Now().UnixNano()

recheckTimeMs := float64(recheckEndTime-recheckStartTime) / 1000000
mem.metrics.RecheckTime.Set(recheckTimeMs)

// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))
Expand Down
17 changes: 13 additions & 4 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type Metrics struct {
// Number of failed transactions.
FailedTxs metrics.Counter
// Number of times transactions are rechecked in the mempool.
RecheckTimes metrics.Counter
RecheckCount metrics.Counter
// Time of recheck transactions in the mempool.
RecheckTime metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand Down Expand Up @@ -54,12 +56,18 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "failed_txs",
Help: "Number of failed transactions.",
}, labels).With(labelsAndValues...),
RecheckTimes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
RecheckCount: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "recheck_times",
Name: "recheck_count",
Help: "Number of times transactions are rechecked in the mempool.",
}, labels).With(labelsAndValues...),
RecheckTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "recheck_time",
Help: "Time of recheck transactions in the mempool in ms.",
}, labels).With(labelsAndValues...),
}
}

Expand All @@ -69,6 +77,7 @@ func NopMetrics() *Metrics {
Size: discard.NewGauge(),
TxSizeBytes: discard.NewHistogram(),
FailedTxs: discard.NewCounter(),
RecheckTimes: discard.NewCounter(),
RecheckCount: discard.NewCounter(),
RecheckTime: discard.NewGauge(),
}
}
3 changes: 3 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,9 @@ func (n *Node) startRPC() ([]net.Listener, error) {
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
config.ReadTimeout = n.config.RPC.ReadTimeout
config.WriteTimeout = n.config.RPC.WriteTimeout
config.IdleTimeout = n.config.RPC.IdleTimeout
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
Expand Down
13 changes: 11 additions & 2 deletions rpc/jsonrpc/client/http_json_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

tmsync "github.com/tendermint/tendermint/libs/sync"
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
Expand All @@ -21,6 +22,10 @@ const (
protoWSS = "wss"
protoWS = "ws"
protoTCP = "tcp"

defaultMaxIdleConns = 10000
defaultIdleConnTimeout = 60 // sec
defaultExpectContinueTimeout = 1 // sec
)

//-------------------------------------------------------------
Expand Down Expand Up @@ -369,8 +374,12 @@ func DefaultHTTPClient(remoteAddr string) (*http.Client, error) {
client := &http.Client{
Transport: &http.Transport{
// Set to true to prevent GZIP-bomb DoS attacks
DisableCompression: true,
Dial: dialFn,
DisableCompression: true,
Dial: dialFn,
MaxIdleConns: defaultMaxIdleConns,
MaxIdleConnsPerHost: defaultMaxIdleConns,
IdleConnTimeout: defaultIdleConnTimeout * time.Second,
ExpectContinueTimeout: defaultExpectContinueTimeout * time.Second,
},
}

Expand Down
5 changes: 5 additions & 0 deletions rpc/jsonrpc/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Config struct {
ReadTimeout time.Duration
// mirrors http.Server#WriteTimeout
WriteTimeout time.Duration
// mirrors http.Server#IdleTimeout
IdleTimeout time.Duration
// MaxBodyBytes controls the maximum number of bytes the
// server will read parsing the request body.
MaxBodyBytes int64
Expand All @@ -40,6 +42,7 @@ func DefaultConfig() *Config {
MaxOpenConnections: 0, // unlimited
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
}
Expand All @@ -56,6 +59,7 @@ func Serve(listener net.Listener, handler http.Handler, logger log.Logger, confi
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
MaxHeaderBytes: config.MaxHeaderBytes,
}
err := s.Serve(listener)
Expand All @@ -81,6 +85,7 @@ func ServeTLS(
Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: config.MaxBodyBytes}, logger),
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
MaxHeaderBytes: config.MaxHeaderBytes,
}
err := s.ServeTLS(listener, certFile, keyFile)
Expand Down
27 changes: 24 additions & 3 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ func (blockExec *BlockExecutor) ApplyBlock(
return state, 0, ErrInvalidBlock(err)
}

startTime := time.Now().UnixNano()
execStartTime := time.Now().UnixNano()
abciResponses, err := execBlockOnProxyApp(
blockExec.logger, blockExec.proxyApp, block, blockExec.store, state.InitialHeight,
)
endTime := time.Now().UnixNano()
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
execEndTime := time.Now().UnixNano()

execTimeMs := float64(execEndTime-execStartTime) / 1000000
blockExec.metrics.BlockProcessingTime.Observe(execTimeMs)
blockExec.metrics.BlockExecutionTime.Set(execTimeMs)

if err != nil {
return state, 0, ErrProxyAppConn(err)
}
Expand Down Expand Up @@ -177,7 +181,13 @@ func (blockExec *BlockExecutor) ApplyBlock(
}

// Lock mempool, commit app state, update mempoool.
commitStartTime := time.Now().UnixNano()
appHash, retainHeight, err := blockExec.Commit(state, block, abciResponses.DeliverTxs)
commitEndTime := time.Now().UnixNano()

commitTimeMs := float64(commitEndTime-commitStartTime) / 1000000
blockExec.metrics.BlockCommitTime.Set(commitTimeMs)

if err != nil {
return state, 0, fmt.Errorf("commit failed for application: %v", err)
}
Expand Down Expand Up @@ -225,7 +235,13 @@ func (blockExec *BlockExecutor) Commit(
}

// Commit block, get hash back
appCommitStartTime := time.Now().UnixNano()
res, err := blockExec.proxyApp.CommitSync()
appCommitEndTime := time.Now().UnixNano()

appCommitTimeMs := float64(appCommitEndTime-appCommitStartTime) / 1000000
blockExec.metrics.BlockAppCommitTime.Set(appCommitTimeMs)

if err != nil {
blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return nil, 0, err
Expand All @@ -240,13 +256,18 @@ func (blockExec *BlockExecutor) Commit(
)

// Update mempool.
updateMempoolStartTime := time.Now().UnixNano()
err = blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
)
updateMempoolEndTime := time.Now().UnixNano()

updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000
blockExec.metrics.BlockUpdateMempoolTime.Set(updateMempoolTimeMs)

return res.Data, res.RetainHeight, err
}
Expand Down
38 changes: 37 additions & 1 deletion state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ const (
type Metrics struct {
// Time between BeginBlock and EndBlock.
BlockProcessingTime metrics.Histogram
// Time gauge between BeginBlock and EndBlock.
BlockExecutionTime metrics.Gauge
// Time of commit
BlockCommitTime metrics.Gauge
// Time of app commit
BlockAppCommitTime metrics.Gauge
// Time of update mempool
BlockUpdateMempoolTime metrics.Gauge
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
Expand All @@ -35,12 +43,40 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Time between BeginBlock and EndBlock in ms.",
Buckets: stdprometheus.LinearBuckets(1, 10, 10),
}, labels).With(labelsAndValues...),
BlockExecutionTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_execution_time",
Help: "Time between BeginBlock and EndBlock in ms.",
}, labels).With(labelsAndValues...),
BlockCommitTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_commit_time",
Help: "Time of commit in ms.",
}, labels).With(labelsAndValues...),
BlockAppCommitTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_app_commit_time",
Help: "Time of app commit in ms.",
}, labels).With(labelsAndValues...),
BlockUpdateMempoolTime: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "block_update_mempool_time",
Help: "Time of update mempool in ms.",
}, labels).With(labelsAndValues...),
}
}

// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
BlockProcessingTime: discard.NewHistogram(),
BlockProcessingTime: discard.NewHistogram(),
BlockExecutionTime: discard.NewGauge(),
BlockCommitTime: discard.NewGauge(),
BlockAppCommitTime: discard.NewGauge(),
BlockUpdateMempoolTime: discard.NewGauge(),
}
}

0 comments on commit ef06e93

Please sign in to comment.