diff --git a/cmd/trade.go b/cmd/trade.go index 115ebc0..e54684d 100644 --- a/cmd/trade.go +++ b/cmd/trade.go @@ -20,7 +20,6 @@ import ( "github.com/stellar/kelp/database" "github.com/stellar/kelp/model" "github.com/stellar/kelp/plugins" - "github.com/stellar/kelp/query" "github.com/stellar/kelp/support/logger" "github.com/stellar/kelp/support/monitoring" "github.com/stellar/kelp/support/networking" @@ -71,7 +70,6 @@ type inputs struct { stratConfigPath *string operationalBuffer *float64 operationalBufferNonNativePct *float64 - withIPC *bool simMode *bool logPrefix *string fixedIterations *uint64 @@ -128,7 +126,6 @@ func init() { // long-only flags options.operationalBuffer = tradeCmd.Flags().Float64("operationalBuffer", 20, "buffer of native XLM to maintain beyond minimum account balance requirement") options.operationalBufferNonNativePct = tradeCmd.Flags().Float64("operationalBufferNonNativePct", 0.001, "buffer of non-native assets to maintain as a percentage (0.001 = 0.1%)") - options.withIPC = tradeCmd.Flags().Bool("with-ipc", false, "enable IPC communication when spawned as a child process from the GUI") options.simMode = tradeCmd.Flags().Bool("sim", false, "simulate the bot's actions without placing any trades") options.logPrefix = tradeCmd.Flags().StringP("log", "l", "", "log to a file (and stdout) with this prefix for the filename") options.fixedIterations = tradeCmd.Flags().Uint64("iter", 0, "only run the bot for the first N iterations (defaults value 0 runs unboundedly)") @@ -566,18 +563,6 @@ func runTradeCmd(options inputs) { db, threadTracker, ) - startQueryServer( - l, - *options.strategy, - strategy, - botConfig, - client, - sdex, - exchangeShim, - tradingPair, - threadTracker, - &options, - ) // --- end initialization of services --- l.Info("Starting the trader bot...") @@ -683,47 +668,6 @@ func startFillTracking( } } -func startQueryServer( - l logger.Logger, - strategyName string, - strategy api.Strategy, - botConfig trader.BotConfig, - client *horizonclient.Client, - sdex *plugins.SDEX, - exchangeShim api.ExchangeShim, - tradingPair *model.TradingPair, - threadTracker *multithreading.ThreadTracker, - options *inputs, -) { - // only start query server (with IPC) if specifically instructed to so so from the command line. - // File descriptors in the IPC receiver will be invalid and will crash the bot if the other end of the pipe does not exist. - if !*options.withIPC { - return - } - - qs := query.MakeServer( - l, - strategyName, - strategy, - botConfig, - client, - sdex, - exchangeShim, - tradingPair, - ) - - go func() { - defer logPanic(l, true) - e := qs.StartIPC() - if e != nil { - l.Info("") - l.Errorf("problem encountered while running the query server: %s", e) - // we want to delete all the offers and exit here because we don't want the bot to run if the query server isn't working - deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker) - } - }() -} - func validateTrustlines(l logger.Logger, client *horizonclient.Client, botConfig *trader.BotConfig) { if !botConfig.IsTradingSdex() { l.Info("no need to validate trustlines because we're not using SDEX as the trading exchange") diff --git a/gui/backend/get_bot_info.go b/gui/backend/get_bot_info.go index f229777..5f07e31 100644 --- a/gui/backend/get_bot_info.go +++ b/gui/backend/get_bot_info.go @@ -1,8 +1,6 @@ package backend import ( - "bufio" - "bytes" "encoding/json" "fmt" "log" @@ -16,7 +14,6 @@ import ( "github.com/stellar/go/support/config" "github.com/stellar/kelp/gui/model2" "github.com/stellar/kelp/model" - "github.com/stellar/kelp/query" "github.com/stellar/kelp/support/kelpos" "github.com/stellar/kelp/support/utils" "github.com/stellar/kelp/trader" @@ -24,6 +21,23 @@ import ( const buysell = "buysell" +// botInfo is the response from the getBotInfo request +type botInfo struct { + LastUpdated string `json:"last_updated"` + TradingAccount string `json:"trading_account"` + Strategy string `json:"strategy"` + IsTestnet bool `json:"is_testnet"` + TradingPair *model.TradingPair `json:"trading_pair"` + AssetBase hProtocol.Asset `json:"asset_base"` + AssetQuote hProtocol.Asset `json:"asset_quote"` + BalanceBase float64 `json:"balance_base"` + BalanceQuote float64 `json:"balance_quote"` + NumBids int `json:"num_bids"` + NumAsks int `json:"num_asks"` + SpreadValue float64 `json:"spread_value"` + SpreadPercent float64 `json:"spread_pct"` +} + func (s *APIServer) getBotInfo(w http.ResponseWriter, r *http.Request) { botName, e := s.parseBotName(r) if e != nil { @@ -31,44 +45,9 @@ func (s *APIServer) getBotInfo(w http.ResponseWriter, r *http.Request) { return } - // s.runGetBotInfoViaIPC(w, botName) s.runGetBotInfoDirect(w, botName) } -func (s *APIServer) runGetBotInfoViaIPC(w http.ResponseWriter, botName string) { - p, exists := s.kos.GetProcess(botName) - if !exists { - log.Printf("kelp bot process with name '%s' does not exist; processes available: %v\n", botName, s.kos.RegisteredProcesses()) - w.WriteHeader(http.StatusNotFound) - w.Write([]byte("{}")) - return - } - - log.Printf("getBotInfo is making IPC request for botName: %s\n", botName) - p.PipeIn.Write([]byte("getBotInfo\n")) - scanner := bufio.NewScanner(p.PipeOut) - output := "" - for scanner.Scan() { - text := scanner.Text() - if strings.Contains(text, utils.IPCBoundary) { - break - } - output += text - } - var buf bytes.Buffer - e := json.Indent(&buf, []byte(output), "", " ") - if e != nil { - log.Printf("cannot indent json response (error=%s), json_response: %s\n", e, output) - w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte("{}")) - return - } - log.Printf("getBotInfo returned IPC response for botName '%s': %s\n", botName, buf.String()) - - w.WriteHeader(http.StatusOK) - w.Write(buf.Bytes()) -} - func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) { log.Printf("getBotInfo is invoking logic directly for botName: %s\n", botName) @@ -171,7 +150,7 @@ func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) { spreadPct = spread / midPrice } - bi := query.BotInfo{ + bi := botInfo{ LastUpdated: time.Now().UTC().Format("1/_2/2006 15:04:05 MST"), TradingAccount: account.AccountID, Strategy: buysell, @@ -189,7 +168,7 @@ func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) { marshalledJson, e := json.MarshalIndent(bi, "", " ") if e != nil { - log.Printf("cannot marshall to json response (error=%s), BotInfo: %+v\n", e, bi) + log.Printf("cannot marshall to json response (error=%s), botInfo: %+v\n", e, bi) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte("{}")) return diff --git a/query/botInfo.go b/query/botInfo.go deleted file mode 100644 index abff2ad..0000000 --- a/query/botInfo.go +++ /dev/null @@ -1,84 +0,0 @@ -package query - -import ( - "fmt" - "strings" - "time" - - hProtocol "github.com/stellar/go/protocols/horizon" - "github.com/stellar/kelp/model" - "github.com/stellar/kelp/support/utils" -) - -// BotInfo is the response from the getBotInfo IPC request -type BotInfo struct { - LastUpdated string `json:"last_updated"` - TradingAccount string `json:"trading_account"` - Strategy string `json:"strategy"` - IsTestnet bool `json:"is_testnet"` - TradingPair *model.TradingPair `json:"trading_pair"` - AssetBase hProtocol.Asset `json:"asset_base"` - AssetQuote hProtocol.Asset `json:"asset_quote"` - BalanceBase float64 `json:"balance_base"` - BalanceQuote float64 `json:"balance_quote"` - NumBids int `json:"num_bids"` - NumAsks int `json:"num_asks"` - SpreadValue float64 `json:"spread_value"` - SpreadPercent float64 `json:"spread_pct"` -} - -func (s *Server) getBotInfo() (*BotInfo, error) { - assetBase, assetQuote, e := s.sdex.Assets() - if e != nil { - return nil, fmt.Errorf("error getting assets from sdex: %s", e) - } - - balanceBase, e := s.exchangeShim.GetBalanceHack(assetBase) - if e != nil { - return nil, fmt.Errorf("error getting base asset balance: %s", e) - } - - balanceQuote, e := s.exchangeShim.GetBalanceHack(assetQuote) - if e != nil { - return nil, fmt.Errorf("error getting quote asset balance: %s", e) - } - - offers, e := s.exchangeShim.LoadOffersHack() - if e != nil { - return nil, fmt.Errorf("error loading offers: %s", e) - } - sellingAOffers, buyingAOffers := utils.FilterOffers(offers, assetBase, assetQuote) - numBids := len(buyingAOffers) - numAsks := len(sellingAOffers) - - ob, e := s.exchangeShim.GetOrderBook(s.tradingPair, 20) - if e != nil { - return nil, fmt.Errorf("error loading orderbook (maxCount=20): %s", e) - } - topAsk := ob.TopAsk() - topBid := ob.TopBid() - spreadValue := model.NumberFromFloat(-1.0, 16) - midPrice := model.NumberFromFloat(-1.0, 16) - spreadPct := model.NumberFromFloat(-1.0, 16) - if topBid != nil && topAsk != nil { - spreadValue = topAsk.Price.Subtract(*topBid.Price) - midPrice = topAsk.Price.Add(*topBid.Price).Scale(0.5) - spreadPct = spreadValue.Divide(*midPrice) - } - - return &BotInfo{ - LastUpdated: time.Now().UTC().Format("1/_2/2006 15:04:05 MST"), - TradingAccount: s.sdex.TradingAccount, - Strategy: s.strategyName, - IsTestnet: strings.Contains(s.sdex.API.HorizonURL, "test"), - TradingPair: s.tradingPair, - AssetBase: assetBase, - AssetQuote: assetQuote, - BalanceBase: balanceBase.Balance, - BalanceQuote: balanceQuote.Balance, - NumBids: numBids, - NumAsks: numAsks, - SpreadValue: spreadValue.AsFloat(), - SpreadPercent: spreadPct.AsFloat(), - }, nil -} diff --git a/query/server.go b/query/server.go deleted file mode 100644 index 4fb7606..0000000 --- a/query/server.go +++ /dev/null @@ -1,109 +0,0 @@ -package query - -import ( - "bufio" - "encoding/json" - "fmt" - "os" - "strings" - - "github.com/stellar/kelp/support/utils" - - "github.com/stellar/go/clients/horizonclient" - "github.com/stellar/kelp/api" - "github.com/stellar/kelp/model" - "github.com/stellar/kelp/plugins" - "github.com/stellar/kelp/support/logger" - "github.com/stellar/kelp/trader" -) - -// Server is a query server with which the trade command will serve information about an actively running bot -type Server struct { - l logger.Logger - strategyName string - strategy api.Strategy - botConfig trader.BotConfig - client *horizonclient.Client - sdex *plugins.SDEX - exchangeShim api.ExchangeShim - tradingPair *model.TradingPair -} - -// MakeServer is a factory method -func MakeServer( - l logger.Logger, - strategyName string, - strategy api.Strategy, - botConfig trader.BotConfig, - client *horizonclient.Client, - sdex *plugins.SDEX, - exchangeShim api.ExchangeShim, - tradingPair *model.TradingPair, -) *Server { - return &Server{ - l: l, - strategyName: strategyName, - strategy: strategy, - botConfig: botConfig, - client: client, - sdex: sdex, - exchangeShim: exchangeShim, - tradingPair: tradingPair, - } -} - -// StartIPC kicks off the Server which reads from Stdin and writes to Stdout, this should be run in a new goroutine -func (s *Server) StartIPC() error { - pipeRead := os.NewFile(uintptr(3), "pipe_read") - pipeWrite := os.NewFile(uintptr(4), "pipe_write") - - scanner := bufio.NewScanner(pipeRead) - s.l.Infof("waiting for IPC command...\n") - for scanner.Scan() { - command := scanner.Text() - s.l.Infof("...received IPC command: %s\n", command) - output, e := s.executeCommandIPC(command) - if e != nil { - return fmt.Errorf("error while executing IPC Command ('%s'): %s", command, e) - } - if !strings.HasSuffix(output, "\n") { - output += "\n" - } - - output += utils.IPCBoundary + "\n" - s.l.Infof("responding to IPC command ('%s') with output: %s", command, output) - _, e = pipeWrite.WriteString(output) - if e != nil { - return fmt.Errorf("error while writing output to pipeWrite (name=%s; fd=%v): %s", pipeWrite.Name(), pipeWrite.Fd(), e) - } - s.l.Infof("waiting for next IPC command...\n") - } - - if e := scanner.Err(); e != nil { - return fmt.Errorf("error while reading commands in query server: %s", e) - } - return nil -} - -func (s *Server) executeCommandIPC(cmd string) (string, error) { - cmd = strings.TrimSpace(cmd) - - switch cmd { - case "": - return "", nil - case "getBotInfo": - output, e := s.getBotInfo() - if e != nil { - return "", fmt.Errorf("unable to get bot info: %s", e) - } - - outputBytes, e := json.MarshalIndent(output, "", " ") - if e != nil { - return "", fmt.Errorf("unable to marshall output to JSON: %s", e) - } - return string(outputBytes), nil - default: - // don't do anything if the input is an incorrect command because we take input from standard in - return "", nil - } -} diff --git a/support/kelpos/kelpos.go b/support/kelpos/kelpos.go index 46b3b1d..dd9bb3e 100644 --- a/support/kelpos/kelpos.go +++ b/support/kelpos/kelpos.go @@ -2,7 +2,6 @@ package kelpos import ( "io" - "os" "os/exec" "sync" @@ -25,11 +24,9 @@ func (kos *KelpOS) SetSilentRegistrations() { // Process contains all the pieces that can be used to control a given process type Process struct { - Cmd *exec.Cmd - Stdin io.WriteCloser - Stdout io.ReadCloser - PipeIn *os.File - PipeOut *os.File + Cmd *exec.Cmd + Stdin io.WriteCloser + Stdout io.ReadCloser } // singleton is the singleton instance of KelpOS diff --git a/support/kelpos/process.go b/support/kelpos/process.go index f8a4aa9..a9b1ed1 100644 --- a/support/kelpos/process.go +++ b/support/kelpos/process.go @@ -5,8 +5,9 @@ import ( "fmt" "io/ioutil" "log" - "os" "os/exec" + + "github.com/nikhilsaraf/go-tools/multithreading" ) // StreamOutput runs the provided command in a streaming fashion @@ -57,24 +58,39 @@ func (kos *KelpOS) Blocking(namespace string, cmd string) ([]byte, error) { return nil, fmt.Errorf("could not run bash command in background '%s': %s", cmd, e) } - var outputBytes []byte - var err error - go func() { - outputBytes, err = ioutil.ReadAll(p.Stdout) - }() - + // defer unregistration of process because regardless of whether it succeeds or fails it will not be active on the system anymore defer func() { eInner := kos.Unregister(namespace) if eInner != nil { log.Fatalf("error unregistering bash command '%s': %s", cmd, eInner) } }() - e = p.Cmd.Wait() + + var outputBytes []byte + var eRead error + var eWait error + threadTracker := multithreading.MakeThreadTracker() + e = threadTracker.TriggerGoroutine(func(inputs []interface{}) { + outputBytes, eRead = ioutil.ReadAll(p.Stdout) + + // wait for process to finish + eWait = p.Cmd.Wait() + }, nil) if e != nil { - return nil, fmt.Errorf("error waiting for bash command '%s': %s (outputBytes=%s, err=%v)", cmd, e, string(outputBytes), err) + return nil, fmt.Errorf("error while triggering goroutine to read from process: %s", e) } + // wait for threadTracker to finish -- we need to do this double-wait setup because p.Cmd.Wait() needs to be called after + // we read but we still want to wait for this to happen because we are blocking on this command to finish. + // see: https://github.com/hashicorp/go-plugin/issues/116#issuecomment-494153638 + threadTracker.Wait() - return outputBytes, err + // now check for errors + if eWait != nil || eRead != nil { + return nil, fmt.Errorf("error in bash command '%s' for namespace '%s': (eWait=%s, outputBytes=%s, eRead=%v)", + cmd, namespace, eWait, string(outputBytes), eRead) + } + + return outputBytes, nil } // Background runs the provided bash command in the background and registers the command @@ -90,28 +106,15 @@ func (kos *KelpOS) Background(namespace string, cmd string) (*Process, error) { return nil, fmt.Errorf("could not get Stdout pipe for bash command '%s': %s", cmd, e) } - // create two pipes (unidirectional), pass one end of both pipes to child process, save the other on Process - childInputReader, childInputWriter, e := os.Pipe() - if e != nil { - return nil, fmt.Errorf("could not create input pipe for child bash command '%s': %s", cmd, e) - } - childOutputReader, childOutputWriter, e := os.Pipe() - if e != nil { - return nil, fmt.Errorf("could not create output pipe for child bash command '%s': %s", cmd, e) - } - c.ExtraFiles = []*os.File{childInputReader, childOutputWriter} - e = c.Start() if e != nil { return nil, fmt.Errorf("could not start bash command '%s': %s", cmd, e) } p := &Process{ - Cmd: c, - Stdin: stdinWriter, - Stdout: stdoutReader, - PipeIn: childInputWriter, - PipeOut: childOutputReader, + Cmd: c, + Stdin: stdinWriter, + Stdout: stdoutReader, } e = kos.register(namespace, p) if e != nil { diff --git a/support/utils/os.go b/support/utils/os.go deleted file mode 100644 index 66e48f4..0000000 --- a/support/utils/os.go +++ /dev/null @@ -1,3 +0,0 @@ -package utils - -const IPCBoundary = "~~~~~~~~~~EOR~~~~~~~~~~"