Skip to content
This repository has been archived by the owner on Jan 20, 2020. It is now read-only.

Commit

Permalink
Fix heartbeat channel subscriptions (#150)
Browse files Browse the repository at this point in the history
* Fix heartbeat channel subscriptions

- Automatically adds the `heartbeat` channel if not already subscribed
- Removes redundant `keepalive` ping on the socket

Fixes #113

* Remove heartbeat from OrderbookSync signature
  • Loading branch information
rmm5t authored and fb55 committed Dec 26, 2017
1 parent 9802f7f commit 94c671c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 122 deletions.
3 changes: 1 addition & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ declare module 'gdax' {
}

interface WebsocketClientOptions {
heartbeat?: boolean;
channels?: string[];
}

Expand All @@ -252,7 +251,7 @@ declare module 'gdax' {
productIds: string[],
websocketURI?: string,
auth?: {key:string, secret:string, passphrase:string},
{ heartbeat, channels }?: WebsocketClientOptions );
{ channels }?: WebsocketClientOptions );

on(event: 'message', eventHandler: (data:object) => void);
on(event: 'error', eventHandler: (err) => void);
Expand Down
33 changes: 6 additions & 27 deletions lib/clients/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ class WebsocketClient extends EventEmitter {
productIDs,
websocketURI = 'wss://ws-feed.gdax.com',
auth = null,
{ heartbeat = false, channels = null } = {}
{ channels = null } = {}
) {
super();
this.productIDs = Utils.determineProductIDs(productIDs);
this.websocketURI = websocketURI;
this.channels = channels;
this.auth = Utils.checkAuth(auth);
this.heartbeat = heartbeat;
this.channels = channels || ['full'];
if (!this.channels.includes('heartbeat')) {
this.channels.push('heartbeat');

This comment has been minimized.

Copy link
@tomasAlabes

tomasAlabes Dec 28, 2017

Hi @rmm5t, why forcing the heartbeat channel?

This comment has been minimized.

Copy link
@rmm5t

rmm5t Dec 28, 2017

Author Contributor

Because it helps keeps the connection open. You need something to keep the connection open for less-active channels. Otherwise, the connection can die due to inactivity. The original implementation defaulted to sending keepalive pings to the server using a timer. If the heartbeat is on, this is no longer necessary, and there are other (eventual) benefits to forcing the heartbeat channel too.

  • It's easier and quicker to recognize when the connection is interrupted.
  • It's easier to recognize when a sequence is out of sync.
  • It's also the recommended approach when reading the GDAX API docs.

A longer-term goal is to make the WebsocketClient smarter by automatically reconnecting if it recognizes a disconnect. The OrderbookSync can also potentially gain some smarts if it recognizes a missing sequence number via the heartbeat (right now, OrderbookSync is duplicating this effort by inspecting sequences).
See #148 (comment)

This comment has been minimized.

Copy link
@tomasAlabes

tomasAlabes Dec 28, 2017

Ok, thank you for the quick and complete clarification. Looking forward to the improvements listed in the mentioned comment!

}
this.connect();
}

connect() {
if (this.socket) {
clearInterval(this.pinger);
this.socket.close();
}

Expand All @@ -40,8 +41,6 @@ class WebsocketClient extends EventEmitter {
}

disconnect() {
clearInterval(this.pinger);

if (!this.socket) {
throw new Error('Could not disconnect (not connected)');
}
Expand All @@ -55,12 +54,9 @@ class WebsocketClient extends EventEmitter {
const subscribeMessage = {
type: 'subscribe',
product_ids: this.productIDs,
channels: this.channels,
};

if (this.channels) {
subscribeMessage.channels = this.channels;
}

// Add Signature
if (this.auth.secret) {
let sig = signRequest(
Expand All @@ -72,26 +68,9 @@ class WebsocketClient extends EventEmitter {
}

this.socket.send(JSON.stringify(subscribeMessage));

if (this.heartbeat) {
// send heartbeat
const heartbeatMessage = {
type: 'heartbeat',
on: true,
};
this.socket.send(JSON.stringify(heartbeatMessage));
} else {
// Set a 30 second ping to keep connection alive
this.pinger = setInterval(() => {
if (this.socket) {
this.socket.ping('keepalive');
}
}, 30000);
}
}

onClose() {
clearInterval(this.pinger);
this.socket = null;
this.emit('close');
}
Expand Down
5 changes: 2 additions & 3 deletions lib/orderbook_sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ class OrderbookSync extends WebsocketClient {
productIDs,
apiURI = 'https://api.gdax.com',
websocketURI = 'wss://ws-feed.gdax.com',
auth = null,
{ heartbeat = false } = {}
auth = null
) {
super(productIDs, websocketURI, auth, { heartbeat });
super(productIDs, websocketURI, auth);
this.apiURI = apiURI;
this.auth = Utils.checkAuth(auth);

Expand Down
104 changes: 14 additions & 90 deletions tests/websocket.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ suite('WebsocketClient', () => {
});
});

test('subscribes to the default product (BTC-USD) if undefined', done => {
test('subscribes to the default product (BTC-USD) and default channel (full) if undefined', done => {
const server = testserver(++port, () => {
new Gdax.WebsocketClient(null, 'ws://localhost:' + port);
});
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
assert.deepEqual(msg, {
type: 'subscribe',
product_ids: ['BTC-USD'],
});
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.product_ids, ['BTC-USD']);
assert.deepEqual(msg.channels, ['full', 'heartbeat']);

server.close();
done();
Expand All @@ -43,10 +42,8 @@ suite('WebsocketClient', () => {
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
assert.deepEqual(msg, {
type: 'subscribe',
product_ids: ['BTC-USD'],
});
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.product_ids, ['BTC-USD']);

server.close();
done();
Expand All @@ -61,10 +58,8 @@ suite('WebsocketClient', () => {
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
assert.deepEqual(msg, {
type: 'subscribe',
product_ids: ['BTC-USD'],
});
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.product_ids, ['BTC-USD']);

server.close();
done();
Expand All @@ -79,10 +74,8 @@ suite('WebsocketClient', () => {
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
assert.deepEqual(msg, {
type: 'subscribe',
product_ids: ['BTC-EUR'],
});
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.product_ids, ['BTC-EUR']);

server.close();
done();
Expand All @@ -97,10 +90,8 @@ suite('WebsocketClient', () => {
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
assert.deepEqual(msg, {
type: 'subscribe',
product_ids: ['ETH-USD'],
});
assert.equal(msg.type, 'subscribe');
assert.deepEqual(msg.product_ids, ['ETH-USD']);

server.close();
done();
Expand Down Expand Up @@ -131,7 +122,7 @@ suite('WebsocketClient', () => {
});
});

test('passes channels through', done => {
test('passes channels through with heartbeat added', done => {
const server = testserver(++port, () => {
new Gdax.WebsocketClient(
'ETH-USD',
Expand All @@ -150,7 +141,7 @@ suite('WebsocketClient', () => {
assert.equal(msg.type, 'subscribe');
assert.equal(msg.key, 'suchkey');
assert.equal(msg.passphrase, 'muchpassphrase');
assert.deepEqual(msg.channels, ['user', 'ticker']);
assert.deepEqual(msg.channels, ['user', 'ticker', 'heartbeat']);
assert(msg.timestamp);
assert(msg.signature);
server.close();
Expand All @@ -159,70 +150,3 @@ suite('WebsocketClient', () => {
});
});
});

test('passes heartbeat details through', done => {
let calls = 0;
const server = testserver(++port, () => {
new Gdax.WebsocketClient(
'ETH-USD',
'ws://localhost:' + port,
{
key: 'suchkey',
secret: 'suchsecret',
passphrase: 'muchpassphrase',
},
{ heartbeat: true }
);
});
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
calls++;

if (msg.type === 'subscribe') {
assert.equal(msg.key, 'suchkey');
assert.equal(msg.passphrase, 'muchpassphrase');
assert(msg.timestamp);
assert(msg.signature);
} else {
assert.equal(msg.type, 'heartbeat');
assert.equal(msg.on, true);
}

if (calls > 1) {
server.close();
done();
}
});
});
});

test('passes heartbeat details through without authentication details', done => {
let calls = 0;
const server = testserver(++port, () => {
new Gdax.WebsocketClient(
['BTC-USD', 'ETH-USD'],
'ws://localhost:' + port,
null,
{ heartbeat: true }
);
});
server.on('connection', socket => {
socket.on('message', data => {
const msg = JSON.parse(data);
calls++;

if (msg.type === 'subscribe') {
assert.deepEqual(msg.product_ids, ['BTC-USD', 'ETH-USD']);
} else {
assert.equal(msg.type, 'heartbeat');
assert.equal(msg.on, true);
}

if (calls > 1) {
server.close();
done();
}
});
});
});

0 comments on commit 94c671c

Please sign in to comment.