From 7dda2d5278905df88c51bc2c92a5798217722f81 Mon Sep 17 00:00:00 2001 From: crubb Date: Fri, 28 Jul 2017 22:40:46 +0200 Subject: [PATCH 01/26] Really re-order on "post only" error Check for balance error also on checkOrder (e.g. bitfinex, using websockets, reports the error later than other exchanges) --- lib/engine.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/engine.js b/lib/engine.js index ec672bfb45..e9dae0d7b7 100644 --- a/lib/engine.js +++ b/lib/engine.js @@ -282,6 +282,7 @@ module.exports = function container (get, set, clear) { if (err) return cb(err) s.api_order = api_order order.status = api_order.status + if (api_order.reject_reason) order.reject_reason = api_order.reject_reason msg('order status: ' + order.status) if (api_order.status === 'done') { order.time = new Date(api_order.done_at).getTime() @@ -292,7 +293,11 @@ module.exports = function container (get, set, clear) { } if (order.status === 'rejected' && (order.reject_reason === 'post only' || api_order.reject_reason === 'post only')) { msg('post-only ' + type + ' failed, re-ordering') - return cancelOrder(true) + return cb(null, null) + } + if (order.status === 'rejected' && order.reject_reason === 'balance') { + msg('not enough balance for ' + type + ', aborting') + return cb(null, null) } if (new Date().getTime() - order.local_time >= so.order_adjust_time) { getQuote(function (err, quote) { From 3458b7c4b3cd52d45c7a7079b5582d4f29fc5b2f Mon Sep 17 00:00:00 2001 From: crubb Date: Fri, 28 Jul 2017 22:44:17 +0200 Subject: [PATCH 02/26] Initial commit to switch bitfinex to websockets --- extensions/exchanges/bitfinex/exchange.js | 527 ++++++++++++++-------- 1 file changed, 347 insertions(+), 180 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 0f824108eb..25e4fe52c9 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -1,39 +1,241 @@ const BFX = require('bitfinex-api-node') var _ = require('lodash') + , minimist = require('minimist') , path = require('path') , n = require('numbro') module.exports = function container (get, set, clear) { var c = get('conf') + var s = {options: minimist(process.argv)} + var so = s.options - var public_client, authed_client + var pair = 'ETHUSD' + var ws_timeout = 60000 + var public_client, public_client_ws, authed_client, authed_client_ws + + var ws_trades = [] + var ws_balance = [] + var ws_orders = [] + var ws_ticker = [] + var ws_hb = [] + function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest return public_client } + + function publicWsOpen () { + public_client_ws.subscribeTrades(pair) + public_client_ws.subscribeTicker(pair) + } + + function updateWsTrades (pair, trades) { + if (trades[0] === "tu") { + trades = [trades[1]] + } else if (trades[0] === "te") { + return + } + + trades.forEach(function (trade) { + newTrade = { + trade_id: Number(trade.ID), + time: Number(trade.MTS), + size: Math.abs(trade.AMOUNT), + price: Number(trade.PRICE), + side: trade.AMOUNT > 0 ? 'buy' : 'sell' + } + ws_trades.push(newTrade) + }) + + if (ws_trades.length > 1010) + ws_trades.shift() + } - function authedClient () { - if (!authed_client) { - if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { - throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) + function updateWsTicker (pair, ticker) { + ws_ticker = ticker + } + + function updateWsHb (message) { + if (message[0] != "undefined") + ws_hb[message[0]] = Date.now() + } + + function publicClientWs () { + if (!public_client_ws) { + public_client_ws = new BFX('', '', {version: 2, transform: true}).ws + + public_client_ws.on('error', function (e) { + console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { public_client_ws.open() }, ws_timeout) + }) + + public_client_ws + .on('open', publicWsOpen) + .on('trade', updateWsTrades) + .on('ticker', updateWsTicker) + .on('message', updateWsHb) + .on('subscribed', publicWsSubscribed) + } + + return public_client_ws + } + + function publicWsSubscribed (event) { + if (event.channel === "trades") { + ws_hb[event.chanId] = Date.now() + + var intervalId = setInterval(function() { + if (ws_hb[event.chanId]) { + var timeoutThreshold = (Number(Date.now()) - ws_timeout) + if (timeoutThreshold > ws_hb[event.chanId]) { + console.warn(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.log(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + clearInterval(intervalId) + public_client_ws.ws.close() + public_client_ws.open() + } + } + }, ws_timeout) } - authed_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 1}).rest } - return authed_client + + function authWsOpen () { + try { + authed_client_ws.auth() + } + catch (e) { + console.error(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { authWsOpen() }, ws_timeout) + return + } + + var chanId = 0 + ws_hb[chanId] = Date.now() + + var intervalId = setInterval(function() { + if (ws_hb[chanId]) { + var timeoutThreshold = (Number(Date.now()) - ws_timeout) + if (timeoutThreshold > ws_hb[chanId]) { + console.warn(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.log(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + clearInterval(intervalId) + authed_client_ws.ws.close() + authed_client_ws.open() + } + } + }, ws_timeout) + } + + function updateWsOrder (ws_order) { + cid = ws_order[2] + + // https://bitfinex.readme.io/v2/reference#ws-auth-orders + var order = ws_orders['~' + cid] + if (!order) { + console.error(("\nERROR: Order " + cid + ' not found in cache.').red) + return + } + + if (ws_order[13] === 'ACTIVE' || ws_order[13].match(/^PARTIALLY FILLED/)) { + order.status = 'open' + } else if (ws_order[13].match(/^EXECUTED/)) { + order.status = 'done' + } else if (ws_order[13] === 'CANCELED') { + order.status = 'rejected' + } else if (ws_order[13] === 'POSTONLY CANCELED') { + order.status = 'rejected' + order.reject_reason = 'post only' + } + + order.bitfinex_id = ws_order[0] + order.created_at = ws_order[4] + order.filled_size = n(ws_order[7]).subtract(ws_order[6]).format('0.00000000') + order.bitfinex_status = ws_order[13] + order.price = ws_order[16] + order.price_avg = ws_order[17] + + console.error("\nUpdated ORDER: " + JSON.stringify(order)) + ws_orders['~' + cid] = order + } + + function updateWsOrderCancel (ws_order) { + cid = ws_order[2] + + if (ws_orders['~' + cid]) + { + setTimeout(function () { + console.log("\nDeleted CACHED order: " + cid) + delete(ws_orders['~' + cid]) + }, 60000 * 5) + } + + updateWsOrder(ws_order) + } + + function updateWsReqOrder (error) { + if (error[6] === 'ERROR' && error[7].match(/^Invalid order: not enough .* balance for/)) { + cid = error[4][2] + ws_orders['~' + cid].status = 'rejected' + ws_orders['~' + cid].reject_reason = 'balance' + } + } + + function updateWallet (wallets) { + if (typeof(wallets[0]) !== "object") wallets = [wallets] + + wallets.forEach(function (wallet) { + if (wallet[0] === c.bitfinex.wallet) { + ws_balance[wallet[1].toUpperCase()] = {} + ws_balance[wallet[1].toUpperCase()].balance = wallet[2] + ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 + } + }) + } + + function authedClientWs () { + if (!authed_client_ws) { + if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { + throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) + } + authed_client_ws = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws + + authed_client_ws.on('error', function (e) { + console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + authed_client_ws.ws.close() + setTimeout(function() { + authed_client_ws.open() + }, ws_timeout) + }) + + authed_client_ws + .on('ws', updateWallet) + .on('wu', updateWallet) + .on('on', updateWsOrder) + .on('on-req', updateWsReqOrder) + .on('ou', updateWsOrder) + .on('oc', updateWsOrderCancel) + .on('open', authWsOpen) + .on('message', updateWsHb) + } + + return authed_client_ws } function joinProduct (product_id) { return product_id.split('-')[0] + '' + product_id.split('-')[1] } - function retry (method, args) { - if (method !== 'getTrades') { - console.error(('\nBitfinex API is down! unable to call ' + method + ', retrying in 10s').red) + function retry (method, args, cb) { + if (so.debug) { + console.log("\nWaiting " + ("1s").yellow + " for initial websockets snapshot.") } setTimeout(function () { - exchange[method].apply(exchange, args) - }, 10000) + exchange[method].call(exchange, args, cb) + }, 1000) } function encodeQueryData(data) { @@ -43,8 +245,6 @@ module.exports = function container (get, set, clear) { return ret.join('&') } - var orders = {} - var exchange = { name: 'bitfinex', historyScan: 'backward', @@ -56,201 +256,168 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { - var func_args = [].slice.call(arguments) - var client = publicClient() - var args = {} - args.sort = -1 //backward - args.limit = 1000 - if (opts.from) { - args.start = opts.from - } - else if (opts.to) { - args.end = opts.to - } - else if (args.start && !args.end) { - args.end = args.start + 500000 - } - else if (args.end && !args.start) { - args.start = args.end - 500000 - } - var query = encodeQueryData(args) - var pair = 't' + joinProduct(opts.product_id) - client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { - if (err) return retry('getTrades', func_args, err) - var trades = body.map(function(trade) { - return { - trade_id: trade.ID, - time: trade.MTS, - size: Math.abs(trade.AMOUNT), - price: trade.PRICE, - side: trade.AMOUNT > 0 ? 'buy' : 'sell' - } - }) + if (!public_client_ws) { publicClientWs() } + + // Backfilling using the REST API + if (opts.to || opts.to === null) { + var func_args = [].slice.call(arguments) + var client = publicClient() + var args = {} + args.sort = -1 //backward + args.limit = 1000 + if (opts.from) { + args.start = opts.from + } + else if (opts.to) { + args.end = opts.to + } + else if (args.start && !args.end) { + args.end = args.start + 500000 + } + else if (args.end && !args.start) { + args.start = args.end - 500000 + } + var query = encodeQueryData(args) + var pair = 't' + joinProduct(opts.product_id) + client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { + if (err) return retry('getTrades', opts, cb) + var trades = body.map(function(trade) { + return { + trade_id: trade.ID, + time: trade.MTS, + size: Math.abs(trade.AMOUNT), + price: trade.PRICE, + side: trade.AMOUNT > 0 ? 'buy' : 'sell' + } + }) + cb(null, trades) + }) + } else { + // We're live now (i.e. opts.from is set), use websockets + if (typeof(ws_trades) === "undefined") { return retry('getTrades', opts, cb) } + trades = ws_trades.filter(function (trade) { return trade.time >= opts.from }) cb(null, trades) - }) + } }, getBalance: function (opts, cb) { - var client = authedClient() - client.wallet_balances(function (err, body) { - if (err) return(err) - var balance = {asset: 0, currency: 0} - var accounts = _(body).filter(function (body) { return body.type === c.bitfinex.wallet }).forEach(function (account) { - if (account.currency.toUpperCase() === opts.currency) { - balance.currency = n(account.amount).format('0.00000000') - balance.currency_hold = n(account.amount).subtract(account.available).format('0.00000000') - } - else if (account.currency.toUpperCase() === opts.asset) { - balance.asset = n(account.amount).format('0.00000000') - balance.asset_hold = n(account.amount).subtract(account.available).format('0.00000000') - } - }) - cb(null, balance) - }) + if (!authed_client_ws) { authedClientWs() } + if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } + + balance = {} + balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') + balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') + + balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') + balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') + + cb(null, balance) }, getQuote: function (opts, cb) { - var func_args = [].slice.call(arguments) - var client = publicClient() - var pair = 't' + joinProduct(opts.product_id) - client.ticker(pair, function (err, body) { - if (err) return retry('getQuote', func_args, err) - cb(null, { bid : String(body.BID), ask : String(body.ASK) }) - }) + cb(null, { bid : String(ws_ticker.BID), ask : String(ws_ticker.ASK) }) }, cancelOrder: function (opts, cb) { - var client = authedClient() - client.cancel_order(opts.order_id, function (err, body) { - if (err) return(err) - cb() - }) + console.error("\nCANCEL: " + opts.order_id) + order = ws_orders['~' + opts.order_id] + ws_orders['~' + opts.order_id].reject_reason = "zenbot cancel" + + var ws_cancel_order = [ + 0, + 'oc', + null, + { + id: order.bitfinex_id + } + ] + + client.send(ws_cancel_order) + cb() }, - buy: function (opts, cb) { - var client = authedClient() + trade: function (action, opts, cb) { + console.error("\nORDER " + action + ': ' + JSON.stringify(opts)) + + client = authedClientWs(); + + var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) + var symbol = 't' + joinProduct(opts.product_id) + var amount = action === 'buy' ? opts.size : opts.size * -1 + var price = opts.price + if (opts.order_type === 'maker' && typeof opts.type === 'undefined') { - opts.type = 'exchange limit' + opts.type = 'EXCHANGE LIMIT' } else if (opts.order_type === 'taker' && typeof opts.type === 'undefined') { - opts.type = 'exchange market' + opts.type = 'EXCHANGE MARKET' } if (typeof opts.post_only === 'undefined') { opts.post_only = true } - var symbol = joinProduct(opts.product_id) - var amount = opts.size - var price = opts.price - var exchange = 'bitfinex' - var side = 'buy' var type = opts.type - var is_hidden = false var is_postonly = opts.post_only - var params = { - symbol, - amount, - price, - exchange, - side, - type, - is_hidden, - is_postonly + + var order = { + id: cid, + bitfinex_id: null, + status: 'open', + price: opts.price, + size: opts.size, + post_only: !!opts.post_only, + created_at: new Date().getTime() * 1000, + filled_size: 0, + ordertype: opts.order_type } - client.make_request('order/new', params, function (err, body) { - var order = { - id: body && body.is_live === true ? body.order_id : null, - status: 'open', - price: opts.price, - size: opts.size, - post_only: !!opts.post_only, - created_at: new Date().getTime(), - filled_size: '0', - ordertype: opts.order_type - } - if (err && err.toString('Error: Invalid order: not enough exchange balance')) { - status: 'rejected' - reject_reason: 'balance' - return cb(null, order) + + var ws_order = [ + 0, + 'on', + null, + { + cid: cid, + type: type, + symbol: symbol, + amount: String(amount), + price: price, + hidden: 0, + postonly: is_postonly ? 1 : 0 } - if (err) return(err) - orders['~' + body.id] = order - cb(null, order) - }) + ] + + console.error("\n" + action + " params: " + JSON.stringify(ws_order)) + + client.send(ws_order) + ws_orders['~' + cid] = order + + return cb(null, order) }, + buy: function (opts, cb) { + exchange.trade('buy', opts, cb) + }, + sell: function (opts, cb) { - var client = authedClient() - if (opts.order_type === 'maker' && typeof opts.type === 'undefined') { - opts.type = 'exchange limit' - } - else if (opts.order_type === 'taker' && typeof opts.type === 'undefined') { - opts.type = 'exchange market' - } - if (typeof opts.post_only === 'undefined') { - opts.post_only = true - } - var symbol = joinProduct(opts.product_id) - var amount = opts.size - var price = opts.price - var exchange = 'bitfinex' - var side = 'sell' - var type = opts.type - var is_hidden = false - var is_postonly = opts.post_only - var params = { - symbol, - amount, - price, - exchange, - side, - type, - is_hidden, - is_postonly - } - client.make_request('order/new', params, function (err, body) { - var order = { - id: body && body.is_live === true ? body.order_id : null, - status: 'open', - price: opts.price, - size: opts.size, - post_only: !!opts.post_only, - created_at: new Date().getTime(), - filled_size: '0', - ordertype: opts.order_type - } - if (err && err.toString('Error: Invalid order: not enough exchange balance')) { - status: 'rejected' - reject_reason: 'balance' - return cb(null, order) - } - if (err) return(err) - orders['~' + body.id] = order - cb(null, order) - }) + exchange.trade('sell', opts, cb) }, getOrder: function (opts, cb) { - var order = orders['~' + opts.order_id] - var client = authedClient() - client.order_status(opts.order_id, function (err, body) { - if (err) return(err) - if (!body.id) { - return cb('Order not found') - } - if (body.is_cancelled === true && body.is_live === false) { - order.status = 'rejected' - order.reject_reason = 'post only' - order.done_at = new Date().getTime() - return cb(null, order) - } - if (body.is_live === false) { - order.status = 'done' - order.done_at = new Date().getTime() - order.filled_size = body.original_amount - body.executed_amount - return cb(null, order) - } - cb(null, order) - }) + var order = ws_orders['~' + opts.order_id] + + if (order.status === 'rejected' && order.reject_reason === 'post only') { + return cb(null, order) + } else if (order.status === 'rejected' && order.reject_reason === 'zenbot canceled') { + return cb(null, order) + } + + if (order.status == "done") { + order.done_at = new Date().getTime() + return cb(null, order) + } + + console.error("\ngetOrder ORDER: " + JSON.stringify(order)) + + cb(null, order) }, // return the property used for range querying. From 9d0b78f97c463dfe3ab829a37e35491177037afe Mon Sep 17 00:00:00 2001 From: crubb Date: Tue, 8 Aug 2017 20:55:29 +0200 Subject: [PATCH 03/26] Select pair on first getTrade() --- extensions/exchanges/bitfinex/exchange.js | 37 ++++++++--------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 25e4fe52c9..1222cde73c 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,10 +9,9 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var pair = 'ETHUSD' var ws_timeout = 60000 - var public_client, public_client_ws, authed_client, authed_client_ws + var pair, public_client, public_client_ws, authed_client, authed_client_ws var ws_trades = [] var ws_balance = [] @@ -67,7 +66,6 @@ module.exports = function container (get, set, clear) { public_client_ws.on('error', function (e) { console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) setTimeout(function() { public_client_ws.open() }, ws_timeout) }) @@ -90,8 +88,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - console.log(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nPublic WebSockets: No message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) public_client_ws.ws.close() public_client_ws.open() @@ -106,8 +103,7 @@ module.exports = function container (get, set, clear) { authed_client_ws.auth() } catch (e) { - console.error(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.warn(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) setTimeout(function() { authWsOpen() }, ws_timeout) return } @@ -119,8 +115,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - console.log(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\Authed WebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) authed_client_ws.ws.close() authed_client_ws.open() @@ -135,7 +130,7 @@ module.exports = function container (get, set, clear) { // https://bitfinex.readme.io/v2/reference#ws-auth-orders var order = ws_orders['~' + cid] if (!order) { - console.error(("\nERROR: Order " + cid + ' not found in cache.').red) + console.error(("\nERROR: Order " + cid + ' not found in cache (manual order?).').red) return } @@ -156,8 +151,7 @@ module.exports = function container (get, set, clear) { order.bitfinex_status = ws_order[13] order.price = ws_order[16] order.price_avg = ws_order[17] - - console.error("\nUpdated ORDER: " + JSON.stringify(order)) + ws_orders['~' + cid] = order } @@ -167,9 +161,8 @@ module.exports = function container (get, set, clear) { if (ws_orders['~' + cid]) { setTimeout(function () { - console.log("\nDeleted CACHED order: " + cid) delete(ws_orders['~' + cid]) - }, 60000 * 5) + }, 60000 * 60) } updateWsOrder(ws_order) @@ -204,7 +197,6 @@ module.exports = function container (get, set, clear) { authed_client_ws.on('error', function (e) { console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) authed_client_ws.ws.close() setTimeout(function() { authed_client_ws.open() @@ -256,6 +248,8 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { + pair = joinProduct(opts.product_id) + if (!public_client_ws) { publicClientWs() } // Backfilling using the REST API @@ -278,8 +272,8 @@ module.exports = function container (get, set, clear) { args.start = args.end - 500000 } var query = encodeQueryData(args) - var pair = 't' + joinProduct(opts.product_id) - client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { + var tpair = 't' + joinProduct(opts.product_id) + client.makePublicRequest('trades/' + tpair + '/hist?' + query, function (err, body) { if (err) return retry('getTrades', opts, cb) var trades = body.map(function(trade) { return { @@ -319,7 +313,6 @@ module.exports = function container (get, set, clear) { }, cancelOrder: function (opts, cb) { - console.error("\nCANCEL: " + opts.order_id) order = ws_orders['~' + opts.order_id] ws_orders['~' + opts.order_id].reject_reason = "zenbot cancel" @@ -337,8 +330,6 @@ module.exports = function container (get, set, clear) { }, trade: function (action, opts, cb) { - console.error("\nORDER " + action + ': ' + JSON.stringify(opts)) - client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) @@ -365,7 +356,7 @@ module.exports = function container (get, set, clear) { price: opts.price, size: opts.size, post_only: !!opts.post_only, - created_at: new Date().getTime() * 1000, + created_at: new Date().getTime(), filled_size: 0, ordertype: opts.order_type } @@ -385,8 +376,6 @@ module.exports = function container (get, set, clear) { } ] - console.error("\n" + action + " params: " + JSON.stringify(ws_order)) - client.send(ws_order) ws_orders['~' + cid] = order @@ -415,8 +404,6 @@ module.exports = function container (get, set, clear) { return cb(null, order) } - console.error("\ngetOrder ORDER: " + JSON.stringify(order)) - cb(null, order) }, From 43e720ad4b8e2fddb674f2bfb6d964bef19771e2 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 19:13:09 +0200 Subject: [PATCH 04/26] Fixed bitfinex buy and sell from commandline --- extensions/exchanges/bitfinex/exchange.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 1222cde73c..1ea7eaea8e 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -189,6 +189,17 @@ module.exports = function container (get, set, clear) { } function authedClientWs () { + if (!public_client_ws) { + publicClientWs() + + console.warn(('Warning: Not yet connected to public websockets, waiting 1s for a connection').yellow) + setTimeout(function () { + if (!authedClientWs) { authed_client_ws = authedClientWs() } + }, 1000) + + return null + } + if (!authed_client_ws) { if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) @@ -248,7 +259,7 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { - pair = joinProduct(opts.product_id) + if (!pair) { pair = joinProduct(opts.product_id) } if (!public_client_ws) { publicClientWs() } @@ -295,6 +306,8 @@ module.exports = function container (get, set, clear) { }, getBalance: function (opts, cb) { + if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (!authed_client_ws) { authedClientWs() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } @@ -330,6 +343,9 @@ module.exports = function container (get, set, clear) { }, trade: function (action, opts, cb) { + if (!pair) { pair = joinProduct(opts.product_id) } + var symbol = 't' + pair + client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) From d62946ab337f9c3663d3e0414107ffff33bdd83d Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 20:54:03 +0200 Subject: [PATCH 05/26] Bitfinex: Trigger calc of wallet balances on getBalance over websockets and retun updated balances --- extensions/exchanges/bitfinex/exchange.js | 50 ++++++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 1ea7eaea8e..99efbb0e6d 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -18,6 +18,7 @@ module.exports = function container (get, set, clear) { var ws_orders = [] var ws_ticker = [] var ws_hb = [] + var ws_walletCalcDone function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest @@ -184,6 +185,7 @@ module.exports = function container (get, set, clear) { ws_balance[wallet[1].toUpperCase()] = {} ws_balance[wallet[1].toUpperCase()].balance = wallet[2] ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 + if (wallet[4]) { ws_walletCalcDone[wallet[1]] = true } } }) } @@ -240,6 +242,12 @@ module.exports = function container (get, set, clear) { exchange[method].call(exchange, args, cb) }, 1000) } + + function waitForCalc (method, args, cb) { + setTimeout(function () { + exchange[method].call(exchange, args, cb) + }, 100) + } function encodeQueryData(data) { let ret = [] @@ -307,18 +315,48 @@ module.exports = function container (get, set, clear) { getBalance: function (opts, cb) { if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (pair && !ws_walletCalcDone) { + ws_walletCalcDone = {} + ws_walletCalcDone[opts.asset] = false + ws_walletCalcDone[opts.currency] = false + } if (!authed_client_ws) { authedClientWs() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } - balance = {} - balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') - balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') + if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { + var ws_update_wallet = [ + 0, + 'calc', + null, + [ + ["wallet_exchange_" + opts.currency], + ["wallet_exchange_" + opts.asset] + ] + ] + + authed_client_ws.send(ws_update_wallet) + return waitForCalc('getBalance', opts, cb) + } + else if ( + (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === true) || + (ws_walletCalcDone[opts.asset] === true && ws_walletCalcDone[opts.currency] === false) + ) { + return waitForCalc('getBalance', opts, cb) + } + else { + balance = {} + balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') + balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') - balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') - balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') + balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') + balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') - cb(null, balance) + ws_walletCalcDone[opts.asset] = false + ws_walletCalcDone[opts.currency] = false + + cb(null, balance) + } }, getQuote: function (opts, cb) { From fb0fd3f6e3d46a5c3ca36b2addb309af0ccbd003 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 20:59:50 +0200 Subject: [PATCH 06/26] Bitfinex: Removed duplicate definition of symbol --- extensions/exchanges/bitfinex/exchange.js | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 99efbb0e6d..828b854e2c 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -387,7 +387,6 @@ module.exports = function container (get, set, clear) { client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) - var symbol = 't' + joinProduct(opts.product_id) var amount = action === 'buy' ? opts.size : opts.size * -1 var price = opts.price From 00d29b90d2c67581b7e4e7ef702e0d663c63f812 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 22:30:00 +0200 Subject: [PATCH 07/26] Bitfinex: Now using only one websockets connection --- extensions/exchanges/bitfinex/exchange.js | 122 ++++++++-------------- 1 file changed, 46 insertions(+), 76 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 828b854e2c..456c2007d9 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,9 +9,9 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var ws_timeout = 60000 + var ws_timeout = 10000 - var pair, public_client, public_client_ws, authed_client, authed_client_ws + var pair, public_client, ws_client var ws_trades = [] var ws_balance = [] @@ -25,12 +25,7 @@ module.exports = function container (get, set, clear) { return public_client } - function publicWsOpen () { - public_client_ws.subscribeTrades(pair) - public_client_ws.subscribeTicker(pair) - } - - function updateWsTrades (pair, trades) { + function wsUpdateTrades (pair, trades) { if (trades[0] === "tu") { trades = [trades[1]] } else if (trades[0] === "te") { @@ -52,36 +47,16 @@ module.exports = function container (get, set, clear) { ws_trades.shift() } - function updateWsTicker (pair, ticker) { + function wsUpdateTicker (pair, ticker) { ws_ticker = ticker } - function updateWsHb (message) { + function wsUpdateHb (message) { if (message[0] != "undefined") ws_hb[message[0]] = Date.now() } - function publicClientWs () { - if (!public_client_ws) { - public_client_ws = new BFX('', '', {version: 2, transform: true}).ws - - public_client_ws.on('error', function (e) { - console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { public_client_ws.open() }, ws_timeout) - }) - - public_client_ws - .on('open', publicWsOpen) - .on('trade', updateWsTrades) - .on('ticker', updateWsTicker) - .on('message', updateWsHb) - .on('subscribed', publicWsSubscribed) - } - - return public_client_ws - } - - function publicWsSubscribed (event) { + function wsSubscribed (event) { if (event.channel === "trades") { ws_hb[event.chanId] = Date.now() @@ -89,23 +64,23 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nPublic WebSockets: No message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel '" + ws_client.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) - public_client_ws.ws.close() - public_client_ws.open() + ws_client.ws.close() + ws_client.open() } } }, ws_timeout) } } - function authWsOpen () { + function wsOpen () { try { - authed_client_ws.auth() + ws_client.auth() } catch (e) { - console.warn(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { authWsOpen() }, ws_timeout) + console.warn(("\nWebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { wsOpen() }, ws_timeout) return } @@ -116,16 +91,19 @@ module.exports = function container (get, set, clear) { if (ws_hb[chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\Authed WebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) - authed_client_ws.ws.close() - authed_client_ws.open() + ws_client.ws.close() + ws_client.open() } } }, ws_timeout) + + ws_client.subscribeTrades(pair) + ws_client.subscribeTicker(pair) } - function updateWsOrder (ws_order) { + function wsUpdateOrder (ws_order) { cid = ws_order[2] // https://bitfinex.readme.io/v2/reference#ws-auth-orders @@ -156,7 +134,7 @@ module.exports = function container (get, set, clear) { ws_orders['~' + cid] = order } - function updateWsOrderCancel (ws_order) { + function wsUpdateOrderCancel (ws_order) { cid = ws_order[2] if (ws_orders['~' + cid]) @@ -166,10 +144,10 @@ module.exports = function container (get, set, clear) { }, 60000 * 60) } - updateWsOrder(ws_order) + wsUpdateOrder(ws_order) } - function updateWsReqOrder (error) { + function wsUpdateReqOrder (error) { if (error[6] === 'ERROR' && error[7].match(/^Invalid order: not enough .* balance for/)) { cid = error[4][2] ws_orders['~' + cid].status = 'rejected' @@ -190,44 +168,36 @@ module.exports = function container (get, set, clear) { }) } - function authedClientWs () { - if (!public_client_ws) { - publicClientWs() - - console.warn(('Warning: Not yet connected to public websockets, waiting 1s for a connection').yellow) - setTimeout(function () { - if (!authedClientWs) { authed_client_ws = authedClientWs() } - }, 1000) - - return null - } - - if (!authed_client_ws) { + function wsClient () { + if (!ws_client) { if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) } - authed_client_ws = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws + ws_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws - authed_client_ws.on('error', function (e) { - console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - authed_client_ws.ws.close() + ws_client.on('error', function (e) { + console.warn(("\nWebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + ws_client.ws.close() setTimeout(function() { - authed_client_ws.open() + ws_client.open() }, ws_timeout) }) - authed_client_ws + ws_client + .on('open', wsOpen) + .on('subscribed', wsSubscribed) + .on('message', wsUpdateHb) + .on('trade', wsUpdateTrades) + .on('ticker', wsUpdateTicker) .on('ws', updateWallet) .on('wu', updateWallet) - .on('on', updateWsOrder) - .on('on-req', updateWsReqOrder) - .on('ou', updateWsOrder) - .on('oc', updateWsOrderCancel) - .on('open', authWsOpen) - .on('message', updateWsHb) + .on('on', wsUpdateOrder) + .on('on-req', wsUpdateReqOrder) + .on('ou', wsUpdateOrder) + .on('oc', wsUpdateOrderCancel) } - return authed_client_ws + return ws_client } function joinProduct (product_id) { @@ -269,7 +239,7 @@ module.exports = function container (get, set, clear) { getTrades: function (opts, cb) { if (!pair) { pair = joinProduct(opts.product_id) } - if (!public_client_ws) { publicClientWs() } + if (!ws_client) { ws_client = wsClient() } // Backfilling using the REST API if (opts.to || opts.to === null) { @@ -321,7 +291,7 @@ module.exports = function container (get, set, clear) { ws_walletCalcDone[opts.currency] = false } - if (!authed_client_ws) { authedClientWs() } + if (!ws_client) { ws_client = wsClient() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { @@ -335,7 +305,7 @@ module.exports = function container (get, set, clear) { ] ] - authed_client_ws.send(ws_update_wallet) + ws_client.send(ws_update_wallet) return waitForCalc('getBalance', opts, cb) } else if ( @@ -384,7 +354,7 @@ module.exports = function container (get, set, clear) { if (!pair) { pair = joinProduct(opts.product_id) } var symbol = 't' + pair - client = authedClientWs(); + if (!ws_client) { ws_client = wsClient() } var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) var amount = action === 'buy' ? opts.size : opts.size * -1 @@ -429,7 +399,7 @@ module.exports = function container (get, set, clear) { } ] - client.send(ws_order) + ws_client.send(ws_order) ws_orders['~' + cid] = order return cb(null, order) From b9175095107d0cbc157113fa3eb2b5e0d224a107 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 23:31:24 +0200 Subject: [PATCH 08/26] Bitfinex WS: Better error handling on connect/auth/sending messages --- extensions/exchanges/bitfinex/exchange.js | 112 +++++++++++++--------- 1 file changed, 69 insertions(+), 43 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 456c2007d9..7450654ecf 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,7 +9,7 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var ws_timeout = 10000 + var ws_timeout = 60000 var pair, public_client, ws_client @@ -19,6 +19,7 @@ module.exports = function container (get, set, clear) { var ws_ticker = [] var ws_hb = [] var ws_walletCalcDone + var ws_client_isAuthed = false function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest @@ -51,12 +52,18 @@ module.exports = function container (get, set, clear) { ws_ticker = ticker } - function wsUpdateHb (message) { + function wsMessage (message) { + if (message.event == "auth" && message.status == "OK") { + if (so.debug) { console.log(('WebSockets: We are now authenticated.').green) } + ws_client_isAuthed = true + } + if (message[0] != "undefined") ws_hb[message[0]] = Date.now() } function wsSubscribed (event) { + // We only use the 'trades' channel for heartbeats. That one should be most frequently updated. if (event.channel === "trades") { ws_hb[event.chanId] = Date.now() @@ -64,7 +71,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nWebSockets: No message on channel '" + ws_client.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) ws_client.ws.close() ws_client.open() @@ -75,30 +82,7 @@ module.exports = function container (get, set, clear) { } function wsOpen () { - try { - ws_client.auth() - } - catch (e) { - console.warn(("\nWebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { wsOpen() }, ws_timeout) - return - } - - var chanId = 0 - ws_hb[chanId] = Date.now() - - var intervalId = setInterval(function() { - if (ws_hb[chanId]) { - var timeoutThreshold = (Number(Date.now()) - ws_timeout) - if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\nWebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - clearInterval(intervalId) - ws_client.ws.close() - ws_client.open() - } - } - }, ws_timeout) - + ws_client.auth() ws_client.subscribeTrades(pair) ws_client.subscribeTicker(pair) } @@ -174,19 +158,11 @@ module.exports = function container (get, set, clear) { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) } ws_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws - - ws_client.on('error', function (e) { - console.warn(("\nWebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - ws_client.ws.close() - setTimeout(function() { - ws_client.open() - }, ws_timeout) - }) ws_client .on('open', wsOpen) .on('subscribed', wsSubscribed) - .on('message', wsUpdateHb) + .on('message', wsMessage) .on('trade', wsUpdateTrades) .on('ticker', wsUpdateTicker) .on('ws', updateWallet) @@ -195,6 +171,27 @@ module.exports = function container (get, set, clear) { .on('on-req', wsUpdateReqOrder) .on('ou', wsUpdateOrder) .on('oc', wsUpdateOrderCancel) + + ws_client.on('error', function (e) { + if (e.event == "auth" && e.status == "FAILED") { + ws_client_isAuthed = false + errorMessage = ('WebSockets Error: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_timeout / 1000 + ' seconds').yellow + '.' + if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." + console.error(errorMessage) + + setTimeout(function () { + ws_client.auth() + }, ws_timeout) + } + else { + console.error(("\nWebSockets Error: An unhandled error occured.").red + " Consider reporting.\nSince those are mostly 'connect' related: Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.error(e) + ws_client.ws.close() + setTimeout(function() { + ws_client.open() + }, ws_timeout) + } + }) } return ws_client @@ -205,9 +202,6 @@ module.exports = function container (get, set, clear) { } function retry (method, args, cb) { - if (so.debug) { - console.log("\nWaiting " + ("1s").yellow + " for initial websockets snapshot.") - } setTimeout(function () { exchange[method].call(exchange, args, cb) }, 1000) @@ -292,7 +286,12 @@ module.exports = function container (get, set, clear) { } if (!ws_client) { ws_client = wsClient() } - if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } + if (Object.keys(ws_balance).length === 0) { + if (so.debug && ws_client_isAuthed === true) { + console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").yellow) + } + return retry('getBalance', opts, cb) + } if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { var ws_update_wallet = [ @@ -305,7 +304,16 @@ module.exports = function container (get, set, clear) { ] ] - ws_client.send(ws_update_wallet) + try { + ws_client.send(ws_update_wallet) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets Error: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + } + return retry('getBalance', opts, cb) + } + return waitForCalc('getBalance', opts, cb) } else if ( @@ -346,7 +354,15 @@ module.exports = function container (get, set, clear) { } ] - client.send(ws_cancel_order) + try { + ws_client.send(ws_cancel_order) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + } + return retry('cancelOrder', opts, cb) + } cb() }, @@ -399,7 +415,17 @@ module.exports = function container (get, set, clear) { } ] - ws_client.send(ws_order) + try { + ws_client.send(ws_order) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets: Cannot send trade (maybe connection not open?).").red) + } + + order.status = 'rejected' + order.reject_reason = 'could not send order over websockets' + } ws_orders['~' + cid] = order return cb(null, order) From a39ef829489d016d0adb3dcd8722933ae83f3a3e Mon Sep 17 00:00:00 2001 From: crubb Date: Thu, 10 Aug 2017 17:35:04 +0200 Subject: [PATCH 09/26] Bitfinex WS: Further improving retries and error handling --- extensions/exchanges/bitfinex/exchange.js | 35 +++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 7450654ecf..5bf6f40dcf 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -10,6 +10,8 @@ module.exports = function container (get, set, clear) { var so = s.options var ws_timeout = 60000 + var ws_retry = 10000 + var ws_wait_on_apikey_error = 60000 * 5 var pair, public_client, ws_client @@ -71,7 +73,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nWebSockets: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) clearInterval(intervalId) ws_client.ws.close() ws_client.open() @@ -93,7 +95,7 @@ module.exports = function container (get, set, clear) { // https://bitfinex.readme.io/v2/reference#ws-auth-orders var order = ws_orders['~' + cid] if (!order) { - console.error(("\nERROR: Order " + cid + ' not found in cache (manual order?).').red) + console.warn(("\nWarning: Order " + cid + ' not found in cache (manual order?).').red) return } @@ -125,7 +127,7 @@ module.exports = function container (get, set, clear) { { setTimeout(function () { delete(ws_orders['~' + cid]) - }, 60000 * 60) + }, 60000 * 60 * 12) } wsUpdateOrder(ws_order) @@ -175,21 +177,21 @@ module.exports = function container (get, set, clear) { ws_client.on('error', function (e) { if (e.event == "auth" && e.status == "FAILED") { ws_client_isAuthed = false - errorMessage = ('WebSockets Error: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_timeout / 1000 + ' seconds').yellow + '.' + errorMessage = ('WebSockets Warning: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_wait_on_apikey_error / 1000 + ' seconds').yellow + '.' if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." - console.error(errorMessage) + console.warn(errorMessage) setTimeout(function () { ws_client.auth() - }, ws_timeout) + }, ws_wait_on_apikey_error) } else { - console.error(("\nWebSockets Error: An unhandled error occured.").red + " Consider reporting.\nSince those are mostly 'connect' related: Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.error(("\nWebSockets Error: An unhandled error occured.").red + "\nSince those are mostly 'connect' related: Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '. Otherwise, consider reporting.') console.error(e) ws_client.ws.close() setTimeout(function() { ws_client.open() - }, ws_timeout) + }, ws_retry) } }) } @@ -204,13 +206,13 @@ module.exports = function container (get, set, clear) { function retry (method, args, cb) { setTimeout(function () { exchange[method].call(exchange, args, cb) - }, 1000) + }, ws_retry) } function waitForCalc (method, args, cb) { setTimeout(function () { exchange[method].call(exchange, args, cb) - }, 100) + }, 50) } function encodeQueryData(data) { @@ -288,7 +290,7 @@ module.exports = function container (get, set, clear) { if (!ws_client) { ws_client = wsClient() } if (Object.keys(ws_balance).length === 0) { if (so.debug && ws_client_isAuthed === true) { - console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").yellow) + console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('getBalance', opts, cb) } @@ -309,7 +311,8 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets Error: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('getBalance', opts, cb) } @@ -359,7 +362,8 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('cancelOrder', opts, cb) } @@ -420,14 +424,15 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets: Cannot send trade (maybe connection not open?).").red) + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not retry automatically.").yellow) } order.status = 'rejected' order.reject_reason = 'could not send order over websockets' } ws_orders['~' + cid] = order - + return cb(null, order) }, From 1d142dca2d98ae666a4f9d0d53b92687195accab Mon Sep 17 00:00:00 2001 From: crubb Date: Thu, 10 Aug 2017 17:54:48 +0200 Subject: [PATCH 10/26] Bitfinex WS: Not directly calling the websockets part of the bitfinex module for close() anymore --- extensions/exchanges/bitfinex/exchange.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 5bf6f40dcf..9542ce4cc7 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -75,7 +75,7 @@ module.exports = function container (get, set, clear) { if (timeoutThreshold > ws_hb[event.chanId]) { console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) clearInterval(intervalId) - ws_client.ws.close() + ws_client.close() ws_client.open() } } @@ -188,7 +188,7 @@ module.exports = function container (get, set, clear) { else { console.error(("\nWebSockets Error: An unhandled error occured.").red + "\nSince those are mostly 'connect' related: Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '. Otherwise, consider reporting.') console.error(e) - ws_client.ws.close() + ws_client.close() setTimeout(function() { ws_client.open() }, ws_retry) From af971ef39e7031ecb4ee98b710455374dfceca10 Mon Sep 17 00:00:00 2001 From: Clifford Roche Date: Fri, 11 Aug 2017 04:49:45 -0400 Subject: [PATCH 11/26] Fix bug with preroll when 0 volume (#464) * Fix bug with preroll when 0 volume Fix getCursor on Kraken exchange, getCursor must return milliseconds to be consisitent with API calls to getTrades in backfill and trade commands. It was returning seconds, this would cause entries to be returned outside of their respective time periods. Also removed opts.to and replace with opts.from. opts.to is only used for 'backward' history scans, not forward. This was always effectively undefined in this case. * Formatting kraken exchange.js --- commands/trade.js | 6 ++- extensions/exchanges/kraken/exchange.js | 67 ++++++++++++++----------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/commands/trade.js b/commands/trade.js index d4bb9069c6..352171ca1b 100644 --- a/commands/trade.js +++ b/commands/trade.js @@ -114,9 +114,11 @@ module.exports = function container (get, set, clear) { limit: 1000 } if (db_cursor) { + trade_cursor = db_cursor opts.query.time = {$gt: db_cursor} } else { + trade_cursor = query_start opts.query.time = {$gte: query_start} } get('db.trades').select(opts, function (err, trades) { @@ -257,7 +259,9 @@ module.exports = function container (get, set, clear) { console.error('\n' + moment().format('YYYY-MM-DD HH:mm:ss') + ' - error saving session') console.error(err) } - engine.writeReport(true) + if (s.period) { + engine.writeReport(true) + } }) }) } diff --git a/extensions/exchanges/kraken/exchange.js b/extensions/exchanges/kraken/exchange.js index 38c212b449..f79b1aff22 100644 --- a/extensions/exchanges/kraken/exchange.js +++ b/extensions/exchanges/kraken/exchange.js @@ -7,7 +7,9 @@ var KrakenClient = require('kraken-api'), module.exports = function container(get, set, clear) { var c = get('conf') - var s = {options: minimist(process.argv)} + var s = { + options: minimist(process.argv) + } var so = s.options var public_client, authed_client @@ -52,7 +54,7 @@ module.exports = function container(get, set, clear) { } console.warn(('\nKraken API warning - unable to call ' + method + ' (' + errorMsg + '), retrying in ' + timeout / 1000 + 's').yellow) } - setTimeout(function () { + setTimeout(function() { exchange[method].apply(exchange, args) }, timeout) } @@ -65,23 +67,20 @@ module.exports = function container(get, set, clear) { makerFee: 0.16, takerFee: 0.26, // The limit for the public API is not documented, 1750 ms between getTrades in backfilling seems to do the trick to omit warning messages. - backfillRateLimit: 2000, + backfillRateLimit: 3500, - getProducts: function () { + getProducts: function() { return require('./products.json') }, - getTrades: function (opts, cb) { + getTrades: function(opts, cb) { var func_args = [].slice.call(arguments) var client = publicClient() var args = { pair: joinProduct(opts.product_id) } - if (opts.from) { - args.since = parseFloat(opts.from) * 1000000000 - } - client.api('Trades', args, function (error, data) { + client.api('Trades', args, function(error, data) { if (error && error.message.match(recoverableErrors)) { return retry('getTrades', func_args, error) } @@ -93,11 +92,14 @@ module.exports = function container(get, set, clear) { if (data.error.length) { return cb(data.error.join(',')) } + if (opts.from) { + args.since = Number(opts.from) * 1000000000 + } var trades = [] - Object.keys(data.result[args.pair]).forEach(function (i) { + Object.keys(data.result[args.pair]).forEach(function(i) { var trade = data.result[args.pair][i] - if (!opts.to || (parseFloat(opts.to) >= parseFloat(trade[2]))) { + if (!opts.from || (Number(opts.from) < moment.unix((trade[2]).valueOf()))) { trades.push({ trade_id: trade[2] + trade[1] + trade[0], time: moment.unix(trade[2]).valueOf(), @@ -107,17 +109,20 @@ module.exports = function container(get, set, clear) { }) } }) + cb(null, trades) }) }, - getBalance: function (opts, cb) { + getBalance: function(opts, cb) { var args = [].slice.call(arguments) var client = authedClient() - client.api('Balance', null, function (error, data) { + client.api('Balance', null, function(error, data) { var balance = { - asset: 0, - currency: 0 + asset: '0', + asset_hold: '0', + currency: '0', + currency_hold: '0' } if (error) { @@ -128,28 +133,32 @@ module.exports = function container(get, set, clear) { console.error(error) return cb(error) } + if (data.error.length) { return cb(data.error.join(',')) } + if (data.result[opts.currency]) { balance.currency = n(data.result[opts.currency]).format('0.00000000') - balance.currency_hold = 0 + balance.currency_hold = '0' } + if (data.result[opts.asset]) { balance.asset = n(data.result[opts.asset]).format('0.00000000') - balance.asset_hold = 0 + balance.asset_hold = '0' } + cb(null, balance) }) }, - getQuote: function (opts, cb) { + getQuote: function(opts, cb) { var args = [].slice.call(arguments) var client = publicClient() var pair = joinProduct(opts.product_id) client.api('Ticker', { pair: pair - }, function (error, data) { + }, function(error, data) { if (error) { if (error.message.match(recoverableErrors)) { return retry('getQuote', args, error) @@ -168,12 +177,12 @@ module.exports = function container(get, set, clear) { }) }, - cancelOrder: function (opts, cb) { + cancelOrder: function(opts, cb) { var args = [].slice.call(arguments) var client = authedClient() client.api('CancelOrder', { txid: opts.order_id - }, function (error, data) { + }, function(error, data) { if (error) { if (error.message.match(recoverableErrors)) { return retry('cancelOrder', args, error) @@ -193,7 +202,7 @@ module.exports = function container(get, set, clear) { }) }, - trade: function (type, opts, cb) { + trade: function(type, opts, cb) { var args = [].slice.call(arguments) var client = authedClient() var params = { @@ -213,7 +222,7 @@ module.exports = function container(get, set, clear) { console.log("trade") console.log(params) } - client.api('AddOrder', params, function (error, data) { + client.api('AddOrder', params, function(error, data) { if (error && error.message.match(recoverableErrors)) { return retry('trade', args, error) } @@ -264,15 +273,15 @@ module.exports = function container(get, set, clear) { }) }, - buy: function (opts, cb) { + buy: function(opts, cb) { exchange.trade('buy', opts, cb) }, - sell: function (opts, cb) { + sell: function(opts, cb) { exchange.trade('sell', opts, cb) }, - getOrder: function (opts, cb) { + getOrder: function(opts, cb) { var args = [].slice.call(arguments) var order = orders['~' + opts.order_id] if (!order) return cb(new Error('order not found in cache')) @@ -280,7 +289,7 @@ module.exports = function container(get, set, clear) { var params = { txid: opts.order_id } - client.api('QueryOrders', params, function (error, data) { + client.api('QueryOrders', params, function(error, data) { if (error) { if (error.message.match(recoverableErrors)) { return retry('getOrder', args, error) @@ -322,8 +331,8 @@ module.exports = function container(get, set, clear) { }, // return the property used for range querying. - getCursor: function (trade) { - return Math.floor((trade.time || trade) / 1000) + getCursor: function(trade) { + return (trade.time || trade) } } return exchange From f68c6b6dceec0cea1426a0981fd88f1f3ee458e1 Mon Sep 17 00:00:00 2001 From: Christian Rubbert Date: Fri, 11 Aug 2017 17:15:26 +0200 Subject: [PATCH 12/26] Bitfinex WS: Fixed wallet recalc when asset or currency are 0 --- extensions/exchanges/bitfinex/exchange.js | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 9542ce4cc7..543208b599 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -149,7 +149,7 @@ module.exports = function container (get, set, clear) { ws_balance[wallet[1].toUpperCase()] = {} ws_balance[wallet[1].toUpperCase()].balance = wallet[2] ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 - if (wallet[4]) { ws_walletCalcDone[wallet[1]] = true } + if (wallet[4] !== null) { ws_walletCalcDone[wallet[1]] = true } } }) } @@ -235,8 +235,6 @@ module.exports = function container (get, set, clear) { getTrades: function (opts, cb) { if (!pair) { pair = joinProduct(opts.product_id) } - if (!ws_client) { ws_client = wsClient() } - // Backfilling using the REST API if (opts.to || opts.to === null) { var func_args = [].slice.call(arguments) @@ -273,6 +271,7 @@ module.exports = function container (get, set, clear) { }) } else { // We're live now (i.e. opts.from is set), use websockets + if (!ws_client) { wsClient() } if (typeof(ws_trades) === "undefined") { return retry('getTrades', opts, cb) } trades = ws_trades.filter(function (trade) { return trade.time >= opts.from }) cb(null, trades) @@ -281,13 +280,14 @@ module.exports = function container (get, set, clear) { getBalance: function (opts, cb) { if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (pair && !ws_walletCalcDone) { ws_walletCalcDone = {} ws_walletCalcDone[opts.asset] = false ws_walletCalcDone[opts.currency] = false } - if (!ws_client) { ws_client = wsClient() } + if (!ws_client) { wsClient() } if (Object.keys(ws_balance).length === 0) { if (so.debug && ws_client_isAuthed === true) { console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') @@ -307,14 +307,16 @@ module.exports = function container (get, set, clear) { ] try { + ws_walletCalcDone[opts.asset] = "inProgress" + ws_walletCalcDone[opts.currency] = "inProgress" + ws_client.send(ws_update_wallet) } catch (e) { if (so.debug) { console.warn(e) - console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') + console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + ' Waiting for reconnect.') } - return retry('getBalance', opts, cb) } return waitForCalc('getBalance', opts, cb) @@ -374,7 +376,7 @@ module.exports = function container (get, set, clear) { if (!pair) { pair = joinProduct(opts.product_id) } var symbol = 't' + pair - if (!ws_client) { ws_client = wsClient() } + if (!ws_client) { wsClient() } var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) var amount = action === 'buy' ? opts.size : opts.size * -1 @@ -425,7 +427,7 @@ module.exports = function container (get, set, clear) { catch (e) { if (so.debug) { console.warn(e) - console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not retry automatically.").yellow) + console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not just repeat the order automatically.").yellow) } order.status = 'rejected' From 199758e5943d088b9753f7538f721b04bdde4a41 Mon Sep 17 00:00:00 2001 From: crubb Date: Fri, 28 Jul 2017 22:40:46 +0200 Subject: [PATCH 13/26] Really re-order on "post only" error Check for balance error also on checkOrder (e.g. bitfinex, using websockets, reports the error later than other exchanges) --- lib/engine.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/engine.js b/lib/engine.js index ec672bfb45..e9dae0d7b7 100644 --- a/lib/engine.js +++ b/lib/engine.js @@ -282,6 +282,7 @@ module.exports = function container (get, set, clear) { if (err) return cb(err) s.api_order = api_order order.status = api_order.status + if (api_order.reject_reason) order.reject_reason = api_order.reject_reason msg('order status: ' + order.status) if (api_order.status === 'done') { order.time = new Date(api_order.done_at).getTime() @@ -292,7 +293,11 @@ module.exports = function container (get, set, clear) { } if (order.status === 'rejected' && (order.reject_reason === 'post only' || api_order.reject_reason === 'post only')) { msg('post-only ' + type + ' failed, re-ordering') - return cancelOrder(true) + return cb(null, null) + } + if (order.status === 'rejected' && order.reject_reason === 'balance') { + msg('not enough balance for ' + type + ', aborting') + return cb(null, null) } if (new Date().getTime() - order.local_time >= so.order_adjust_time) { getQuote(function (err, quote) { From 9c15ea8bae0847b320665fe2bd82ba87f39fdb9c Mon Sep 17 00:00:00 2001 From: crubb Date: Fri, 28 Jul 2017 22:44:17 +0200 Subject: [PATCH 14/26] Initial commit to switch bitfinex to websockets --- extensions/exchanges/bitfinex/exchange.js | 527 ++++++++++++++-------- 1 file changed, 347 insertions(+), 180 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 5a25f2b2fa..2783b1f913 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -1,39 +1,241 @@ const BFX = require('bitfinex-api-node') var _ = require('lodash') + , minimist = require('minimist') , path = require('path') , n = require('numbro') module.exports = function container (get, set, clear) { var c = get('conf') + var s = {options: minimist(process.argv)} + var so = s.options - var public_client, authed_client + var pair = 'ETHUSD' + var ws_timeout = 60000 + var public_client, public_client_ws, authed_client, authed_client_ws + + var ws_trades = [] + var ws_balance = [] + var ws_orders = [] + var ws_ticker = [] + var ws_hb = [] + function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest return public_client } + + function publicWsOpen () { + public_client_ws.subscribeTrades(pair) + public_client_ws.subscribeTicker(pair) + } + + function updateWsTrades (pair, trades) { + if (trades[0] === "tu") { + trades = [trades[1]] + } else if (trades[0] === "te") { + return + } + + trades.forEach(function (trade) { + newTrade = { + trade_id: Number(trade.ID), + time: Number(trade.MTS), + size: Math.abs(trade.AMOUNT), + price: Number(trade.PRICE), + side: trade.AMOUNT > 0 ? 'buy' : 'sell' + } + ws_trades.push(newTrade) + }) + + if (ws_trades.length > 1010) + ws_trades.shift() + } - function authedClient () { - if (!authed_client) { - if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { - throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) + function updateWsTicker (pair, ticker) { + ws_ticker = ticker + } + + function updateWsHb (message) { + if (message[0] != "undefined") + ws_hb[message[0]] = Date.now() + } + + function publicClientWs () { + if (!public_client_ws) { + public_client_ws = new BFX('', '', {version: 2, transform: true}).ws + + public_client_ws.on('error', function (e) { + console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { public_client_ws.open() }, ws_timeout) + }) + + public_client_ws + .on('open', publicWsOpen) + .on('trade', updateWsTrades) + .on('ticker', updateWsTicker) + .on('message', updateWsHb) + .on('subscribed', publicWsSubscribed) + } + + return public_client_ws + } + + function publicWsSubscribed (event) { + if (event.channel === "trades") { + ws_hb[event.chanId] = Date.now() + + var intervalId = setInterval(function() { + if (ws_hb[event.chanId]) { + var timeoutThreshold = (Number(Date.now()) - ws_timeout) + if (timeoutThreshold > ws_hb[event.chanId]) { + console.warn(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.log(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + clearInterval(intervalId) + public_client_ws.ws.close() + public_client_ws.open() + } + } + }, ws_timeout) } - authed_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 1}).rest } - return authed_client + + function authWsOpen () { + try { + authed_client_ws.auth() + } + catch (e) { + console.error(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { authWsOpen() }, ws_timeout) + return + } + + var chanId = 0 + ws_hb[chanId] = Date.now() + + var intervalId = setInterval(function() { + if (ws_hb[chanId]) { + var timeoutThreshold = (Number(Date.now()) - ws_timeout) + if (timeoutThreshold > ws_hb[chanId]) { + console.warn(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.log(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + clearInterval(intervalId) + authed_client_ws.ws.close() + authed_client_ws.open() + } + } + }, ws_timeout) + } + + function updateWsOrder (ws_order) { + cid = ws_order[2] + + // https://bitfinex.readme.io/v2/reference#ws-auth-orders + var order = ws_orders['~' + cid] + if (!order) { + console.error(("\nERROR: Order " + cid + ' not found in cache.').red) + return + } + + if (ws_order[13] === 'ACTIVE' || ws_order[13].match(/^PARTIALLY FILLED/)) { + order.status = 'open' + } else if (ws_order[13].match(/^EXECUTED/)) { + order.status = 'done' + } else if (ws_order[13] === 'CANCELED') { + order.status = 'rejected' + } else if (ws_order[13] === 'POSTONLY CANCELED') { + order.status = 'rejected' + order.reject_reason = 'post only' + } + + order.bitfinex_id = ws_order[0] + order.created_at = ws_order[4] + order.filled_size = n(ws_order[7]).subtract(ws_order[6]).format('0.00000000') + order.bitfinex_status = ws_order[13] + order.price = ws_order[16] + order.price_avg = ws_order[17] + + console.error("\nUpdated ORDER: " + JSON.stringify(order)) + ws_orders['~' + cid] = order + } + + function updateWsOrderCancel (ws_order) { + cid = ws_order[2] + + if (ws_orders['~' + cid]) + { + setTimeout(function () { + console.log("\nDeleted CACHED order: " + cid) + delete(ws_orders['~' + cid]) + }, 60000 * 5) + } + + updateWsOrder(ws_order) + } + + function updateWsReqOrder (error) { + if (error[6] === 'ERROR' && error[7].match(/^Invalid order: not enough .* balance for/)) { + cid = error[4][2] + ws_orders['~' + cid].status = 'rejected' + ws_orders['~' + cid].reject_reason = 'balance' + } + } + + function updateWallet (wallets) { + if (typeof(wallets[0]) !== "object") wallets = [wallets] + + wallets.forEach(function (wallet) { + if (wallet[0] === c.bitfinex.wallet) { + ws_balance[wallet[1].toUpperCase()] = {} + ws_balance[wallet[1].toUpperCase()].balance = wallet[2] + ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 + } + }) + } + + function authedClientWs () { + if (!authed_client_ws) { + if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { + throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) + } + authed_client_ws = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws + + authed_client_ws.on('error', function (e) { + console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.log(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + authed_client_ws.ws.close() + setTimeout(function() { + authed_client_ws.open() + }, ws_timeout) + }) + + authed_client_ws + .on('ws', updateWallet) + .on('wu', updateWallet) + .on('on', updateWsOrder) + .on('on-req', updateWsReqOrder) + .on('ou', updateWsOrder) + .on('oc', updateWsOrderCancel) + .on('open', authWsOpen) + .on('message', updateWsHb) + } + + return authed_client_ws } function joinProduct (product_id) { return product_id.split('-')[0] + '' + product_id.split('-')[1] } - function retry (method, args) { - if (method !== 'getTrades') { - console.error(('\nBitfinex API is down! unable to call ' + method + ', retrying in 10s').red) + function retry (method, args, cb) { + if (so.debug) { + console.log("\nWaiting " + ("1s").yellow + " for initial websockets snapshot.") } setTimeout(function () { - exchange[method].apply(exchange, args) - }, 10000) + exchange[method].call(exchange, args, cb) + }, 1000) } function encodeQueryData(data) { @@ -43,8 +245,6 @@ module.exports = function container (get, set, clear) { return ret.join('&') } - var orders = {} - var exchange = { name: 'bitfinex', historyScan: 'backward', @@ -56,201 +256,168 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { - var func_args = [].slice.call(arguments) - var client = publicClient() - var args = {} - args.sort = -1 //backward - args.limit = 1000 - if (opts.from) { - args.start = opts.from - } - else if (opts.to) { - args.end = opts.to - } - else if (args.start && !args.end) { - args.end = args.start + 500000 - } - else if (args.end && !args.start) { - args.start = args.end - 500000 - } - var query = encodeQueryData(args) - var pair = 't' + joinProduct(opts.product_id) - client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { - if (err) return retry('getTrades', func_args, err) - var trades = body.map(function(trade) { - return { - trade_id: trade.ID, - time: trade.MTS, - size: Math.abs(trade.AMOUNT), - price: trade.PRICE, - side: trade.AMOUNT > 0 ? 'buy' : 'sell' - } - }) + if (!public_client_ws) { publicClientWs() } + + // Backfilling using the REST API + if (opts.to || opts.to === null) { + var func_args = [].slice.call(arguments) + var client = publicClient() + var args = {} + args.sort = -1 //backward + args.limit = 1000 + if (opts.from) { + args.start = opts.from + } + else if (opts.to) { + args.end = opts.to + } + else if (args.start && !args.end) { + args.end = args.start + 500000 + } + else if (args.end && !args.start) { + args.start = args.end - 500000 + } + var query = encodeQueryData(args) + var pair = 't' + joinProduct(opts.product_id) + client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { + if (err) return retry('getTrades', opts, cb) + var trades = body.map(function(trade) { + return { + trade_id: trade.ID, + time: trade.MTS, + size: Math.abs(trade.AMOUNT), + price: trade.PRICE, + side: trade.AMOUNT > 0 ? 'buy' : 'sell' + } + }) + cb(null, trades) + }) + } else { + // We're live now (i.e. opts.from is set), use websockets + if (typeof(ws_trades) === "undefined") { return retry('getTrades', opts, cb) } + trades = ws_trades.filter(function (trade) { return trade.time >= opts.from }) cb(null, trades) - }) + } }, getBalance: function (opts, cb) { - var client = authedClient() - client.wallet_balances(function (err, body) { - if (err) return(err) - var balance = {asset: 0, currency: 0} - var accounts = _(body).filter(function (body) { return body.type === c.bitfinex.wallet }).forEach(function (account) { - if (account.currency.toUpperCase() === opts.currency) { - balance.currency = n(account.amount).format('0.00000000') - balance.currency_hold = n(account.amount).subtract(account.available).format('0.00000000') - } - else if (account.currency.toUpperCase() === opts.asset) { - balance.asset = n(account.amount).format('0.00000000') - balance.asset_hold = n(account.amount).subtract(account.available).format('0.00000000') - } - }) - cb(null, balance) - }) + if (!authed_client_ws) { authedClientWs() } + if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } + + balance = {} + balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') + balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') + + balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') + balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') + + cb(null, balance) }, getQuote: function (opts, cb) { - var func_args = [].slice.call(arguments) - var client = publicClient() - var pair = 't' + joinProduct(opts.product_id) - client.ticker(pair, function (err, body) { - if (err) return retry('getQuote', func_args, err) - cb(null, { bid : String(body.BID), ask : String(body.ASK) }) - }) + cb(null, { bid : String(ws_ticker.BID), ask : String(ws_ticker.ASK) }) }, cancelOrder: function (opts, cb) { - var client = authedClient() - client.cancel_order(opts.order_id, function (err, body) { - if (err) return(err) - cb() - }) + console.error("\nCANCEL: " + opts.order_id) + order = ws_orders['~' + opts.order_id] + ws_orders['~' + opts.order_id].reject_reason = "zenbot cancel" + + var ws_cancel_order = [ + 0, + 'oc', + null, + { + id: order.bitfinex_id + } + ] + + client.send(ws_cancel_order) + cb() }, - buy: function (opts, cb) { - var client = authedClient() + trade: function (action, opts, cb) { + console.error("\nORDER " + action + ': ' + JSON.stringify(opts)) + + client = authedClientWs(); + + var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) + var symbol = 't' + joinProduct(opts.product_id) + var amount = action === 'buy' ? opts.size : opts.size * -1 + var price = opts.price + if (opts.order_type === 'maker' && typeof opts.type === 'undefined') { - opts.type = 'exchange limit' + opts.type = 'EXCHANGE LIMIT' } else if (opts.order_type === 'taker' && typeof opts.type === 'undefined') { - opts.type = 'exchange market' + opts.type = 'EXCHANGE MARKET' } if (typeof opts.post_only === 'undefined') { opts.post_only = true } - var symbol = joinProduct(opts.product_id) - var amount = opts.size - var price = opts.price - var exchange = 'bitfinex' - var side = 'buy' var type = opts.type - var is_hidden = false var is_postonly = opts.post_only - var params = { - symbol, - amount, - price, - exchange, - side, - type, - is_hidden, - is_postonly + + var order = { + id: cid, + bitfinex_id: null, + status: 'open', + price: opts.price, + size: opts.size, + post_only: !!opts.post_only, + created_at: new Date().getTime() * 1000, + filled_size: 0, + ordertype: opts.order_type } - client.make_request('order/new', params, function (err, body) { - var order = { - id: body && body.is_live === true ? body.order_id : null, - status: 'open', - price: opts.price, - size: opts.size, - post_only: !!opts.post_only, - created_at: new Date().getTime(), - filled_size: '0', - ordertype: opts.order_type - } - if (err && err.toString('Error: Invalid order: not enough exchange balance')) { - status: 'rejected' - reject_reason: 'balance' - return cb(null, order) + + var ws_order = [ + 0, + 'on', + null, + { + cid: cid, + type: type, + symbol: symbol, + amount: String(amount), + price: price, + hidden: 0, + postonly: is_postonly ? 1 : 0 } - if (err) return(err) - orders['~' + body.id] = order - cb(null, order) - }) + ] + + console.error("\n" + action + " params: " + JSON.stringify(ws_order)) + + client.send(ws_order) + ws_orders['~' + cid] = order + + return cb(null, order) }, + buy: function (opts, cb) { + exchange.trade('buy', opts, cb) + }, + sell: function (opts, cb) { - var client = authedClient() - if (opts.order_type === 'maker' && typeof opts.type === 'undefined') { - opts.type = 'exchange limit' - } - else if (opts.order_type === 'taker' && typeof opts.type === 'undefined') { - opts.type = 'exchange market' - } - if (typeof opts.post_only === 'undefined') { - opts.post_only = true - } - var symbol = joinProduct(opts.product_id) - var amount = opts.size - var price = opts.price - var exchange = 'bitfinex' - var side = 'sell' - var type = opts.type - var is_hidden = false - var is_postonly = opts.post_only - var params = { - symbol, - amount, - price, - exchange, - side, - type, - is_hidden, - is_postonly - } - client.make_request('order/new', params, function (err, body) { - var order = { - id: body && body.is_live === true ? body.order_id : null, - status: 'open', - price: opts.price, - size: opts.size, - post_only: !!opts.post_only, - created_at: new Date().getTime(), - filled_size: '0', - ordertype: opts.order_type - } - if (err && err.toString('Error: Invalid order: not enough exchange balance')) { - status: 'rejected' - reject_reason: 'balance' - return cb(null, order) - } - if (err) return(err) - orders['~' + body.id] = order - cb(null, order) - }) + exchange.trade('sell', opts, cb) }, getOrder: function (opts, cb) { - var order = orders['~' + opts.order_id] - var client = authedClient() - client.order_status(opts.order_id, function (err, body) { - if (err) return(err) - if (!body.id) { - return cb('Order not found') - } - if (body.is_cancelled === true && body.is_live === false) { - order.status = 'rejected' - order.reject_reason = 'post only' - order.done_at = new Date().getTime() - return cb(null, order) - } - if (body.is_live === false) { - order.status = 'done' - order.done_at = new Date().getTime() - order.filled_size = body.original_amount - body.executed_amount - return cb(null, order) - } - cb(null, order) - }) + var order = ws_orders['~' + opts.order_id] + + if (order.status === 'rejected' && order.reject_reason === 'post only') { + return cb(null, order) + } else if (order.status === 'rejected' && order.reject_reason === 'zenbot canceled') { + return cb(null, order) + } + + if (order.status == "done") { + order.done_at = new Date().getTime() + return cb(null, order) + } + + console.error("\ngetOrder ORDER: " + JSON.stringify(order)) + + cb(null, order) }, // return the property used for range querying. From 016b60b34f421cc6a4f0e9f134d89c4784913261 Mon Sep 17 00:00:00 2001 From: crubb Date: Tue, 8 Aug 2017 20:55:29 +0200 Subject: [PATCH 15/26] Select pair on first getTrade() --- extensions/exchanges/bitfinex/exchange.js | 37 ++++++++--------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 2783b1f913..a5ee562b38 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,10 +9,9 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var pair = 'ETHUSD' var ws_timeout = 60000 - var public_client, public_client_ws, authed_client, authed_client_ws + var pair, public_client, public_client_ws, authed_client, authed_client_ws var ws_trades = [] var ws_balance = [] @@ -67,7 +66,6 @@ module.exports = function container (get, set, clear) { public_client_ws.on('error', function (e) { console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) setTimeout(function() { public_client_ws.open() }, ws_timeout) }) @@ -90,8 +88,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - console.log(("\nBitfinex Websockets Server did not send a message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nPublic WebSockets: No message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) public_client_ws.ws.close() public_client_ws.open() @@ -106,8 +103,7 @@ module.exports = function container (get, set, clear) { authed_client_ws.auth() } catch (e) { - console.error(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + console.warn(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) setTimeout(function() { authWsOpen() }, ws_timeout) return } @@ -119,8 +115,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - console.log(("\Authed WebSockets did not send a message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\Authed WebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) authed_client_ws.ws.close() authed_client_ws.open() @@ -135,7 +130,7 @@ module.exports = function container (get, set, clear) { // https://bitfinex.readme.io/v2/reference#ws-auth-orders var order = ws_orders['~' + cid] if (!order) { - console.error(("\nERROR: Order " + cid + ' not found in cache.').red) + console.error(("\nERROR: Order " + cid + ' not found in cache (manual order?).').red) return } @@ -156,8 +151,7 @@ module.exports = function container (get, set, clear) { order.bitfinex_status = ws_order[13] order.price = ws_order[16] order.price_avg = ws_order[17] - - console.error("\nUpdated ORDER: " + JSON.stringify(order)) + ws_orders['~' + cid] = order } @@ -167,9 +161,8 @@ module.exports = function container (get, set, clear) { if (ws_orders['~' + cid]) { setTimeout(function () { - console.log("\nDeleted CACHED order: " + cid) delete(ws_orders['~' + cid]) - }, 60000 * 5) + }, 60000 * 60) } updateWsOrder(ws_order) @@ -204,7 +197,6 @@ module.exports = function container (get, set, clear) { authed_client_ws.on('error', function (e) { console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - console.log(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) authed_client_ws.ws.close() setTimeout(function() { authed_client_ws.open() @@ -256,6 +248,8 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { + pair = joinProduct(opts.product_id) + if (!public_client_ws) { publicClientWs() } // Backfilling using the REST API @@ -278,8 +272,8 @@ module.exports = function container (get, set, clear) { args.start = args.end - 500000 } var query = encodeQueryData(args) - var pair = 't' + joinProduct(opts.product_id) - client.makePublicRequest('trades/' + pair + '/hist?' + query, function (err, body) { + var tpair = 't' + joinProduct(opts.product_id) + client.makePublicRequest('trades/' + tpair + '/hist?' + query, function (err, body) { if (err) return retry('getTrades', opts, cb) var trades = body.map(function(trade) { return { @@ -319,7 +313,6 @@ module.exports = function container (get, set, clear) { }, cancelOrder: function (opts, cb) { - console.error("\nCANCEL: " + opts.order_id) order = ws_orders['~' + opts.order_id] ws_orders['~' + opts.order_id].reject_reason = "zenbot cancel" @@ -337,8 +330,6 @@ module.exports = function container (get, set, clear) { }, trade: function (action, opts, cb) { - console.error("\nORDER " + action + ': ' + JSON.stringify(opts)) - client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) @@ -365,7 +356,7 @@ module.exports = function container (get, set, clear) { price: opts.price, size: opts.size, post_only: !!opts.post_only, - created_at: new Date().getTime() * 1000, + created_at: new Date().getTime(), filled_size: 0, ordertype: opts.order_type } @@ -385,8 +376,6 @@ module.exports = function container (get, set, clear) { } ] - console.error("\n" + action + " params: " + JSON.stringify(ws_order)) - client.send(ws_order) ws_orders['~' + cid] = order @@ -415,8 +404,6 @@ module.exports = function container (get, set, clear) { return cb(null, order) } - console.error("\ngetOrder ORDER: " + JSON.stringify(order)) - cb(null, order) }, From cfc7c7898a79dbb2453bb75ac9b911cc8e2559c3 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 19:13:09 +0200 Subject: [PATCH 16/26] Fixed bitfinex buy and sell from commandline --- extensions/exchanges/bitfinex/exchange.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index a5ee562b38..cf2460d8df 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -189,6 +189,17 @@ module.exports = function container (get, set, clear) { } function authedClientWs () { + if (!public_client_ws) { + publicClientWs() + + console.warn(('Warning: Not yet connected to public websockets, waiting 1s for a connection').yellow) + setTimeout(function () { + if (!authedClientWs) { authed_client_ws = authedClientWs() } + }, 1000) + + return null + } + if (!authed_client_ws) { if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) @@ -248,7 +259,7 @@ module.exports = function container (get, set, clear) { }, getTrades: function (opts, cb) { - pair = joinProduct(opts.product_id) + if (!pair) { pair = joinProduct(opts.product_id) } if (!public_client_ws) { publicClientWs() } @@ -295,6 +306,8 @@ module.exports = function container (get, set, clear) { }, getBalance: function (opts, cb) { + if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (!authed_client_ws) { authedClientWs() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } @@ -330,6 +343,9 @@ module.exports = function container (get, set, clear) { }, trade: function (action, opts, cb) { + if (!pair) { pair = joinProduct(opts.product_id) } + var symbol = 't' + pair + client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) From 9121fb7c89cfdda47e23e69dadd0db49441277f7 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 20:54:03 +0200 Subject: [PATCH 17/26] Bitfinex: Trigger calc of wallet balances on getBalance over websockets and retun updated balances --- extensions/exchanges/bitfinex/exchange.js | 50 ++++++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index cf2460d8df..cc5cf2af7e 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -18,6 +18,7 @@ module.exports = function container (get, set, clear) { var ws_orders = [] var ws_ticker = [] var ws_hb = [] + var ws_walletCalcDone function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest @@ -184,6 +185,7 @@ module.exports = function container (get, set, clear) { ws_balance[wallet[1].toUpperCase()] = {} ws_balance[wallet[1].toUpperCase()].balance = wallet[2] ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 + if (wallet[4]) { ws_walletCalcDone[wallet[1]] = true } } }) } @@ -240,6 +242,12 @@ module.exports = function container (get, set, clear) { exchange[method].call(exchange, args, cb) }, 1000) } + + function waitForCalc (method, args, cb) { + setTimeout(function () { + exchange[method].call(exchange, args, cb) + }, 100) + } function encodeQueryData(data) { let ret = [] @@ -307,18 +315,48 @@ module.exports = function container (get, set, clear) { getBalance: function (opts, cb) { if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (pair && !ws_walletCalcDone) { + ws_walletCalcDone = {} + ws_walletCalcDone[opts.asset] = false + ws_walletCalcDone[opts.currency] = false + } if (!authed_client_ws) { authedClientWs() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } - balance = {} - balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') - balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') + if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { + var ws_update_wallet = [ + 0, + 'calc', + null, + [ + ["wallet_exchange_" + opts.currency], + ["wallet_exchange_" + opts.asset] + ] + ] + + authed_client_ws.send(ws_update_wallet) + return waitForCalc('getBalance', opts, cb) + } + else if ( + (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === true) || + (ws_walletCalcDone[opts.asset] === true && ws_walletCalcDone[opts.currency] === false) + ) { + return waitForCalc('getBalance', opts, cb) + } + else { + balance = {} + balance.currency = n(ws_balance[opts.currency].balance).format('0.00000000') + balance.asset = n(ws_balance[opts.asset].balance).format('0.00000000') - balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') - balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') + balance.currency_hold = ws_balance[opts.currency].available ? n(ws_balance[opts.currency].balance).subtract(ws_balance[opts.currency].available).format('0.00000000') : n(0).format('0.00000000') + balance.asset_hold = ws_balance[opts.asset].available ? n(ws_balance[opts.asset].balance).subtract(ws_balance[opts.asset].available).format('0.00000000') : n(0).format('0.00000000') - cb(null, balance) + ws_walletCalcDone[opts.asset] = false + ws_walletCalcDone[opts.currency] = false + + cb(null, balance) + } }, getQuote: function (opts, cb) { From 9a0f48476e9b272128909e29f9b322ed2eaf4820 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 20:59:50 +0200 Subject: [PATCH 18/26] Bitfinex: Removed duplicate definition of symbol --- extensions/exchanges/bitfinex/exchange.js | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index cc5cf2af7e..a46f888c91 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -387,7 +387,6 @@ module.exports = function container (get, set, clear) { client = authedClientWs(); var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) - var symbol = 't' + joinProduct(opts.product_id) var amount = action === 'buy' ? opts.size : opts.size * -1 var price = opts.price From 68f84de3fd7dc2886c9d5d52de2de03123355158 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 22:30:00 +0200 Subject: [PATCH 19/26] Bitfinex: Now using only one websockets connection --- extensions/exchanges/bitfinex/exchange.js | 122 ++++++++-------------- 1 file changed, 46 insertions(+), 76 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index a46f888c91..1ebb77c405 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,9 +9,9 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var ws_timeout = 60000 + var ws_timeout = 10000 - var pair, public_client, public_client_ws, authed_client, authed_client_ws + var pair, public_client, ws_client var ws_trades = [] var ws_balance = [] @@ -25,12 +25,7 @@ module.exports = function container (get, set, clear) { return public_client } - function publicWsOpen () { - public_client_ws.subscribeTrades(pair) - public_client_ws.subscribeTicker(pair) - } - - function updateWsTrades (pair, trades) { + function wsUpdateTrades (pair, trades) { if (trades[0] === "tu") { trades = [trades[1]] } else if (trades[0] === "te") { @@ -52,36 +47,16 @@ module.exports = function container (get, set, clear) { ws_trades.shift() } - function updateWsTicker (pair, ticker) { + function wsUpdateTicker (pair, ticker) { ws_ticker = ticker } - function updateWsHb (message) { + function wsUpdateHb (message) { if (message[0] != "undefined") ws_hb[message[0]] = Date.now() } - function publicClientWs () { - if (!public_client_ws) { - public_client_ws = new BFX('', '', {version: 2, transform: true}).ws - - public_client_ws.on('error', function (e) { - console.warn(("\nPublic WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { public_client_ws.open() }, ws_timeout) - }) - - public_client_ws - .on('open', publicWsOpen) - .on('trade', updateWsTrades) - .on('ticker', updateWsTicker) - .on('message', updateWsHb) - .on('subscribed', publicWsSubscribed) - } - - return public_client_ws - } - - function publicWsSubscribed (event) { + function wsSubscribed (event) { if (event.channel === "trades") { ws_hb[event.chanId] = Date.now() @@ -89,23 +64,23 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nPublic WebSockets: No message on channel '" + public_client_ws.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel '" + ws_client.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) - public_client_ws.ws.close() - public_client_ws.open() + ws_client.ws.close() + ws_client.open() } } }, ws_timeout) } } - function authWsOpen () { + function wsOpen () { try { - authed_client_ws.auth() + ws_client.auth() } catch (e) { - console.warn(("\nAuthed WebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { authWsOpen() }, ws_timeout) + console.warn(("\nWebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + setTimeout(function() { wsOpen() }, ws_timeout) return } @@ -116,16 +91,19 @@ module.exports = function container (get, set, clear) { if (ws_hb[chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\Authed WebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) - authed_client_ws.ws.close() - authed_client_ws.open() + ws_client.ws.close() + ws_client.open() } } }, ws_timeout) + + ws_client.subscribeTrades(pair) + ws_client.subscribeTicker(pair) } - function updateWsOrder (ws_order) { + function wsUpdateOrder (ws_order) { cid = ws_order[2] // https://bitfinex.readme.io/v2/reference#ws-auth-orders @@ -156,7 +134,7 @@ module.exports = function container (get, set, clear) { ws_orders['~' + cid] = order } - function updateWsOrderCancel (ws_order) { + function wsUpdateOrderCancel (ws_order) { cid = ws_order[2] if (ws_orders['~' + cid]) @@ -166,10 +144,10 @@ module.exports = function container (get, set, clear) { }, 60000 * 60) } - updateWsOrder(ws_order) + wsUpdateOrder(ws_order) } - function updateWsReqOrder (error) { + function wsUpdateReqOrder (error) { if (error[6] === 'ERROR' && error[7].match(/^Invalid order: not enough .* balance for/)) { cid = error[4][2] ws_orders['~' + cid].status = 'rejected' @@ -190,44 +168,36 @@ module.exports = function container (get, set, clear) { }) } - function authedClientWs () { - if (!public_client_ws) { - publicClientWs() - - console.warn(('Warning: Not yet connected to public websockets, waiting 1s for a connection').yellow) - setTimeout(function () { - if (!authedClientWs) { authed_client_ws = authedClientWs() } - }, 1000) - - return null - } - - if (!authed_client_ws) { + function wsClient () { + if (!ws_client) { if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) } - authed_client_ws = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws + ws_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws - authed_client_ws.on('error', function (e) { - console.warn(("\nAuthed WebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - authed_client_ws.ws.close() + ws_client.on('error', function (e) { + console.warn(("\nWebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) + ws_client.ws.close() setTimeout(function() { - authed_client_ws.open() + ws_client.open() }, ws_timeout) }) - authed_client_ws + ws_client + .on('open', wsOpen) + .on('subscribed', wsSubscribed) + .on('message', wsUpdateHb) + .on('trade', wsUpdateTrades) + .on('ticker', wsUpdateTicker) .on('ws', updateWallet) .on('wu', updateWallet) - .on('on', updateWsOrder) - .on('on-req', updateWsReqOrder) - .on('ou', updateWsOrder) - .on('oc', updateWsOrderCancel) - .on('open', authWsOpen) - .on('message', updateWsHb) + .on('on', wsUpdateOrder) + .on('on-req', wsUpdateReqOrder) + .on('ou', wsUpdateOrder) + .on('oc', wsUpdateOrderCancel) } - return authed_client_ws + return ws_client } function joinProduct (product_id) { @@ -269,7 +239,7 @@ module.exports = function container (get, set, clear) { getTrades: function (opts, cb) { if (!pair) { pair = joinProduct(opts.product_id) } - if (!public_client_ws) { publicClientWs() } + if (!ws_client) { ws_client = wsClient() } // Backfilling using the REST API if (opts.to || opts.to === null) { @@ -321,7 +291,7 @@ module.exports = function container (get, set, clear) { ws_walletCalcDone[opts.currency] = false } - if (!authed_client_ws) { authedClientWs() } + if (!ws_client) { ws_client = wsClient() } if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { @@ -335,7 +305,7 @@ module.exports = function container (get, set, clear) { ] ] - authed_client_ws.send(ws_update_wallet) + ws_client.send(ws_update_wallet) return waitForCalc('getBalance', opts, cb) } else if ( @@ -384,7 +354,7 @@ module.exports = function container (get, set, clear) { if (!pair) { pair = joinProduct(opts.product_id) } var symbol = 't' + pair - client = authedClientWs(); + if (!ws_client) { ws_client = wsClient() } var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) var amount = action === 'buy' ? opts.size : opts.size * -1 @@ -429,7 +399,7 @@ module.exports = function container (get, set, clear) { } ] - client.send(ws_order) + ws_client.send(ws_order) ws_orders['~' + cid] = order return cb(null, order) From 88efa70bb7308ff463ed3c5278f98f875b332304 Mon Sep 17 00:00:00 2001 From: crubb Date: Wed, 9 Aug 2017 23:31:24 +0200 Subject: [PATCH 20/26] Bitfinex WS: Better error handling on connect/auth/sending messages --- extensions/exchanges/bitfinex/exchange.js | 112 +++++++++++++--------- 1 file changed, 69 insertions(+), 43 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 1ebb77c405..8a0539a931 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,7 +9,7 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options - var ws_timeout = 10000 + var ws_timeout = 60000 var pair, public_client, ws_client @@ -19,6 +19,7 @@ module.exports = function container (get, set, clear) { var ws_ticker = [] var ws_hb = [] var ws_walletCalcDone + var ws_client_isAuthed = false function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest @@ -51,12 +52,18 @@ module.exports = function container (get, set, clear) { ws_ticker = ticker } - function wsUpdateHb (message) { + function wsMessage (message) { + if (message.event == "auth" && message.status == "OK") { + if (so.debug) { console.log(('WebSockets: We are now authenticated.').green) } + ws_client_isAuthed = true + } + if (message[0] != "undefined") ws_hb[message[0]] = Date.now() } function wsSubscribed (event) { + // We only use the 'trades' channel for heartbeats. That one should be most frequently updated. if (event.channel === "trades") { ws_hb[event.chanId] = Date.now() @@ -64,7 +71,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nWebSockets: No message on channel '" + ws_client.channelMap[event.chanId].channel + "' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) clearInterval(intervalId) ws_client.ws.close() ws_client.open() @@ -75,30 +82,7 @@ module.exports = function container (get, set, clear) { } function wsOpen () { - try { - ws_client.auth() - } - catch (e) { - console.warn(("\nWebSockets: Error on auth, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - setTimeout(function() { wsOpen() }, ws_timeout) - return - } - - var chanId = 0 - ws_hb[chanId] = Date.now() - - var intervalId = setInterval(function() { - if (ws_hb[chanId]) { - var timeoutThreshold = (Number(Date.now()) - ws_timeout) - if (timeoutThreshold > ws_hb[chanId]) { - console.warn(("\nWebSockets: No message on channel 'auth' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) - clearInterval(intervalId) - ws_client.ws.close() - ws_client.open() - } - } - }, ws_timeout) - + ws_client.auth() ws_client.subscribeTrades(pair) ws_client.subscribeTicker(pair) } @@ -174,19 +158,11 @@ module.exports = function container (get, set, clear) { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) } ws_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws - - ws_client.on('error', function (e) { - console.warn(("\nWebSockets: Error on connect, retrying in " + ws_timeout / 1000 + ' seconds.').yellow) - ws_client.ws.close() - setTimeout(function() { - ws_client.open() - }, ws_timeout) - }) ws_client .on('open', wsOpen) .on('subscribed', wsSubscribed) - .on('message', wsUpdateHb) + .on('message', wsMessage) .on('trade', wsUpdateTrades) .on('ticker', wsUpdateTicker) .on('ws', updateWallet) @@ -195,6 +171,27 @@ module.exports = function container (get, set, clear) { .on('on-req', wsUpdateReqOrder) .on('ou', wsUpdateOrder) .on('oc', wsUpdateOrderCancel) + + ws_client.on('error', function (e) { + if (e.event == "auth" && e.status == "FAILED") { + ws_client_isAuthed = false + errorMessage = ('WebSockets Error: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_timeout / 1000 + ' seconds').yellow + '.' + if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." + console.error(errorMessage) + + setTimeout(function () { + ws_client.auth() + }, ws_timeout) + } + else { + console.error(("\nWebSockets Error: An unhandled error occured.").red + " Consider reporting.\nSince those are mostly 'connect' related: Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.error(e) + ws_client.ws.close() + setTimeout(function() { + ws_client.open() + }, ws_timeout) + } + }) } return ws_client @@ -205,9 +202,6 @@ module.exports = function container (get, set, clear) { } function retry (method, args, cb) { - if (so.debug) { - console.log("\nWaiting " + ("1s").yellow + " for initial websockets snapshot.") - } setTimeout(function () { exchange[method].call(exchange, args, cb) }, 1000) @@ -292,7 +286,12 @@ module.exports = function container (get, set, clear) { } if (!ws_client) { ws_client = wsClient() } - if (Object.keys(ws_balance).length === 0) { return retry('getBalance', opts, cb) } + if (Object.keys(ws_balance).length === 0) { + if (so.debug && ws_client_isAuthed === true) { + console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").yellow) + } + return retry('getBalance', opts, cb) + } if (ws_walletCalcDone[opts.asset] === false && ws_walletCalcDone[opts.currency] === false) { var ws_update_wallet = [ @@ -305,7 +304,16 @@ module.exports = function container (get, set, clear) { ] ] - ws_client.send(ws_update_wallet) + try { + ws_client.send(ws_update_wallet) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets Error: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + } + return retry('getBalance', opts, cb) + } + return waitForCalc('getBalance', opts, cb) } else if ( @@ -346,7 +354,15 @@ module.exports = function container (get, set, clear) { } ] - client.send(ws_cancel_order) + try { + ws_client.send(ws_cancel_order) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + } + return retry('cancelOrder', opts, cb) + } cb() }, @@ -399,7 +415,17 @@ module.exports = function container (get, set, clear) { } ] - ws_client.send(ws_order) + try { + ws_client.send(ws_order) + } + catch (e) { + if (so.debug) { + console.error(("\nWebSockets: Cannot send trade (maybe connection not open?).").red) + } + + order.status = 'rejected' + order.reject_reason = 'could not send order over websockets' + } ws_orders['~' + cid] = order return cb(null, order) From 2268101e674019efdb8f851717696ab07fcf6701 Mon Sep 17 00:00:00 2001 From: crubb Date: Thu, 10 Aug 2017 17:35:04 +0200 Subject: [PATCH 21/26] Bitfinex WS: Further improving retries and error handling --- extensions/exchanges/bitfinex/exchange.js | 35 +++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 8a0539a931..5dfc29be46 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -10,6 +10,8 @@ module.exports = function container (get, set, clear) { var so = s.options var ws_timeout = 60000 + var ws_retry = 10000 + var ws_wait_on_apikey_error = 60000 * 5 var pair, public_client, ws_client @@ -71,7 +73,7 @@ module.exports = function container (get, set, clear) { if (ws_hb[event.chanId]) { var timeoutThreshold = (Number(Date.now()) - ws_timeout) if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nWebSockets: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').yellow) + console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) clearInterval(intervalId) ws_client.ws.close() ws_client.open() @@ -93,7 +95,7 @@ module.exports = function container (get, set, clear) { // https://bitfinex.readme.io/v2/reference#ws-auth-orders var order = ws_orders['~' + cid] if (!order) { - console.error(("\nERROR: Order " + cid + ' not found in cache (manual order?).').red) + console.warn(("\nWarning: Order " + cid + ' not found in cache (manual order?).').red) return } @@ -125,7 +127,7 @@ module.exports = function container (get, set, clear) { { setTimeout(function () { delete(ws_orders['~' + cid]) - }, 60000 * 60) + }, 60000 * 60 * 12) } wsUpdateOrder(ws_order) @@ -175,21 +177,21 @@ module.exports = function container (get, set, clear) { ws_client.on('error', function (e) { if (e.event == "auth" && e.status == "FAILED") { ws_client_isAuthed = false - errorMessage = ('WebSockets Error: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_timeout / 1000 + ' seconds').yellow + '.' + errorMessage = ('WebSockets Warning: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_wait_on_apikey_error / 1000 + ' seconds').yellow + '.' if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." - console.error(errorMessage) + console.warn(errorMessage) setTimeout(function () { ws_client.auth() - }, ws_timeout) + }, ws_wait_on_apikey_error) } else { - console.error(("\nWebSockets Error: An unhandled error occured.").red + " Consider reporting.\nSince those are mostly 'connect' related: Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.error(("\nWebSockets Error: An unhandled error occured.").red + "\nSince those are mostly 'connect' related: Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '. Otherwise, consider reporting.') console.error(e) ws_client.ws.close() setTimeout(function() { ws_client.open() - }, ws_timeout) + }, ws_retry) } }) } @@ -204,13 +206,13 @@ module.exports = function container (get, set, clear) { function retry (method, args, cb) { setTimeout(function () { exchange[method].call(exchange, args, cb) - }, 1000) + }, ws_retry) } function waitForCalc (method, args, cb) { setTimeout(function () { exchange[method].call(exchange, args, cb) - }, 100) + }, 50) } function encodeQueryData(data) { @@ -288,7 +290,7 @@ module.exports = function container (get, set, clear) { if (!ws_client) { ws_client = wsClient() } if (Object.keys(ws_balance).length === 0) { if (so.debug && ws_client_isAuthed === true) { - console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").yellow) + console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('getBalance', opts, cb) } @@ -309,7 +311,8 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets Error: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('getBalance', opts, cb) } @@ -359,7 +362,8 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_timeout / 1000 + ' seconds').yellow + '.') + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send cancelOrder (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('cancelOrder', opts, cb) } @@ -420,14 +424,15 @@ module.exports = function container (get, set, clear) { } catch (e) { if (so.debug) { - console.error(("\nWebSockets: Cannot send trade (maybe connection not open?).").red) + console.warn(e) + console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not retry automatically.").yellow) } order.status = 'rejected' order.reject_reason = 'could not send order over websockets' } ws_orders['~' + cid] = order - + return cb(null, order) }, From a7b88c00dffe0f3ba3aa885b0d8eb9a825aab9d9 Mon Sep 17 00:00:00 2001 From: crubb Date: Thu, 10 Aug 2017 17:54:48 +0200 Subject: [PATCH 22/26] Bitfinex WS: Not directly calling the websockets part of the bitfinex module for close() anymore --- extensions/exchanges/bitfinex/exchange.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 5dfc29be46..9f069f2631 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -75,7 +75,7 @@ module.exports = function container (get, set, clear) { if (timeoutThreshold > ws_hb[event.chanId]) { console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) clearInterval(intervalId) - ws_client.ws.close() + ws_client.close() ws_client.open() } } @@ -188,7 +188,7 @@ module.exports = function container (get, set, clear) { else { console.error(("\nWebSockets Error: An unhandled error occured.").red + "\nSince those are mostly 'connect' related: Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '. Otherwise, consider reporting.') console.error(e) - ws_client.ws.close() + ws_client.close() setTimeout(function() { ws_client.open() }, ws_retry) From 36b9575588500bd0a13d2c4768a8772ed1c58531 Mon Sep 17 00:00:00 2001 From: Christian Rubbert Date: Fri, 11 Aug 2017 17:15:26 +0200 Subject: [PATCH 23/26] Bitfinex WS: Fixed wallet recalc when asset or currency are 0 --- extensions/exchanges/bitfinex/exchange.js | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 9f069f2631..39657b408d 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -149,7 +149,7 @@ module.exports = function container (get, set, clear) { ws_balance[wallet[1].toUpperCase()] = {} ws_balance[wallet[1].toUpperCase()].balance = wallet[2] ws_balance[wallet[1].toUpperCase()].available = wallet[4] ? wallet[4] : 0 - if (wallet[4]) { ws_walletCalcDone[wallet[1]] = true } + if (wallet[4] !== null) { ws_walletCalcDone[wallet[1]] = true } } }) } @@ -235,8 +235,6 @@ module.exports = function container (get, set, clear) { getTrades: function (opts, cb) { if (!pair) { pair = joinProduct(opts.product_id) } - if (!ws_client) { ws_client = wsClient() } - // Backfilling using the REST API if (opts.to || opts.to === null) { var func_args = [].slice.call(arguments) @@ -273,6 +271,7 @@ module.exports = function container (get, set, clear) { }) } else { // We're live now (i.e. opts.from is set), use websockets + if (!ws_client) { wsClient() } if (typeof(ws_trades) === "undefined") { return retry('getTrades', opts, cb) } trades = ws_trades.filter(function (trade) { return trade.time >= opts.from }) cb(null, trades) @@ -281,13 +280,14 @@ module.exports = function container (get, set, clear) { getBalance: function (opts, cb) { if (!pair) { pair = joinProduct(opts.asset + '-' + opts.currency) } + if (pair && !ws_walletCalcDone) { ws_walletCalcDone = {} ws_walletCalcDone[opts.asset] = false ws_walletCalcDone[opts.currency] = false } - if (!ws_client) { ws_client = wsClient() } + if (!ws_client) { wsClient() } if (Object.keys(ws_balance).length === 0) { if (so.debug && ws_client_isAuthed === true) { console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') @@ -307,14 +307,16 @@ module.exports = function container (get, set, clear) { ] try { + ws_walletCalcDone[opts.asset] = "inProgress" + ws_walletCalcDone[opts.currency] = "inProgress" + ws_client.send(ws_update_wallet) } catch (e) { if (so.debug) { console.warn(e) - console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') + console.warn(("\nWebSockets Warning: Cannot send 'calc' for getBalance update (maybe connection not open?).").red + ' Waiting for reconnect.') } - return retry('getBalance', opts, cb) } return waitForCalc('getBalance', opts, cb) @@ -374,7 +376,7 @@ module.exports = function container (get, set, clear) { if (!pair) { pair = joinProduct(opts.product_id) } var symbol = 't' + pair - if (!ws_client) { ws_client = wsClient() } + if (!ws_client) { wsClient() } var cid = Math.round(((new Date()).getTime()).toString() * Math.random()) var amount = action === 'buy' ? opts.size : opts.size * -1 @@ -425,7 +427,7 @@ module.exports = function container (get, set, clear) { catch (e) { if (so.debug) { console.warn(e) - console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not retry automatically.").yellow) + console.warn(("\nWebSockets Warning: Cannot send trade (maybe connection not open?).").red + (" Orders are sensitive, we're marking this one as rejected and will not just repeat the order automatically.").yellow) } order.status = 'rejected' From 667fed17fba26cb39b3650199b433603cb089686 Mon Sep 17 00:00:00 2001 From: crubb Date: Sun, 13 Aug 2017 17:50:55 +0200 Subject: [PATCH 24/26] Bitfinex WS: New reconnect approach, honor "INSUFFICIENT MARGIN" --- extensions/exchanges/bitfinex/exchange.js | 127 +++++++++++++--------- 1 file changed, 76 insertions(+), 51 deletions(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 39657b408d..d2f95a6105 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -9,6 +9,8 @@ module.exports = function container (get, set, clear) { var s = {options: minimist(process.argv)} var so = s.options + var ws_connecting = false + var ws_connected = false var ws_timeout = 60000 var ws_retry = 10000 var ws_wait_on_apikey_error = 60000 * 5 @@ -21,7 +23,6 @@ module.exports = function container (get, set, clear) { var ws_ticker = [] var ws_hb = [] var ws_walletCalcDone - var ws_client_isAuthed = false function publicClient () { if (!public_client) public_client = new BFX(null,null, {version: 2, transform: true}).rest @@ -56,38 +57,14 @@ module.exports = function container (get, set, clear) { function wsMessage (message) { if (message.event == "auth" && message.status == "OK") { - if (so.debug) { console.log(('WebSockets: We are now authenticated.').green) } - ws_client_isAuthed = true + if (so.debug) { console.log(('WebSockets: We are now fully connected and authenticated.').green) } + ws_connecting = false + ws_connected = true } if (message[0] != "undefined") ws_hb[message[0]] = Date.now() } - - function wsSubscribed (event) { - // We only use the 'trades' channel for heartbeats. That one should be most frequently updated. - if (event.channel === "trades") { - ws_hb[event.chanId] = Date.now() - - var intervalId = setInterval(function() { - if (ws_hb[event.chanId]) { - var timeoutThreshold = (Number(Date.now()) - ws_timeout) - if (timeoutThreshold > ws_hb[event.chanId]) { - console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) - clearInterval(intervalId) - ws_client.close() - ws_client.open() - } - } - }, ws_timeout) - } - } - - function wsOpen () { - ws_client.auth() - ws_client.subscribeTrades(pair) - ws_client.subscribeTicker(pair) - } function wsUpdateOrder (ws_order) { cid = ws_order[2] @@ -123,6 +100,11 @@ module.exports = function container (get, set, clear) { function wsUpdateOrderCancel (ws_order) { cid = ws_order[2] + if (ws_order[13].match(/^INSUFFICIENT MARGIN/)) { + ws_orders['~' + cid].status = 'rejected' + ws_orders['~' + cid].reject_reason = 'balance' + } + if (ws_orders['~' + cid]) { setTimeout(function () { @@ -154,15 +136,77 @@ module.exports = function container (get, set, clear) { }) } + function wsConnect () { + if (ws_connected || ws_connecting) return + ws_client.open() + } + + function wsOpen () { + ws_client.auth() + ws_client.subscribeTrades(pair) + ws_client.subscribeTicker(pair) + } + + function wsSubscribed (event) { + // We only use the 'trades' channel for heartbeats. That one should be most frequently updated. + if (event.channel === "trades") { + ws_hb[event.chanId] = Date.now() + + var intervalId = setInterval(function() { + if (ws_hb[event.chanId]) { + var timeoutThreshold = (Number(Date.now()) - ws_timeout) + if (timeoutThreshold > ws_hb[event.chanId]) { + console.warn(("\nWebSockets Warning: No message on channel 'trade' within " + ws_timeout / 1000 + ' seconds, reconnecting...').red) + clearInterval(intervalId) + ws_connecting = false + ws_connected = false + ws_client.close() + } + } + }, ws_timeout + } + } + + function wsClose () { + ws_connecting = false + ws_connected = false + + console.error(("\nWebSockets Error: Connection closed.").red + " Retrying every " + (ws_retry / 1000 + ' seconds').yellow + '.') + console.error(e) + } + + function wsError (e) { + if (e.event == "auth" && e.status == "FAILED") { + errorMessage = ('WebSockets Warning: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_wait_on_apikey_error / 1000 + ' seconds').yellow + '.' + if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." + console.warn(errorMessage) + + setTimeout(function () { + ws_client.auth() + }, ws_wait_on_apikey_error) + } + else { + ws_connecting = false + ws_connected = false + + ws_client.close() + } + } + function wsClient () { if (!ws_client) { if (!c.bitfinex || !c.bitfinex.key || c.bitfinex.key === 'YOUR-API-KEY') { throw new Error('please configure your Bitfinex credentials in ' + path.resolve(__dirname, 'conf.js')) } + ws_connecting = true + ws_connected = false + ws_client = new BFX(c.bitfinex.key, c.bitfinex.secret, {version: 2, transform: true}).ws ws_client .on('open', wsOpen) + .on('close', wsClose) + .on('error', wsError) .on('subscribed', wsSubscribed) .on('message', wsMessage) .on('trade', wsUpdateTrades) @@ -174,29 +218,10 @@ module.exports = function container (get, set, clear) { .on('ou', wsUpdateOrder) .on('oc', wsUpdateOrderCancel) - ws_client.on('error', function (e) { - if (e.event == "auth" && e.status == "FAILED") { - ws_client_isAuthed = false - errorMessage = ('WebSockets Warning: Authentication ' + e.status + ' (Reason: "' + e.msg + '").').red + ' Retrying in ' + (ws_wait_on_apikey_error / 1000 + ' seconds').yellow + '.' - if (e.msg == 'apikey: invalid') errorMessage = errorMessage + "\nEither your API key is invalid or you tried reconnecting to quickly. Wait and/or check your API keys." - console.warn(errorMessage) - - setTimeout(function () { - ws_client.auth() - }, ws_wait_on_apikey_error) - } - else { - console.error(("\nWebSockets Error: An unhandled error occured.").red + "\nSince those are mostly 'connect' related: Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '. Otherwise, consider reporting.') - console.error(e) - ws_client.close() - setTimeout(function() { - ws_client.open() - }, ws_retry) - } - }) + setTimeout(function() { + wsConnect() + }, ws_retry) } - - return ws_client } function joinProduct (product_id) { @@ -289,7 +314,7 @@ module.exports = function container (get, set, clear) { if (!ws_client) { wsClient() } if (Object.keys(ws_balance).length === 0) { - if (so.debug && ws_client_isAuthed === true) { + if (so.debug && ws_connected === true) { console.warn(("WebSockets Warning: Waiting for initial websockets snapshot.").red + " Retrying in " + (ws_retry / 1000 + ' seconds').yellow + '.') } return retry('getBalance', opts, cb) From 88b84d7a13e0a1a734ed7b8b7a5fed3e3f593c55 Mon Sep 17 00:00:00 2001 From: crubb Date: Sun, 13 Aug 2017 18:02:25 +0200 Subject: [PATCH 25/26] Bitfinex WS: Missing ) --- extensions/exchanges/bitfinex/exchange.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index d2f95a6105..3e130508be 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -163,7 +163,7 @@ module.exports = function container (get, set, clear) { ws_client.close() } } - }, ws_timeout + }, ws_timeout) } } From 7822028bbef41b3b763c3b29a36f9491aae52a60 Mon Sep 17 00:00:00 2001 From: crubb Date: Mon, 14 Aug 2017 09:36:14 +0200 Subject: [PATCH 26/26] Bitfinex WS: Remove debug logging on "close" --- extensions/exchanges/bitfinex/exchange.js | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/exchanges/bitfinex/exchange.js b/extensions/exchanges/bitfinex/exchange.js index 3e130508be..1620d4edfe 100644 --- a/extensions/exchanges/bitfinex/exchange.js +++ b/extensions/exchanges/bitfinex/exchange.js @@ -172,7 +172,6 @@ module.exports = function container (get, set, clear) { ws_connected = false console.error(("\nWebSockets Error: Connection closed.").red + " Retrying every " + (ws_retry / 1000 + ' seconds').yellow + '.') - console.error(e) } function wsError (e) {