Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/adapt for restart #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
node_modules
docker-compose.override.yml
docker-compose.override.yml
.DS_Store
32 changes: 18 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,39 @@ State of a particular vehicle at a particular time. Some fields may be null if n

| Field Name | Type | Description |
| --- | --- | --- |
| `vid` | `String` | ID of this vehicle. |
| `did` | `String` | ID of the direction the vehicle reported it was going. |
| `lat` | `Float` | Reported latitude of vehicle. |
| `lon` | `Float` | Reported longitude of vehicle. |
| `heading` | `Float` | Reported heading of vehicle in degrees. |
| `vehicleID` | `String` | ID of this vehicle. |
| `direction` | `String` | ID of the direction the vehicle reported it was going. |
| `latitude` | `Float` | Reported latitude of vehicle. |
| `longitude` | `Float` | Reported longitude of vehicle. |
| `bearing` | `Float` | Reported heading of vehicle in degrees. |
| `secsSinceReport` | `Int` | Number of seconds old this observation was when it was retrieved (at `timestamp` of `RouteState`). |
| `numCars` | `Int` | Number of cars in this vehicle. |
| `stopIndex` | `Int` | The index of the current stop in the sequence (GTFS-realtime providers only) |
| `status` | `Int` | 0 if the vehicle is about to arrive at the stop referred to by `stopIndex`, 1 if the vehicle is stopped at this stop, 2 if the vehicle is in transit to this stop (GTFS-realtime providers only) |
| `tripId` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) |
| `tripID` | `String` | ID of the trip the vehicle reported it was running (GTFS-realtime providers only) |

## Sample Query

Once you run it, go to http://localhost:4000/graphql in your browser and run this query:

```
query {
state(agencyId: "muni", startTime: 1572105600, endTime: 1572112800, routes: ["14", "19", "49"]) {
state(agencyId: "trimet"
, startTime: 1642867201
, endTime: 1642867500
, routes: ["100"]) {
agencyId
startTime
routes {
routeId
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, can we either change agencyId and routeId to agencyID and routeID, or change vehicleID and tripID to vehicleId and tripId?

states {
timestamp
vehicles {
vid
lat
lon
heading
vehicleID
tripID
latitude
longitude
direction
bearing
time
secsSinceReport
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ services:
- "4000:4000"
volumes:
- ./src:/usr/src/app/src
environment:
TRYNAPI_S3_BUCKET: "opentransit-pdx"
19 changes: 13 additions & 6 deletions examples/marin-query.graphql
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
# replace "marin" with "muni" to use that as the agency
# replace "trimet" with your agency

query {
state(agencyId: "marin", startTime: 1572127400, endTime: 1572127900) {
state(agencyId: "trimet"
, startTime: 1642867201
, endTime: 1642867500
, routes: ["100"]) {
agencyId
startTime
routes {
routeId
states {
timestamp
vehicles {
vid
lat
lon
heading
vehicleID
tripID
latitude
longitude
direction
bearing
time
secsSinceReport
}
}
}
Expand Down
80 changes: 46 additions & 34 deletions src/helpers/s3Helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,31 @@ const s3 = new AWS.S3();
const s3Bucket = process.env.TRYNAPI_S3_BUCKET || "orion-vehicles";
console.log(`Reading state from s3://${s3Bucket}`);

function convertTZ(date, tzString) {
return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString}));
}

/*
* Gets bucket prefix at the minute-level
* Gets bucket prefix at the hour-level
* Note to Jesse - I changed the bucket structure
* when we switch to a new bucket (code for PDX owned)
* we could switch it back. For now, it's not great
* but I think it's not the source of the problem.
* see getVehiclePaths - I think the function is
* not as fast as it could be but it works
* @param agencyId - String
* @param currentTime - Number
* @return prefix - String
*/
function getBucketMinutePrefix(agencyId, currentTime) {
function getBucketHourPrefix(agencyId, currentTime) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous S3 path format with the minute was used because the S3 API allows searching for keys by prefix, and including the minute in the prefix of the path makes it possible to fetch data with a granularity of one minute. opentransit-metrics fetches data from opentransit-state-api in chunks that are not necessarily multiples of 1 hour.
If the minute was removed from the bucket prefix, it would require searching the S3 for keys with a granularity of 1 hour and then filtering within those keys to find keys with the requested minutes. I'm not sure that there is a drawback to keeping the minute in the prefix so it's probably easier that way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing the existing code in getVehiclePaths, I see that it makes separate s3.listObject requests to get the list of keys for every minute within the time range, which is a lot more API requests than necessary and might reduce performance. Now I think it probably would be faster to search for a keys with a granularity of 1 hour and then filter within those keys to find the requested minutes.

const currentDateTime = new Date(Number(currentTime * 1000));
const year = currentDateTime.getUTCFullYear();
const month = currentDateTime.getUTCMonth()+1;
const day = currentDateTime.getUTCDate();
const hour = currentDateTime.getUTCHours();
const minute = currentDateTime.getUTCMinutes();
return `${agencyId}/${year}/${month}/${day}/${hour}/${minute}/`;
const pacificDateTime = convertTZ(currentDateTime, 'America/Los_Angeles');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using America/Los_Angeles as the time zone could cause issues related to daylight savings since when switching from PDT to PST in the fall the there would be two hours with the same key prefix.

Using America/Los_Angeles would also be confusing when deploying opentransit-state-api for transit agencies that are not in Pacific time. It is also possible that a single instance of opentransit-state-api could support transit agencies in multiple time zones. It would probably be simpler to keep the S3 keys in UTC.

const year = pacificDateTime.getFullYear();
const month = String(pacificDateTime.getMonth()+1).padStart(2, '0');
const day = String(pacificDateTime.getUTCDate()).padStart(2, '0');
const hour = String(pacificDateTime.getUTCHours()).padStart(2, '0');
// console.log('looking at bucket year %i, month %i, day %i, hour %i', year, month, day, hour);
return `${agencyId}/${year}/${month}/${day}/${hour}/`;
}

function getS3Paths(prefix) {
Expand Down Expand Up @@ -49,11 +60,13 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) {
}
// Idea: there are 1440 minutes in a day, and the API return at most 1-2 days,
// so we can iterate every minute (as we have to get each file individually anyways)
let minutePrefixes = [];
let hourPrefixes = [];
for (let time = startEpoch; time < endEpoch; time += 60) {
minutePrefixes.push(getBucketMinutePrefix(agencyId, time));
hourPrefixes.push(getBucketHourPrefix(agencyId, time));
}
let files = _.flatten(await Promise.all(minutePrefixes.map(prefix => getS3Paths(prefix))));
let uniquehourPrefixes = [...new Set(hourPrefixes)];
// console.log(uniquehourPrefixes)
let files = _.flatten(await Promise.all(uniquehourPrefixes.map(prefix => getS3Paths(prefix))));

let timestampsMap = {};
let res = [];
Expand All @@ -65,13 +78,27 @@ async function getVehiclePaths(agencyId, startEpoch, endEpoch) {
res.push(key);
}
});
// console.log(res)
return res;
}


// unzip the gzip data
function decompressData(data) {
return new Promise((resolve, _) => {
return zlib.unzip(data, (_, decoded) => resolve(JSON.parse(decoded.toString())));
return new Promise((resolve, reject) => {
return zlib.unzip(data, (err, decoded) => {
if (err) {
reject(err);
} else {
var parsedData;
try {
parsedData = JSON.parse(decoded.toString());
} catch (e) {
reject(e);
}
resolve(parsedData);
}
});
});
}

Expand All @@ -88,36 +115,21 @@ async function getVehicles(agencyId, startEpoch, endEpoch) {
Key: key,
}, (err, data) => {
if (err) {
reject(err);
reject(err);
} else {
const timestamp = getTimestamp(key);
decompressData(data.Body)
.then(decodedData =>
resolve(insertTimestamp(timestamp, decodedData)));
resolve(decompressData(data.Body));
}
});
}).catch((err) => {
return Promise.reject(`Error loading s3://${s3Bucket}/${key}: ${err}`);
});
})));
}

function getTimestamp(key) {
const keyParts = key.split('-');
return Math.floor(Number(keyParts[keyParts.length - 1].split('.json')[0])/1000);
}

/*
* The API defines timestamp (epoch time in seconds) as a field for each vehicle,
* which was also a column in Cassandra.
* Since the timestamp is in the key in S3, that field does not exist,
* thus we have to add it in the S3Helper to maintain compatibility
*/
function insertTimestamp(timestamp, vehicles) {
return vehicles.map(vehicle => {
return {
...vehicle,
timestamp: timestamp,
};
});
const raw_timestamp = Number(keyParts[keyParts.length - 1].split('.json')[0])
return raw_timestamp;
}

module.exports = {
Expand Down
121 changes: 68 additions & 53 deletions src/resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,73 +18,83 @@ const resolvers = {

let { startTime, endTime } = params;

const vehicles = await s3Helper.getVehicles(agencyId, startTime, endTime);
console.log(agencyId, routes)

const resultSets = await s3Helper.getVehicles(agencyId, startTime, endTime);


const vehiclesByTripByTime = {};
const vehiclesByRouteByTime = {};
const UniqueVehicleTimeKeys = {};

// group the vehicles by route, and then by time
// below are fields in the vehicle api response
// 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID',
// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay',
// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID',
// 'delay', 'lastStopSeq', 'vehicleID', 'time'

vehicles.forEach(vehicle => {
const routeId = vehicle.rid;
const vtime = vehicle.timestamp;


if (!vehiclesByRouteByTime[routeId]) {
vehiclesByRouteByTime[routeId] = {};
}
if (!vehiclesByRouteByTime[routeId][vtime]) {
vehiclesByRouteByTime[routeId][vtime] = [];
}
resultSets.forEach(results => {

const queryTime = results.resultSet.queryTime;

if (!(results.resultSet['vehicle']===undefined)) {

results.resultSet.vehicle.forEach(vehicle => {

// console.log(vehicle)
const routeId = vehicle.routeNumber;
const vtime = Math.floor(Number(queryTime)/1000);
const vehicleID = vehicle.vehicleID;
const tempVehicleTime = vehicleID+'_'+vtime;
const vehicleTIme = Math.floor(Number(vehicle.time)/1000);

const secsSinceReport = (vtime-vehicleTIme);

vehicle.time = vehicleTIme;

vehicle.secsSinceReport = secsSinceReport;

// check for multiple vehicles with the same tripId, assume to be multiple-car trains if close to each other
const tripId = vehicle.tripId;
if (tripId) {
if (!vehiclesByTripByTime[tripId]) {
vehiclesByTripByTime[tripId] = {};
}
const prevVehicle = vehiclesByTripByTime[tripId][vtime];
if (!prevVehicle) {
vehiclesByTripByTime[tripId][vtime] = vehicle;
} else if (Math.abs(prevVehicle.lat - vehicle.lat) < 0.001 && Math.abs(prevVehicle.lon - vehicle.lon) < 0.001) {
// 0.001 degrees latitude = 111m, 0.001 degrees longitude typically between between ~50m and 111m
prevVehicle.numCars = (prevVehicle.numCars || 1) + 1;
if (prevVehicle.vid > vehicle.vid) {
prevVehicle.vid = vehicle.vid;
if (!vehiclesByRouteByTime[routeId]) {
vehiclesByRouteByTime[routeId] = {};
}
if (!vehiclesByRouteByTime[routeId][vtime]) {
vehiclesByRouteByTime[routeId][vtime] = [];
}
return;
}
}

vehiclesByRouteByTime[routeId][vtime].push(vehicle);
});
if (!UniqueVehicleTimeKeys[tempVehicleTime]) {
vehiclesByRouteByTime[tempVehicleTime] = [];
vehiclesByRouteByTime[routeId][vtime].push(vehicle);
}

});



// remove duplicate Muni Metro vehicles
if (agencyId === 'muni') {
const affectedRouteIDs = ['KT', 'L', 'M', 'N', 'J'];
affectedRouteIDs.forEach(routeID => {
if (debug) {
console.log(routeID);
}
if (vehiclesByRouteByTime[routeID]) {
vehiclesByRouteByTime[routeID] = removeMuniMetroDuplicates(
vehiclesByRouteByTime[routeID],
);
}
});
}




}

});

// get all the routes
const routeIDs = routes ?
_.intersection(routes, Object.keys(vehiclesByRouteByTime)) :
Object.keys(vehiclesByRouteByTime);


return {
agencyId,
routeIDs,
startTime,
endTime,
vehiclesByRouteByTime
};

},
},

Expand All @@ -110,18 +120,23 @@ const resolvers = {
}
},

// list all available fields
// 'serviceDate', 'latitude', 'nextStopSeq', 'type', 'blockID',
// 'signMessageLong', 'lastLocID', 'nextLocID', 'locationInScheduleDay',
// 'longitude', 'direction', 'routeNumber', 'bearing', 'garage', 'tripID',
// 'delay', 'lastStopSeq', 'vehicleID', 'time'

VehicleState: {
vid: vehicle => vehicle.vid,
did: vehicle => vehicle.did,
lat: vehicle => vehicle.lat,
lon: vehicle => vehicle.lon,
heading: vehicle => vehicle.heading,
vehicleID: vehicle => vehicle.vehicleID,
direction: vehicle => vehicle.direction,
latitude: vehicle => vehicle.latitude,
longitude: vehicle => vehicle.longitude,
bearing: vehicle => vehicle.bearing,
tripID: vehicle => vehicle.tripID,
nextStopSeq: vehicle => vehicle.nextStopSeq,
lastStopSeq: vehicle => vehicle.lastStopSeq,
time: vehicle => vehicle.time,
secsSinceReport: vehicle => vehicle.secsSinceReport,
numCars: vehicle => vehicle.numCars,
tripId: vehicle => vehicle.tripId,
stopId: vehicle => vehicle.stopId,
stopIndex: vehicle => vehicle.stopIndex,
status: vehicle => vehicle.status,
}
};

Expand Down
Loading