Skip to content

Commit

Permalink
Kelp GUI: fix too many open files error (closes #292 and closes #385)…
Browse files Browse the repository at this point in the history
… (#386)

* 1 - attempt to fix stream closing issues in kelpos.Blocking()

* 2 - only close manually created pipes, not Std* pipes

* 3 - the actual fix, Wait() after Read()

see: hashicorp/go-plugin#116 (comment)

* 4 - remove IPC server and components since that is causing an explosion of pipes/files and is also unused

closes #385
  • Loading branch information
seniordev-1 committed Mar 2, 2020
1 parent a064cae commit 47a79dd
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 324 deletions.
56 changes: 0 additions & 56 deletions cmd/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/stellar/kelp/database"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/plugins"
"github.com/stellar/kelp/query"
"github.com/stellar/kelp/support/logger"
"github.com/stellar/kelp/support/monitoring"
"github.com/stellar/kelp/support/networking"
Expand Down Expand Up @@ -71,7 +70,6 @@ type inputs struct {
stratConfigPath *string
operationalBuffer *float64
operationalBufferNonNativePct *float64
withIPC *bool
simMode *bool
logPrefix *string
fixedIterations *uint64
Expand Down Expand Up @@ -128,7 +126,6 @@ func init() {
// long-only flags
options.operationalBuffer = tradeCmd.Flags().Float64("operationalBuffer", 20, "buffer of native XLM to maintain beyond minimum account balance requirement")
options.operationalBufferNonNativePct = tradeCmd.Flags().Float64("operationalBufferNonNativePct", 0.001, "buffer of non-native assets to maintain as a percentage (0.001 = 0.1%)")
options.withIPC = tradeCmd.Flags().Bool("with-ipc", false, "enable IPC communication when spawned as a child process from the GUI")
options.simMode = tradeCmd.Flags().Bool("sim", false, "simulate the bot's actions without placing any trades")
options.logPrefix = tradeCmd.Flags().StringP("log", "l", "", "log to a file (and stdout) with this prefix for the filename")
options.fixedIterations = tradeCmd.Flags().Uint64("iter", 0, "only run the bot for the first N iterations (defaults value 0 runs unboundedly)")
Expand Down Expand Up @@ -566,18 +563,6 @@ func runTradeCmd(options inputs) {
db,
threadTracker,
)
startQueryServer(
l,
*options.strategy,
strategy,
botConfig,
client,
sdex,
exchangeShim,
tradingPair,
threadTracker,
&options,
)
// --- end initialization of services ---

l.Info("Starting the trader bot...")
Expand Down Expand Up @@ -683,47 +668,6 @@ func startFillTracking(
}
}

func startQueryServer(
l logger.Logger,
strategyName string,
strategy api.Strategy,
botConfig trader.BotConfig,
client *horizonclient.Client,
sdex *plugins.SDEX,
exchangeShim api.ExchangeShim,
tradingPair *model.TradingPair,
threadTracker *multithreading.ThreadTracker,
options *inputs,
) {
// only start query server (with IPC) if specifically instructed to so so from the command line.
// File descriptors in the IPC receiver will be invalid and will crash the bot if the other end of the pipe does not exist.
if !*options.withIPC {
return
}

qs := query.MakeServer(
l,
strategyName,
strategy,
botConfig,
client,
sdex,
exchangeShim,
tradingPair,
)

go func() {
defer logPanic(l, true)
e := qs.StartIPC()
if e != nil {
l.Info("")
l.Errorf("problem encountered while running the query server: %s", e)
// we want to delete all the offers and exit here because we don't want the bot to run if the query server isn't working
deleteAllOffersAndExit(l, botConfig, client, sdex, exchangeShim, threadTracker)
}
}()
}

func validateTrustlines(l logger.Logger, client *horizonclient.Client, botConfig *trader.BotConfig) {
if !botConfig.IsTradingSdex() {
l.Info("no need to validate trustlines because we're not using SDEX as the trading exchange")
Expand Down
59 changes: 19 additions & 40 deletions gui/backend/get_bot_info.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package backend

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"log"
Expand All @@ -16,59 +14,40 @@ import (
"github.com/stellar/go/support/config"
"github.com/stellar/kelp/gui/model2"
"github.com/stellar/kelp/model"
"github.com/stellar/kelp/query"
"github.com/stellar/kelp/support/kelpos"
"github.com/stellar/kelp/support/utils"
"github.com/stellar/kelp/trader"
)

const buysell = "buysell"

// botInfo is the response from the getBotInfo request
type botInfo struct {
LastUpdated string `json:"last_updated"`
TradingAccount string `json:"trading_account"`
Strategy string `json:"strategy"`
IsTestnet bool `json:"is_testnet"`
TradingPair *model.TradingPair `json:"trading_pair"`
AssetBase hProtocol.Asset `json:"asset_base"`
AssetQuote hProtocol.Asset `json:"asset_quote"`
BalanceBase float64 `json:"balance_base"`
BalanceQuote float64 `json:"balance_quote"`
NumBids int `json:"num_bids"`
NumAsks int `json:"num_asks"`
SpreadValue float64 `json:"spread_value"`
SpreadPercent float64 `json:"spread_pct"`
}

func (s *APIServer) getBotInfo(w http.ResponseWriter, r *http.Request) {
botName, e := s.parseBotName(r)
if e != nil {
s.writeError(w, fmt.Sprintf("error parsing bot name in getBotInfo: %s\n", e))
return
}

// s.runGetBotInfoViaIPC(w, botName)
s.runGetBotInfoDirect(w, botName)
}

func (s *APIServer) runGetBotInfoViaIPC(w http.ResponseWriter, botName string) {
p, exists := s.kos.GetProcess(botName)
if !exists {
log.Printf("kelp bot process with name '%s' does not exist; processes available: %v\n", botName, s.kos.RegisteredProcesses())
w.WriteHeader(http.StatusNotFound)
w.Write([]byte("{}"))
return
}

log.Printf("getBotInfo is making IPC request for botName: %s\n", botName)
p.PipeIn.Write([]byte("getBotInfo\n"))
scanner := bufio.NewScanner(p.PipeOut)
output := ""
for scanner.Scan() {
text := scanner.Text()
if strings.Contains(text, utils.IPCBoundary) {
break
}
output += text
}
var buf bytes.Buffer
e := json.Indent(&buf, []byte(output), "", " ")
if e != nil {
log.Printf("cannot indent json response (error=%s), json_response: %s\n", e, output)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("{}"))
return
}
log.Printf("getBotInfo returned IPC response for botName '%s': %s\n", botName, buf.String())

w.WriteHeader(http.StatusOK)
w.Write(buf.Bytes())
}

func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) {
log.Printf("getBotInfo is invoking logic directly for botName: %s\n", botName)

Expand Down Expand Up @@ -171,7 +150,7 @@ func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) {
spreadPct = spread / midPrice
}

bi := query.BotInfo{
bi := botInfo{
LastUpdated: time.Now().UTC().Format("1/_2/2006 15:04:05 MST"),
TradingAccount: account.AccountID,
Strategy: buysell,
Expand All @@ -189,7 +168,7 @@ func (s *APIServer) runGetBotInfoDirect(w http.ResponseWriter, botName string) {

marshalledJson, e := json.MarshalIndent(bi, "", " ")
if e != nil {
log.Printf("cannot marshall to json response (error=%s), BotInfo: %+v\n", e, bi)
log.Printf("cannot marshall to json response (error=%s), botInfo: %+v\n", e, bi)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("{}"))
return
Expand Down
84 changes: 0 additions & 84 deletions query/botInfo.go

This file was deleted.

109 changes: 0 additions & 109 deletions query/server.go

This file was deleted.

Loading

0 comments on commit 47a79dd

Please sign in to comment.