From 8da5b691f5b23263bd9edebe2902e98fe252a17a Mon Sep 17 00:00:00 2001 From: Aaron Claydon Date: Sat, 9 Nov 2024 20:58:45 +0000 Subject: [PATCH] GTFS RT Service Alerts #122 --- data/datasources/se-trafiklab.yaml | 90 ++++ .../charts/travigo-data-importer/values.yaml | 37 +- deploy/charts/travigo-stats/values.yaml | 8 +- pkg/api/routes/servicealerts.go | 32 +- pkg/dataimporter/formats/gtfs/realtime.go | 84 +++- pkg/realtime/vehicletracker/consumer.go | 430 +++--------------- .../vehicletracker/identifiers/gtfsrt.go | 71 +++ .../vehicletracker/realtimejourney.go | 360 +++++++++++++++ pkg/realtime/vehicletracker/servicealert.go | 62 +++ 9 files changed, 788 insertions(+), 386 deletions(-) create mode 100644 pkg/realtime/vehicletracker/realtimejourney.go create mode 100644 pkg/realtime/vehicletracker/servicealert.go diff --git a/data/datasources/se-trafiklab.yaml b/data/datasources/se-trafiklab.yaml index a8153ab2..43cd0643 100644 --- a/data/datasources/se-trafiklab.yaml +++ b/data/datasources/se-trafiklab.yaml @@ -105,3 +105,93 @@ datasets: realtimejourneys: true linkeddataset: se-trafiklab-gtfs-schedule importdestination: realtime-queue +- identifier: gtfs-realtime-sl-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/sl/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-ul-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/ul/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-otraf-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/otraf/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-klt-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/klt/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-skane-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/skane/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-dt-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/dt/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-varm-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/varm/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-xt-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/xt/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue +- identifier: gtfs-realtime-vastmanland-alerts + format: gtfs-realtime + source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/vastmanland/ServiceAlertsSweden.pb" + sourceauthentication: + query: + key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY + supportedobjects: + realtimejourneys: true + linkeddataset: se-trafiklab-gtfs-schedule + importdestination: realtime-queue diff --git a/deploy/charts/travigo-data-importer/values.yaml b/deploy/charts/travigo-data-importer/values.yaml index f978e675..5237e8ca 100644 --- a/deploy/charts/travigo-data-importer/values.yaml +++ b/deploy/charts/travigo-data-importer/values.yaml @@ -97,24 +97,43 @@ deployments: args: ["data-importer", "dataset", "--id", "fr-ilevia-lille-gtfs-realtime", "--repeat-every", "90s"] - name: de-gtfs-gtfs-realtime args: ["data-importer", "dataset", "--id", "de-gtfs-gtfs-realtime", "--repeat-every", "90s"] + # Sweden - name: se-trafiklab-gtfs-realtime-sl-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-sl-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-sl-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-ul-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-ul-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-ul-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-otraf-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-otraf-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-otraf-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-klt-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-klt-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-klt-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-skane-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-skane-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-skane-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-dt-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-dt-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-dt-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-varm-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-varm-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-varm-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-xt-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-xt-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-xt-trip", "--repeat-every", "300s"] - name: se-trafiklab-gtfs-realtime-vastm-trip - args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-vastmanland-trip", "--repeat-every", "180s"] + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-vastmanland-trip", "--repeat-every", "300s"] + - name: se-trafiklab-gtfs-realtime-sl-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-sl-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-ul-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-ul-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-otraf-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-otraf-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-klt-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-klt-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-skane-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-skane-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-dt-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-dt-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-varm-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-varm-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-xt-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-xt-alerts", "--repeat-every", "900s"] + - name: se-trafiklab-gtfs-realtime-vastm-alerts + args: ["data-importer", "dataset", "--id", "se-trafiklab-gtfs-realtime-vastmanland-alerts", "--repeat-every", "900s"] # - name: us-nyc-subway-realtime-numbers # args: ["data-importer", "dataset", "--id", "us-nyc-subway-relatime-1-2-3-4-5-6-7", "--repeat-every", "60s"] diff --git a/deploy/charts/travigo-stats/values.yaml b/deploy/charts/travigo-stats/values.yaml index 8ef02e79..3c0ffe08 100644 --- a/deploy/charts/travigo-stats/values.yaml +++ b/deploy/charts/travigo-stats/values.yaml @@ -40,11 +40,11 @@ image: cronjobs: - name: basic - schedule: "0 8 * * *" - args: ["stats", "calculate", "--object", "services,operators,stops,servicealerts"] - - name: realtimejourneys + schedule: "0 */4 * * *" + args: ["stats", "calculate", "--object", "services,operators,stops"] + - name: dynamic schedule: "*/20 * * * *" - args: ["stats", "calculate", "--object", "realtimejourneys"] + args: ["stats", "calculate", "--object", "realtimejourneys,servicealerts"] imagePullSecrets: [] nameOverride: "" diff --git a/pkg/api/routes/servicealerts.go b/pkg/api/routes/servicealerts.go index aa5b4bea..51347f58 100644 --- a/pkg/api/routes/servicealerts.go +++ b/pkg/api/routes/servicealerts.go @@ -1,6 +1,8 @@ package routes import ( + "crypto/sha256" + "fmt" "strings" "github.com/gofiber/fiber/v2" @@ -14,6 +16,28 @@ func ServiceAlertRouter(router fiber.Router) { router.Get("/stop/:identifier", getStopServiceAlerts) } +func filterIdenticalServiceAlerts(serviceAlerts []*ctdf.ServiceAlert) []*ctdf.ServiceAlert { + var serviceAlertsFiltered []*ctdf.ServiceAlert + uniqueMap := make(map[string]bool) + + for _, serviceAlert := range serviceAlerts { + hash := sha256.New() + + hash.Write([]byte(serviceAlert.AlertType)) + hash.Write([]byte(serviceAlert.Title)) + hash.Write([]byte(serviceAlert.Text)) + + key := fmt.Sprintf("%x", hash.Sum(nil)) + + if !uniqueMap[key] { + uniqueMap[key] = true + serviceAlertsFiltered = append(serviceAlertsFiltered, serviceAlert) + } + } + + return serviceAlertsFiltered +} + func getMatchingIdentifierServiceAlerts(c *fiber.Ctx) error { identifier := c.Params("identifier") @@ -22,13 +46,15 @@ func getMatchingIdentifierServiceAlerts(c *fiber.Ctx) error { MatchingIdentifiers: strings.Split(identifier, ","), }) + serviceAlertsFiltered := filterIdenticalServiceAlerts(serviceAlerts) + if err != nil { c.SendStatus(404) return c.JSON(fiber.Map{ "error": err.Error(), }) } else { - return c.JSON(serviceAlerts) + return c.JSON(serviceAlertsFiltered) } } @@ -68,12 +94,14 @@ func getStopServiceAlerts(c *fiber.Ctx) error { MatchingIdentifiers: matchingIdentifiers, }) + serviceAlertsFiltered := filterIdenticalServiceAlerts(serviceAlerts) + if err != nil { c.SendStatus(404) return c.JSON(fiber.Map{ "error": err.Error(), }) } else { - return c.JSON(serviceAlerts) + return c.JSON(serviceAlertsFiltered) } } diff --git a/pkg/dataimporter/formats/gtfs/realtime.go b/pkg/dataimporter/formats/gtfs/realtime.go index 0aeaf8e3..0e135b84 100644 --- a/pkg/dataimporter/formats/gtfs/realtime.go +++ b/pkg/dataimporter/formats/gtfs/realtime.go @@ -14,7 +14,6 @@ import ( "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" redisstore "github.com/eko/gocache/store/redis/v4" - "github.com/kr/pretty" "github.com/rs/zerolog/log" "github.com/travigo/travigo/pkg/ctdf" "github.com/travigo/travigo/pkg/database" @@ -65,6 +64,7 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource) withTripID := 0 withLocation := 0 withTripUpdate := 0 + serviceAlertCount := 0 for _, entity := range feed.Entity { vehiclePosition := entity.GetVehicle() @@ -93,15 +93,80 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource) tripID := trip.GetTripId() - // TODO gtfs-rt alerts if entity.Alert != nil { - pretty.Println(entity.GetAlert()) - collection := database.GetCollection("datadump") - collection.InsertOne(context.Background(), bson.M{ - "type": "gtfsrt-alert", - "creationdatetime": time.Now(), - "document": entity.GetAlert(), - }) + var alertType ctdf.ServiceAlertType + + switch entity.Alert.Effect { + case gtfs.Alert_NO_SERVICE.Enum(): + alertType = ctdf.ServiceAlertTypeServiceSuspended + case gtfs.Alert_REDUCED_SERVICE.Enum(): + alertType = ctdf.ServiceAlertTypeServicePartSuspended + case gtfs.Alert_SIGNIFICANT_DELAYS.Enum(): + alertType = ctdf.ServiceAlertTypeSevereDelays + case gtfs.Alert_DETOUR.Enum(): + alertType = ctdf.ServiceAlertTypeWarning // TODO new type? + case gtfs.Alert_ADDITIONAL_SERVICE.Enum(): + alertType = ctdf.ServiceAlertTypeInformation + case gtfs.Alert_MODIFIED_SERVICE.Enum(): + alertType = ctdf.ServiceAlertTypeWarning + case gtfs.Alert_OTHER_EFFECT.Enum(): + alertType = ctdf.ServiceAlertTypeInformation + case gtfs.Alert_UNKNOWN_EFFECT.Enum(): + alertType = ctdf.ServiceAlertTypeInformation + case gtfs.Alert_STOP_MOVED.Enum(): + alertType = ctdf.ServiceAlertTypeWarning + case gtfs.Alert_NO_EFFECT.Enum(): + alertType = ctdf.ServiceAlertTypeInformation + case gtfs.Alert_ACCESSIBILITY_ISSUE.Enum(): + alertType = ctdf.ServiceAlertTypeInformation + default: + alertType = ctdf.ServiceAlertTypeInformation + } + + // Create one per active period & informed entity + for _, informedEntity := range entity.Alert.InformedEntity { + for _, activePeriod := range entity.Alert.ActivePeriod { + validFromTimestamp := activePeriod.GetStart() + validToTimestamp := activePeriod.GetEnd() + + validFrom := time.Unix(int64(validFromTimestamp), 0) + validTo := time.Unix(int64(validToTimestamp), 0) + + tripID := informedEntity.GetTrip().GetTripId() + routeID := informedEntity.GetRouteId() + stopID := informedEntity.GetStopId() + agencyID := informedEntity.GetAgencyId() + + updateEvent := vehicletracker.VehicleUpdateEvent{ + MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert, + LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validFromTimestamp, validToTimestamp), + IdentifyingInformation: map[string]string{ + "TripID": tripID, + "RouteID": routeID, + "StopID": stopID, + "AgencyID": agencyID, + "LinkedDataset": dataset.LinkedDataset, + }, + + ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{ + Type: alertType, + Title: *entity.Alert.HeaderText.GetTranslation()[0].Text, // TODO assume we only use the 1 translation + Description: *entity.Alert.DescriptionText.GetTranslation()[0].Text, + ValidFrom: validFrom, + ValidUntil: validTo, + }, + + SourceType: "GTFS-RT", + DataSource: datasource, + RecordedAt: recordedAtTime, + } + + updateEventJson, _ := json.Marshal(updateEvent) + r.queue.PublishBytes(updateEventJson) + + serviceAlertCount += 1 + } + } } if tripID != "" { @@ -240,6 +305,7 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource) Int("withtrip", withTripID). Int("withlocation", withLocation). Int("withtripupdate", withTripUpdate). + Int("servicealert", serviceAlertCount). Int("total", len(feed.Entity)). Msg("Submitted vehicle updates") diff --git a/pkg/realtime/vehicletracker/consumer.go b/pkg/realtime/vehicletracker/consumer.go index 964373a1..897ec63a 100644 --- a/pkg/realtime/vehicletracker/consumer.go +++ b/pkg/realtime/vehicletracker/consumer.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "time" @@ -13,12 +12,10 @@ import ( "github.com/eko/gocache/lib/v4/store" redisstore "github.com/eko/gocache/store/redis/v4" "github.com/rs/zerolog/log" - "github.com/travigo/travigo/pkg/ctdf" "github.com/travigo/travigo/pkg/database" "github.com/travigo/travigo/pkg/elastic_client" "github.com/travigo/travigo/pkg/realtime/vehicletracker/identifiers" "github.com/travigo/travigo/pkg/redis_client" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -86,7 +83,8 @@ func NewBatchConsumer(id int) *BatchConsumer { func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { payloads := batch.Payloads() - var locationEventOperations []mongo.WriteModel + var realtimeJourneyOperations []mongo.WriteModel + var serviceAlertOperations []mongo.WriteModel for _, payload := range payloads { var vehicleUpdateEvent *VehicleUpdateEvent @@ -98,35 +96,54 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { } } - identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent) + if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeTrip { + identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent) - if identifiedJourneyID != "" { - var writeModel mongo.WriteModel + if identifiedJourneyID != "" { + writeModel, _ := consumer.updateRealtimeJourney(identifiedJourneyID, vehicleUpdateEvent) - if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeTrip { - writeModel, _ = consumer.updateRealtimeJourney(identifiedJourneyID, vehicleUpdateEvent) + if writeModel != nil { + realtimeJourneyOperations = append(realtimeJourneyOperations, writeModel) + } + } else { + log.Debug().Interface("event", vehicleUpdateEvent.IdentifyingInformation).Msg("Couldnt identify journey") } + } else if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeServiceAlert { + identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent) + identifiedStopID := consumer.identifyStop(vehicleUpdateEvent) + identifiedServiceID := consumer.identifyService(vehicleUpdateEvent) + writeModel, _ := consumer.updateServiceAlert(identifiedJourneyID, identifiedStopID, identifiedServiceID, vehicleUpdateEvent) if writeModel != nil { - locationEventOperations = append(locationEventOperations, writeModel) + serviceAlertOperations = append(serviceAlertOperations, writeModel) } - } else { - log.Debug().Interface("event", vehicleUpdateEvent.IdentifyingInformation).Msg("Couldnt identify journey") } } - if len(locationEventOperations) > 0 { + if len(realtimeJourneyOperations) > 0 { realtimeJourneysCollection := database.GetCollection("realtime_journeys") startTime := time.Now() - _, err := realtimeJourneysCollection.BulkWrite(context.Background(), locationEventOperations, &options.BulkWriteOptions{}) - log.Info().Int("Length", len(locationEventOperations)).Str("Time", time.Now().Sub(startTime).String()).Msg("Bulk write") + _, err := realtimeJourneysCollection.BulkWrite(context.Background(), realtimeJourneyOperations, &options.BulkWriteOptions{}) + log.Info().Int("Length", len(realtimeJourneyOperations)).Str("Time", time.Now().Sub(startTime).String()).Msg("Bulk write realtime_journeys") if err != nil { log.Fatal().Err(err).Msg("Failed to bulk write Realtime Journeys") } } + if len(serviceAlertOperations) > 0 { + serviceAlertsCollection := database.GetCollection("service_alerts") + + startTime := time.Now() + _, err := serviceAlertsCollection.BulkWrite(context.Background(), serviceAlertOperations, &options.BulkWriteOptions{}) + log.Info().Int("Length", len(serviceAlertOperations)).Str("Time", time.Now().Sub(startTime).String()).Msg("Bulk write service_alerts") + + if err != nil { + log.Fatal().Err(err).Msg("Failed to bulk write Service Alerts") + } + } + if ackErrors := batch.Ack(); len(ackErrors) > 0 { for _, err := range ackErrors { log.Fatal().Err(err).Msg("Failed to consume realtime event") @@ -134,6 +151,40 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { } } +func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEvent) string { + if vehicleUpdateEvent.SourceType == "GTFS-RT" { + stopIdentifier := identifiers.GTFSRT{ + IdentifyingInformation: vehicleUpdateEvent.IdentifyingInformation, + } + stop, err := stopIdentifier.IdentifyStop() + + if err != nil { + return "" + } + + return stop + } + + return "" +} + +func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdateEvent) string { + if vehicleUpdateEvent.SourceType == "GTFS-RT" { + serviceIdentifier := identifiers.GTFSRT{ + IdentifyingInformation: vehicleUpdateEvent.IdentifyingInformation, + } + service, err := serviceIdentifier.IdentifyService() + + if err != nil { + return "" + } + + return service + } + + return "" +} + func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpdateEvent) string { currentTime := time.Now() yearNumber, weekNumber := currentTime.ISOWeek() @@ -197,7 +248,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, "N/A") // Set cross dataset ID - if vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" { + if vehicleLocationEvent.VehicleLocationUpdate != nil && vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" { identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleLocationEvent.SourceType) } @@ -247,7 +298,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, string(journeyMapJson)) // Set cross dataset ID - if vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" { + if vehicleLocationEvent.VehicleLocationUpdate != nil && vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" { identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleLocationEvent.SourceType) } @@ -288,348 +339,3 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda return journeyID } - -func (consumer *BatchConsumer) updateRealtimeJourney(journeyID string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { - currentTime := vehicleUpdateEvent.RecordedAt - - realtimeJourneyIdentifier := fmt.Sprintf(ctdf.RealtimeJourneyIDFormat, vehicleUpdateEvent.VehicleLocationUpdate.Timeframe, journeyID) - searchQuery := bson.M{"primaryidentifier": realtimeJourneyIdentifier} - - var realtimeJourney *ctdf.RealtimeJourney - var realtimeJourneyReliability ctdf.RealtimeJourneyReliabilityType - - opts := options.FindOne().SetProjection(bson.D{ - {Key: "journey.path", Value: 1}, - {Key: "journey.departuretimezone", Value: 1}, - {Key: "nextstopref", Value: 1}, - {Key: "offset", Value: 1}, - }) - - realtimeJourneysCollection := database.GetCollection("realtime_journeys") - realtimeJourneysCollection.FindOne(context.Background(), searchQuery, opts).Decode(&realtimeJourney) - - newRealtimeJourney := false - if realtimeJourney == nil { - var journey *ctdf.Journey - journeysCollection := database.GetCollection("journeys") - err := journeysCollection.FindOne(context.Background(), bson.M{"primaryidentifier": journeyID}).Decode(&journey) - - if err != nil { - return nil, err - } - - for _, pathItem := range journey.Path { - pathItem.GetDestinationStop() - } - - journey.GetService() - - journeyDate, _ := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) - - realtimeJourney = &ctdf.RealtimeJourney{ - PrimaryIdentifier: realtimeJourneyIdentifier, - ActivelyTracked: true, - TimeoutDurationMinutes: 10, - Journey: journey, - JourneyRunDate: journeyDate, - Service: journey.Service, - - CreationDateTime: currentTime, - - VehicleRef: vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier, - Stops: map[string]*ctdf.RealtimeJourneyStops{}, - } - newRealtimeJourney = true - } - - if realtimeJourney.Journey == nil { - log.Error().Msg("RealtimeJourney without a Journey found, deleting") - realtimeJourneysCollection.DeleteOne(context.Background(), searchQuery) - return nil, errors.New("RealtimeJourney without a Journey found, deleting") - } - - var offset time.Duration - journeyStopUpdates := map[string]*ctdf.RealtimeJourneyStops{} - var closestDistanceJourneyPath *ctdf.JourneyPathItem // TODO maybe not here? - - // Calculate everything based on location if we aren't provided with updates - if len(vehicleUpdateEvent.VehicleLocationUpdate.StopUpdates) == 0 && vehicleUpdateEvent.VehicleLocationUpdate.Location.Type == "Point" { - closestDistance := 999999999999.0 - var closestDistanceJourneyPathIndex int - var closestDistanceJourneyPathPercentComplete float64 // TODO: this is a hack, replace with actual distance - - // Attempt to calculate using closest journey track - for i, journeyPathItem := range realtimeJourney.Journey.Path { - journeyPathClosestDistance := 99999999999999.0 // TODO do this better - - for i := 0; i < len(journeyPathItem.Track)-1; i++ { - a := journeyPathItem.Track[i] - b := journeyPathItem.Track[i+1] - - distance := vehicleUpdateEvent.VehicleLocationUpdate.Location.DistanceFromLine(a, b) - - if distance < journeyPathClosestDistance { - journeyPathClosestDistance = distance - } - } - - if journeyPathClosestDistance < closestDistance { - closestDistance = journeyPathClosestDistance - closestDistanceJourneyPath = journeyPathItem - closestDistanceJourneyPathIndex = i - - // TODO: this is a hack, replace with actual distance - // this is a rough estimation based on what part of path item track we are on - closestDistanceJourneyPathPercentComplete = float64(i) / float64(len(journeyPathItem.Track)) - } - } - - // If we fail to identify closest journey path item using track use fallback stop location method - if closestDistanceJourneyPath == nil { - closestDistance = 999999999999.0 - for i, journeyPathItem := range realtimeJourney.Journey.Path { - if journeyPathItem.DestinationStop == nil { - return nil, errors.New(fmt.Sprintf("Cannot get stop %s", journeyPathItem.DestinationStopRef)) - } - - distance := journeyPathItem.DestinationStop.Location.Distance(&vehicleUpdateEvent.VehicleLocationUpdate.Location) - - if distance < closestDistance { - closestDistance = distance - closestDistanceJourneyPath = journeyPathItem - closestDistanceJourneyPathIndex = i - } - } - - if closestDistanceJourneyPathIndex == 0 { - // TODO this seems a bit hacky but I dont think we care much if we're on the first item - closestDistanceJourneyPathPercentComplete = 0.5 - } else { - previousJourneyPath := realtimeJourney.Journey.Path[len(realtimeJourney.Journey.Path)-1] - - if previousJourneyPath.DestinationStop == nil { - return nil, errors.New(fmt.Sprintf("Cannot get stop %s", previousJourneyPath.DestinationStopRef)) - } - - previousJourneyPathDistance := previousJourneyPath.DestinationStop.Location.Distance(&vehicleUpdateEvent.VehicleLocationUpdate.Location) - - closestDistanceJourneyPathPercentComplete = (1 + ((previousJourneyPathDistance - closestDistance) / (previousJourneyPathDistance + closestDistance))) / 2 - } - - realtimeJourneyReliability = ctdf.RealtimeJourneyReliabilityLocationWithoutTrack - } else { - realtimeJourneyReliability = ctdf.RealtimeJourneyReliabilityLocationWithTrack - } - - // Calculate new stop arrival times - realtimeTimeframe, err := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) - if err != nil { - log.Error().Err(err).Msg("Failed to parse realtime time frame") - } - - if closestDistanceJourneyPath == nil { - return nil, errors.New("nil closestdistancejourneypath") - } - - journeyTimezone, _ := time.LoadLocation(realtimeJourney.Journey.DepartureTimezone) - - // Get the arrival & departure times with date of the journey - destinationArrivalTimeWithDate := time.Date( - realtimeTimeframe.Year(), - realtimeTimeframe.Month(), - realtimeTimeframe.Day(), - closestDistanceJourneyPath.DestinationArrivalTime.Hour(), - closestDistanceJourneyPath.DestinationArrivalTime.Minute(), - closestDistanceJourneyPath.DestinationArrivalTime.Second(), - closestDistanceJourneyPath.DestinationArrivalTime.Nanosecond(), - journeyTimezone, - ) - originDepartureTimeWithDate := time.Date( - realtimeTimeframe.Year(), - realtimeTimeframe.Month(), - realtimeTimeframe.Day(), - closestDistanceJourneyPath.OriginDepartureTime.Hour(), - closestDistanceJourneyPath.OriginDepartureTime.Minute(), - closestDistanceJourneyPath.OriginDepartureTime.Second(), - closestDistanceJourneyPath.OriginDepartureTime.Nanosecond(), - journeyTimezone, - ) - - // How long it take to travel between origin & destination - currentPathTraversalTime := destinationArrivalTimeWithDate.Sub(originDepartureTimeWithDate) - - // How far we are between origin & departure (% of journey path, NOT time or metres) - // TODO: this is a hack, replace with actual distance - currentPathPercentageComplete := closestDistanceJourneyPathPercentComplete - - // Calculate what the expected time of the current position of the vehicle should be - currentPathPositionExpectedTime := originDepartureTimeWithDate.Add( - time.Duration(int(currentPathPercentageComplete * float64(currentPathTraversalTime.Nanoseconds())))) - - // Offset is how far behind or ahead the vehicle is from its positions expected time - offset = currentTime.Sub(currentPathPositionExpectedTime).Round(10 * time.Second) - - // If the offset is too small then just turn it to zero so we can mark buses as on time - if offset.Seconds() <= 45 { - offset = time.Duration(0) - } - - // Calculate all the estimated stop arrival & departure times - for i := closestDistanceJourneyPathIndex; i < len(realtimeJourney.Journey.Path); i++ { - // Don't update the database if theres no actual change - if (offset.Seconds() == realtimeJourney.Offset.Seconds()) && !newRealtimeJourney { - break - } - - path := realtimeJourney.Journey.Path[i] - - arrivalTime := path.DestinationArrivalTime.Add(offset).Round(time.Minute) - var departureTime time.Time - - if i < len(realtimeJourney.Journey.Path)-1 { - nextPath := realtimeJourney.Journey.Path[i+1] - - if arrivalTime.Before(nextPath.OriginDepartureTime) { - departureTime = nextPath.OriginDepartureTime - } else { - departureTime = arrivalTime - } - } - - journeyStopUpdates[path.DestinationStopRef] = &ctdf.RealtimeJourneyStops{ - StopRef: path.DestinationStopRef, - TimeType: ctdf.RealtimeJourneyStopTimeEstimatedFuture, - - ArrivalTime: arrivalTime, - DepartureTime: departureTime, - } - } - } else { - for _, stopUpdate := range vehicleUpdateEvent.VehicleLocationUpdate.StopUpdates { - arrivalTime := stopUpdate.ArrivalTime - departureTime := stopUpdate.DepartureTime - - if arrivalTime.Year() == 1970 { - for _, path := range realtimeJourney.Journey.Path { - if path.OriginStopRef == stopUpdate.StopID { - arrivalTime = path.OriginArrivalTime.Add(time.Duration(stopUpdate.ArrivalOffset) * time.Second) - break - } - } - } - if departureTime.Year() == 1970 { - for _, path := range realtimeJourney.Journey.Path { - if path.OriginStopRef == stopUpdate.StopID { - departureTime = path.OriginDepartureTime.Add(time.Duration(stopUpdate.DepartureOffset) * time.Second) - break - } - } - } - - journeyStopUpdates[stopUpdate.StopID] = &ctdf.RealtimeJourneyStops{ - StopRef: stopUpdate.StopID, - TimeType: ctdf.RealtimeJourneyStopTimeEstimatedFuture, - - ArrivalTime: arrivalTime, - DepartureTime: departureTime, - } - } - - closestPathTime := 9999999 * time.Minute - now := time.Now() - realtimeTimeframe, err := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) - - journeyTimezone, _ := time.LoadLocation(realtimeJourney.Journey.DepartureTimezone) - - if err != nil { - log.Error().Err(err).Msg("Failed to parse realtime time frame") - } - for _, path := range realtimeJourney.Journey.Path { - refTime := time.Date( - realtimeTimeframe.Year(), - realtimeTimeframe.Month(), - realtimeTimeframe.Day(), - path.OriginArrivalTime.Hour(), - path.OriginArrivalTime.Minute(), - path.OriginArrivalTime.Second(), - path.OriginArrivalTime.Nanosecond(), - journeyTimezone, - ) - - if journeyStopUpdates[path.OriginStopRef] != nil { - refTime = journeyStopUpdates[path.OriginStopRef].ArrivalTime - } - - if refTime.Before(now) && now.Sub(refTime) < closestPathTime { - closestDistanceJourneyPath = path - - closestPathTime = now.Sub(refTime) - } - } - } - - if closestDistanceJourneyPath == nil { - return nil, errors.New("unable to find next journeypath") - } - - // Update database - updateMap := bson.M{ - "modificationdatetime": currentTime, - "vehiclebearing": vehicleUpdateEvent.VehicleLocationUpdate.Bearing, - "departedstopref": closestDistanceJourneyPath.OriginStopRef, - "nextstopref": closestDistanceJourneyPath.DestinationStopRef, - "occupancy": vehicleUpdateEvent.VehicleLocationUpdate.Occupancy, - // "vehiclelocationdescription": fmt.Sprintf("Passed %s", closestDistanceJourneyPath.OriginStop.PrimaryName), - } - if vehicleUpdateEvent.VehicleLocationUpdate.Location.Type != "" { - updateMap["vehiclelocation"] = vehicleUpdateEvent.VehicleLocationUpdate.Location - } - if newRealtimeJourney { - updateMap["primaryidentifier"] = realtimeJourney.PrimaryIdentifier - updateMap["activelytracked"] = realtimeJourney.ActivelyTracked - updateMap["timeoutdurationminutes"] = realtimeJourney.TimeoutDurationMinutes - - updateMap["journey"] = realtimeJourney.Journey - updateMap["journeyrundate"] = realtimeJourney.JourneyRunDate - - updateMap["service"] = realtimeJourney.Service - - updateMap["creationdatetime"] = realtimeJourney.CreationDateTime - - updateMap["vehicleref"] = vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier - updateMap["datasource"] = vehicleUpdateEvent.DataSource - - updateMap["reliability"] = realtimeJourneyReliability - } else { - updateMap["datasource.timestamp"] = vehicleUpdateEvent.DataSource.Timestamp - } - - if (offset.Seconds() != realtimeJourney.Offset.Seconds()) || newRealtimeJourney { - updateMap["offset"] = offset - } - - if realtimeJourney.NextStopRef != closestDistanceJourneyPath.DestinationStopRef { - journeyStopUpdates[realtimeJourney.NextStopRef] = &ctdf.RealtimeJourneyStops{ - StopRef: realtimeJourney.NextStopRef, - TimeType: ctdf.RealtimeJourneyStopTimeHistorical, - - // TODO this should obviously be a different time - ArrivalTime: currentTime, - DepartureTime: currentTime, - } - } - - for key, stopUpdate := range journeyStopUpdates { - if key != "" { - updateMap[fmt.Sprintf("stops.%s", key)] = stopUpdate - } - } - - bsonRep, _ := bson.Marshal(bson.M{"$set": updateMap}) - updateModel := mongo.NewUpdateOneModel() - updateModel.SetFilter(searchQuery) - updateModel.SetUpdate(bsonRep) - updateModel.SetUpsert(true) - - return updateModel, nil -} diff --git a/pkg/realtime/vehicletracker/identifiers/gtfsrt.go b/pkg/realtime/vehicletracker/identifiers/gtfsrt.go index 59c1c5d1..e568af61 100644 --- a/pkg/realtime/vehicletracker/identifiers/gtfsrt.go +++ b/pkg/realtime/vehicletracker/identifiers/gtfsrt.go @@ -3,6 +3,7 @@ package identifiers import ( "context" "errors" + "fmt" "github.com/travigo/travigo/pkg/ctdf" "github.com/travigo/travigo/pkg/database" @@ -13,6 +14,76 @@ type GTFSRT struct { IdentifyingInformation map[string]string } +func (r *GTFSRT) IdentifyStop() (string, error) { + stopsCollection := database.GetCollection("stops") + + stopID := r.IdentifyingInformation["StopID"] + if stopID == "" { + return "", errors.New("Missing field StopID") + } + + linkedDataset := r.IdentifyingInformation["LinkedDataset"] + if linkedDataset == "" { + return "", errors.New("Missing field linkedDataset") + } + + var potentialStops []ctdf.Stop + + formatedStopID := fmt.Sprintf("%s-stop-%s", linkedDataset, stopID) + + cursor, _ := stopsCollection.Find(context.Background(), bson.M{ + "$or": bson.A{ + bson.M{"primaryidentifier": formatedStopID}, + bson.M{"otheridentifiers": formatedStopID}, + }, + "datasource.datasetid": linkedDataset, + }) + cursor.All(context.Background(), &potentialStops) + + if len(potentialStops) == 0 { + return "", errors.New("Could not find referenced stop") + } else if len(potentialStops) == 1 { + return potentialStops[0].PrimaryIdentifier, nil + } else { + return "", errors.New("Could not find referenced stop") + } +} + +func (r *GTFSRT) IdentifyService() (string, error) { + servicesCollection := database.GetCollection("services") + + routeID := r.IdentifyingInformation["RouteID"] + if routeID == "" { + return "", errors.New("Missing field RouteID") + } + + linkedDataset := r.IdentifyingInformation["LinkedDataset"] + if linkedDataset == "" { + return "", errors.New("Missing field linkedDataset") + } + + var potentialServices []ctdf.Stop + + formatedServiceID := fmt.Sprintf("%s-service-%s", linkedDataset, routeID) + + cursor, _ := servicesCollection.Find(context.Background(), bson.M{ + "$or": bson.A{ + bson.M{"primaryidentifier": formatedServiceID}, + bson.M{"otheridentifiers": formatedServiceID}, + }, + "datasource.datasetid": linkedDataset, + }) + cursor.All(context.Background(), &potentialServices) + + if len(potentialServices) == 0 { + return "", errors.New("Could not find referenced service") + } else if len(potentialServices) == 1 { + return potentialServices[0].PrimaryIdentifier, nil + } else { + return "", errors.New("Could not find referenced service") + } +} + func (r *GTFSRT) IdentifyJourney() (string, error) { journeysCollection := database.GetCollection("journeys") diff --git a/pkg/realtime/vehicletracker/realtimejourney.go b/pkg/realtime/vehicletracker/realtimejourney.go new file mode 100644 index 00000000..7f3aacef --- /dev/null +++ b/pkg/realtime/vehicletracker/realtimejourney.go @@ -0,0 +1,360 @@ +package vehicletracker + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/rs/zerolog/log" + "github.com/travigo/travigo/pkg/ctdf" + "github.com/travigo/travigo/pkg/database" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (consumer *BatchConsumer) updateRealtimeJourney(journeyID string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { + currentTime := vehicleUpdateEvent.RecordedAt + + realtimeJourneyIdentifier := fmt.Sprintf(ctdf.RealtimeJourneyIDFormat, vehicleUpdateEvent.VehicleLocationUpdate.Timeframe, journeyID) + searchQuery := bson.M{"primaryidentifier": realtimeJourneyIdentifier} + + var realtimeJourney *ctdf.RealtimeJourney + var realtimeJourneyReliability ctdf.RealtimeJourneyReliabilityType + + opts := options.FindOne().SetProjection(bson.D{ + {Key: "journey.path", Value: 1}, + {Key: "journey.departuretimezone", Value: 1}, + {Key: "nextstopref", Value: 1}, + {Key: "offset", Value: 1}, + }) + + realtimeJourneysCollection := database.GetCollection("realtime_journeys") + realtimeJourneysCollection.FindOne(context.Background(), searchQuery, opts).Decode(&realtimeJourney) + + newRealtimeJourney := false + if realtimeJourney == nil { + var journey *ctdf.Journey + journeysCollection := database.GetCollection("journeys") + err := journeysCollection.FindOne(context.Background(), bson.M{"primaryidentifier": journeyID}).Decode(&journey) + + if err != nil { + return nil, err + } + + for _, pathItem := range journey.Path { + pathItem.GetDestinationStop() + } + + journey.GetService() + + journeyDate, _ := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) + + realtimeJourney = &ctdf.RealtimeJourney{ + PrimaryIdentifier: realtimeJourneyIdentifier, + ActivelyTracked: true, + TimeoutDurationMinutes: 10, + Journey: journey, + JourneyRunDate: journeyDate, + Service: journey.Service, + + CreationDateTime: currentTime, + + VehicleRef: vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier, + Stops: map[string]*ctdf.RealtimeJourneyStops{}, + } + newRealtimeJourney = true + } + + if realtimeJourney.Journey == nil { + log.Error().Msg("RealtimeJourney without a Journey found, deleting") + realtimeJourneysCollection.DeleteOne(context.Background(), searchQuery) + return nil, errors.New("RealtimeJourney without a Journey found, deleting") + } + + var offset time.Duration + journeyStopUpdates := map[string]*ctdf.RealtimeJourneyStops{} + var closestDistanceJourneyPath *ctdf.JourneyPathItem // TODO maybe not here? + + // Calculate everything based on location if we aren't provided with updates + if len(vehicleUpdateEvent.VehicleLocationUpdate.StopUpdates) == 0 && vehicleUpdateEvent.VehicleLocationUpdate.Location.Type == "Point" { + closestDistance := 999999999999.0 + var closestDistanceJourneyPathIndex int + var closestDistanceJourneyPathPercentComplete float64 // TODO: this is a hack, replace with actual distance + + // Attempt to calculate using closest journey track + for i, journeyPathItem := range realtimeJourney.Journey.Path { + journeyPathClosestDistance := 99999999999999.0 // TODO do this better + + for i := 0; i < len(journeyPathItem.Track)-1; i++ { + a := journeyPathItem.Track[i] + b := journeyPathItem.Track[i+1] + + distance := vehicleUpdateEvent.VehicleLocationUpdate.Location.DistanceFromLine(a, b) + + if distance < journeyPathClosestDistance { + journeyPathClosestDistance = distance + } + } + + if journeyPathClosestDistance < closestDistance { + closestDistance = journeyPathClosestDistance + closestDistanceJourneyPath = journeyPathItem + closestDistanceJourneyPathIndex = i + + // TODO: this is a hack, replace with actual distance + // this is a rough estimation based on what part of path item track we are on + closestDistanceJourneyPathPercentComplete = float64(i) / float64(len(journeyPathItem.Track)) + } + } + + // If we fail to identify closest journey path item using track use fallback stop location method + if closestDistanceJourneyPath == nil { + closestDistance = 999999999999.0 + for i, journeyPathItem := range realtimeJourney.Journey.Path { + if journeyPathItem.DestinationStop == nil { + return nil, errors.New(fmt.Sprintf("Cannot get stop %s", journeyPathItem.DestinationStopRef)) + } + + distance := journeyPathItem.DestinationStop.Location.Distance(&vehicleUpdateEvent.VehicleLocationUpdate.Location) + + if distance < closestDistance { + closestDistance = distance + closestDistanceJourneyPath = journeyPathItem + closestDistanceJourneyPathIndex = i + } + } + + if closestDistanceJourneyPathIndex == 0 { + // TODO this seems a bit hacky but I dont think we care much if we're on the first item + closestDistanceJourneyPathPercentComplete = 0.5 + } else { + previousJourneyPath := realtimeJourney.Journey.Path[len(realtimeJourney.Journey.Path)-1] + + if previousJourneyPath.DestinationStop == nil { + return nil, errors.New(fmt.Sprintf("Cannot get stop %s", previousJourneyPath.DestinationStopRef)) + } + + previousJourneyPathDistance := previousJourneyPath.DestinationStop.Location.Distance(&vehicleUpdateEvent.VehicleLocationUpdate.Location) + + closestDistanceJourneyPathPercentComplete = (1 + ((previousJourneyPathDistance - closestDistance) / (previousJourneyPathDistance + closestDistance))) / 2 + } + + realtimeJourneyReliability = ctdf.RealtimeJourneyReliabilityLocationWithoutTrack + } else { + realtimeJourneyReliability = ctdf.RealtimeJourneyReliabilityLocationWithTrack + } + + // Calculate new stop arrival times + realtimeTimeframe, err := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) + if err != nil { + log.Error().Err(err).Msg("Failed to parse realtime time frame") + } + + if closestDistanceJourneyPath == nil { + return nil, errors.New("nil closestdistancejourneypath") + } + + journeyTimezone, _ := time.LoadLocation(realtimeJourney.Journey.DepartureTimezone) + + // Get the arrival & departure times with date of the journey + destinationArrivalTimeWithDate := time.Date( + realtimeTimeframe.Year(), + realtimeTimeframe.Month(), + realtimeTimeframe.Day(), + closestDistanceJourneyPath.DestinationArrivalTime.Hour(), + closestDistanceJourneyPath.DestinationArrivalTime.Minute(), + closestDistanceJourneyPath.DestinationArrivalTime.Second(), + closestDistanceJourneyPath.DestinationArrivalTime.Nanosecond(), + journeyTimezone, + ) + originDepartureTimeWithDate := time.Date( + realtimeTimeframe.Year(), + realtimeTimeframe.Month(), + realtimeTimeframe.Day(), + closestDistanceJourneyPath.OriginDepartureTime.Hour(), + closestDistanceJourneyPath.OriginDepartureTime.Minute(), + closestDistanceJourneyPath.OriginDepartureTime.Second(), + closestDistanceJourneyPath.OriginDepartureTime.Nanosecond(), + journeyTimezone, + ) + + // How long it take to travel between origin & destination + currentPathTraversalTime := destinationArrivalTimeWithDate.Sub(originDepartureTimeWithDate) + + // How far we are between origin & departure (% of journey path, NOT time or metres) + // TODO: this is a hack, replace with actual distance + currentPathPercentageComplete := closestDistanceJourneyPathPercentComplete + + // Calculate what the expected time of the current position of the vehicle should be + currentPathPositionExpectedTime := originDepartureTimeWithDate.Add( + time.Duration(int(currentPathPercentageComplete * float64(currentPathTraversalTime.Nanoseconds())))) + + // Offset is how far behind or ahead the vehicle is from its positions expected time + offset = currentTime.Sub(currentPathPositionExpectedTime).Round(10 * time.Second) + + // If the offset is too small then just turn it to zero so we can mark buses as on time + if offset.Seconds() <= 45 { + offset = time.Duration(0) + } + + // Calculate all the estimated stop arrival & departure times + for i := closestDistanceJourneyPathIndex; i < len(realtimeJourney.Journey.Path); i++ { + // Don't update the database if theres no actual change + if (offset.Seconds() == realtimeJourney.Offset.Seconds()) && !newRealtimeJourney { + break + } + + path := realtimeJourney.Journey.Path[i] + + arrivalTime := path.DestinationArrivalTime.Add(offset).Round(time.Minute) + var departureTime time.Time + + if i < len(realtimeJourney.Journey.Path)-1 { + nextPath := realtimeJourney.Journey.Path[i+1] + + if arrivalTime.Before(nextPath.OriginDepartureTime) { + departureTime = nextPath.OriginDepartureTime + } else { + departureTime = arrivalTime + } + } + + journeyStopUpdates[path.DestinationStopRef] = &ctdf.RealtimeJourneyStops{ + StopRef: path.DestinationStopRef, + TimeType: ctdf.RealtimeJourneyStopTimeEstimatedFuture, + + ArrivalTime: arrivalTime, + DepartureTime: departureTime, + } + } + } else { + for _, stopUpdate := range vehicleUpdateEvent.VehicleLocationUpdate.StopUpdates { + arrivalTime := stopUpdate.ArrivalTime + departureTime := stopUpdate.DepartureTime + + if arrivalTime.Year() == 1970 { + for _, path := range realtimeJourney.Journey.Path { + if path.OriginStopRef == stopUpdate.StopID { + arrivalTime = path.OriginArrivalTime.Add(time.Duration(stopUpdate.ArrivalOffset) * time.Second) + break + } + } + } + if departureTime.Year() == 1970 { + for _, path := range realtimeJourney.Journey.Path { + if path.OriginStopRef == stopUpdate.StopID { + departureTime = path.OriginDepartureTime.Add(time.Duration(stopUpdate.DepartureOffset) * time.Second) + break + } + } + } + + journeyStopUpdates[stopUpdate.StopID] = &ctdf.RealtimeJourneyStops{ + StopRef: stopUpdate.StopID, + TimeType: ctdf.RealtimeJourneyStopTimeEstimatedFuture, + + ArrivalTime: arrivalTime, + DepartureTime: departureTime, + } + } + + closestPathTime := 9999999 * time.Minute + now := time.Now() + realtimeTimeframe, err := time.Parse("2006-01-02", vehicleUpdateEvent.VehicleLocationUpdate.Timeframe) + + journeyTimezone, _ := time.LoadLocation(realtimeJourney.Journey.DepartureTimezone) + + if err != nil { + log.Error().Err(err).Msg("Failed to parse realtime time frame") + } + for _, path := range realtimeJourney.Journey.Path { + refTime := time.Date( + realtimeTimeframe.Year(), + realtimeTimeframe.Month(), + realtimeTimeframe.Day(), + path.OriginArrivalTime.Hour(), + path.OriginArrivalTime.Minute(), + path.OriginArrivalTime.Second(), + path.OriginArrivalTime.Nanosecond(), + journeyTimezone, + ) + + if journeyStopUpdates[path.OriginStopRef] != nil { + refTime = journeyStopUpdates[path.OriginStopRef].ArrivalTime + } + + if refTime.Before(now) && now.Sub(refTime) < closestPathTime { + closestDistanceJourneyPath = path + + closestPathTime = now.Sub(refTime) + } + } + } + + if closestDistanceJourneyPath == nil { + return nil, errors.New("unable to find next journeypath") + } + + // Update database + updateMap := bson.M{ + "modificationdatetime": currentTime, + "vehiclebearing": vehicleUpdateEvent.VehicleLocationUpdate.Bearing, + "departedstopref": closestDistanceJourneyPath.OriginStopRef, + "nextstopref": closestDistanceJourneyPath.DestinationStopRef, + "occupancy": vehicleUpdateEvent.VehicleLocationUpdate.Occupancy, + // "vehiclelocationdescription": fmt.Sprintf("Passed %s", closestDistanceJourneyPath.OriginStop.PrimaryName), + } + if vehicleUpdateEvent.VehicleLocationUpdate.Location.Type != "" { + updateMap["vehiclelocation"] = vehicleUpdateEvent.VehicleLocationUpdate.Location + } + if newRealtimeJourney { + updateMap["primaryidentifier"] = realtimeJourney.PrimaryIdentifier + updateMap["activelytracked"] = realtimeJourney.ActivelyTracked + updateMap["timeoutdurationminutes"] = realtimeJourney.TimeoutDurationMinutes + + updateMap["journey"] = realtimeJourney.Journey + updateMap["journeyrundate"] = realtimeJourney.JourneyRunDate + + updateMap["service"] = realtimeJourney.Service + + updateMap["creationdatetime"] = realtimeJourney.CreationDateTime + + updateMap["vehicleref"] = vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier + updateMap["datasource"] = vehicleUpdateEvent.DataSource + + updateMap["reliability"] = realtimeJourneyReliability + } else { + updateMap["datasource.timestamp"] = vehicleUpdateEvent.DataSource.Timestamp + } + + if (offset.Seconds() != realtimeJourney.Offset.Seconds()) || newRealtimeJourney { + updateMap["offset"] = offset + } + + if realtimeJourney.NextStopRef != closestDistanceJourneyPath.DestinationStopRef { + journeyStopUpdates[realtimeJourney.NextStopRef] = &ctdf.RealtimeJourneyStops{ + StopRef: realtimeJourney.NextStopRef, + TimeType: ctdf.RealtimeJourneyStopTimeHistorical, + + // TODO this should obviously be a different time + ArrivalTime: currentTime, + DepartureTime: currentTime, + } + } + + for key, stopUpdate := range journeyStopUpdates { + if key != "" { + updateMap[fmt.Sprintf("stops.%s", key)] = stopUpdate + } + } + + bsonRep, _ := bson.Marshal(bson.M{"$set": updateMap}) + updateModel := mongo.NewUpdateOneModel() + updateModel.SetFilter(searchQuery) + updateModel.SetUpdate(bsonRep) + updateModel.SetUpsert(true) + + return updateModel, nil +} diff --git a/pkg/realtime/vehicletracker/servicealert.go b/pkg/realtime/vehicletracker/servicealert.go new file mode 100644 index 00000000..cb915774 --- /dev/null +++ b/pkg/realtime/vehicletracker/servicealert.go @@ -0,0 +1,62 @@ +package vehicletracker + +import ( + "errors" + "fmt" + "time" + + "github.com/travigo/travigo/pkg/ctdf" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +func (consumer *BatchConsumer) updateServiceAlert(journeyID string, stopID string, serviceID string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { + primaryIdentifier := fmt.Sprintf( + "%s-%s-%d-%d", + vehicleUpdateEvent.DataSource.DatasetID, vehicleUpdateEvent.ServiceAlertUpdate.Type, + vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom.UnixMilli(), + vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil.UnixMilli(), + ) + var matchedIdentifiers []string + + if journeyID != "" { + matchedIdentifiers = append(matchedIdentifiers, journeyID) + primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, journeyID) + } + + if stopID != "" { + matchedIdentifiers = append(matchedIdentifiers, stopID) + primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, stopID) + } + + if serviceID != "" { + matchedIdentifiers = append(matchedIdentifiers, serviceID) + primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, serviceID) + } + + if len(matchedIdentifiers) == 0 { + return nil, errors.New("No matching identifiers") + } + + serviceAlert := ctdf.ServiceAlert{ + PrimaryIdentifier: primaryIdentifier, + OtherIdentifiers: map[string]string{}, + CreationDateTime: time.Time{}, + ModificationDateTime: vehicleUpdateEvent.RecordedAt, + DataSource: vehicleUpdateEvent.DataSource, + AlertType: vehicleUpdateEvent.ServiceAlertUpdate.Type, + Title: vehicleUpdateEvent.ServiceAlertUpdate.Title, + Text: vehicleUpdateEvent.ServiceAlertUpdate.Description, + MatchedIdentifiers: matchedIdentifiers, + ValidFrom: vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom, + ValidUntil: vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil, + } + + bsonRep, _ := bson.Marshal(bson.M{"$set": serviceAlert}) + updateModel := mongo.NewUpdateOneModel() + updateModel.SetFilter(bson.M{"primaryidentifier": primaryIdentifier}) + updateModel.SetUpdate(bsonRep) + updateModel.SetUpsert(true) + + return updateModel, nil +}