PegelOnline is a tool designed to interact with the German WSV PegelOnline API to fetch water level data for rivers in Germany. The tool can retrieve water level data from individual stations, list available stations, or continuously poll the API to send water level updates to a Kafka topic.
- Water Level Fetching: Retrieve current water level data for specific stations from the PegelOnline API.
- Station Listing: List all available monitoring stations.
- Kafka Integration: Send water level updates as CloudEvents to a Kafka topic, supporting Microsoft Event Hubs and Microsoft Fabric Event Streams.
The tool is written in Python and requires Python 3.10 or later. You can download Python from here or get it from the Microsoft Store if you are on Windows.
Once Python is installed, you can install the tool from the command line as follows:
pip install git+https://github.com/clemensv/real-time-sources#subdirectory=pegelonline
If you clone the repository, you can install the tool as follows:
git clone https://github.com/clemensv/real-time-sources.git
cd real-time-sources/pegelonline
pip install .
For a packaged install, consider using the CONTAINER.md instructions.
After installation, the tool can be run using the pegelonline
command. It supports multiple subcommands:
- List Stations (
list
): Fetch and display all available monitoring stations. - Get Water Level (
level
): Retrieve the current water level for a specific station. - Feed Stations (
feed
): Continuously poll PegelOnline API for water levels and send updates to a Kafka topic.
Fetches and displays all available monitoring stations from the PegelOnline API.
pegelonline list
Retrieves the current water level for the specified station.
shortname
: The short name of the station to query.
pegelonline level <station_shortname>
Polls the PegelOnline API for water level measurements and sends them as CloudEvents to a Kafka topic. The events are formatted using CloudEvents structured JSON format and described in EVENTS.md.
--kafka-bootstrap-servers
: Comma-separated list of Kafka bootstrap servers.--kafka-topic
: Kafka topic to send messages to.--sasl-username
: Username for SASL PLAIN authentication.--sasl-password
: Password for SASL PLAIN authentication.--connection-string
: Microsoft Event Hubs or Microsoft Fabric Event Stream connection string (overrides other Kafka parameters).--polling-interval
: Interval in seconds between API polling requests (default is 60 seconds; the data is for most stations is only updated once every 6 minutes, but different for each station).
pegelonline feed --kafka-bootstrap-servers "<bootstrap_servers>" --kafka-topic "<topic_name>" --sasl-username "<username>" --sasl-password "<password>" --polling-interval 60
Alternatively, using a connection string for Microsoft Event Hubs or Microsoft Fabric Event Streams:
pegelonline feed --connection-string "<your_connection_string>" --polling-interval 60
The connection string format is as follows:
Endpoint=sb://<your-event-hubs-namespace>.servicebus.windows.net/;SharedAccessKeyName=<policy-name>;SharedAccessKey=<access-key>;EntityPath=<event-hub-name>
When provided, the connection string is parsed to extract the Kafka configuration parameters:
- Bootstrap Servers: Derived from the
Endpoint
value. - Kafka Topic: Derived from the
EntityPath
value. - SASL Username and Password: The username is set to
'$ConnectionString'
, and the password is the entire connection string.
The tool supports the following environment variables to avoid passing them via the command line:
KAFKA_BOOTSTRAP_SERVERS
: Kafka bootstrap servers (comma-separated list).KAFKA_TOPIC
: Kafka topic for publishing.SASL_USERNAME
: SASL username for Kafka authentication.SASL_PASSWORD
: SASL password for Kafka authentication.CONNECTION_STRING
: Microsoft Event Hubs or Microsoft Fabric Event Stream connection string.POLLING_INTERVAL
: Polling interval in seconds.
The tool handles state internally for efficient API polling and sending updates.