Skip to content

Commit

Permalink
move things around
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronClaydon committed Nov 10, 2024
1 parent 78306cd commit e2805b7
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 75 deletions.
28 changes: 15 additions & 13 deletions pkg/dataimporter/formats/gtfs/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,21 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource)
updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validFromTimestamp, validToTimestamp),
IdentifyingInformation: map[string]string{
"TripID": tripID,
"RouteID": routeID,
"StopID": stopID,
"AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Title: *entity.Alert.HeaderText.GetTranslation()[0].Text, // TODO assume we only use the 1 translation
Description: *entity.Alert.DescriptionText.GetTranslation()[0].Text,
ValidFrom: validFrom,
ValidUntil: validTo,

IdentifyingInformation: map[string]string{
"TripID": tripID,
"RouteID": routeID,
"StopID": stopID,
"AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},
},

SourceType: "GTFS-RT",
Expand Down Expand Up @@ -188,14 +189,15 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource)
locationEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeTrip,
LocalID: fmt.Sprintf("%s-realtime-%s-%s", dataset.Identifier, timeframe, tripID),
IdentifyingInformation: map[string]string{
"TripID": tripID,
"RouteID": trip.GetRouteId(),
"LinkedDataset": dataset.LinkedDataset,
},
SourceType: "GTFS-RT",
SourceType: "GTFS-RT",
VehicleLocationUpdate: &vehicletracker.VehicleLocationUpdate{
Timeframe: timeframe,

IdentifyingInformation: map[string]string{
"TripID": tripID,
"RouteID": trip.GetRouteId(),
"LinkedDataset": dataset.LinkedDataset,
},
},
DataSource: datasource,
RecordedAt: recordedAtTime,
Expand Down
15 changes: 8 additions & 7 deletions pkg/dataimporter/formats/siri_sx/siri_sx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,21 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d
updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validityPeriodStart.UnixMicro(), validityPeriodEnd.UnixMicro()),
IdentifyingInformation: map[string]string{
// "TripID": tripID,
// "RouteID": routeID,
// "StopID": stopID,
// "AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Title: situationElement.Summary,
Description: situationElement.Description,
ValidFrom: validityPeriodStart,
ValidUntil: validityPeriodEnd,

IdentifyingInformation: map[string]string{
// "TripID": tripID,
// "RouteID": routeID,
// "StopID": stopID,
// "AgencyID": agencyID,
"LinkedDataset": dataset.LinkedDataset,
},
},

SourceType: "siri-sx",
Expand Down
29 changes: 15 additions & 14 deletions pkg/dataimporter/formats/siri_vm/siri_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,7 @@ func SubmitToProcessQueue(queue rmq.Queue, vehicle *VehicleActivity, dataset dat
locationEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeTrip,
LocalID: localJourneyID,
IdentifyingInformation: map[string]string{
"ServiceNameRef": vehicle.MonitoredVehicleJourney.LineRef,
"DirectionRef": vehicle.MonitoredVehicleJourney.DirectionRef,
"PublishedLineName": vehicle.MonitoredVehicleJourney.PublishedLineName,
"OperatorRef": fmt.Sprintf(ctdf.OperatorNOCFormat, operatorRef),
"VehicleJourneyRef": vehicleJourneyRef,
"BlockRef": vehicle.MonitoredVehicleJourney.BlockRef,
"OriginRef": originRef,
"DestinationRef": fmt.Sprintf(ctdf.GBStopIDFormat, vehicle.MonitoredVehicleJourney.DestinationRef),
"OriginAimedDepartureTime": vehicle.MonitoredVehicleJourney.OriginAimedDepartureTime,
"FramedVehicleJourneyDate": vehicle.MonitoredVehicleJourney.FramedVehicleJourneyRef.DataFrameRef,
"LinkedDataset": dataset.LinkedDataset,
},
SourceType: "siri-vm",
SourceType: "siri-vm",
VehicleLocationUpdate: &vehicletracker.VehicleLocationUpdate{
Location: ctdf.Location{
Type: "Point",
Expand All @@ -103,6 +90,20 @@ func SubmitToProcessQueue(queue rmq.Queue, vehicle *VehicleActivity, dataset dat
Bearing: vehicle.MonitoredVehicleJourney.Bearing,
VehicleIdentifier: vehicleRef,
Timeframe: timeframe,

IdentifyingInformation: map[string]string{
"ServiceNameRef": vehicle.MonitoredVehicleJourney.LineRef,
"DirectionRef": vehicle.MonitoredVehicleJourney.DirectionRef,
"PublishedLineName": vehicle.MonitoredVehicleJourney.PublishedLineName,
"OperatorRef": fmt.Sprintf(ctdf.OperatorNOCFormat, operatorRef),
"VehicleJourneyRef": vehicleJourneyRef,
"BlockRef": vehicle.MonitoredVehicleJourney.BlockRef,
"OriginRef": originRef,
"DestinationRef": fmt.Sprintf(ctdf.GBStopIDFormat, vehicle.MonitoredVehicleJourney.DestinationRef),
"OriginAimedDepartureTime": vehicle.MonitoredVehicleJourney.OriginAimedDepartureTime,
"FramedVehicleJourneyDate": vehicle.MonitoredVehicleJourney.FramedVehicleJourneyRef.DataFrameRef,
"LinkedDataset": dataset.LinkedDataset,
},
},
DataSource: datasource,
RecordedAt: recordedAtTime,
Expand Down
78 changes: 39 additions & 39 deletions pkg/realtime/vehicletracker/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) {
realtimeJourneyOperations = append(realtimeJourneyOperations, writeModel)
}
} else {
log.Debug().Interface("event", vehicleUpdateEvent.IdentifyingInformation).Msg("Couldnt identify journey")
log.Debug().Interface("event", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation).Msg("Couldnt identify journey")
}
} else if vehicleUpdateEvent.MessageType == VehicleUpdateEventTypeServiceAlert {
identifiedJourneyID := consumer.identifyVehicle(vehicleUpdateEvent)
Expand Down Expand Up @@ -154,7 +154,7 @@ func (consumer *BatchConsumer) Consume(batch rmq.Deliveries) {
func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEvent) string {
if vehicleUpdateEvent.SourceType == "GTFS-RT" {
stopIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleUpdateEvent.IdentifyingInformation,
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
}
stop, err := stopIdentifier.IdentifyStop()

Expand All @@ -171,7 +171,7 @@ func (consumer *BatchConsumer) identifyStop(vehicleUpdateEvent *VehicleUpdateEve
func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdateEvent) string {
if vehicleUpdateEvent.SourceType == "GTFS-RT" {
serviceIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleUpdateEvent.IdentifyingInformation,
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
}
service, err := serviceIdentifier.IdentifyService()

Expand All @@ -185,71 +185,71 @@ func (consumer *BatchConsumer) identifyService(vehicleUpdateEvent *VehicleUpdate
return ""
}

func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpdateEvent) string {
func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent) string {
currentTime := time.Now()
yearNumber, weekNumber := currentTime.ISOWeek()
identifyEventsIndexName := fmt.Sprintf("realtime-identify-events-%d-%d", yearNumber, weekNumber)

operatorRef := vehicleLocationEvent.IdentifyingInformation["OperatorRef"]
operatorRef := vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"]

var journeyID string

cachedJourneyMapping, _ := identificationCache.Get(context.Background(), vehicleLocationEvent.LocalID)
cachedJourneyMapping, _ := identificationCache.Get(context.Background(), vehicleUpdateEvent.LocalID)

if cachedJourneyMapping == "" {
var journey string
var err error

// TODO use an interface here to reduce duplication
if vehicleLocationEvent.SourceType == "siri-vm" {
if vehicleUpdateEvent.SourceType == "siri-vm" {
// Save a cache value of N/A to stop us from constantly rechecking for journeys handled somewhere else
successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" && successVehicleID != "" {
identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, "N/A")
successVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && successVehicleID != "" {
identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, "N/A")
return ""
}

// TODO only exists here if siri-vm only comes from the 1 source
failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" && failedVehicleID == "" {
failedVehicleID, _ := identificationCache.Get(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier))
if vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" && failedVehicleID == "" {
return ""
}

// perform the actual sirivm
journeyIdentifier := identifiers.SiriVM{
IdentifyingInformation: vehicleLocationEvent.IdentifyingInformation,
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
}
journey, err = journeyIdentifier.IdentifyJourney()

// TODO yet another special TfL only thing that shouldn't be here
if err != nil && vehicleLocationEvent.IdentifyingInformation["OperatorRef"] == "gb-noc-TFLO" {
if err != nil && vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OperatorRef"] == "gb-noc-TFLO" {
tflEventBytes, _ := json.Marshal(map[string]string{
"Line": vehicleLocationEvent.IdentifyingInformation["PublishedLineName"],
"DirectionRef": vehicleLocationEvent.IdentifyingInformation["DirectionRef"],
"NumberPlate": vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier,
"OriginRef": vehicleLocationEvent.IdentifyingInformation["OriginRef"],
"DestinationRef": vehicleLocationEvent.IdentifyingInformation["DestinationRef"],
"OriginAimedDepartureTime": vehicleLocationEvent.IdentifyingInformation["OriginAimedDepartureTime"],
"Line": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
"DirectionRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DirectionRef"],
"NumberPlate": vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier,
"OriginRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginRef"],
"DestinationRef": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["DestinationRef"],
"OriginAimedDepartureTime": vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["OriginAimedDepartureTime"],
})
consumer.TfLBusQueue.PublishBytes(tflEventBytes)
}
} else if vehicleLocationEvent.SourceType == "GTFS-RT" {
} else if vehicleUpdateEvent.SourceType == "GTFS-RT" {
journeyIdentifier := identifiers.GTFSRT{
IdentifyingInformation: vehicleLocationEvent.IdentifyingInformation,
IdentifyingInformation: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation,
}
journey, err = journeyIdentifier.IdentifyJourney()
} else {
log.Error().Str("sourcetype", vehicleLocationEvent.SourceType).Msg("Unknown sourcetype")
log.Error().Str("sourcetype", vehicleUpdateEvent.SourceType).Msg("Unknown sourcetype")
return ""
}

if err != nil {
// Save a cache value of N/A to stop us from constantly rechecking for journeys we cant identify
identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, "N/A")
identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, "N/A")

// Set cross dataset ID
if vehicleLocationEvent.VehicleLocationUpdate != nil && vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleLocationEvent.SourceType)
if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("failedvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType)
}

// Temporary https://github.com/travigo/travigo/issues/43
Expand Down Expand Up @@ -278,10 +278,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda
FailReason: errorCode,

Operator: operatorRef,
Service: vehicleLocationEvent.IdentifyingInformation["PublishedLineName"],
Trip: vehicleLocationEvent.IdentifyingInformation["TripID"],
Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"],

SourceType: vehicleLocationEvent.SourceType,
SourceType: vehicleUpdateEvent.SourceType,
})

elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent))
Expand All @@ -292,14 +292,14 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda

journeyMapJson, _ := json.Marshal(localJourneyIDMap{
JourneyID: journeyID,
LastUpdated: vehicleLocationEvent.RecordedAt,
LastUpdated: vehicleUpdateEvent.RecordedAt,
})

identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, string(journeyMapJson))
identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, string(journeyMapJson))

// Set cross dataset ID
if vehicleLocationEvent.VehicleLocationUpdate != nil && vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleLocationEvent.IdentifyingInformation["LinkedDataset"], vehicleLocationEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleLocationEvent.SourceType)
if vehicleUpdateEvent.VehicleLocationUpdate != nil && vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier != "" {
identificationCache.Set(context.Background(), fmt.Sprintf("successvehicleid/%s/%s", vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["LinkedDataset"], vehicleUpdateEvent.VehicleLocationUpdate.VehicleIdentifier), vehicleUpdateEvent.SourceType)
}

// Record the successful identification event
Expand All @@ -309,10 +309,10 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda
Success: true,

Operator: operatorRef,
Service: vehicleLocationEvent.IdentifyingInformation["PublishedLineName"],
Trip: vehicleLocationEvent.IdentifyingInformation["TripID"],
Service: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["PublishedLineName"],
Trip: vehicleUpdateEvent.VehicleLocationUpdate.IdentifyingInformation["TripID"],

SourceType: vehicleLocationEvent.SourceType,
SourceType: vehicleUpdateEvent.SourceType,
})

elastic_client.IndexRequest(identifyEventsIndexName, bytes.NewReader(elasticEvent))
Expand All @@ -323,13 +323,13 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleLocationEvent *VehicleUpda
json.Unmarshal([]byte(cachedJourneyMapping), &journeyMap)

// skip this journey if hasnt changed
if vehicleLocationEvent.RecordedAt.After(journeyMap.LastUpdated) {
if vehicleUpdateEvent.RecordedAt.After(journeyMap.LastUpdated) {
// Update the last updated time
journeyMap.LastUpdated = vehicleLocationEvent.RecordedAt
journeyMap.LastUpdated = vehicleUpdateEvent.RecordedAt

journeyMapJson, _ := json.Marshal(journeyMap)

identificationCache.Set(context.Background(), vehicleLocationEvent.LocalID, string(journeyMapJson))
identificationCache.Set(context.Background(), vehicleUpdateEvent.LocalID, string(journeyMapJson))
} else {
return ""
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/realtime/vehicletracker/vehicleupdateevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ type VehicleUpdateEvent struct {

MessageType VehicleUpdateEventType

IdentifyingInformation map[string]string
SourceType string
SourceType string

VehicleLocationUpdate *VehicleLocationUpdate
ServiceAlertUpdate *ServiceAlertUpdate
Expand All @@ -33,6 +32,8 @@ type VehicleLocationUpdate struct {
Bearing float64
Timeframe string

IdentifyingInformation map[string]string

StopUpdates []VehicleLocationEventStopUpdate

Occupancy ctdf.RealtimeJourneyOccupancy
Expand All @@ -58,4 +59,6 @@ type ServiceAlertUpdate struct {

ValidFrom time.Time
ValidUntil time.Time

IdentifyingInformation map[string]string
}

0 comments on commit e2805b7

Please sign in to comment.