Skip to content

Commit

Permalink
Add BODS service alerts #123
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronClaydon committed Nov 10, 2024
1 parent d0f7be5 commit 3bca2d3
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 31 deletions.
4 changes: 3 additions & 1 deletion deploy/charts/travigo-data-importer/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ cronjobs:
args: [ "data-linker", "run", "--type", "stops" ]

deployments:
- name: sirivm-all
- name: gb-dft-bods-sirivm-all
args: ["data-importer", "dataset", "--id", "gb-dft-bods-sirivm-all", "--repeat-every", "30s"]
- name: gb-dft-bods-sirisx-all
args: ["data-importer", "dataset", "--id", "gb-dft-bods-sirisx-all", "--repeat-every", "300s"]
- name: gb-bods-gtfs-rt
args: ["data-importer", "dataset", "--id", "gb-dft-bods-gtfs-realtime", "--repeat-every", "10s"]
- name: ie-gtfs-realtime
Expand Down
2 changes: 1 addition & 1 deletion pkg/dataimporter/formats/gtfs/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource)

updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash),
LocalID: fmt.Sprintf("%s-servicealert-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash),

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Expand Down
53 changes: 38 additions & 15 deletions pkg/dataimporter/formats/siri_sx/siri_sx.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package siri_sx

import (
"crypto/sha256"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"time"

"github.com/adjust/rmq/v5"
"github.com/kr/pretty"
"github.com/rs/zerolog/log"
"github.com/travigo/travigo/pkg/ctdf"
"github.com/travigo/travigo/pkg/dataimporter/datasets"
Expand Down Expand Up @@ -92,35 +93,57 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d
var alertType ctdf.ServiceAlertType
alertType = ctdf.ServiceAlertTypeInformation // TODO debug

var identifyingInformation []map[string]string
for _, consequence := range situationElement.Consequence {
for _, network := range consequence.AffectedNetworks {
for _, line := range network.AffectedLine {
identifyingInformation = append(identifyingInformation, map[string]string{
"LineRef": line.LineRef,
"PubishedLineRef": line.PublishedLineRef,
"OperatorRef": line.OperatorRef,
"LinkedDataset": "gb-dft-bods-gtfs-schedule", // not always going to be true in future
})
}
}

for _, stopPoint := range consequence.AffectedStopPoints {
identifyingInformation = append(identifyingInformation, map[string]string{
"StopPointRef": stopPoint.StopPointRef,
"LinkedDataset": "gb-dft-naptan", // not always going to be true in future
})
}
}

title := situationElement.Summary
description := situationElement.Description

hash := sha256.New()
hash.Write([]byte(alertType))
hash.Write([]byte(title))
hash.Write([]byte(description))
localIDhash := fmt.Sprintf("%x", hash.Sum(nil))

updateEvent := vehicletracker.VehicleUpdateEvent{
MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert,
LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validityPeriodStart.UnixMicro(), validityPeriodEnd.UnixMicro()),
LocalID: fmt.Sprintf("%s-servicealert-%d-%d-%s", dataset.Identifier, validityPeriodStart.UnixMicro(), validityPeriodEnd.UnixMicro(), localIDhash),

ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{
Type: alertType,
Title: situationElement.Summary,
Description: situationElement.Description,
Title: title,
Description: description,
ValidFrom: validityPeriodStart,
ValidUntil: validityPeriodEnd,

// IdentifyingInformation: map[string]string{
// // "TripID": tripID,
// // "RouteID": routeID,
// // "StopID": stopID,
// // "AgencyID": agencyID,
// "LinkedDataset": dataset.LinkedDataset,
// },
IdentifyingInformation: identifyingInformation,
},

SourceType: "siri-sx",
DataSource: datasource,
RecordedAt: versionedAtTime,
}

pretty.Println(updateEvent)

// updateEventJson, _ := json.Marshal(updateEvent)
// queue.PublishBytes(updateEventJson)
updateEventJson, _ := json.Marshal(updateEvent)
queue.PublishBytes(updateEventJson)

return true
}
34 changes: 30 additions & 4 deletions pkg/realtime/vehicletracker/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,21 @@ func (consumer *BatchConsumer) identifyStop(sourceType string, identifyingInform
}

return stop
}
} else if sourceType == "siri-sx" {
stopIdentifier := identifiers.SiriSX{
IdentifyingInformation: identifyingInformation,
}
stop, err := stopIdentifier.IdentifyStop()

if err != nil {
return ""
}

return ""
return stop
} else {
log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype")
return ""
}
}

func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInformation map[string]string) string {
Expand All @@ -193,9 +205,21 @@ func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInf
}

return service
}
} else if sourceType == "siri-sx" {
serviceIdentifier := identifiers.SiriSX{
IdentifyingInformation: identifyingInformation,
}
service, err := serviceIdentifier.IdentifyService()

if err != nil {
return ""
}

return ""
return service
} else {
log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype")
return ""
}
}

func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent, sourceType string, identifyingInformation map[string]string) string {
Expand Down Expand Up @@ -251,6 +275,8 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate
IdentifyingInformation: identifyingInformation,
}
journey, err = journeyIdentifier.IdentifyJourney()
} else if sourceType == "siri-sx" {
return "" // TODO not now
} else {
log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype")
return ""
Expand Down
90 changes: 90 additions & 0 deletions pkg/realtime/vehicletracker/identifiers/sirisx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package identifiers

import (
"context"
"errors"
"fmt"

"github.com/travigo/travigo/pkg/ctdf"
"github.com/travigo/travigo/pkg/database"
"go.mongodb.org/mongo-driver/bson"
)

type SiriSX struct {
IdentifyingInformation map[string]string
}

func (r *SiriSX) IdentifyStop() (string, error) {
stopsCollection := database.GetCollection("stops")

stopID := r.IdentifyingInformation["StopPointRef"]
if stopID == "" {
return "", errors.New("Missing field StopPointRef")
}

linkedDataset := r.IdentifyingInformation["LinkedDataset"]
if linkedDataset == "" {
return "", errors.New("Missing field linkedDataset")
}

var potentialStops []ctdf.Stop

formatedStopID := fmt.Sprintf("gb-atco-%s", stopID)

cursor, _ := stopsCollection.Find(context.Background(), bson.M{
"$or": bson.A{
bson.M{"primaryidentifier": formatedStopID},
bson.M{"otheridentifiers": formatedStopID},
},
"datasource.datasetid": linkedDataset,
})
cursor.All(context.Background(), &potentialStops)

if len(potentialStops) == 0 {
return "", errors.New("Could not find referenced stop")
} else if len(potentialStops) == 1 {
return potentialStops[0].PrimaryIdentifier, nil
} else {
return "", errors.New("Could not find referenced stop")
}
}

func (r *SiriSX) IdentifyService() (string, error) {
servicesCollection := database.GetCollection("services")

lineRef := r.IdentifyingInformation["LineRef"]
if lineRef == "" {
return "", errors.New("Missing field LineRef")
}

operatorRef := r.IdentifyingInformation["OperatorRef"]
if operatorRef == "" {
return "", errors.New("Missing field OperatorRef")
}

linkedDataset := r.IdentifyingInformation["LinkedDataset"]
if linkedDataset == "" {
return "", errors.New("Missing field linkedDataset")
}

var potentialServices []ctdf.Stop

cursor, _ := servicesCollection.Find(context.Background(), bson.M{
"servicename": lineRef,
"operatorref": fmt.Sprintf("gb-noc-%s", operatorRef),
"datasource.datasetid": linkedDataset,
})
cursor.All(context.Background(), &potentialServices)

if len(potentialServices) == 0 {
return "", errors.New("Could not find referenced service")
} else if len(potentialServices) == 1 {
return potentialServices[0].PrimaryIdentifier, nil
} else {
return "", errors.New("Could not find referenced service")
}
}

func (r *SiriSX) IdentifyJourney() (string, error) {
return "", errors.New("Not supported")
}
12 changes: 2 additions & 10 deletions pkg/realtime/vehicletracker/servicealert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package vehicletracker

import (
"errors"
"fmt"
"time"

"github.com/travigo/travigo/pkg/ctdf"
Expand All @@ -11,19 +10,12 @@ import (
)

func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) {
primaryIdentifier := fmt.Sprintf(
"%s-%s-%d-%d",
vehicleUpdateEvent.DataSource.DatasetID, vehicleUpdateEvent.ServiceAlertUpdate.Type,
vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom.UnixMilli(),
vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil.UnixMilli(),
)

if len(matchedIdentifiers) == 0 {
return nil, errors.New("No matching identifiers")
}

serviceAlert := ctdf.ServiceAlert{
PrimaryIdentifier: primaryIdentifier,
PrimaryIdentifier: vehicleUpdateEvent.LocalID,
OtherIdentifiers: map[string]string{},
CreationDateTime: time.Time{},
ModificationDateTime: vehicleUpdateEvent.RecordedAt,
Expand All @@ -38,7 +30,7 @@ func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, v

bsonRep, _ := bson.Marshal(bson.M{"$set": serviceAlert})
updateModel := mongo.NewUpdateOneModel()
updateModel.SetFilter(bson.M{"primaryidentifier": primaryIdentifier})
updateModel.SetFilter(bson.M{"primaryidentifier": vehicleUpdateEvent.LocalID})
updateModel.SetUpdate(bsonRep)
updateModel.SetUpsert(true)

Expand Down

0 comments on commit 3bca2d3

Please sign in to comment.