Skip to content

Commit

Permalink
wip multi queue
Browse files Browse the repository at this point in the history
  • Loading branch information
AaronClaydon committed Dec 22, 2024
1 parent ac9ba8d commit f78b857
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 1 deletion.
3 changes: 3 additions & 0 deletions data/datasources/gb-dft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datasets:
- identifier: bods-sirivm-all
format: eu-siri-vm
source: "https://data.bus-data.dft.gov.uk/avl/download/bulk_archive"
refreshinterval: 30s
sourceauthentication:
query:
api_key: TRAVIGO_BODS_API_KEY
Expand All @@ -62,6 +63,7 @@ datasets:
- identifier: bods-gtfs-realtime
format: gtfs-realtime
source: "https://data.bus-data.dft.gov.uk/avl/download/gtfsrt"
refreshinterval: 10s
sourceauthentication:
query:
api_key: TRAVIGO_BODS_API_KEY
Expand All @@ -73,6 +75,7 @@ datasets:
- identifier: bods-sirisx-all
format: eu-siri-sx
source: "https://data.bus-data.dft.gov.uk/disruptions/download/bulk_archive"
refreshinterval: 5m
sourceauthentication:
query:
api_key: TRAVIGO_BODS_API_KEY
Expand Down
18 changes: 18 additions & 0 deletions data/datasources/se-trafiklab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ datasets:
- identifier: gtfs-realtime-sl-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/sl/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -29,6 +30,7 @@ datasets:
- identifier: gtfs-realtime-ul-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/ul/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -39,6 +41,7 @@ datasets:
- identifier: gtfs-realtime-otraf-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/otraf/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -49,6 +52,7 @@ datasets:
- identifier: gtfs-realtime-klt-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/klt/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -59,6 +63,7 @@ datasets:
- identifier: gtfs-realtime-skane-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/skane/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -69,6 +74,7 @@ datasets:
- identifier: gtfs-realtime-dt-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/dt/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -79,6 +85,7 @@ datasets:
- identifier: gtfs-realtime-varm-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/varm/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -89,6 +96,7 @@ datasets:
- identifier: gtfs-realtime-xt-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/xt/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -99,6 +107,7 @@ datasets:
- identifier: gtfs-realtime-vastmanland-trip
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/vastmanland/TripUpdatesSweden.pb"
refreshinterval: 590s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -109,6 +118,7 @@ datasets:
- identifier: gtfs-realtime-sl-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/sl/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -119,6 +129,7 @@ datasets:
- identifier: gtfs-realtime-ul-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/ul/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -129,6 +140,7 @@ datasets:
- identifier: gtfs-realtime-otraf-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/otraf/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -139,6 +151,7 @@ datasets:
- identifier: gtfs-realtime-klt-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/klt/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -149,6 +162,7 @@ datasets:
- identifier: gtfs-realtime-skane-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/skane/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -159,6 +173,7 @@ datasets:
- identifier: gtfs-realtime-dt-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/dt/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -169,6 +184,7 @@ datasets:
- identifier: gtfs-realtime-varm-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/varm/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -179,6 +195,7 @@ datasets:
- identifier: gtfs-realtime-xt-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/xt/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand All @@ -189,6 +206,7 @@ datasets:
- identifier: gtfs-realtime-vastmanland-alerts
format: gtfs-realtime
source: "https://opendata.samtrafiken.se/gtfs-rt-sweden/vastmanland/ServiceAlertsSweden.pb"
refreshinterval: 1800s
sourceauthentication:
query:
key: TRAVIGO_SE_TRAFIKLAB_REALTIME_API_KEY
Expand Down
72 changes: 72 additions & 0 deletions pkg/dataimporter/cli.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package dataimporter

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/travigo/travigo/pkg/dataimporter/datasets"
"github.com/travigo/travigo/pkg/dataimporter/manager"

"github.com/travigo/travigo/pkg/database"
Expand Down Expand Up @@ -88,6 +92,74 @@ func RegisterCLI() *cli.Command {
}
}

return nil
},
},
{
Name: "multi-realtime",
Usage: "Import mutliple realtime datasets",
Flags: []cli.Flag{},
Action: func(c *cli.Context) error {
if err := database.Connect(); err != nil {
return err
}
if err := redis_client.Connect(); err != nil {
log.Fatal().Err(err).Msg("Failed to connect to Redis")
}

allDatasets := manager.GetRegisteredDataSets()

for _, dataset := range allDatasets {
if dataset.ImportDestination != datasets.ImportDestinationRealtimeQueue {
continue
}

go func(dataset datasets.DataSet) {
var repeatDuration time.Duration

if dataset.RefreshInterval.Seconds() > 0 {
repeatDuration = dataset.RefreshInterval
} else if dataset.SupportedObjects.RealtimeJourneys {
repeatDuration = 2 * time.Minute
} else if dataset.SupportedObjects.ServiceAlerts {
repeatDuration = 10 * time.Minute
}

log.Info().Str("interval", repeatDuration.String()).Str("id", dataset.Identifier).Msg("Loaded realtime dataset")

for {
startTime := time.Now()

err := manager.ImportDataset(&dataset, false)

if err != nil {
// TODO report failure here
log.Error().Err(err).Str("id", dataset.Identifier).Msg("Failed to import dataset")
time.Sleep(1 * time.Minute)
}

executionDuration := time.Since(startTime)
log.Info().Str("id", dataset.Identifier).Msgf("Operation took %s", executionDuration.String())

waitTime := repeatDuration - executionDuration

if waitTime.Seconds() > 0 {
time.Sleep(waitTime)
}
}
}(dataset)
}

signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT)
defer signal.Stop(signals)

<-signals // wait for signal
go func() {
<-signals // hard exit on second signal (in case shutdown gets stuck)
os.Exit(1)
}()

return nil
},
},
Expand Down
4 changes: 3 additions & 1 deletion pkg/dataimporter/datasets/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datasets

import (
"net/http"
"time"

"github.com/adjust/rmq/v5"
)
Expand All @@ -16,7 +17,8 @@ type DataSet struct {
Source string
SourceAuthentication SourceAuthentication `json:"-"`

DatasetSize string
DatasetSize string
RefreshInterval time.Duration

UnpackBundle BundleFormat `json:"-"`
SupportedObjects SupportedObjects
Expand Down

0 comments on commit f78b857

Please sign in to comment.