-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathconnector.py
73 lines (60 loc) · 2.16 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import websockets
import json
import pathlib
import os
import functools
class Connection:
"""Connection().result -> '192.168.x.xxx'"""
def __init__(self, router_ip, port, start=2, stop=255, skip=1):
self.loop = asyncio.get_event_loop()
self.result_queue = asyncio.Queue()
self._result = None
self.address = ".".join(router_ip.split(".")[:-1])
self.tasks = list()
self._args = start, stop, skip
self.port = port
async def get_result(self):
await self._get_result()
return self._result
@property
def result(self):
try:
task = self.loop.create_task(self._get_result())
self.loop.run_until_complete(task)
return self._result
except:
return False
async def _get_result(self):
for i in range(*self._args):
address = self.address + f".{i}"
task = self.loop.create_task(self._worker(address))
self.tasks.append(task)
self._result = await self.result_queue.get()
self.result_queue.task_done()
return True
def _done_cb(self, task):
print("done")
for task in self.tasks:
task.stop()
async def _worker(self, address):
ip = f"ws://{address}:{self.port}"
while True:
try:
async with websockets.connect(ip, ping_timeout=0.5) as ws:
await ws.send(json.dumps(
{"client_id":"Extern", "request":"ping", "data":None}))
result = json.loads(await ws.recv())["result"]
assert result
await self.result_queue.put(address)
return address, True
except:
await asyncio.sleep(1/30)
if __name__ == "__main__":
confpath = os.path.join(str(pathlib.Path.home()),".hwk/config.json")
with open(confpath, "r") as fp:
config = json.load(fp)
router_ip, port = config["router"], config["port"]
config["ip"] = Connection(router_ip, port).result
with open(confpath, "w") as fp:
json.dump(config, fp, indent=4)