Skip to content

Commit

Permalink
Merge pull request #6056 from nightscout/cache_invalidation
Browse files Browse the repository at this point in the history
Refactor in-memory caching
  • Loading branch information
sulkaharo authored Sep 22, 2020
2 parents a0043ec + 54008a7 commit cde9aa3
Show file tree
Hide file tree
Showing 16 changed files with 540 additions and 200 deletions.
2 changes: 1 addition & 1 deletion lib/api/devicestatus/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function configure (app, wares, ctx, env) {
q.count = 10;
}

const inMemoryData = ctx.ddata.shadow.devicestatus ? ctx.ddata.shadow.devicestatus : [];
const inMemoryData = ctx.cache.devicestatus ? ctx.cache.devicestatus : [];
const canServeFromMemory = inMemoryData.length >= q.count && Object.keys(q).length == 1 ? true : false;

if (canServeFromMemory) {
Expand Down
19 changes: 11 additions & 8 deletions lib/api/entries/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ function configure (app, wares, ctx, env) {
function prepReqModel (req, model) {
var type = model || 'sgv';
if (!req.query.find) {

req.query.find = {
type: type
};
Expand Down Expand Up @@ -447,6 +448,7 @@ function configure (app, wares, ctx, env) {
Object.keys(query.find).forEach(function(key) {
if (key == 'type') {
typeQuery = query.find[key]["$eq"];
if (!typeQuery) typeQuery = query.find.type;
} else {
inMemoryPossible = false;
}
Expand All @@ -456,14 +458,16 @@ function configure (app, wares, ctx, env) {
let inMemoryCollection;

if (typeQuery) {
if (typeQuery == 'sgv') inMemoryCollection = ctx.ddata.shadow.sgvs;
if (typeQuery == 'mbg') inMemoryCollection = ctx.ddata.shadow.mbgs;
if (typeQuery == 'cal') inMemoryCollection = ctx.ddata.shadow.cals;
} else {
const merged = _.unionWith(ctx.ddata.shadow.sgvs, ctx.ddata.shadow.mbgs, ctx.ddata.shadow.cals, function(a, b) {
return a._id == b._id;
inMemoryCollection= _.filter(ctx.cache.entries, function checkType (object) {
if (typeQuery == 'sgv') return 'sgv' in object;
if (typeQuery == 'mbg') return 'mbg' in object;
if (typeQuery == 'cal') return object.type === 'cal';
return false;
});
inMemoryCollection = _.sortBy(merged, function(item) {
} else {
inMemoryCollection = ctx.cache.getData('entries');

inMemoryCollection = _.sortBy(inMemoryCollection, function(item) {
return item.mills;
}).reverse();
}
Expand All @@ -475,7 +479,6 @@ function configure (app, wares, ctx, env) {
}

// If we get this far, query the database

// bias to entries, but allow expressing a preference
var storage = req.storage || ctx.entries;
// perform the query
Expand Down
35 changes: 27 additions & 8 deletions lib/api/treatments/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function configure (app, wares, ctx, env) {
query.count = query.find ? 1000 : 100;
}

const inMemoryData = ctx.ddata.shadow.treatments;
const inMemoryData = ctx.cache.treatments;
const canServeFromMemory = inMemoryData && inMemoryData.length >= query.count && Object.keys(query).length == 1 ? true : false;

if (canServeFromMemory) {
Expand All @@ -112,12 +112,35 @@ function configure (app, wares, ctx, env) {
treatments = [treatments];
}

for (let i = 0; i < treatments.length; i++) {
const t = treatments[i];

if (!t.created_at) {
t.created_at = new Date().toISOString();
}

/*
if (!t.created_at) {
console.log('Trying to create treatment without created_at field', t);
res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments must contain created_at');
return;
}
const d = moment(t.created_at);
if (!d.isValid()) {
console.log('Trying to insert date with invalid created_at', t);
res.sendJSONStatus(res, constants.HTTP_VALIDATION_ERROR, 'Treatments created_at must be an ISO-8601 date');
return;
}
*/

}

ctx.treatments.create(treatments, function(err, created) {
if (err) {
console.log('Error adding treatment', err);
res.sendJSONStatus(res, constants.HTTP_INTERNAL_ERROR, 'Mongo Error', err);
} else {
console.log('Treatment created');
console.log('REST API treatment created', created);
res.json(created);
}
});
Expand All @@ -138,19 +161,16 @@ function configure (app, wares, ctx, env) {
query.count = 10
}

console.log('Delete records with query: ', query);

// remove using the query
ctx.treatments.remove(query, function(err, stat) {
if (err) {
console.log('treatments delete error: ', err);
return next(err);
}

// yield some information about success of operation
res.json(stat);

console.log('treatments records deleted');

return next();
});
}
Expand Down Expand Up @@ -180,8 +200,7 @@ function configure (app, wares, ctx, env) {
ctx.treatments.save(data, function(err, created) {
if (err) {
res.sendJSONStatus(res, constants.HTTP_INTERNAL_ERROR, 'Mongo Error', err);
console.log('Error saving treatment');
console.log(err);
console.log('Error saving treatment', err);
} else {
res.json(created);
console.log('Treatment saved', data);
Expand Down
2 changes: 2 additions & 0 deletions lib/client/careportal.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ function init (client, $) {
data.eventTime = mergeDateAndTime().toDate();
}

data.created_at = data.eventTime ? data.eventTime.toISOString() : new Date().toISOString();

if (!inputMatrix[data.eventType].profile) {
delete data.profile;
}
Expand Down
1 change: 1 addition & 0 deletions lib/constants.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"HTTP_UNAUTHORIZED" : 401,
"HTTP_VALIDATION_ERROR" : 422,
"HTTP_INTERNAL_ERROR" : 500,
"HTTP_BAD_REQUEST": 400,
"ENTRIES_DEFAULT_COUNT" : 10,
"PROFILES_DEFAULT_COUNT" : 10,
"MMOL_TO_MGDL": 18,
Expand Down
83 changes: 38 additions & 45 deletions lib/data/dataloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ function init(env, ctx) {
}

// clear treatments to the base set, we're going to merge from multiple queries
ddata.treatments = ddata.shadow.treatments ? _.cloneDeep(ddata.shadow.treatments) : [];
ddata.treatments = []; // ctx.cache.treatments ? _.cloneDeep(ctx.cache.treatments) : [];

ddata.dbstats = {};

Expand All @@ -170,11 +170,11 @@ function init(env, ctx) {
function loadEntries(ddata, ctx, callback) {

const withFrame = ddata.page && ddata.page.frame;
const loadPeriod = ddata.sgvs && ddata.sgvs.length > 0 && !withFrame ? constants.ONE_HOUR : constants.TWO_DAYS;
const latestEntry = ddata.sgvs && ddata.sgvs.length > 0 ? findLatestMills(ddata.sgvs)ddata.lastUpdated;
const longLoad = Math.round(constants.TWO_DAYS);
const loadTime = ctx.cache.isEmpty('entries') || withFrame ? longLoad : constants.FIFTEEN_MINUTES;

var dateRange = {
$gte: Math.min(ddata.lastUpdated - loadPeriod, latestEntry)
$gte: ddata.lastUpdated - loadTime
};
if (withFrame) {
dateRange['$lte'] = ddata.lastUpdated;
Expand All @@ -195,14 +195,18 @@ function loadEntries(ddata, ctx, callback) {
}

if (!err && results) {

const ageFilter = ddata.lastUpdated - constants.TWO_DAYS;
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('entries', r, ageFilter);

const currentData = ctx.cache.getData('entries').reverse();

const mbgs = [];
const sgvs = [];
const cals = [];
const shadowMbgs = [];
const shadowSgvs = [];
const shadowCals = [];

results.forEach(function(element) {
currentData.forEach(function(element) {
if (element) {
if (!element.mills) element.mills = element.date;
if (element.mbg) {
Expand All @@ -213,7 +217,6 @@ function loadEntries(ddata, ctx, callback) {
device: element.device,
type: 'mbg'
});
shadowMbgs.push(element);
} else if (element.sgv) {
sgvs.push({
_id: element._id,
Expand All @@ -227,7 +230,6 @@ function loadEntries(ddata, ctx, callback) {
rssi: element.rssi,
type: 'sgv'
});
shadowSgvs.push(element);
} else if (element.type === 'cal') {
cals.push({
_id: element._id,
Expand All @@ -237,20 +239,14 @@ function loadEntries(ddata, ctx, callback) {
slope: element.slope,
type: 'cal'
});
shadowCals.push(element);
}
}
});

const ageLimit = ddata.lastUpdated - constants.TWO_DAYS;

//stop using uniq for SGVs since we use buckets, also enables more detailed monitoring
ddata.sgvs = mergeProcessSort(ddata.sgvs, sgvs, ageLimit);
ddata.mbgs = mergeProcessSort(ddata.mbgs, uniqBasedOnMills(mbgs), ageLimit);
ddata.cals = mergeProcessSort(ddata.cals, uniqBasedOnMills(cals), ageLimit);
ddata.shadow.sgvs = mergeProcessSort(ddata.shadow.sgvs, shadowSgvs, ageLimit).reverse();
ddata.shadow.mbgs = mergeProcessSort(ddata.shadow.mbgs, uniqBasedOnMills(shadowMbgs), ageLimit).reverse();
ddata.shadow.cals = mergeProcessSort(ddata.shadow.cals, uniqBasedOnMills(shadowCals), ageLimit).reverse();
ddata.sgvs = sgvs;
ddata.mbgs = mbgs;
ddata.cals = cals;
}
callback();
});
Expand Down Expand Up @@ -307,14 +303,12 @@ function loadTreatments(ddata, ctx, callback) {
const longLoad = Math.round(constants.ONE_DAY * 2.5); //ONE_DAY * 2.5;

// Load 2.5 days to cover last 48 hours including overlapping temp boluses or temp targets for first load
// Subsequently load at least 15 minutes of data, but if latest entry is older than 15 minutes, load until that entry
// Subsequently load at least 15 minutes of data

const loadTime = ctx.cache.isEmpty('treatments') || withFrame ? longLoad : constants.FIFTEEN_MINUTES;

const loadPeriod = ddata.treatments && ddata.treatments.length > 0 && !withFrame ? constants.SIX_HOURS : longLoad;
const latestEntry = ddata.treatments && ddata.treatments.length > 0 ? findLatestMills(ddata.treatments)ddata.lastUpdated;
const loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry);

var dateRange = {
$gte: new Date(loadTime).toISOString()
$gte: new Date(ddata.lastUpdated - loadTime).toISOString()
};
if (withFrame) {
dateRange['$lte'] = new Date(ddata.lastUpdated).toISOString();
Expand All @@ -331,9 +325,11 @@ function loadTreatments(ddata, ctx, callback) {
ctx.treatments.list(tq, function(err, results) {
if (!err && results) {
const ageFilter = ddata.lastUpdated - longLoad;
ddata.treatments = mergeProcessSort(ddata.treatments, results, ageFilter);
ddata.shadow.treatments = mergeProcessSort(ddata.shadow.treatments, results, ageFilter).reverse(); //.reverse();
//mergeToTreatments(ddata, results);
const r = ctx.ddata.processRawDataForRuntime(results);

// update cache
ctx.cache.insertData('treatments', r, ageFilter);
ddata.treatments = ctx.ddata.idMergePreferNew(ddata.treatments, ctx.cache.getData('treatments'));
}

callback();
Expand Down Expand Up @@ -454,21 +450,12 @@ function loadFood(ddata, ctx, callback) {

function loadDeviceStatus(ddata, env, ctx, callback) {

let loadPeriod = constants.ONE_DAY;
if(env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2) loadPeriod = constants.TWO_DAYS;

const withFrame = ddata.page && ddata.page.frame ? true : false;

if (!withFrame && ddata.devicestatus && ddata.devicestatus.length > 0) {
loadPeriod = constants.FIFTEEN_MINUTES;
}

let latestEntry = ddata.devicestatus && ddata.devicestatus.length > 0 ? findLatestMills(ddata.devicestatus)ddata.lastUpdated;
if (!latestEntry) latestEntry = ddata.lastUpdated; // TODO find out why report test fails withtout this
const loadTime = Math.min(ddata.lastUpdated - loadPeriod, latestEntry);
const withFrame = ddata.page && ddata.page.frame;
const longLoad = env.extendedSettings.devicestatus && env.extendedSettings.devicestatus.days && env.extendedSettings.devicestatus.days == 2 ? constants.TWO_DAYS : constants.ONE_DAY;
const loadTime = ctx.cache.isEmpty('devicestatus') || withFrame ? longLoad : constants.FIFTEEN_MINUTES;

var dateRange = {
$gte: new Date( loadTime ).toISOString()
$gte: new Date( ddata.lastUpdated - loadTime ).toISOString()
};

if (withFrame) {
Expand All @@ -486,10 +473,15 @@ function loadDeviceStatus(ddata, env, ctx, callback) {

ctx.devicestatus.list(opts, function(err, results) {
if (!err && results) {
const ageFilter = ddata.lastUpdated - constants.TWO_DAYS;
ddata.shadow.devicestatus = mergeProcessSort(ddata.shadow.devicestatus, results, ageFilter);
// ctx.cache.devicestatus = mergeProcessSort(ctx.cache.devicestatus, results, ageFilter);

const r = _.map(results, function eachStatus(result) {
const ageFilter = ddata.lastUpdated - longLoad;
const r = ctx.ddata.processRawDataForRuntime(results);
ctx.cache.insertData('devicestatus', r, ageFilter);

const res = ctx.cache.getData('devicestatus');

const res2 = _.map(res, function eachStatus(result) {
//result.mills = new Date(result.created_at).getTime();
if ('uploaderBattery' in result) {
result.uploader = {
Expand All @@ -499,7 +491,8 @@ function loadDeviceStatus(ddata, env, ctx, callback) {
}
return result;
});
ddata.devicestatus = mergeProcessSort(ddata.devicestatus, r, ageFilter);

ddata.devicestatus = mergeProcessSort(ddata.devicestatus, res2, ageFilter);
} else {
ddata.devicestatus = [];
}
Expand Down
Loading

0 comments on commit cde9aa3

Please sign in to comment.