diff --git a/bin/README.hermit.md b/bin/README.hermit.md index 9ad50b43..bcad5f32 100644 --- a/bin/README.hermit.md +++ b/bin/README.hermit.md @@ -2,5 +2,5 @@ This is a [Hermit](https://github.com/cashapp/hermit) bin directory. -The symlinks in this directory are managed by Hermit and will automatically download and install -Hermit itself as well as packages. These packages are local to this environment. +The symlinks in this directory are managed by Hermit and will automatically download and install Hermit itself as well +as packages. These packages are local to this environment. diff --git a/cluster/dragon/dragon.go b/cluster/dragon/dragon.go index ecaf7225..47d87615 100644 --- a/cluster/dragon/dragon.go +++ b/cluster/dragon/dragon.go @@ -4,7 +4,8 @@ import ( "context" "fmt" "github.com/lni/dragonboat/v3/client" - log "github.com/sirupsen/logrus" + "log" + "github.com/squareup/pranadb/conf" "path/filepath" "sync" @@ -73,7 +74,7 @@ type snapshot struct { func (s *snapshot) Close() { if err := s.pebbleSnapshot.Close(); err != nil { - log.Errorf("failed to close snapshot %v", err) + log.Printf("failed to close snapshot %v", err) } } @@ -278,7 +279,7 @@ func (d *Dragon) Start() error { // https://github.com/squareup/pranadb/issues/124 time.Sleep(10 * time.Second) - log.Infof("Dragon node %d started", d.cnf.NodeID) + log.Printf("Dragon node %d started", d.cnf.NodeID) return nil } @@ -712,7 +713,7 @@ func (d *Dragon) NodeUnloaded(info raftio.NodeInfo) { go func() { err := d.nodeRemovedFromCluster(int(info.NodeID-1), info.ClusterID) if err != nil { - log.Errorf("failed to remove node from cluster %v", err) + log.Printf("failed to remove node from cluster %v", err) } }() } diff --git a/cluster/dragon/integration/dragon_integration_test.go b/cluster/dragon/integration/dragon_integration_test.go index 1ee048b8..e948a255 100644 --- a/cluster/dragon/integration/dragon_integration_test.go +++ b/cluster/dragon/integration/dragon_integration_test.go @@ -5,12 +5,12 @@ import ( "flag" "fmt" "io/ioutil" + "log" "math/rand" "os" "testing" "time" - log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/conf" dragon "github.com/squareup/pranadb/cluster/dragon" @@ -31,7 +31,7 @@ var dataDir string func TestMain(m *testing.M) { flag.Parse() if testing.Short() { - log.Infof("-short: skipped") + log.Printf("-short: skipped") return } var err error diff --git a/cmd/pranadb/main.go b/cmd/pranadb/main.go index 9be4dd3e..bb10dce8 100644 --- a/cmd/pranadb/main.go +++ b/cmd/pranadb/main.go @@ -1,11 +1,11 @@ package main import ( - "os" - "github.com/alecthomas/kong" "github.com/alecthomas/kong-hcl/v2" - log "github.com/sirupsen/logrus" + "log" + "os" + "github.com/squareup/pranadb/conf" plog "github.com/squareup/pranadb/log" "github.com/squareup/pranadb/server" @@ -20,7 +20,7 @@ type cli struct { func main() { r := &runner{} if err := r.run(os.Args[1:], true); err != nil { - log.WithError(err).Fatal("startup failed") + log.Printf("startup failed %v", err) } select {} // prevent main exiting } diff --git a/cmd/pranadb/testdata/config.hcl b/cmd/pranadb/testdata/config.hcl index b2581ee0..eabc39d1 100644 --- a/cmd/pranadb/testdata/config.hcl +++ b/cmd/pranadb/testdata/config.hcl @@ -1,5 +1,5 @@ // This is the clusterid -cluster-id = 12345 +cluster-id = 12345 /* These are the raft addresses */ @@ -16,34 +16,34 @@ notif-listen-addresses = [ ] // Numshards -num-shards = 50 -replication-factor = 3 -data-dir = "foo/bar/baz" -test-server = false -data-snapshot-entries = 1001 -data-compaction-overhead = 501 -sequence-snapshot-entries = 2001 -sequence-compaction-overhead = 1001 -locks-snapshot-entries = 101 -locks-compaction-overhead = 51 -debug = true -notifier-heartbeat-interval = "76s" -enable-api-server = true -api-server-listen-addresses = [ +num-shards = 50 +replication-factor = 3 +data-dir = "foo/bar/baz" +test-server = false +data-snapshot-entries = 1001 +data-compaction-overhead = 501 +sequence-snapshot-entries = 2001 +sequence-compaction-overhead = 1001 +locks-snapshot-entries = 101 +locks-compaction-overhead = 51 +debug = true +notifier-heartbeat-interval = "76s" +enable-api-server = true +api-server-listen-addresses = [ "addr7", "addr8", "addr9" ] -api-server-session-timeout = "41s" +api-server-session-timeout = "41s" api-server-session-check-interval = "6s" -log-format = "json" -log-level = "info" -log-file = "-" +log-format = "json" +log-level = "info" +log-file = "-" kafka-brokers = { "testbroker" = { "client-type" = 1, - "properties" = { + "properties" = { "fakeKafkaID" = "1" } } diff --git a/common/util.go b/common/util.go index 18502d8f..92217f3c 100644 --- a/common/util.go +++ b/common/util.go @@ -2,8 +2,8 @@ package common import ( "fmt" - log "github.com/sirupsen/logrus" "io" + "log" "reflect" "runtime" "sync/atomic" @@ -58,7 +58,7 @@ func InvokeCloser(closer io.Closer) { if closer != nil { err := closer.Close() if err != nil { - log.Errorf("failed to close closer %v", err) + log.Printf("failed to close closer %v", err) } } } diff --git a/kafka/fake_kafka.go b/kafka/fake_kafka.go index 13bdcbee..9019eb4b 100644 --- a/kafka/fake_kafka.go +++ b/kafka/fake_kafka.go @@ -3,8 +3,7 @@ package kafka import ( "fmt" "github.com/pkg/errors" - - log "github.com/sirupsen/logrus" + "log" "github.com/squareup/pranadb/common" @@ -274,7 +273,7 @@ func (g *Group) checkInjectFailure() error { if g.failureEnd != nil { if time.Now().Sub(*g.failureEnd) >= 0 { - log.Infof("Failure injection has ended") + log.Printf("Failure injection has ended") g.failureEnd = nil return nil } diff --git a/kafka/ingestor.go b/kafka/ingestor.go index b19a9c3c..2ac94e8f 100644 --- a/kafka/ingestor.go +++ b/kafka/ingestor.go @@ -32,7 +32,7 @@ func IngestRows(f *FakeKafka, sourceInfo *common.SourceInfo, rows *common.Rows, ok, err := commontest.WaitUntilWithError(func() (bool, error) { ingested, committed := topic.TotalMessages(groupID) // All the messages have been ingested and committed - //log.Infof("start committed %d ingested %d committed %d ingested %d", c, ingestedStart, committed, ingested) + //log.Printf("start committed %d ingested %d committed %d ingested %d", c, ingestedStart, committed, ingested) if (ingested-ingestedStart == rows.RowCount()) && (ingested-committed) == 0 { return true, nil } diff --git a/log/log.go b/log/log.go index 03869e97..f43ce565 100644 --- a/log/log.go +++ b/log/log.go @@ -1,10 +1,9 @@ package log import ( - "os" - log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/perrors" + "os" ) // Config contains the configuration for the global logger. diff --git a/notifier/client.go b/notifier/client.go index 37f212f6..846a4f67 100644 --- a/notifier/client.go +++ b/notifier/client.go @@ -3,8 +3,8 @@ package notifier import ( "github.com/golang/protobuf/proto" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/common" + "log" "net" "sync" "sync/atomic" @@ -63,7 +63,7 @@ func (c *client) makeUnavailable(serverAddress string) { // Cannot write to server or make connection, it's unavailable - it may be down or there's a network issue // We remove the server from the set of live servers and add it to the set of unavailable ones // Unavailable ones will be retried after a delay - log.Errorf("Server became unavailable %s", serverAddress) + log.Printf("Server became unavailable %s", serverAddress) delete(c.connections, serverAddress) delete(c.availableServers, serverAddress) c.unavailableServers[serverAddress] = time.Now() @@ -107,7 +107,7 @@ func (c *client) broadcast(nf *NotificationMessage, ri *responseInfo) error { for serverAddress, failTime := range c.unavailableServers { if now.Sub(failTime) >= connectionRetryBackoff { // Put the server back in the available set - log.Warnf("Backoff time for unavailable server %s has expired - adding back to available set", serverAddress) + log.Printf("Backoff time for unavailable server %s has expired - adding back to available set", serverAddress) delete(c.unavailableServers, serverAddress) c.availableServers[serverAddress] = struct{}{} } @@ -321,7 +321,7 @@ func (cc *clientConnection) stop() { } if err := cc.conn.Close(); err != nil { // Do nothing - connection might already have been closed (e.g. from client) - log.Errorf("Failed to close connection %v", err) + log.Printf("Failed to close connection %v", err) } <-cc.loopCh cc.client.connectionClosed(cc) @@ -329,7 +329,7 @@ func (cc *clientConnection) stop() { func (cc *clientConnection) sendHeartbeat() { if err := writeMessage(heartbeatMessageType, nil, cc.conn); err != nil { - log.Errorf("failed to send heartbeat %v", err) + log.Printf("failed to send heartbeat %v", err) cc.heartbeatFailed() return } @@ -338,7 +338,7 @@ func (cc *clientConnection) sendHeartbeat() { if cc.hbReceived.Get() { cc.sendHeartbeat() } else { - log.Warnf("response heartbeat not received within %f seconds", cc.client.heartbeatInterval.Seconds()) + log.Printf("response heartbeat not received within %f seconds", cc.client.heartbeatInterval.Seconds()) cc.heartbeatFailed() } }) diff --git a/notifier/server.go b/notifier/server.go index bee3e4dd..3af19483 100644 --- a/notifier/server.go +++ b/notifier/server.go @@ -2,11 +2,12 @@ package notifier import ( "fmt" + "log" "net" "sync" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/squareup/pranadb/common" ) @@ -175,7 +176,7 @@ func (c *connection) handleMessage(msgType messageType, msg []byte) { if msgType == heartbeatMessageType { if !c.s.responsesDisabled.Get() { if err := writeMessage(heartbeatMessageType, nil, c.conn); err != nil { - log.Errorf("failed to write heartbeat %v", err) + log.Printf("failed to write heartbeat %v", err) } } return @@ -192,19 +193,19 @@ func (c *connection) handleMessageAsync(msg []byte) { func (c *connection) doHandleMessageAsync(msg []byte) { nf := &NotificationMessage{} if err := nf.deserialize(msg); err != nil { - log.Errorf("Failed to deserialize notification %v", err) + log.Printf("Failed to deserialize notification %v", err) return } listener := c.s.lookupNotificationListener(nf.notif) err := listener.HandleNotification(nf.notif) ok := true if err != nil { - log.Errorf("Failed to handle notification %v", err) + log.Printf("Failed to handle notification %v", err) ok = false } if nf.requiresResponse && !c.s.responsesDisabled.Get() { if err := c.sendResponse(nf, ok); err != nil { - log.Errorf("failed to send response %v", err) + log.Printf("failed to send response %v", err) } } } diff --git a/push/engine.go b/push/engine.go index 86cd34f8..db8fc32f 100644 --- a/push/engine.go +++ b/push/engine.go @@ -3,7 +3,8 @@ package push import ( "errors" "fmt" - log "github.com/sirupsen/logrus" + "log" + "github.com/squareup/pranadb/conf" "github.com/squareup/pranadb/push/mover" "github.com/squareup/pranadb/push/sched" @@ -225,10 +226,10 @@ func (s *shardListener) maybeHandleRemoteBatch() error { // It's possible an error can occur in handling received rows if the source or aggregate table is not // yet registered - this could be the case if rows are forwarded right after startup - in this case we can just // retry - log.Errorf("failed to handle received rows %v will retry after delay", err) + log.Printf("failed to handle received rows %v will retry after delay", err) time.AfterFunc(remoteBatchRetryDelay, func() { if err := s.maybeHandleRemoteBatch(); err != nil { - log.Errorf("failed to process remote batch %v", err) + log.Printf("failed to process remote batch %v", err) } }) return nil @@ -322,20 +323,20 @@ func (p *PushEngine) checkForRowsToForward() error { // WaitForProcessingToComplete is used in tests to wait for all rows have been processed when ingesting test data func (p *PushEngine) WaitForProcessingToComplete() error { - log.Infof("Waiting for schedulers to stop") + log.Printf("Waiting for schedulers to stop") err := p.waitForSchedulers() if err != nil { return err } - log.Infof("Waiting for no rows in forwarder table") + log.Printf("Waiting for no rows in forwarder table") // Wait for no rows in the forwarder table err = p.waitForNoRowsInTable(common.ForwarderTableID) if err != nil { return err } - log.Infof("Waiting for no rows in receiver table") + log.Printf("Waiting for no rows in receiver table") // Wait for no rows in the receiver table err = p.waitForNoRowsInTable(common.ReceiverTableID) if err != nil { diff --git a/push/sched/scheduler.go b/push/sched/scheduler.go index efadb653..808585fc 100644 --- a/push/sched/scheduler.go +++ b/push/sched/scheduler.go @@ -1,7 +1,7 @@ package sched import ( - log "github.com/sirupsen/logrus" + "log" "sync" ) @@ -97,7 +97,7 @@ func (s *ShardScheduler) runLoop() { if holder.errChan != nil { holder.errChan <- err } else if err != nil { - log.Errorf("Failed to execute action: %v", err) + log.Printf("Failed to execute action: %v", err) } } } diff --git a/push/source/consumer.go b/push/source/consumer.go index 34b9f76c..adf166eb 100644 --- a/push/source/consumer.go +++ b/push/source/consumer.go @@ -1,10 +1,10 @@ package source import ( - log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/common" "github.com/squareup/pranadb/kafka" "github.com/squareup/pranadb/push/sched" + "log" "time" ) @@ -147,7 +147,7 @@ func (m *MessageConsumer) getBatch(pollTimeout time.Duration, maxRecords int) ([ // We've seen the message before - this can be the case if a node crashed after offset was committed in // Prana but before offset was committed in Kafka. // In this case we log a warning, and ignore the message, the offset will be committed - log.Warnf("Duplicate message delivery attempted on node %d schema %s source %s topic %s partition %d offset %d"+ + log.Printf("Duplicate message delivery attempted on node %d schema %s source %s topic %s partition %d offset %d"+ " Message will be ignored", m.source.cluster.GetNodeID(), m.source.sourceInfo.SchemaName, m.source.sourceInfo.Name, m.source.sourceInfo.TopicInfo.TopicName, partID, msg.PartInfo.Offset) continue } diff --git a/push/source/message_parser_test.go b/push/source/message_parser_test.go index 8b7636cd..4a5e2453 100644 --- a/push/source/message_parser_test.go +++ b/push/source/message_parser_test.go @@ -218,7 +218,7 @@ func TestParseMessageTimestamp(t *testing.T) { testParseMessage(t, theColNames, theColTypes, common.EncodingJSON, common.EncodingJSON, common.EncodingJSON, nil, - []byte(fmt.Sprintf(`{"kf1":"%s"}`, sTS)), // Tests decoding mysql timestamp from string field in message + []byte(fmt.Sprintf(`{"kf1":"%s"}`, sTS)), // Tests decoding mysql timestamp from string field in message []byte(fmt.Sprintf(`{"vf1":%d}`, unixMillisPastEpoch)), // Tests decoding mysql timestamp from numeric field - assumed to be milliseconds past Unix epoch []string{"t", "k.kf1", "v.vf1"}, ts, vf) diff --git a/push/source/source.go b/push/source/source.go index 2cdb9a8f..7c4fa34e 100644 --- a/push/source/source.go +++ b/push/source/source.go @@ -2,7 +2,7 @@ package source import ( "fmt" - log "github.com/sirupsen/logrus" + "github.com/squareup/pranadb/conf" "github.com/squareup/pranadb/kafka" "github.com/squareup/pranadb/meta" @@ -13,13 +13,13 @@ import ( "strconv" - "sync" - "time" - "github.com/squareup/pranadb/cluster" "github.com/squareup/pranadb/common" "github.com/squareup/pranadb/push/exec" "github.com/squareup/pranadb/sharder" + "log" + "sync" + "time" ) // TODO make configurable @@ -232,7 +232,7 @@ func (s *Source) consumerError(err error, clientError bool) { return //panic("Got consumer error but souce is not started") } - log.Errorf("Failure in consumer %v source will be stopped", err) + log.Printf("Failure in consumer %v source will be stopped", err) if err2 := s.stop(); err2 != nil { return } @@ -246,11 +246,11 @@ func (s *Source) consumerError(err error, clientError bool) { } else { delay = initialRestartDelay } - log.Warnf("Will attempt restart of source after delay of %d ms", delay.Milliseconds()) + log.Printf("Will attempt restart of source after delay of %d ms", delay.Milliseconds()) time.AfterFunc(delay, func() { err := s.Start() if err != nil { - log.Errorf("Failed to start source %v", err) + log.Printf("Failed to start source %v", err) } }) } diff --git a/server/server.go b/server/server.go index 48220080..df882791 100644 --- a/server/server.go +++ b/server/server.go @@ -8,10 +8,10 @@ import ( //nolint:gosec _ "net/http/pprof" //nolint:stylecheck + "log" //nolint:stylecheck "sync" - log "github.com/sirupsen/logrus" "github.com/squareup/pranadb/api" "github.com/squareup/pranadb/cluster" @@ -126,7 +126,7 @@ func (s *Server) Start() error { s.debugServer = &http.Server{Addr: addr} go func(srv *http.Server) { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - log.Errorf("debug server failed to listen %v", err) + log.Printf("debug server failed to listen %v", err) } }(s.debugServer) } diff --git a/sqltest/sql_test.go b/sqltest/sql_test.go index b086ff25..23abd6d2 100644 --- a/sqltest/sql_test.go +++ b/sqltest/sql_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/squareup/pranadb/client" "io/ioutil" + "log" "math/rand" "os" "sort" @@ -14,7 +15,6 @@ import ( "testing" "time" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -94,12 +94,12 @@ func (w *sqlTestsuite) TestSQL() { } func (w *sqlTestsuite) restartCluster() { - log.Infof("Restarting cluster") + log.Printf("Restarting cluster") w.stopCluster() - log.Infof("Stopped cluster") + log.Printf("Stopped cluster") time.Sleep(5 * time.Second) w.startCluster() - log.Infof("Restarted it") + log.Printf("Restarted it") } func (w *sqlTestsuite) setupPranaCluster() { @@ -310,7 +310,7 @@ func (st *sqlTest) run() { st.testSuite.lock.Lock() defer st.testSuite.lock.Unlock() - log.Infof("Running sql test %s", st.testName) + log.Printf("Running sql test %s", st.testName) require := st.testSuite.suite.Require() @@ -341,7 +341,7 @@ func (st *sqlTest) run() { //nolint:gocyclo func (st *sqlTest) runTestIteration(require *require.Assertions, commands []string, iter int) int { - log.Infof("Running test iteration %d", iter) + log.Printf("Running test iteration %d", iter) start := time.Now() st.prana = st.choosePrana() st.output = &strings.Builder{} @@ -429,9 +429,9 @@ func (st *sqlTest) runTestIteration(require *require.Assertions, commands []stri topicNames := st.testSuite.fakeKafka.GetTopicNames() if len(topicNames) > 0 { - log.Infof("Topics left at end of test run - please make sure you delete them at the end of your script") + log.Printf("Topics left at end of test run - please make sure you delete them at the end of your script") for _, name := range topicNames { - log.Infof("Topic %s", name) + log.Printf("Topic %s", name) } } require.Equal(0, len(topicNames), "Topics left at end of test run") @@ -454,12 +454,12 @@ func (st *sqlTest) runTestIteration(require *require.Assertions, commands []stri require.Equal(trimBothEnds(expectedOutput), trimBothEnds(actualOutput)) dur := time.Now().Sub(start) - log.Infof("Finished running sql test %s time taken %d ms", st.testName, dur.Milliseconds()) + log.Printf("Finished running sql test %s time taken %d ms", st.testName, dur.Milliseconds()) return numIters } func (st *sqlTest) waitUntilRowsInTable(require *require.Assertions, tableName string, numRows int) { - log.Infof("Waiting for %d rows in table %s", numRows, tableName) + log.Printf("Waiting for %d rows in table %s", numRows, tableName) schema, ok := st.prana.GetMetaController().GetSchema(TestSchemaName) require.True(ok, "can't find test schema") tab, ok := schema.GetTable(tableName) @@ -515,7 +515,7 @@ func (st *sqlTest) tableDataLeft(require *require.Assertions, prana *server.Serv require.NoError(err) if displayRows && len(pairs) > 0 { for _, pair := range pairs { - log.Infof("%s v:%v", common.DumpDataKey(pair.Key), pair.Value) + log.Printf("%s v:%v", common.DumpDataKey(pair.Key), pair.Value) } require.Equal(0, len(pairs), fmt.Sprintf("Table data left at end of test for shard %d", shardID)) } @@ -633,7 +633,7 @@ func (st *sqlTest) doLoadData(require *require.Assertions, command string) { st.waitForProcessingToComplete(require) end := time.Now() dur := end.Sub(start) - log.Infof("Load data %s execute time ms %d", command, dur.Milliseconds()) + log.Printf("Load data %s execute time ms %d", command, dur.Milliseconds()) } func (st *sqlTest) executeCloseSession(require *require.Assertions) { @@ -662,7 +662,7 @@ func (st *sqlTest) executeCreateTopic(require *require.Assertions, command strin } _, err := st.testSuite.fakeKafka.CreateTopic(topicName, int(partitions)) require.NoError(err) - log.Infof("Created topic %s partitions %d", topicName, partitions) + log.Printf("Created topic %s partitions %d", topicName, partitions) } func (st *sqlTest) executeDeleteTopic(require *require.Assertions, command string) { @@ -672,7 +672,7 @@ func (st *sqlTest) executeDeleteTopic(require *require.Assertions, command strin topicName := parts[2] err := st.testSuite.fakeKafka.DeleteTopic(topicName) require.NoError(err) - log.Infof("Deleted topic %s ", topicName) + log.Printf("Deleted topic %s ", topicName) } func (st *sqlTest) executeResetOffets(require *require.Assertions, command string) { @@ -695,7 +695,7 @@ func (st *sqlTest) executeRestartCluster(require *require.Assertions) { } func (st *sqlTest) executeKafkaFail(require *require.Assertions, command string) { - log.Infof("Executing kafka fail") + log.Printf("Executing kafka fail") parts := strings.Split(command, " ") lp := len(parts) require.True(lp == 5, "Invalid kafka fail, should be --kafka fail topic_name source_name fail_time") @@ -741,7 +741,7 @@ func (st *sqlTest) executeSQLStatement(require *require.Assertions, statement st } end := time.Now() dur := end.Sub(start) - log.Infof("Statement execute time ms %d", dur.Milliseconds()) + log.Printf("Statement execute time ms %d", dur.Milliseconds()) } func (st *sqlTest) choosePrana() *server.Server {