Skip to content

Commit

Permalink
move around
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronClaydon committed Nov 10, 2024
1 parent e2805b7 commit d0f7be5
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 98 deletions.
92 changes: 53 additions & 39 deletions pkg/dataimporter/formats/gtfs/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gtfs

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -124,49 +125,62 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource)
}

// Create one per active period & informed entity
var identifyingInformation []map[string]string

for _, informedEntity := range entity.Alert.InformedEntity {
for _, activePeriod := range entity.Alert.ActivePeriod {
validFromTimestamp := activePeriod.GetStart()
validToTimestamp := activePeriod.GetEnd()

validFrom := time.Unix(int64(validFromTimestamp), 0)
validTo := time.Unix(int64(validToTimestamp), 0)

tripID := informedEntity.GetTrip().GetTripId()
routeID := informedEntity.GetRouteId()
stopID := informedEntity.GetStopId()
agencyID := informedEntity.GetAgencyId()

updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validFromTimestamp, validToTimestamp),

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Title: *entity.Alert.HeaderText.GetTranslation()[0].Text, // TODO assume we only use the 1 translation
Description: *entity.Alert.DescriptionText.GetTranslation()[0].Text,
ValidFrom: validFrom,
ValidUntil: validTo,

IdentifyingInformation: map[string]string{
"TripID": tripID,
"RouteID": routeID,
"StopID": stopID,
"AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},
},

SourceType: "GTFS-RT",
DataSource: datasource,
RecordedAt: recordedAtTime,
}
tripID := informedEntity.GetTrip().GetTripId()
routeID := informedEntity.GetRouteId()
stopID := informedEntity.GetStopId()
agencyID := informedEntity.GetAgencyId()

identifyingInformation = append(identifyingInformation, map[string]string{
"TripID": tripID,
"RouteID": routeID,
"StopID": stopID,
"AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
})
}

for _, activePeriod := range entity.Alert.ActivePeriod {
validFromTimestamp := activePeriod.GetStart()
validToTimestamp := activePeriod.GetEnd()

validFrom := time.Unix(int64(validFromTimestamp), 0)
validTo := time.Unix(int64(validToTimestamp), 0)

title := *entity.Alert.HeaderText.GetTranslation()[0].Text // TODO assume we only use the 1 translation
description := *entity.Alert.DescriptionText.GetTranslation()[0].Text

updateEventJson, _ := json.Marshal(updateEvent)
r.queue.PublishBytes(updateEventJson)
hash := sha256.New()
hash.Write([]byte(alertType))
hash.Write([]byte(title))
hash.Write([]byte(description))
localIDhash := fmt.Sprintf("%x", hash.Sum(nil))

serviceAlertCount += 1
updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash),

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Title: title,
Description: description,
ValidFrom: validFrom,
ValidUntil: validTo,

IdentifyingInformation: identifyingInformation,
},

SourceType: "GTFS-RT",
DataSource: datasource,
RecordedAt: recordedAtTime,
}

updateEventJson, _ := json.Marshal(updateEvent)
r.queue.PublishBytes(updateEventJson)

serviceAlertCount += 1
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/dataimporter/formats/siri_sx/siri_sx.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d
ValidFrom: validityPeriodStart,
ValidUntil: validityPeriodEnd,

IdentifyingInformation: map[string]string{
// "TripID": tripID,
// "RouteID": routeID,
// "StopID": stopID,
// "AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},
// IdentifyingInformation: map[string]string{
// // "TripID": tripID,
// // "RouteID": routeID,
// // "StopID": stopID,
// // "AgencyID": agencyID,
// "LinkedDataset": dataset.LinkedDataset,
// },
},

SourceType: "siri-sx",
Expand Down
81 changes: 47 additions & 34 deletions pkg/realtime/vehicletracker/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) {
}

if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeTrip {
identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent)
identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent, vehicleUpdateEvent.SourceType, vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation)

if identifiedJourneyID != "" {
writeModel, _ := consumer.updateRealtimeJourney(identifiedJourneyID, vehicleUpdateEvent)
Expand All @@ -109,11 +109,24 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) {
log.Debug().Interface("event", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation).Msg("Couldnt identify journey")
}
} else if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeServiceAlert {
identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent)
identifiedStopID := consumer.identifyStop(vehicleUpdateEvent)
identifiedServiceID := consumer.identifyService(vehicleUpdateEvent)
var matchedIdentifiers []string
for _, identifyingInformation := range vehicleUpdateEvent.ServiceAlertUpdate.IdentifyingInformation {
identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent, vehicleUpdateEvent.SourceType, identifyingInformation)
identifiedStopID := consumer.identifyStop(vehicleUpdateEvent.SourceType, identifyingInformation)
identifiedServiceID := consumer.identifyService(vehicleUpdateEvent.SourceType, identifyingInformation)

if identifiedJourneyID != "" {
matchedIdentifiers = append(matchedIdentifiers, identifiedJourneyID)
}
if identifiedStopID != "" {
matchedIdentifiers = append(matchedIdentifiers, identifiedStopID)
}
if identifiedServiceID != "" {
matchedIdentifiers = append(matchedIdentifiers, identifiedServiceID)
}
}

writeModel, _ := consumer.updateServiceAlert(identifiedJourneyID, identifiedStopID, identifiedServiceID, vehicleUpdateEvent)
writeModel, _ := consumer.updateServiceAlert(matchedIdentifiers, vehicleUpdateEvent)
if writeModel != nil {
serviceAlertOperations = append(serviceAlertOperations, writeModel)
}
Expand Down Expand Up @@ -151,10 +164,10 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) {
}
}

func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEvent) string {
if vehicleUpdateEvent.SourceType == "GTFS-RT" {
func (consumer *BatchConsumer) identifyStop(sourceType string, identifyingInformation map[string]string) string {
if sourceType == "GTFS-RT" {
stopIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
IdentifyingInformation: identifyingInformation,
}
stop, err := stopIdentifier.IdentifyStop()

Expand All @@ -168,10 +181,10 @@ func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEve
return ""
}

func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdateEvent) string {
if vehicleUpdateEvent.SourceType == "GTFS-RT" {
func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInformation map[string]string) string {
if sourceType == "GTFS-RT" {
serviceIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
IdentifyingInformation: identifyingInformation,
}
service, err := serviceIdentifier.IdentifyService()

Expand All @@ -185,12 +198,12 @@ func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdate
return ""
}

func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent) string {
func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent, sourceType string, identifyingInformation map[string]string) string {
currentTime := time.Now()
yearNumber, weekNumber := currentTime.ISOWeek()
identifyEventsIndexName := fmt.Sprintf("realtime-identify-events-%d-%d", yearNumber, weekNumber)

operatorRef := vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"]
operatorRef := identifyingInformation["OperatorRef"]

var journeyID string

Expand All @@ -201,45 +214,45 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate
var err error

// TODO use an interface here to reduce duplication
if vehicleUpdateEvent.SourceType == "siri-vm" {
if sourceType == "siri-vm" {
// Save a cache value of N/A to stop us from constantly rechecking for journeys handled somewhere else
successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && successVehicleID != "" {
identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, "N/A")
return ""
}

// TODO only exists here if siri-vm only comes from the 1 source
failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && failedVehicleID == "" {
return ""
}

// perform the actual sirivm
journeyIdentifier := identifiers.SiriVM{
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
IdentifyingInformation: identifyingInformation,
}
journey, err = journeyIdentifier.IdentifyJourney()

// TODO yet another special TfL only thing that shouldn't be here
if err != nil && vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"] == "gb-noc-TFLO" {
if err != nil && identifyingInformation["OperatorRef"] == "gb-noc-TFLO" {
tflEventBytes, _ := json.Marshal(map[string]string{
"Line": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
"DirectionRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DirectionRef"],
"Line": identifyingInformation["PublishedLineName"],
"DirectionRef": identifyingInformation["DirectionRef"],
"NumberPlate": vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier,
"OriginRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginRef"],
"DestinationRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DestinationRef"],
"OriginAimedDepartureTime": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginAimedDepartureTime"],
"OriginRef": identifyingInformation["OriginRef"],
"DestinationRef": identifyingInformation["DestinationRef"],
"OriginAimedDepartureTime": identifyingInformation["OriginAimedDepartureTime"],
})
consumer.TfLBusQueue.PublishBytes(tflEventBytes)
}
} else if vehicleUpdateEvent.SourceType == "GTFS-RT" {
} else if sourceType == "GTFS-RT" {
journeyIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
IdentifyingInformation: identifyingInformation,
}
journey, err = journeyIdentifier.IdentifyJourney()
} else {
log.Error().Str("sourcetype", vehicleUpdateEvent.SourceType).Msg("Unknown sourcetype")
log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype")
return ""
}

Expand All @@ -249,7 +262,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate

// Set cross dataset ID
if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType)
identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), sourceType)
}

// Temporary https://github.com/travigo/travigo/issues/43
Expand Down Expand Up @@ -278,10 +291,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate
FailReason: errorCode,

Operator: operatorRef,
Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"],
Service: identifyingInformation["PublishedLineName"],
Trip: identifyingInformation["TripID"],

SourceType: vehicleUpdateEvent.SourceType,
SourceType: sourceType,
})

elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent))
Expand All @@ -299,7 +312,7 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate

// Set cross dataset ID
if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType)
identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", identifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), sourceType)
}

// Record the successful identification event
Expand All @@ -309,10 +322,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate
Success: true,

Operator: operatorRef,
Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"],
Service: identifyingInformation["PublishedLineName"],
Trip: identifyingInformation["TripID"],

SourceType: vehicleUpdateEvent.SourceType,
SourceType: sourceType,
})

elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent))
Expand Down
18 changes: 1 addition & 17 deletions pkg/realtime/vehicletracker/servicealert.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,13 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)

func (consumer *BatchConsumer) updateServiceAlert(journeyID string, stopID string, serviceID string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) {
func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) {
primaryIdentifier := fmt.Sprintf(
"%s-%s-%d-%d",
vehicleUpdateEvent.DataSource.DatasetID, vehicleUpdateEvent.ServiceAlertUpdate.Type,
vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom.UnixMilli(),
vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil.UnixMilli(),
)
var matchedIdentifiers []string

if journeyID != "" {
matchedIdentifiers = append(matchedIdentifiers, journeyID)
primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, journeyID)
}

if stopID != "" {
matchedIdentifiers = append(matchedIdentifiers, stopID)
primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, stopID)
}

if serviceID != "" {
matchedIdentifiers = append(matchedIdentifiers, serviceID)
primaryIdentifier = fmt.Sprintf("%s-%s", primaryIdentifier, serviceID)
}

if len(matchedIdentifiers) == 0 {
return nil, errors.New("No matching identifiers")
Expand Down
2 changes: 1 addition & 1 deletion pkg/realtime/vehicletracker/vehicleupdateevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ type ServiceAlertUpdate struct {
ValidFrom time.Time
ValidUntil time.Time

IdentifyingInformation map[string]string
IdentifyingInformation []map[string]string
}

0 comments on commit d0f7be5

Please sign in to comment.