diff --git a/index.js b/index.js index 0fe4323..c3434ca 100755 --- a/index.js +++ b/index.js @@ -33,6 +33,7 @@ const ONE_WEEK_IN_SECONDS = 604800; const PRESENCE_UPDATE_INTERVAL = ONE_SECOND; const HEALTH_UPDATE_INTERVAL = 5000; const KEY_EXPIRATION_TTL = parseInt(PRESENCE_UPDATE_INTERVAL / ONE_SECOND) * 3; +const KEYS_PER_SCAN = '100'; const UMF_INVALID_MESSAGE = 'UMF message requires "to", "from" and "body" fields'; /** @@ -348,7 +349,7 @@ class Hydra extends EventEmitter { const promises = []; if (!this.testMode) { this._logMessage('error', 'Service is shutting down.'); - this.redisdb.multi() + this.redisdb.batch() .expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL) .expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health:log`, ONE_WEEK_IN_SECONDS) .exec(); @@ -417,13 +418,36 @@ class Hydra extends EventEmitter { */ _getKeys(pattern) { return new Promise((resolve, _reject) => { - this.redisdb.keys(pattern, (err, result) => { - if (err) { - resolve([]); - } else { - resolve(result); - } - }); + if (this.testMode) { + this.redisdb.keys(pattern, (err, result) => { + if (err) { + resolve([]); + } else { + resolve(result); + } + }); + } else { + let doScan = (cursor, pattern, retSet) => { + this.redisdb.scan(cursor, 'MATCH', pattern, 'COUNT', KEYS_PER_SCAN, (err, result) => { + if (!err) { + cursor = result[0]; + let keys = result[1]; + keys.forEach((key, _i) => { + retSet.add(key); + }); + if (cursor === '0') { + resolve(Array.from(retSet)); + } else { + doScan(cursor, pattern, retSet); + } + } else { + resolve([]); + } + }); + }; + let results = new Set(); + doScan('0', pattern, results); + } }); } @@ -716,11 +740,10 @@ class Hydra extends EventEmitter { hostName: this.hostName }); if (entry && !this.redisdb.closing) { - this.redisdb.setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:presence`, KEY_EXPIRATION_TTL, this.instanceID); - this.redisdb.hset(`${redisPreKey}:nodes`, this.instanceID, entry); - this.redisdb.multi() - .expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL) - .expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health:log`, ONE_WEEK_IN_SECONDS) + let cmd = (this.testMode) ? 'multi' : 'batch'; + this.redisdb[cmd]() + .setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:presence`, KEY_EXPIRATION_TTL, this.instanceID) + .hset(`${redisPreKey}:nodes`, this.instanceID, entry) .exec(); } } @@ -735,7 +758,11 @@ class Hydra extends EventEmitter { let entry = Object.assign({ updatedOn: this._getTimeStamp() }, this._getHealth()); - this.redisdb.setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL, Utils.safeJSONStringify(entry)); + let cmd = (this.testMode) ? 'multi' : 'batch'; + this.redisdb[cmd]() + .setex(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health`, KEY_EXPIRATION_TTL, Utils.safeJSONStringify(entry)) + .expire(`${redisPreKey}:${this.serviceName}:${this.instanceID}:health:log`, ONE_WEEK_IN_SECONDS) + .exec(); } /** @@ -955,6 +982,7 @@ class Hydra extends EventEmitter { instanceList.push(instanceObj); } }); + Utils.shuffeArray(instanceList); this.internalCache.put(cacheKey, instanceList, KEY_EXPIRATION_TTL); resolve(instanceList); } @@ -982,9 +1010,6 @@ class Hydra extends EventEmitter { this._logMessage('error', msg); reject(new Error(msg)); } else { - if (result.length > 1) { - Utils.shuffeArray(result); - } resolve(result); } }) diff --git a/package-lock.json b/package-lock.json index df216d4..59726dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "hydra", - "version": "1.5.0", + "version": "1.5.5", "lockfileVersion": 1, "dependencies": { "acorn": { diff --git a/package.json b/package.json index b21710d..ec056b3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hydra", - "version": "1.5.0", + "version": "1.5.5", "license": "MIT", "author": "Carlos Justiniano", "contributors": "https://github.com/flywheelsports/hydra/graphs/contributors",