-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsvarog_ctl.py
268 lines (208 loc) · 10.7 KB
/
svarog_ctl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""
This is the main runner script for svarog_ctl.
"""
import time
import argparse
import sys
import logging
from datetime import datetime, timedelta, timezone
from dateutil import parser as dateparser
from dateutil import tz
from orbit_predictor.predictors.base import CartesianPredictor
from orbit_predictor.locations import Location
from orbit_predictor.sources import get_predictor_from_tle_lines
from svarog_ctl import orbitdb, utils, passes, rotctld
from svarog_ctl.globalvars import APP_NAME, VERSION
def get_pass(pred: CartesianPredictor, loc: Location, aos: datetime, los: datetime):
"""Returns position list for specified satellite (identified by predictor) for
specified location, between AOS (start time) and LOS (end time).
For the time being we're using time ticks algorithm with 30 seconds interval
and no smoothing."""
return passes.get_pass(pred, loc, aos, los, passes.PassAlgo.TIME_TICKS, 5)
def get_fake_pass(steps: int, start_az: int, end_az: int):
"""Returns fake positions list. Useful for testing. The timing is always now..now+2 minutes.
Parameters
==========
steps - how many steps
start_az - starting azimuth
end_az - ending azimuth"""
pos = []
delta = (end_az - start_az) / (steps-1)
for x in range(steps):
pos.append([datetime.now() + timedelta(seconds=x), start_az + delta*x, 5.0])
pos.append([datetime.now() + timedelta(seconds=120), 0, 0])
return pos
def get_timestamp_str(timestamp: datetime, tz_info: tz.tz) -> str:
"""Returns a string representation of a timestamp in the specified timezone."""
return f"{timestamp.astimezone(tz_info)} {timestamp.astimezone(tz_info).tzname()}"
def log_details(loc: Location, args: argparse.Namespace, when: datetime, pass_,
zone: tz.tz):
"""Print the details of parameters used. Mostly for developer's convenience."""
logging.info("Observer loc.: %s", utils.coords(loc.latitude_deg, loc.longitude_deg))
logging.info("After time : %s", get_timestamp_str(when, zone))
logging.info("Next AOS : %s", get_timestamp_str(pass_.aos, zone))
logging.info("Next LOS : %s", get_timestamp_str(pass_.los, zone))
logging.info("Max elevation: %.1f deg at %s", pass_.max_elevation_deg,
str(pass_.max_elevation_date.astimezone(zone)))
logging.info("Duration : %s", timedelta(seconds=pass_.duration_s))
logging.debug(args)
def print_pos(positions: list):
"""Prints the positions list. Useful for debugging."""
print(f"---positions has {len(positions)} entries")
for x in positions:
print(f"utc={x[0]}, local={x[0].astimezone()}: az={x[1]:3.1f}, el={x[2]:03.1f}")
def rewind_positions(positions: list) -> list:
"""This function rewinds (shifts positions) in time in a way that the pass will start now.
This is useful for testing, because usually the sat pass is in the future and we want to run
the experiments or tests now. Looking at it from a different perspective, this is a good
way to test future pass in advance (or replay old pass).
Parameters
==========
positions - list of positions (tuples of 3 elements: timestamp, azimuth, elevation)
returns - modified list of positions (tuples of 3 elements: timestamp, azimuth, elevation)
"""
delta = positions[0][0] - datetime.now(timezone.utc)
return list(map(lambda x: [x[0]-delta,x[1],x[2]], positions))
def track_positions(positions: list, rotator: rotctld.Rotctld, delta: int):
"""This function sends commands to the rotator and tracks its position.
Parameters
==========
positions - list of positions (tuples of 3 elements: timestamp, azimuth, elevation)
rotator - an instance of open connection to the rotator
delta - step time in seconds (the loop will get the rotator position every delta seconds)
returns a list of actual rotator positions over time (list of 3 elements tuples:
timestamp, azimuth, elevation)
"""
actual = [] # actual rotator positions
timeout = positions[-1][0] # The last entry specifies the last position and also
# when to stop movement
# get the first command
index = 0
pos = positions[index]
while datetime.now(timezone.utc) < timeout:
actual_az, actual_el = rotator.get_pos()
actual.append([datetime.now(timezone.utc), actual_az, actual_el])
logging.debug("%s: az=%s, el=%s, the next command @ %s (in %s)",
datetime.now(), actual_az, actual_el, pos[0], pos[0] - datetime.now())
if pos[0] <= datetime.now(timezone.utc):
# normalize azimuth to -180;180, as this is what most rotctl rotators require.
if pos[1]>180.0:
pos[1] = pos[1] - 360.0
# Ok, it's time to execute the next command
logging.info("%s: sending command to move to az=%.1f, el=%.1f",
datetime.now(), pos[1], pos[2])
status, resp = rotator.set_pos(pos[1], pos[2])
if not status:
logging.warning("set_pos command failed. response=%s", resp)
index = index + 1
# If we gotten to the end of the list of commands, we're done here.
if index>len(positions):
return actual
pos = positions[index]
time.sleep(delta)
return actual
def plot_charts(_intended: list, _actual: list):
"""To be implemented: generate charts based on two series of data:
1. the intended antenna position over time (commands we're sending),
2. the actual antenna position (as checked using get_pos command).
"""
# Charting not implemented yet.
def get_norad(tle: list) -> int:
"""Gets norad id from the TLE data."""
_, line2 = tle
return int(line2.split(" ")[1])
def main():
"""Parses command-line options and executes the satellite tracking routine."""
# pylint: disable=too-many-locals,too-many-statements
parser = argparse.ArgumentParser(
description="svarog-ctl: tracks satellite pass with rotator"
)
parser.add_argument('--tle1', type=str, help="First line of the orbital data in TLE format")
parser.add_argument('--tle2', type=str, help="Second line of the orbital data in TLE format")
parser.add_argument('--sat', type=str,
help="Name of the satellite (if local catalog is available)")
parser.add_argument('--satid', type=int,
help="Norad ID of the satellite (if local catalog is available)")
parser.add_argument("--lat", type=float, required=True,
help="Specify the latitude of the observer in degrees, positive is northern hemisphere"
" (e.g. 53.3 represents 53.3N for Gdansk)")
parser.add_argument("--lon", type=float, required=True,
help="Specify the longitude of the observer in degrees, positive is easter hemisphere "
"(e.g. 18.5 represents 18.5E for Gdansk)")
parser.add_argument("--alt", default=0, type=int, required=False,
help="Observer's altitude, in meters above sea level")
parser.add_argument("--time", default=str(datetime.now(timezone.utc)), type=str,
help="Specify the timestamp before the pass in UTC.")
parser.add_argument("--host", default="127.0.0.1", type=str,
help="Specify how to connect (which hostname to use) to a running rotctld.")
parser.add_argument("--port", default=4533, type=int,
help="Specify which port to connect to")
parser.add_argument("--now", dest='now', action='store_const', const=True, default=False,
help="Don't wait for the actual pass, start now (useful for testing only)")
parser.add_argument("--local", dest='local_tz', action='store_const', const=True, default=False,
help="Use the local time zone, instead of the default UTC")
parser.add_argument("--version", action="version", version=f"{APP_NAME} {VERSION}")
args = parser.parse_args()
# Sanity checks
if (args.tle1 and not args.tle2) or (not args.tle1 and args.tle2):
print("ERROR: You must either specify both TLE lines or none.")
sys.exit(1)
if (not args.tle1 and not args.sat and not args.satid):
print("ERROR: You need to identify the satellite somehow. 3 options are supported:")
print("ERROR: - provide TLE parameters on your own (use --tle1 and --tle2 options)")
print("ERROR: - specify the satellite name, e.g. --sat 'NOAA 18'")
print("ERROR: - specify the NORAD ID of the satellite, e.g. --satid 28654")
sys.exit(1)
# First step is to get the orbit predictor. There are two options here.
name = None
if args.tle1:
# If TLE is specified explicitly, we don't need to load any databases, just use
# the TLE as is.
name = "CUSTOM"
tle_lines = (args.tle1, args.tle2)
pred = get_predictor_from_tle_lines(tle_lines)
else:
# If sat was referenced by name or Norad ID, we need to load the database and
# see if we can find the sat.
db = orbitdb.OrbitDatabase()
db.refresh_urls()
if args.satid is not None:
tle = db.get_norad(args.satid)
name = tle.get_name()
elif args.sat is not None:
name = args.sat
logging.debug("Looking for satellite %s", name)
pred = db.get_predictor(name)
# Need to extract norad id
satid = get_norad(pred.tle.lines)
# Get the timezone
target_tz = timezone.utc if not args.local_tz else tz.tzlocal()
when = dateparser.parse(args.time)
logging.info("Calculating pass after time: utc=%s localtz=%s",
when.astimezone(timezone.utc), when.astimezone(tz.tzlocal()))
logging.info("Tracking sat %s, norad id %d, all timestamps in %s", name, satid,
when.astimezone(target_tz).tzname())
loc = Location('Observer', args.lat, args.lon, args.alt)
pass_ = pred.get_next_pass(loc, when_utc=when)
log_details(loc, args, when, pass_, target_tz)
positions = get_pass(pred, loc, pass_.aos, pass_.los)
# Uncomment this for azimuth debugging
# positions = get_fake_pass(10, 30, -30) # generate from 3 to 200 degrees, in 10 steps
# print_pos(positions)
# If specified, rewind in time so the positions start immediately.
if args.now:
positions = rewind_positions(positions)
print_pos(positions)
logging.info("Connecting to %s, port %d", args.host, args.port)
ctl = rotctld.Rotctld(args.host, args.port, 1)
try:
ctl.connect()
except ConnectionRefusedError as e:
logging.critical("Failed to connect to rotctld: %s", str(e))
sys.exit(-1)
antenna_pos = track_positions(positions, ctl, 3)
plot_charts(positions, antenna_pos)
ctl.close()
logging.debug("Exiting main after completing tracking")
if __name__ == "__main__":
main()