forked from kellerza/pysma
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
107 lines (81 loc) · 2.86 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
"""Basic usage example and testing of pysma."""
import argparse
import asyncio
import logging
import signal
import sys
import aiohttp
import pysma
from configparser import ConfigParser
import myMQTT as mq
# This module will work with Python 3.5+
# Python 3.4+ "@asyncio.coroutine" decorator
# Python 3.5+ uses "async def f()" syntax
_LOGGER = logging.getLogger(__name__)
VAR = {}
def print_table(sensors):
for sen in sensors:
if sen.value is None:
print("{:>25}".format(sen.name))
else:
print("{:>25}{:>15} {}".format(sen.name, str(sen.value), sen.unit))
def send_table(sensors):
global client, config
for sen in sensors:
if sen.value is None:
client.publish('%s/%s' % (config.get('main', 'prefix'), sen.name),
'None')
else:
client.publish(
'%s/%s_%s' %
(config.get('main', 'prefix'), sen.name, sen.unit),
str(sen.value))
async def main_loop(loop, password, user, ip): # pylint: disable=invalid-name
"""Main loop."""
async with aiohttp.ClientSession(
loop=loop, connector=aiohttp.TCPConnector(ssl=False)) as session:
VAR["sma"] = pysma.SMA(session, ip, password=password, group=user)
await VAR["sma"].new_session()
if VAR["sma"].sma_sid is None:
_LOGGER.info("No session ID")
return
_LOGGER.info("NEW SID: %s", VAR["sma"].sma_sid)
VAR["running"] = True
cnt = 5
sensors = pysma.Sensors()
while VAR.get("running"):
await VAR["sma"].read(sensors)
print_table(sensors)
send_table(sensors)
cnt -= 1
if cnt == 0:
break
await asyncio.sleep(2)
await VAR["sma"].close_session()
def main():
"""Main example."""
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
parser = argparse.ArgumentParser(
description="Test the SMA webconnect library.")
parser.add_argument("ip",
type=str,
help="IP address of the Webconnect module")
parser.add_argument("user", help="installer/user")
parser.add_argument("password", help="Installer password")
args = parser.parse_args()
loop = asyncio.get_event_loop()
def _shutdown(*_):
VAR["running"] = False
# asyncio.ensure_future(sma.close_session(), loop=loop)
signal.signal(signal.SIGINT, _shutdown)
# loop.add_signal_handler(signal.SIGINT, shutdown)
# signal.signal(signal.SIGINT, signal.SIG_DFL)
loop.run_until_complete(
main_loop(loop, user=args.user, password=args.password, ip=args.ip))
config = ConfigParser()
config.read('mqtt.ini')
client = mq.myMQTT()
client.run(addr=config.get('main', 'mqtt_ip'))
if __name__ == "__main__":
main()