diff --git a/pkg/dataimporter/formats/gtfs/realtime.go b/pkg/dataimporter/formats/gtfs/realtime.go index b69c115..08ce4e2 100644 --- a/pkg/dataimporter/formats/gtfs/realtime.go +++ b/pkg/dataimporter/formats/gtfs/realtime.go @@ -2,6 +2,7 @@ package gtfs import ( "context" + "crypto/sha256" "encoding/json" "errors" "fmt" @@ -124,49 +125,62 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource) } // Create one per active period & informed entity + var identifyingInformation []map[string]string + for _, informedEntity := range entity.Alert.InformedEntity { - for _, activePeriod := range entity.Alert.ActivePeriod { - validFromTimestamp := activePeriod.GetStart() - validToTimestamp := activePeriod.GetEnd() - - validFrom := time.Unix(int64(validFromTimestamp), 0) - validTo := time.Unix(int64(validToTimestamp), 0) - - tripID := informedEntity.GetTrip().GetTripId() - routeID := informedEntity.GetRouteId() - stopID := informedEntity.GetStopId() - agencyID := informedEntity.GetAgencyId() - - updateEvent := vehicletracker.VehicleUpdateEvent{ - MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert, - LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validFromTimestamp, validToTimestamp), - - ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{ - Type: alertType, - Title: *entity.Alert.HeaderText.GetTranslation()[0].Text, // TODO assume we only use the 1 translation - Description: *entity.Alert.DescriptionText.GetTranslation()[0].Text, - ValidFrom: validFrom, - ValidUntil: validTo, - - IdentifyingInformation: map[string]string{ - "TripID": tripID, - "RouteID": routeID, - "StopID": stopID, - "AgencyID": agencyID, - "LinkedDataset": dataset.LinkedDataset, - }, - }, - - SourceType: "GTFS-RT", - DataSource: datasource, - RecordedAt: recordedAtTime, - } + tripID := informedEntity.GetTrip().GetTripId() + routeID := informedEntity.GetRouteId() + stopID := informedEntity.GetStopId() + agencyID := informedEntity.GetAgencyId() + + identifyingInformation = append(identifyingInformation, map[string]string{ + "TripID": tripID, + "RouteID": routeID, + "StopID": stopID, + "AgencyID": agencyID, + "LinkedDataset": dataset.LinkedDataset, + }) + } + + for _, activePeriod := range entity.Alert.ActivePeriod { + validFromTimestamp := activePeriod.GetStart() + validToTimestamp := activePeriod.GetEnd() + + validFrom := time.Unix(int64(validFromTimestamp), 0) + validTo := time.Unix(int64(validToTimestamp), 0) + + title := *entity.Alert.HeaderText.GetTranslation()[0].Text // TODO assume we only use the 1 translation + description := *entity.Alert.DescriptionText.GetTranslation()[0].Text - updateEventJson, _ := json.Marshal(updateEvent) - r.queue.PublishBytes(updateEventJson) + hash := sha256.New() + hash.Write([]byte(alertType)) + hash.Write([]byte(title)) + hash.Write([]byte(description)) + localIDhash := fmt.Sprintf("%x", hash.Sum(nil)) - serviceAlertCount += 1 + updateEvent := vehicletracker.VehicleUpdateEvent{ + MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert, + LocalID: fmt.Sprintf("%s-realtime-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash), + + ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{ + Type: alertType, + Title: title, + Description: description, + ValidFrom: validFrom, + ValidUntil: validTo, + + IdentifyingInformation: identifyingInformation, + }, + + SourceType: "GTFS-RT", + DataSource: datasource, + RecordedAt: recordedAtTime, } + + updateEventJson, _ := json.Marshal(updateEvent) + r.queue.PublishBytes(updateEventJson) + + serviceAlertCount += 1 } } diff --git a/pkg/dataimporter/formats/siri_sx/siri_sx.go b/pkg/dataimporter/formats/siri_sx/siri_sx.go index 3032bfe..160c65b 100644 --- a/pkg/dataimporter/formats/siri_sx/siri_sx.go +++ b/pkg/dataimporter/formats/siri_sx/siri_sx.go @@ -103,13 +103,13 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d ValidFrom: validityPeriodStart, ValidUntil: validityPeriodEnd, - IdentifyingInformation: map[string]string{ - // "TripID": tripID, - // "RouteID": routeID, - // "StopID": stopID, - // "AgencyID": agencyID, - "LinkedDataset": dataset.LinkedDataset, - }, + // IdentifyingInformation: map[string]string{ + // // "TripID": tripID, + // // "RouteID": routeID, + // // "StopID": stopID, + // // "AgencyID": agencyID, + // "LinkedDataset": dataset.LinkedDataset, + // }, }, SourceType: "siri-sx", diff --git a/pkg/realtime/vehicletracker/consumer.go b/pkg/realtime/vehicletracker/consumer.go index 1e91eb4..97e49e4 100644 --- a/pkg/realtime/vehicletracker/consumer.go +++ b/pkg/realtime/vehicletracker/consumer.go @@ -97,7 +97,7 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { } if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeTrip { - identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent) + identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent, vehicleUpdateEvent.SourceType, vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation) if identifiedJourneyID != "" { writeModel, _ := consumer.updateRealtimeJourney(identifiedJourneyID, vehicleUpdateEvent) @@ -109,11 +109,24 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { log.Debug().Interface("event", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation).Msg("Couldnt identify journey") } } else if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeServiceAlert { - identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent) - identifiedStopID := consumer.identifyStop(vehicleUpdateEvent) - identifiedServiceID := consumer.identifyService(vehicleUpdateEvent) + var matchedIdentifiers []string + for _, identifyingInformation := range vehicleUpdateEvent.ServiceAlertUpdate.IdentifyingInformation { + identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent, vehicleUpdateEvent.SourceType, identifyingInformation) + identifiedStopID := consumer.identifyStop(vehicleUpdateEvent.SourceType, identifyingInformation) + identifiedServiceID := consumer.identifyService(vehicleUpdateEvent.SourceType, identifyingInformation) + + if identifiedJourneyID != "" { + matchedIdentifiers = append(matchedIdentifiers, identifiedJourneyID) + } + if identifiedStopID != "" { + matchedIdentifiers = append(matchedIdentifiers, identifiedStopID) + } + if identifiedServiceID != "" { + matchedIdentifiers = append(matchedIdentifiers, identifiedServiceID) + } + } - writeModel, _ := consumer.updateServiceAlert(identifiedJourneyID, identifiedStopID, identifiedServiceID, vehicleUpdateEvent) + writeModel, _ := consumer.updateServiceAlert(matchedIdentifiers, vehicleUpdateEvent) if writeModel != nil { serviceAlertOperations = append(serviceAlertOperations, writeModel) } @@ -151,10 +164,10 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) { } } -func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEvent) string { - if vehicleUpdateEvent.SourceType == "GTFS-RT" { +func (consumer *BatchConsumer) identifyStop(sourceType string, identifyingInformation map[string]string) string { + if sourceType == "GTFS-RT" { stopIdentifier := identifiers.GTFSRT{ - IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation, + IdentifyingInformation: identifyingInformation, } stop, err := stopIdentifier.IdentifyStop() @@ -168,10 +181,10 @@ func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEve return "" } -func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdateEvent) string { - if vehicleUpdateEvent.SourceType == "GTFS-RT" { +func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInformation map[string]string) string { + if sourceType == "GTFS-RT" { serviceIdentifier := identifiers.GTFSRT{ - IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation, + IdentifyingInformation: identifyingInformation, } service, err := serviceIdentifier.IdentifyService() @@ -185,12 +198,12 @@ func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdate return "" } -func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent) string { +func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent, sourceType string, identifyingInformation map[string]string) string { currentTime := time.Now() yearNumber, weekNumber := currentTime.ISOWeek() identifyEventsIndexName := fmt.Sprintf("realtime-identify-events-%d-%d", yearNumber, weekNumber) - operatorRef := vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"] + operatorRef := identifyingInformation["OperatorRef"] var journeyID string @@ -201,45 +214,45 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate var err error // TODO use an interface here to reduce duplication - if vehicleUpdateEvent.SourceType == "siri-vm" { + if sourceType == "siri-vm" { // Save a cache value of N/A to stop us from constantly rechecking for journeys handled somewhere else - successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier)) + successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier)) if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && successVehicleID != "" { identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, "N/A") return "" } // TODO only exists here if siri-vm only comes from the 1 source - failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier)) + failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier)) if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && failedVehicleID == "" { return "" } // perform the actual sirivm journeyIdentifier := identifiers.SiriVM{ - IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation, + IdentifyingInformation: identifyingInformation, } journey, err = journeyIdentifier.IdentifyJourney() // TODO yet another special TfL only thing that shouldn't be here - if err != nil && vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"] == "gb-noc-TFLO" { + if err != nil && identifyingInformation["OperatorRef"] == "gb-noc-TFLO" { tflEventBytes, _ := json.Marshal(map[string]string{ - "Line": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"], - "DirectionRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DirectionRef"], + "Line": identifyingInformation["PublishedLineName"], + "DirectionRef": identifyingInformation["DirectionRef"], "NumberPlate": vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier, - "OriginRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginRef"], - "DestinationRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DestinationRef"], - "OriginAimedDepartureTime": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginAimedDepartureTime"], + "OriginRef": identifyingInformation["OriginRef"], + "DestinationRef": identifyingInformation["DestinationRef"], + "OriginAimedDepartureTime": identifyingInformation["OriginAimedDepartureTime"], }) consumer.TfLBusQueue.PublishBytes(tflEventBytes) } - } else if vehicleUpdateEvent.SourceType == "GTFS-RT" { + } else if sourceType == "GTFS-RT" { journeyIdentifier := identifiers.GTFSRT{ - IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation, + IdentifyingInformation: identifyingInformation, } journey, err = journeyIdentifier.IdentifyJourney() } else { - log.Error().Str("sourcetype", vehicleUpdateEvent.SourceType).Msg("Unknown sourcetype") + log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype") return "" } @@ -249,7 +262,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate // Set cross dataset ID if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" { - identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType) + identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), sourceType) } // Temporary https://github.com/travigo/travigo/issues/43 @@ -278,10 +291,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate FailReason: errorCode, Operator: operatorRef, - Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"], - Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"], + Service: identifyingInformation["PublishedLineName"], + Trip: identifyingInformation["TripID"], - SourceType: vehicleUpdateEvent.SourceType, + SourceType: sourceType, }) elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent)) @@ -299,7 +312,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate // Set cross dataset ID if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" { - identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType) + identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), sourceType) } // Record the successful identification event @@ -309,10 +322,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate Success: true, Operator: operatorRef, - Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"], - Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"], + Service: identifyingInformation["PublishedLineName"], + Trip: identifyingInformation["TripID"], - SourceType: vehicleUpdateEvent.SourceType, + SourceType: sourceType, }) elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent)) diff --git a/pkg/realtime/vehicletracker/servicealert.go b/pkg/realtime/vehicletracker/servicealert.go index cb91577..619785b 100644 --- a/pkg/realtime/vehicletracker/servicealert.go +++ b/pkg/realtime/vehicletracker/servicealert.go @@ -10,29 +10,13 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func (consumer *BatchConsumer) updateServiceAlert(journeyID string, stopID string, serviceID string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { +func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { primaryIdentifier := fmt.Sprintf( "%s-%s-%d-%d", vehicleUpdateEvent.DataSource.DatasetID, vehicleUpdateEvent.ServiceAlertUpdate.Type, vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom.UnixMilli(), vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil.UnixMilli(), ) - var matchedIdentifiers []string - - if journeyID != "" { - matchedIdentifiers = append(matchedIdentifiers, journeyID) - primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, journeyID) - } - - if stopID != "" { - matchedIdentifiers = append(matchedIdentifiers, stopID) - primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, stopID) - } - - if serviceID != "" { - matchedIdentifiers = append(matchedIdentifiers, serviceID) - primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, serviceID) - } if len(matchedIdentifiers) == 0 { return nil, errors.New("No matching identifiers") diff --git a/pkg/realtime/vehicletracker/vehicleupdateevent.go b/pkg/realtime/vehicletracker/vehicleupdateevent.go index 6874bb9..da33b97 100644 --- a/pkg/realtime/vehicletracker/vehicleupdateevent.go +++ b/pkg/realtime/vehicletracker/vehicleupdateevent.go @@ -60,5 +60,5 @@ type ServiceAlertUpdate struct { ValidFrom time.Time ValidUntil time.Time - IdentifyingInformation map[string]string + IdentifyingInformation []map[string]string }