Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

previousFetchTime parameter for FETCH_SYNC_RECORDS #363

Merged
merged 3 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ jspm_packages
# Optional REPL history
.node_repl_history

bundles/*.js
# Generated bundles files
bundles/**/*.js

#Editors
.idea
2 changes: 1 addition & 1 deletion client/constants/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const messages = {
* with new records, do
* GET_EXISTING_OBJECTS -> RESOLVE_SYNC_RECORDS -> RESOLVED_SYNC_RECORDS
*/
FETCH_SYNC_RECORDS: _, /* @param Array.<string> categoryNames, @param {number} startAt (in seconds or milliseconds), @param {number=} maxRecords limit response to a given max number of records. set to 0 or falsey to not limit the response */
FETCH_SYNC_RECORDS: _, /* @param Array.<string> categoryNames, @param {number} startAt (in seconds or milliseconds), @param {number=} maxRecords limit response to a given max number of records. set to 0 or falsey to not limit the response, @param {number} previousFetchTime (in milliseconds) */
/**
* browser -> webview
* sent to fetch all sync devices. webview responds with RESOLVED_SYNC_RECORDS.
Expand Down
33 changes: 17 additions & 16 deletions client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ RequestUtil.prototype.parseAWSResponse = function (bytes) {
* }} opts
* @returns {Promise(Array.<Object>)}
*/
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, opts = {}) {
RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContinuationToken, previousFetchTime, opts = {}) {
const prefix = `${this.apiVersion}/${this.userId}/${category}`
let options = {
MaxKeys: maxRecords || 1000,
Expand All @@ -223,7 +223,7 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
}
if (startAt) { options.StartAfter = `${prefix}/${startAt}` }
return this.withRetry(() => {
if (this.shouldListObject(startAt, category) || opts.compaction) {
if (this.shouldListObject(previousFetchTime, category) || opts.compaction) {
const s3ObjectsPromise = s3Helper.listObjects(this.s3, options, !!maxRecords)
if (!opts.compaction) {
return s3ObjectsPromise
Expand All @@ -237,7 +237,7 @@ RequestUtil.prototype.list = function (category, startAt, maxRecords, nextContin
// wait for 15 seconds between batches
setTimeout(() => {
if (s3Objects.isTruncated) {
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, opts)
return this.list(category, startAt, maxRecords, s3Objects.nextContinuationToken, previousFetchTime, opts)
}
return new Promise((resolve, reject) => {
// compaction is done
Expand Down Expand Up @@ -305,16 +305,14 @@ const CATEGORIES_FOR_SQS = [proto.categories.BOOKMARKS, proto.categories.PREFERE

/**
* Checks do we need to use s3 list Object or SQS notifications
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds
* @param {number=} previousFetchTime - the previous fetch time. Could be seconds or milliseconds
* @param {string} category - the category ID
* @returns {boolean}
*/
RequestUtil.prototype.shouldListObject = function (startAt, category) {
RequestUtil.prototype.shouldListObject = function (previousFetchTime, category) {
let currentTime = new Date().getTime()
let startAtToCheck = this.normalizeTimestampToMs(startAt, currentTime)

return !startAtToCheck ||
(currentTime - startAtToCheck) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 ||
return !previousFetchTime ||
(currentTime - previousFetchTime) > parseInt(s3Helper.SQS_RETENTION, 10) * 1000 ||
!CATEGORIES_FOR_SQS.includes(category) ||
this.listInProgress === true
}
Expand All @@ -336,17 +334,20 @@ RequestUtil.prototype.shouldRetireOldSQSQueue = function (createdTimestamp) {

/**
* Checks do we need to use s3 list Object or SQS notifications
* @param {number=} startAt return objects with timestamp >= startAt (e.g. 1482435340). Could be seconds or milliseconds
* @param {number=} timeToNormalize could be seconds or milliseconds
* @param {number=} currentTime currentTime in milliseconds
* @returns {number=}
* @returns {number=} the time for sure in milliseconds
*/
RequestUtil.prototype.normalizeTimestampToMs = function (startAt, currentTime) {
let startAtToCheck = startAt
if (startAtToCheck && currentTime.toString().length - startAtToCheck.toString().length >= 3) {
startAtToCheck *= 1000
RequestUtil.prototype.normalizeTimestampToMs = function (timeToNormalize, currentTime) {
if (!timeToNormalize) {
return 0
}

if (timeToNormalize && currentTime.toString().length - timeToNormalize.toString().length >= 3) {
timeToNormalize *= 1000
}

return startAtToCheck
return timeToNormalize
}

/**
Expand Down
8 changes: 4 additions & 4 deletions client/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ const startSync = (requester) => {
return jsRecords
}

ipc.on(messages.FETCH_SYNC_RECORDS, (e, categoryNames, startAt, limitResponse) => {
logSync(`fetching ${categoryNames} records after ${startAt}`)
ipc.on(messages.FETCH_SYNC_RECORDS, (e, categoryNames, startAt, limitResponse, previousFetchTime) => {
logSync(`fetching ${categoryNames} records after ${startAt} previous fetch is ${previousFetchTime}`)
categoryNames.forEach((category) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
Expand All @@ -119,7 +119,7 @@ const startSync = (requester) => {
if (nextContinuationTokens[category]) {
continuationToken = nextContinuationTokens[category]
}
requester.list(proto.categories[category], startAt, limitResponse, continuationToken).then((s3Objects) => {
requester.list(proto.categories[category], startAt, limitResponse, continuationToken, previousFetchTime).then((s3Objects) => {
const jsRecords = getJSRecords(s3Objects.contents)
logSync(`got ${jsRecords.length} decrypted records in ${category} after ${startAt}`)
let lastRecordTimestamp
Expand Down Expand Up @@ -220,7 +220,7 @@ const startSync = (requester) => {
ipc.send(messages.GET_EXISTING_OBJECTS, category, jsRecords, 0, false)
}
if (!isCompactionInProgress) {
requester.list(proto.categories[category], 0, 1000, '',
requester.list(proto.categories[category], 0, 1000, '', 0,
{compaction: true, compactionDoneCb: compactionDone, compactionUpdateCb: compactionUpdate}).then(() => {
logSync(`Compacting category: ${category}`)
isCompactionInProgress = true
Expand Down
4 changes: 2 additions & 2 deletions test/client/deviceIdV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ test('deviceId V2 migration', (t) => {
const testCanListFromOldQueue = (t) => {
t.test('can list notification from old SQS queue', (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 0, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 1)
Expand Down Expand Up @@ -127,7 +127,7 @@ test('deviceId V2 migration', (t) => {
const testCanListFromBothQueues = (t) => {
t.test('can list notifications from new and old SQS queues', (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 0, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 1)
Expand Down
26 changes: 22 additions & 4 deletions test/client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,25 @@ test('client RequestUtil', (t) => {
})
const serializer = requestUtil.serializer

t.plan(2)
t.plan(3)

t.test('#should list object', (t) => {
t.plan(3)

t.equals(true,
requestUtil.shouldListObject(0, proto.categories.BOOKMARKS),
`${t.name}: previous fetch time is empty - use S3`)

t.equals(false,
requestUtil.shouldListObject(new Date().getTime()-1000*60, proto.categories.BOOKMARKS),
`${t.name}: previous fetch is not older than 24h - use SQS`)

const delta25hours = 1000*60*60*25
t.equals(true,
requestUtil.shouldListObject(new Date().getTime()-delta25hours, proto.categories.BOOKMARKS),
`${t.name}: previous fetch is older than 24h - use S3`)
})

t.test('#put preference: device', (t) => {
t.plan(2)
const deviceId = new Uint8Array([0])
Expand Down Expand Up @@ -288,7 +306,7 @@ test('client RequestUtil', (t) => {
const testCanListNotifications = (t) => {
t.test(`${t.name} can list notification`, (t) => {
let currentTime = new Date().getTime()
requestUtil.list(proto.categories.BOOKMARKS, currentTime)
requestUtil.list(proto.categories.BOOKMARKS, currentTime, 1000, '', currentTime)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
const s3Record = response[1] ? response[1].record : response[1]
Expand Down Expand Up @@ -461,7 +479,7 @@ test('client RequestUtil', (t) => {
}
// limit batch size to 10 to test cross batch compaction for around 40
// objects
requestUtil.list(proto.categories.BOOKMARKS, 0, 10, '', {compaction: true,
requestUtil.list(proto.categories.BOOKMARKS, 0, 10, '', 0, {compaction: true,
compactionUpdateCb: compactionUpdate,
compactionDoneCb: () => {
console.log = consoleLogBak
Expand All @@ -474,7 +492,7 @@ test('client RequestUtil', (t) => {
compactedRecord.filter(record => record.objectId.toString() === record2_update.objectId.toString()).length, 5)
// we already have 15 second timeout for each batch so no need to
// do another wait after compaction is done
requestUtil.list(proto.categories.BOOKMARKS, 0, 0)
requestUtil.list(proto.categories.BOOKMARKS, 0, 0, '', 0)
.then(s3Objects => requestUtil.s3ObjectsToRecords(s3Objects.contents))
.then((response) => {
t.equals(response.length, 2, `${t.name} check records number`)
Expand Down