-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/adapt for restart #2
base: master
Are you sure you want to change the base?
Changes from all commits
44d306c
40ef714
8ffbec9
f25028d
c4394da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
node_modules | ||
docker-compose.override.yml | ||
docker-compose.override.yml | ||
.DS_Store |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,5 @@ services: | |
- "4000:4000" | ||
volumes: | ||
- ./src:/usr/src/app/src | ||
environment: | ||
TRYNAPI_S3_BUCKET: "opentransit-pdx" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,20 +7,31 @@ const s3 = new AWS.S3(); | |
const s3Bucket = process.env.TRYNAPI_S3_BUCKET || "orion-vehicles"; | ||
console.log(`Reading state from s3://${s3Bucket}`); | ||
|
||
function convertTZ(date, tzString) { | ||
return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString})); | ||
} | ||
|
||
/* | ||
* Gets bucket prefix at the minute-level | ||
* Gets bucket prefix at the hour-level | ||
* Note to Jesse - I changed the bucket structure | ||
* when we switch to a new bucket (code for PDX owned) | ||
* we could switch it back. For now, it's not great | ||
* but I think it's not the source of the problem. | ||
* see getVehiclePaths - I think the function is | ||
* not as fast as it could be but it works | ||
* @param agencyId - String | ||
* @param currentTime - Number | ||
* @return prefix - String | ||
*/ | ||
function getBucketMinutePrefix(agencyId, currentTime) { | ||
function getBucketHourPrefix(agencyId, currentTime) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The previous S3 path format with the minute was used because the S3 API allows searching for keys by prefix, and including the minute in the prefix of the path makes it possible to fetch data with a granularity of one minute. opentransit-metrics fetches data from opentransit-state-api in chunks that are not necessarily multiples of 1 hour. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After reviewing the existing code in getVehiclePaths, I see that it makes separate s3.listObject requests to get the list of keys for every minute within the time range, which is a lot more API requests than necessary and might reduce performance. Now I think it probably would be faster to search for a keys with a granularity of 1 hour and then filter within those keys to find the requested minutes. |
||
const currentDateTime = new Date(Number(currentTime * 1000)); | ||
const year = currentDateTime.getUTCFullYear(); | ||
const month = currentDateTime.getUTCMonth()+1; | ||
const day = currentDateTime.getUTCDate(); | ||
const hour = currentDateTime.getUTCHours(); | ||
const minute = currentDateTime.getUTCMinutes(); | ||
return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`; | ||
const pacificDateTime = convertTZ(currentDateTime, 'America/Los_Angeles'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using America/Los_Angeles as the time zone could cause issues related to daylight savings since when switching from PDT to PST in the fall the there would be two hours with the same key prefix. Using America/Los_Angeles would also be confusing when deploying opentransit-state-api for transit agencies that are not in Pacific time. It is also possible that a single instance of opentransit-state-api could support transit agencies in multiple time zones. It would probably be simpler to keep the S3 keys in UTC. |
||
const year = pacificDateTime.getFullYear(); | ||
const month = String(pacificDateTime.getMonth()+1).padStart(2, '0'); | ||
const day = String(pacificDateTime.getUTCDate()).padStart(2, '0'); | ||
const hour = String(pacificDateTime.getUTCHours()).padStart(2, '0'); | ||
// console.log('looking at bucket year %i, month %i, day %i, hour %i', year, month, day, hour); | ||
return `${agencyId}/${year}/${month}/${day}/${hour}/`; | ||
} | ||
|
||
function getS3Paths(prefix) { | ||
|
@@ -49,11 +60,13 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { | |
} | ||
// Idea: there are 1440 minutes in a day, and the API return at most 1-2 days, | ||
// so we can iterate every minute (as we have to get each file individually anyways) | ||
let minutePrefixes = []; | ||
let hourPrefixes = []; | ||
for (let time = startEpoch; time < endEpoch; time += 60) { | ||
minutePrefixes.push(getBucketMinutePrefix(agencyId, time)); | ||
hourPrefixes.push(getBucketHourPrefix(agencyId, time)); | ||
} | ||
let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix)))); | ||
let uniquehourPrefixes = [...new Set(hourPrefixes)]; | ||
// console.log(uniquehourPrefixes) | ||
let files = _.flatten(await Promise.all(uniquehourPrefixes.map(prefix => getS3Paths(prefix)))); | ||
|
||
let timestampsMap = {}; | ||
let res = []; | ||
|
@@ -65,13 +78,27 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) { | |
res.push(key); | ||
} | ||
}); | ||
// console.log(res) | ||
return res; | ||
} | ||
|
||
|
||
// unzip the gzip data | ||
function decompressData(data) { | ||
return new Promise((resolve, _) => { | ||
return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString()))); | ||
return new Promise((resolve, reject) => { | ||
return zlib.unzip(data, (err, decoded) => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
var parsedData; | ||
try { | ||
parsedData = JSON.parse(decoded.toString()); | ||
} catch (e) { | ||
reject(e); | ||
} | ||
resolve(parsedData); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
|
@@ -88,36 +115,21 @@ async function getVehicles(agencyId, startEpoch, endEpoch) { | |
Key: key, | ||
}, (err, data) => { | ||
if (err) { | ||
reject(err); | ||
reject(err); | ||
} else { | ||
const timestamp = getTimestamp(key); | ||
decompressData(data.Body) | ||
.then(decodedData => | ||
resolve(insertTimestamp(timestamp, decodedData))); | ||
resolve(decompressData(data.Body)); | ||
} | ||
}); | ||
}).catch((err) => { | ||
return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`); | ||
}); | ||
}))); | ||
} | ||
|
||
function getTimestamp(key) { | ||
const keyParts = key.split('-'); | ||
return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000); | ||
} | ||
|
||
/* | ||
* The API defines timestamp (epoch time in seconds) as a field for each vehicle, | ||
* which was also a column in Cassandra. | ||
* Since the timestamp is in the key in S3, that field does not exist, | ||
* thus we have to add it in the S3Helper to maintain compatibility | ||
*/ | ||
function insertTimestamp(timestamp, vehicles) { | ||
return vehicles.map(vehicle => { | ||
return { | ||
...vehicle, | ||
timestamp: timestamp, | ||
}; | ||
}); | ||
const raw_timestamp = Number(keyParts[keyParts.length - 1].split('.json')[0]) | ||
return raw_timestamp; | ||
} | ||
|
||
module.exports = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, can we either change agencyId and routeId to agencyID and routeID, or change vehicleID and tripID to vehicleId and tripId?