-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnetwolf_agent.py
executable file
·133 lines (98 loc) · 3.75 KB
/
netwolf_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python3
import asyncio
import netdev
import re
import time
import multiprocessing
from socket import gaierror
from asyncssh.misc import PermissionDenied, ConnectionLost
from netdev.exceptions import DisconnectError
from asyncio.exceptions import TimeoutError
from os import system
import multiprocessing
import random
import netjson
def find_regex_ml(text, regex, hint=None, optional=True):
""" Find single or multiple values per each of the lines of text. Uses regex grouping mechanism to mark interesting values. """
if hint:
if optional and hint not in text:
return []
if not (text_lines := [_ for _ in text.split("\n") if hint in _]):
return []
else:
text_lines = text.split("\n")
cregex = re.compile(regex)
return [_.groups() if len(_.groups()) > 1 else _.group(1) for __ in text_lines if (_ := cregex.search(__.rstrip("\r")))]
workers = {}
results = {}
async def worker(job):
""" Worker coroutine """
try:
async with netdev.create(**job["login"]) as cli:
while workers[job["login"]["host"]] > time.time():
results[job["login"]["host"]] = {__["id"]: _ for __ in job["tasks"] if (_ := find_regex_ml(await cli.send_command(__["command"]), __["regex"])[0])}
await asyncio.sleep(1)
except (
ConnectionLost,
gaierror,
ValueError,
PermissionDenied,
DisconnectError,
TimeoutError,
ConnectionRefusedError,
netdev.exceptions.TimeoutError,
IndexError,
OSError,
):
pass
workers.pop(job["login"]["host"])
results.pop(job["login"]["host"], None)
async def start_workers(manager_address, manager_port):
""" Process function """
# asyncio.create_task(print_results())
while True:
print(f"Attempting connction to {manager_address}, port {manager_port}")
try:
nj = netjson.NetJson(*await asyncio.open_connection(manager_address, manager_port))
except ConnectionRefusedError:
await asyncio.sleep(1)
jobs = []
while not nj.is_socket_closed():
if _ := await nj.read(blocking=False):
jobs = _
# print(f"Received {len(_)} jobs from manager")
jobs_hosts = set(_["login"]["host"] for _ in jobs)
active_hosts = set(workers)
batch_hosts = set()
if jobs_hosts:
for _ in range(10):
if candidate_hosts := list(jobs_hosts - active_hosts - batch_hosts):
batch_hosts.add(random.choice(candidate_hosts))
print("Jobs received:", len(jobs_hosts))
print("Jobs running:", len(active_hosts))
print()
for job in jobs:
if job["login"]["host"] in batch_hosts:
asyncio.create_task(worker(job))
workers[job["login"]["host"]] = 30
elif workers.get(job["login"]["host"], None):
workers[job["login"]["host"]] = time.time() + 15
await asyncio.sleep(1)
def start_asyncio(manager_address, manager_port):
""" Start Asyncio in separate process """
asyncio.run(start_workers(manager_address, manager_port))
async def print_results():
""" """
while True:
system("clear")
n = 0
for host in results:
print(f"{int(results[host]['cpu']):02}", "", end="" if n % 10 else "\n")
n += 1
await asyncio.sleep(1)
def main():
""" Main program function """
for _ in range(multiprocessing.cpu_count()):
multiprocessing.Process(target=start_asyncio, args=("127.0.0.1", 5555)).start()
if __name__ == "__main__":
main()