-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathexample_threads.py
85 lines (71 loc) · 2.6 KB
/
example_threads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from farmbot import Farmbot, FarmbotToken
import threading
# PYTHON MULTITHREAD EXAMPLE.
# ==========================================================
# The main thread has a blocking loop that waits for user
# input. The W/A/S/D keys are used to move FarmBot. Commands
# are entered in a queue that are processed in a background
# thread so as to not be blocked when waiting for keyboard
# input.
# ==========================================================
class MyHandler:
def __init__(self, bot):
# Store "W", "A", "S", "D" in a queue
self.queue = []
# Maintain a flag that lets us know if the bot is
# ready for more commands.
self.busy = True
self.bot = bot
def add_job(self, direction):
d = direction.capitalize()
if d in ["W", "A", "S", "D"]:
self.queue.append(d)
self.bot.read_status()
def try_next_job(self):
if (len(self.queue) > 0) and (not self.busy):
command = self.queue.pop(0)
print("sending " + command)
self.busy = True
if command == "W":
return self.bot.move_relative(10, 0, 0)
if command == "A":
return self.bot.move_relative(0, -10, 0)
if command == "S":
return self.bot.move_relative(-10, 0, 0)
if command == "D":
return self.bot.move_relative(0, 10, 0)
def on_connect(self, bot, mqtt_client):
self.bot.read_status()
pass
def on_change(self, bot, state):
is_busy = state['informational_settings']['busy']
if is_busy != self.busy:
if is_busy:
print("Device is busy")
else:
print("Device is idle")
self.busy = is_busy
self.try_next_job()
def on_log(self, _bot, log):
print("LOG: " + log['message'])
def on_response(self, _bot, _response):
pass
def on_error(self, _bot, response):
print("ERROR: " + response.id)
print("Reason(s) for failure: " + str(response.errors))
if __name__ == '__main__':
raw_token = FarmbotToken.download_token(
"usr@name.com", "pass", "https://my.farm.bot")
fb = Farmbot(raw_token)
handler = MyHandler(fb)
threading.Thread(target=fb.connect, name="foo", args=[handler]).start()
print("ENTER A DIRECTION VIA WASD:")
print(" ^")
print(" W")
print(" < A S >")
print(" D")
print(" v")
while(True):
direction = input("> ")
handler.add_job(direction)
handler.try_next_job()