From 3bca2d3713f897e636c8264511c29ac6c58e639d Mon Sep 17 00:00:00 2001 From: Aaron Claydon Date: Sun, 10 Nov 2024 23:44:55 +0000 Subject: [PATCH] Add BODS service alerts #123 --- .../charts/travigo-data-importer/values.yaml | 4 +- pkg/dataimporter/formats/gtfs/realtime.go | 2 +- pkg/dataimporter/formats/siri_sx/siri_sx.go | 53 +++++++---- pkg/realtime/vehicletracker/consumer.go | 34 ++++++- .../vehicletracker/identifiers/sirisx.go | 90 +++++++++++++++++++ pkg/realtime/vehicletracker/servicealert.go | 12 +-- 6 files changed, 164 insertions(+), 31 deletions(-) create mode 100644 pkg/realtime/vehicletracker/identifiers/sirisx.go diff --git a/deploy/charts/travigo-data-importer/values.yaml b/deploy/charts/travigo-data-importer/values.yaml index 9b3c2c4..90bfc0a 100644 --- a/deploy/charts/travigo-data-importer/values.yaml +++ b/deploy/charts/travigo-data-importer/values.yaml @@ -87,8 +87,10 @@ cronjobs: args: [ "data-linker", "run", "--type", "stops" ] deployments: - - name: sirivm-all + - name: gb-dft-bods-sirivm-all args: ["data-importer", "dataset", "--id", "gb-dft-bods-sirivm-all", "--repeat-every", "30s"] + - name: gb-dft-bods-sirisx-all + args: ["data-importer", "dataset", "--id", "gb-dft-bods-sirisx-all", "--repeat-every", "300s"] - name: gb-bods-gtfs-rt args: ["data-importer", "dataset", "--id", "gb-dft-bods-gtfs-realtime", "--repeat-every", "10s"] - name: ie-gtfs-realtime diff --git a/pkg/dataimporter/formats/gtfs/realtime.go b/pkg/dataimporter/formats/gtfs/realtime.go index 08ce4e2..f8f18ff 100644 --- a/pkg/dataimporter/formats/gtfs/realtime.go +++ b/pkg/dataimporter/formats/gtfs/realtime.go @@ -160,7 +160,7 @@ func (r *Realtime) Import(dataset datasets.DataSet, datasource *ctdf.DataSource) updateEvent := vehicletracker.VehicleUpdateEvent{ MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert, - LocalID: fmt.Sprintf("%s-realtime-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash), + LocalID: fmt.Sprintf("%s-servicealert-%d-%d-%s", dataset.Identifier, validFromTimestamp, validToTimestamp, localIDhash), ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{ Type: alertType, diff --git a/pkg/dataimporter/formats/siri_sx/siri_sx.go b/pkg/dataimporter/formats/siri_sx/siri_sx.go index 160c65b..bad1fe6 100644 --- a/pkg/dataimporter/formats/siri_sx/siri_sx.go +++ b/pkg/dataimporter/formats/siri_sx/siri_sx.go @@ -1,6 +1,8 @@ package siri_sx import ( + "crypto/sha256" + "encoding/json" "encoding/xml" "errors" "fmt" @@ -8,7 +10,6 @@ import ( "time" "github.com/adjust/rmq/v5" - "github.com/kr/pretty" "github.com/rs/zerolog/log" "github.com/travigo/travigo/pkg/ctdf" "github.com/travigo/travigo/pkg/dataimporter/datasets" @@ -92,24 +93,48 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d var alertType ctdf.ServiceAlertType alertType = ctdf.ServiceAlertTypeInformation // TODO debug + var identifyingInformation []map[string]string + for _, consequence := range situationElement.Consequence { + for _, network := range consequence.AffectedNetworks { + for _, line := range network.AffectedLine { + identifyingInformation = append(identifyingInformation, map[string]string{ + "LineRef": line.LineRef, + "PubishedLineRef": line.PublishedLineRef, + "OperatorRef": line.OperatorRef, + "LinkedDataset": "gb-dft-bods-gtfs-schedule", // not always going to be true in future + }) + } + } + + for _, stopPoint := range consequence.AffectedStopPoints { + identifyingInformation = append(identifyingInformation, map[string]string{ + "StopPointRef": stopPoint.StopPointRef, + "LinkedDataset": "gb-dft-naptan", // not always going to be true in future + }) + } + } + + title := situationElement.Summary + description := situationElement.Description + + hash := sha256.New() + hash.Write([]byte(alertType)) + hash.Write([]byte(title)) + hash.Write([]byte(description)) + localIDhash := fmt.Sprintf("%x", hash.Sum(nil)) + updateEvent := vehicletracker.VehicleUpdateEvent{ MessageType: vehicletracker.VehicleUpdateEventTypeServiceAlert, - LocalID: fmt.Sprintf("%s-realtime-%d-%d", dataset.Identifier, validityPeriodStart.UnixMicro(), validityPeriodEnd.UnixMicro()), + LocalID: fmt.Sprintf("%s-servicealert-%d-%d-%s", dataset.Identifier, validityPeriodStart.UnixMicro(), validityPeriodEnd.UnixMicro(), localIDhash), ServiceAlertUpdate: &vehicletracker.ServiceAlertUpdate{ Type: alertType, - Title: situationElement.Summary, - Description: situationElement.Description, + Title: title, + Description: description, ValidFrom: validityPeriodStart, ValidUntil: validityPeriodEnd, - // IdentifyingInformation: map[string]string{ - // // "TripID": tripID, - // // "RouteID": routeID, - // // "StopID": stopID, - // // "AgencyID": agencyID, - // "LinkedDataset": dataset.LinkedDataset, - // }, + IdentifyingInformation: identifyingInformation, }, SourceType: "siri-sx", @@ -117,10 +142,8 @@ func SubmitToProcessQueue(queue rmq.Queue, situationElement *SituationElement, d RecordedAt: versionedAtTime, } - pretty.Println(updateEvent) - - // updateEventJson, _ := json.Marshal(updateEvent) - // queue.PublishBytes(updateEventJson) + updateEventJson, _ := json.Marshal(updateEvent) + queue.PublishBytes(updateEventJson) return true } diff --git a/pkg/realtime/vehicletracker/consumer.go b/pkg/realtime/vehicletracker/consumer.go index 97e49e4..edbd475 100644 --- a/pkg/realtime/vehicletracker/consumer.go +++ b/pkg/realtime/vehicletracker/consumer.go @@ -176,9 +176,21 @@ func (consumer *BatchConsumer) identifyStop(sourceType string, identifyingInform } return stop - } + } else if sourceType == "siri-sx" { + stopIdentifier := identifiers.SiriSX{ + IdentifyingInformation: identifyingInformation, + } + stop, err := stopIdentifier.IdentifyStop() + + if err != nil { + return "" + } - return "" + return stop + } else { + log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype") + return "" + } } func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInformation map[string]string) string { @@ -193,9 +205,21 @@ func (consumer *BatchConsumer) identifyService(sourceType string, identifyingInf } return service - } + } else if sourceType == "siri-sx" { + serviceIdentifier := identifiers.SiriSX{ + IdentifyingInformation: identifyingInformation, + } + service, err := serviceIdentifier.IdentifyService() + + if err != nil { + return "" + } - return "" + return service + } else { + log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype") + return "" + } } func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdateEvent, sourceType string, identifyingInformation map[string]string) string { @@ -251,6 +275,8 @@ func (consumer *BatchConsumer) identifyVehicle(vehicleUpdateEvent *VehicleUpdate IdentifyingInformation: identifyingInformation, } journey, err = journeyIdentifier.IdentifyJourney() + } else if sourceType == "siri-sx" { + return "" // TODO not now } else { log.Error().Str("sourcetype", sourceType).Msg("Unknown sourcetype") return "" diff --git a/pkg/realtime/vehicletracker/identifiers/sirisx.go b/pkg/realtime/vehicletracker/identifiers/sirisx.go new file mode 100644 index 0000000..d034a5b --- /dev/null +++ b/pkg/realtime/vehicletracker/identifiers/sirisx.go @@ -0,0 +1,90 @@ +package identifiers + +import ( + "context" + "errors" + "fmt" + + "github.com/travigo/travigo/pkg/ctdf" + "github.com/travigo/travigo/pkg/database" + "go.mongodb.org/mongo-driver/bson" +) + +type SiriSX struct { + IdentifyingInformation map[string]string +} + +func (r *SiriSX) IdentifyStop() (string, error) { + stopsCollection := database.GetCollection("stops") + + stopID := r.IdentifyingInformation["StopPointRef"] + if stopID == "" { + return "", errors.New("Missing field StopPointRef") + } + + linkedDataset := r.IdentifyingInformation["LinkedDataset"] + if linkedDataset == "" { + return "", errors.New("Missing field linkedDataset") + } + + var potentialStops []ctdf.Stop + + formatedStopID := fmt.Sprintf("gb-atco-%s", stopID) + + cursor, _ := stopsCollection.Find(context.Background(), bson.M{ + "$or": bson.A{ + bson.M{"primaryidentifier": formatedStopID}, + bson.M{"otheridentifiers": formatedStopID}, + }, + "datasource.datasetid": linkedDataset, + }) + cursor.All(context.Background(), &potentialStops) + + if len(potentialStops) == 0 { + return "", errors.New("Could not find referenced stop") + } else if len(potentialStops) == 1 { + return potentialStops[0].PrimaryIdentifier, nil + } else { + return "", errors.New("Could not find referenced stop") + } +} + +func (r *SiriSX) IdentifyService() (string, error) { + servicesCollection := database.GetCollection("services") + + lineRef := r.IdentifyingInformation["LineRef"] + if lineRef == "" { + return "", errors.New("Missing field LineRef") + } + + operatorRef := r.IdentifyingInformation["OperatorRef"] + if operatorRef == "" { + return "", errors.New("Missing field OperatorRef") + } + + linkedDataset := r.IdentifyingInformation["LinkedDataset"] + if linkedDataset == "" { + return "", errors.New("Missing field linkedDataset") + } + + var potentialServices []ctdf.Stop + + cursor, _ := servicesCollection.Find(context.Background(), bson.M{ + "servicename": lineRef, + "operatorref": fmt.Sprintf("gb-noc-%s", operatorRef), + "datasource.datasetid": linkedDataset, + }) + cursor.All(context.Background(), &potentialServices) + + if len(potentialServices) == 0 { + return "", errors.New("Could not find referenced service") + } else if len(potentialServices) == 1 { + return potentialServices[0].PrimaryIdentifier, nil + } else { + return "", errors.New("Could not find referenced service") + } +} + +func (r *SiriSX) IdentifyJourney() (string, error) { + return "", errors.New("Not supported") +} diff --git a/pkg/realtime/vehicletracker/servicealert.go b/pkg/realtime/vehicletracker/servicealert.go index 619785b..bbea4b1 100644 --- a/pkg/realtime/vehicletracker/servicealert.go +++ b/pkg/realtime/vehicletracker/servicealert.go @@ -2,7 +2,6 @@ package vehicletracker import ( "errors" - "fmt" "time" "github.com/travigo/travigo/pkg/ctdf" @@ -11,19 +10,12 @@ import ( ) func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, vehicleUpdateEvent *VehicleUpdateEvent) (mongo.WriteModel, error) { - primaryIdentifier := fmt.Sprintf( - "%s-%s-%d-%d", - vehicleUpdateEvent.DataSource.DatasetID, vehicleUpdateEvent.ServiceAlertUpdate.Type, - vehicleUpdateEvent.ServiceAlertUpdate.ValidFrom.UnixMilli(), - vehicleUpdateEvent.ServiceAlertUpdate.ValidUntil.UnixMilli(), - ) - if len(matchedIdentifiers) == 0 { return nil, errors.New("No matching identifiers") } serviceAlert := ctdf.ServiceAlert{ - PrimaryIdentifier: primaryIdentifier, + PrimaryIdentifier: vehicleUpdateEvent.LocalID, OtherIdentifiers: map[string]string{}, CreationDateTime: time.Time{}, ModificationDateTime: vehicleUpdateEvent.RecordedAt, @@ -38,7 +30,7 @@ func (consumer *BatchConsumer) updateServiceAlert(matchedIdentifiers []string, v bsonRep, _ := bson.Marshal(bson.M{"$set": serviceAlert}) updateModel := mongo.NewUpdateOneModel() - updateModel.SetFilter(bson.M{"primaryidentifier": primaryIdentifier}) + updateModel.SetFilter(bson.M{"primaryidentifier": vehicleUpdateEvent.LocalID}) updateModel.SetUpdate(bsonRep) updateModel.SetUpsert(true)