-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfirehose.go
411 lines (381 loc) · 15.2 KB
/
firehose.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
package firehose
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"strings"
)
// DefaultAddress is the default server address to use for Firehose connections.
const DefaultAddress = "firehose.flightaware.com:1501"
// Event represents an event type which can be provided through a Firehose stream.
type Event string
const (
// PositionEvent indicates a position report from the airborne feed.
PositionEvent Event = "position"
)
// A Rectangle indicates a lat/lon bounding box.
type Rectangle struct {
// LowLat is the minimum latitude included in the bounding box.
LowLat float64
// LowLon is the minimum longitude included in the bounding box.
LowLon float64
// HiLat is the maximum latitude included in the bounding box.
HiLat float64
// HiLon is the maximum longitude included in the bounding box.
HiLon float64
}
// InitCommand helps build and serialize an initiation command string which can be provided as the argument to
// Stream.Init.
//
// Note: not all init command options supported by Firehose are available for use through this builder. See
// https://www.flightaware.com/commercial/firehose/documentation/commands for full details.
type InitCommand struct {
// Live requests data from the present time forward.
Live bool
// PITR requests data starting from the specified time, in POSIX epoch format, in the past until the current time,
// and continue with live behavior.
PITR string
// Range requests data between the two specified times, in POSIX epoch format. FlightAware will disconnect the
// connection when the last message has been sent.
Range *PITRRange
// Password supplies the credentials for authentication. In most cases, this should actually be the Firehose API Key
// and not the password of the account.
Password string
// Username supplies the credentials for authentication. This should be the username of the FlightAware account that
// has been granted access.
Username string
// AirportFilter requests information only for flights originating from or destined for airports matching the space
// separated list of glob patterns provided.
//
// For example: "CYUL" or "K??? P* TJSJ"
AirportFilter []string
// Events specifies a list of downlink messages which should be sent.
//
// If not specified default behavior is to deliver all Airborne Feed messages enabled in the Firehose Subscription.
// Which event codes are available will depend on which Subscription Layers are enabled.
Events []Event
// LatLong specifies that only positions within the specified rectangle should be sent and any others will be
// ignored, unless the flight has already been matched by other criteria. Once a flight has been matched by a
// latlong rectangle, it becomes remembered and all subsequent messages until landing for that flight ID will
// continue to be sent even if the flight no longer matches a specified rectangle.
LatLong []Rectangle
}
// String converts the InitCommand to a string suitable for passing to Stream.Init.
func (i *InitCommand) String() string {
var parts []string
if i.Live {
parts = append(parts, "live")
}
if i.PITR != "" {
parts = append(parts, "pitr", i.PITR)
}
if i.Range != nil {
parts = append(parts, "range", i.Range.Start, i.Range.End)
}
parts = append(parts, "username", i.Username)
parts = append(parts, "password", i.Password)
if len(i.AirportFilter) > 0 {
filter := fmt.Sprintf("\"%s\"", strings.Join(i.AirportFilter, " "))
parts = append(parts, "airport_filter", filter)
}
if len(i.Events) > 0 {
var events []string
for _, e := range i.Events {
events = append(events, string(e))
}
filter := fmt.Sprintf("\"%s\"", strings.Join(events, " "))
parts = append(parts, "events", filter)
}
for _, rect := range i.LatLong {
filter := fmt.Sprintf("\"%f %f %f %f\"", rect.LowLat, rect.LowLon, rect.HiLat, rect.HiLon)
parts = append(parts, "latlong", filter)
}
return strings.Join(parts, " ")
}
// A PITRRange denotes a specific time range to fetch.
type PITRRange struct {
// Start is the starting PITR.
Start string
// End is the ending PITR.
//
// After this PITR is reached, the Firehose connection will be closed by the server.
End string
}
// Connect is a simple way to open a Firehose stream using the default configuration.
//
// To customize your connection, use NewStream instead.
func Connect() (*Stream, error) {
conn, err := tls.Dial("tcp", DefaultAddress, nil)
if err != nil {
return nil, err
}
return NewStream(conn), nil
}
// NewStream creates a new Firehose Stream over the provided network connection.
//
// This allows for customization of the connection, for example connecting to a different Firehose server or overriding
// the default TLS configuration.
//
// If you don't want to do any customization, you can use Connect instead to easily open a Stream with the default
// configuration options.
func NewStream(conn net.Conn) *Stream {
return &Stream{
conn: conn,
decoder: json.NewDecoder(conn),
}
}
// A Stream implements the Firehose protocol over a net.Conn.
type Stream struct {
conn net.Conn
decoder *json.Decoder
}
// Init sends the provided init command.
//
// Init must be called after the stream is initially created. You can use the InitCommand struct to help create a
// command string, or you can provide your own.
//
// For details about the init command, see https://www.flightaware.com/commercial/firehose/documentation/commands.
func (c *Stream) Init(command string) error {
_, err := fmt.Fprintln(c.conn, command)
return err
}
// Message encapsulates a message received from the Firehose Stream.
type Message struct {
// Type indicates the message type.
//
// It should always be one of the event types that was requested with the `events` init command option.
Type string
// Payload holds the message body represented as one of the message type structs.
//
// Generally, you will want to use a type switch to handle messages of various types. See the README for an example.
Payload any
}
// UnmarshalJSON implements json.Unmarshaler for Message.
func (m *Message) UnmarshalJSON(data []byte) error {
var stub struct {
Type string `json:"type"`
}
if err := json.Unmarshal(data, &stub); err != nil {
return fmt.Errorf("could not determine message type: %w", err)
}
m.Type = stub.Type
switch m.Type {
case "error":
var payload ErrorMessage
err := json.Unmarshal(data, &payload)
m.Payload = payload
return err
case "position":
var payload PositionMessage
err := json.Unmarshal(data, &payload)
m.Payload = payload
return err
default:
return fmt.Errorf("unrecognized message type: %s", m.Type)
}
}
// ErrorMessage indicates an error condition.
type ErrorMessage struct {
// Type is always "error".
Type string `json:"type"`
// ErrorMessage contains details of the error encountered.
ErrorMessage string `json:"error_msg"`
}
// Waypoint contains position data
type Waypoint struct {
// Latitude in decimal degrees.
Lat float64 `json:"lat"`
// Longitude in decimal degrees.
Lon float64 `json:"lon"`
// Clock is the time in POSIX epoch format.
Clock string `json:"clock"`
// Name is the airport, navaid, waypoint, intersection, or other identifier.
Name string `json:"name"`
// Alt is the altitude in feet (MSL).
Alt string `json:"alt"`
// GS is the ground speed in knots.
GS string `json:"gs"`
}
// PositionMessage includes a position report.
type PositionMessage struct {
// Type is always "position".
Type string `json:"type"`
// Ident is the callsign identifying the flight. Typically, ICAO airline code plus IATA/ticketing flight number, or the aircraft registration.
Ident string `json:"ident"`
// Latitude in decimal degrees, rounded to 5 decimal points.
Lat string `json:"lat"`
// Longitude in decimal degrees, rounded to 5 decimal points.
Lon string `json:"lon"`
// Clock is the report time in POSIX epoch format. Time should be the time generated by flight hardware if possible.
Clock string `json:"clock"`
// ID is the FlightAware Flight ID, a unique identifier associated with each flight.
ID string `json:"id"`
// UpdateType specifies the source of the message.
//
// - A for ADS-B
// - Z for radar
// - O for transoceanic
// - P for estimated
// - D for datalink
// - M for multilateration (MLAT)
// - X for ASDE-X
// - S for space-based ADS-B
UpdateType string `json:"updateType"`
// AirGround indicates whether the aircraft is on the ground.
//
// - A for Air
// - G for Ground
// - WOW for Weight-on-Wheels
AirGround string `json:"air_ground"`
// FacilityHash is a consistent and unique obfuscated identifier string for each source reporting positions to
// FlightAware.
FacilityHash string `json:"facility_hash"`
// FacilityName is a description of the reporting facility intended for end-user consumption. May be a blank string
// if undefined.
FacilityName string `json:"facility_name"`
// PITR is the point-in-time-recovery timestamp value that should be supplied to the "pitr" connection initiation
// command when reconnecting and you wish to resume firehose playback at that approximate position.
PITR string `json:"pitr"`
// Alt is altitude in feet (MSL).
Alt string `json:"alt"`
// AltChange describes change in altitude.
//
// - C for climbing
// - D for descending
// - " " when undetermined
AltChange string `json:"alt_change"`
// GS is ground speed in knots.
GS string `json:"gs"`
// Heading indicates the course in degrees.
Heading string `json:"heading"`
// Squawk is the transponder squawk code, a 4 digit octal transponder beacon code assigned by ATC.
Squawk string `json:"squawk"`
// Hexid is the transponder Mode S code, a 24-bit transponder code assigned by aircraft registrar. Formatted in
// upper case hexadecimal.
Hexid string `json:"hexid"`
// ATCIdent is an identifier used for ATC, if that differs from the flight identifier.
ATCIdent string `json:"atcident"`
// AircraftType is the ICAO aircraft type code.
AircraftType string `json:"aircrafttype"`
// Orig is the origin ICAO airport code, waypoint, or latitude/longitude pair.
Orig string `json:"orig"`
// Dest is the destination ICAO airport code, waypoint, or latitude/longitude pair. May be missing if not known.
Dest string `json:"dest"`
// Reg is the tail number or registration of the aircraft, if known and it differs from the ident.
Reg string `json:"reg"`
// ETA is the estimated time of arrival in POSIX epoch format.
ETA string `json:"eta"`
// EDT is the revised timestamp of when the flight is expected to depart in POSIX epoch format.
EDT string `json:"edt"`
// ETE is the en route time in seconds. May be missing if not known.
ETE string `json:"ete"`
// Speed is the filed cruising speed in knots.
Speed string `json:"speed"`
// Waypoints is an array of 2D, 3D, or 4D objects of locations, times, and altitudes.
Waypoints []Waypoint `json:"waypoints"`
// Route is a textual route string.
Route string `json:"route"`
// ADSBVersion is the ADS-B version used by the transmitter responsible for position, when known/applicable.
ADSBVersion string `json:"adsb_version"`
// NACp is the ADS-B Navigational Accuracy Category for Position.
NACp int `json:"nac_p"`
// NACv is the ADS-B Navigational Accuracy Category for Velocity.
NACv int `json:"nac_v"`
// NIC is the ADS-B Navigational Integrity Category.
NIC int `json:"nic"`
// NICBaro is the ADS-B Navigational Integrity Category for Barometer.
NICBaro int `json:"nic_baro"`
// SIL is the ADS-B Source Integrity Level
SIL int `json:"sil"`
// SILType is the ADS-B Source Integrity Level type (applies per-hour or per-sample).
//
// Possible values are "perhour", "persample", and "unknown".
SILType string `json:"sil_type"`
// PosRC is the ADS-B Radius of Containment, in meters.
PosRC float64 `json:"pos_rc"`
// HeadingMagnetic is the aircraft's heading, in degrees, relative to magnetic North.
HeadingMagnetic string `json:"heading_magnetic"`
// HeadingTrue is the aircraft's heading, in degrees, relative to true North.
HeadingTrue string `json:"heading_true"`
// Mach is the mach number of the aircraft.
Mach string `json:"mach"`
// SpeedTAS is the true airspeed of the aircraft in knots.
SpeedTAS string `json:"speed_tas"`
// SpeedIAS is the indicated airspeed of the aircraft in knots.
SpeedIAS string `json:"speed_ias"`
// Pressure is the computed static air pressure in hPa.
Pressure string `json:"pressure"`
// WindQuality is set to 1 if the aircraft is stable (not maneuvering) and 0 if the aircraft is maneuvering.
//
// Derived wind data is less reliable if the aircraft is maneuvering.
WindQuality string `json:"wind_quality"`
// WindDir is the computed wind direction, in degrees, relative to true North.
//
// The value uses the normal convention where the direction is opposite the wind vector (i.e. wind_dir = 0 means
// wind from the North).
WindDir string `json:"wind_dir"`
// WindSpeed is the computed wind speed in knots.
WindSpeed string `json:"wind_speed"`
// TemperatureQuality is set to 0 if the derived temperature is likely to be inaccurate due to quantization errors,
// 0 otherwise.
TemperatureQuality string `json:"temperature_quality"`
// Temperature is the computed outside air temperature in degrees Celsius.
Temperature string `json:"temperature"`
// NavHeading is the heading in degrees from the navigation equipment.
NavHeading string `json:"nav_heading"`
// NavAltitude is the altitude setting in feet from tne navigation equipment.
NavAltitude string `json:"nav_altitude"`
// NavQNH is the altimeter setting in hPa that has been set.
NavQNH string `json:"nav_qnh"`
// NavModes is the list of active modes from the navigation equipment.
//
// Possible values are autopilot, vnav, althold, approach, lnav, tcas.
NavModes string `json:"nav_modes"`
// AltGNSS is the reported GNSS altitude (feet above WGS84 ellipsoid).
AltGNSS string `json:"alt_gnss"`
// VertRate is the aircraft's vertical rate of climb/descent derived from pressure altitude, reported in feet per
// minute.
VertRate string `json:"vertRate"`
// VertRateGeom is the aircraft's vertical rate of climb/descent derived from GNSS altitude, reported in feet per
// minute.
VertRateGeom string `json:"vertRate_geom"`
// FuelOnBoard is the amount of fuel in the tank.
//
// The units are reported in the FuelOnBoardUnit field. This data is available for specifically authorized customers
// only.
FuelOnBoard string `json:"fuel_on_board"`
// FuelOnBoardUnit is the unit for FuelOnBoard.
//
// Possible values are LITERS, GALLONS, POUNDS, KILOGRAMS, or UNKNOWN. This data is available for specifically
// authorized customers only.
FuelOnBoardUnit string `json:"fuel_on_board_unit"`
}
// NextMessage reads a Message from the Stream.
//
// If a message cannot be read, an error is returned.
func (c *Stream) NextMessage(ctx context.Context) (*Message, error) {
// If our context has a deadline, set the read deadline on our underlying connection accordingly.
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
if err := c.conn.SetReadDeadline(deadline); err != nil {
return nil, fmt.Errorf("could not set read deadline: %w", err)
}
}
var msg Message
errc := make(chan error)
go func() {
errc <- c.decoder.Decode(&msg)
}()
select {
case <-ctx.Done():
c.Close()
return nil, ctx.Err()
case err := <-errc:
return &msg, err
}
}
// Close closes the Firehose Stream and the underlying net.Conn.
func (c *Stream) Close() error {
return c.conn.Close()
}