diff --git a/myDevices/__init__.py b/myDevices/__init__.py index e3844a4..ba61fe3 100644 --- a/myDevices/__init__.py +++ b/myDevices/__init__.py @@ -1,4 +1,4 @@ """ This package contains the Cayenne agent, which is a full featured client for the Cayenne IoT project builder: https://cayenne.mydevices.com. It sends system information as well as sensor and actuator data and responds to actuator messages initiated from the Cayenne dashboard and mobile apps. """ -__version__ = '2.0.1' +__version__ = '2.0.2' diff --git a/myDevices/cloud/cayennemqtt.py b/myDevices/cloud/cayennemqtt.py index 19dcaaa..b6bd5b8 100644 --- a/myDevices/cloud/cayennemqtt.py +++ b/myDevices/cloud/cayennemqtt.py @@ -65,7 +65,28 @@ def add(data_list, prefix, channel=None, suffix=None, value=None, type=None, uni if name is not None: data['name'] = name data_list.append(data) - + + @staticmethod + def add_unique(data_list, prefix, channel=None, suffix=None, value=None, type=None, unit=None, name=None): + """Create data channel dict and append it to a list if the channel doesn't already exist in the list""" + data_channel = prefix + if channel is not None: + data_channel += ':' + str(channel) + if suffix is not None: + data_channel += ';' + str(suffix) + item = next((item for item in data_list if item['channel'] == data_channel), None) + if not item: + data = {} + data['channel'] = data_channel + data['value'] = value + if type is not None: + data['type'] = type + if unit is not None: + data['unit'] = unit + if name is not None: + data['name'] = name + data_list.append(data) + class CayenneMQTTClient: """Cayenne MQTT Client class. diff --git a/myDevices/cloud/client.py b/myDevices/cloud/client.py index 2630c59..4bfd72d 100644 --- a/myDevices/cloud/client.py +++ b/myDevices/cloud/client.py @@ -9,12 +9,11 @@ from time import strftime, localtime, tzset, time, sleep from queue import Queue, Empty from myDevices import __version__ -from myDevices.utils.config import Config +from myDevices.utils.config import Config, APP_SETTINGS, NETWORK_SETTINGS from myDevices.utils.logger import exception, info, warn, error, debug, logJson from myDevices.sensors import sensors from myDevices.system.hardware import Hardware from myDevices.cloud.scheduler import SchedulerEngine -from myDevices.cloud.download_speed import DownloadSpeed from myDevices.cloud.updater import Updater from myDevices.system.systemconfig import SystemConfig from myDevices.utils.daemon import Daemon @@ -25,10 +24,9 @@ from myDevices.cloud.apiclient import CayenneApiClient import myDevices.cloud.cayennemqtt as cayennemqtt -NETWORK_SETTINGS = '/etc/myDevices/Network.ini' -APP_SETTINGS = '/etc/myDevices/AppSettings.ini' GENERAL_SLEEP_THREAD = 0.20 + def GetTime(): """Return string with the current time""" tzset() @@ -203,15 +201,14 @@ def Start(self): self.oSInfo = OSInfo() self.count = 10000 self.buff = bytearray(self.count) - self.downloadSpeed = DownloadSpeed(self.config) - self.downloadSpeed.getDownloadSpeed() self.sensorsClient.SetDataChanged(self.OnDataChanged) self.writerThread = WriterThread('writer', self) self.writerThread.start() self.processorThread = ProcessorThread('processor', self) self.processorThread.start() + self.systemInfo = [] TimerThread(self.SendSystemInfo, 300) - TimerThread(self.SendSystemState, 30, 5) + # TimerThread(self.SendSystemState, 30, 5) self.updater = Updater(self.config) self.updater.start() events = self.schedulerEngine.get_scheduled_events() @@ -244,50 +241,63 @@ def Destroy(self): def OnDataChanged(self, data): """Enqueue a packet containing changed system data to send to the server""" - info('Send changed data: {}'.format([{item['channel']:item['value']} for item in data])) + try: + if len(data) > 15: + items = [{item['channel']:item['value']} for item in data if not item['channel'].startswith(cayennemqtt.SYS_GPIO)] + info('Send changed data: {} + {}'.format(items, cayennemqtt.SYS_GPIO)) + else: + info('Send changed data: {}'.format([{item['channel']:item['value']} for item in data])) + # items = {} + # gpio_items = {} + # for item in data: + # if not item['channel'].startswith(cayennemqtt.SYS_GPIO): + # items[item['channel']] = item['value'] + # else: + # channel = item['channel'].replace(cayennemqtt.SYS_GPIO + ':', '').split(';') + # if not channel[0] in gpio_items: + # gpio_items[channel[0]] = str(item['value']) + # else: + # gpio_items[channel[0]] += ',' + str(item['value']) + # info('Send changed data: {}, {}: {}'.format(items, cayennemqtt.SYS_GPIO, gpio_items)) + except: + info('Send changed data') + pass self.EnqueuePacket(data) def SendSystemInfo(self): """Enqueue a packet containing system info to send to the server""" try: - data = [] - cayennemqtt.DataChannel.add(data, cayennemqtt.SYS_OS_NAME, value=self.oSInfo.ID) - cayennemqtt.DataChannel.add(data, cayennemqtt.SYS_OS_VERSION, value=self.oSInfo.VERSION_ID) - cayennemqtt.DataChannel.add(data, cayennemqtt.AGENT_VERSION, value=self.config.get('Agent', 'Version', __version__)) - cayennemqtt.DataChannel.add(data, cayennemqtt.SYS_POWER_RESET, value=0) - cayennemqtt.DataChannel.add(data, cayennemqtt.SYS_POWER_HALT, value=0) + currentSystemInfo = [] + cayennemqtt.DataChannel.add(currentSystemInfo, cayennemqtt.SYS_OS_NAME, value=self.oSInfo.ID) + cayennemqtt.DataChannel.add(currentSystemInfo, cayennemqtt.SYS_OS_VERSION, value=self.oSInfo.VERSION_ID) + cayennemqtt.DataChannel.add(currentSystemInfo, cayennemqtt.AGENT_VERSION, value=self.config.get('Agent', 'Version', __version__)) + cayennemqtt.DataChannel.add(currentSystemInfo, cayennemqtt.SYS_POWER_RESET, value=0) + cayennemqtt.DataChannel.add(currentSystemInfo, cayennemqtt.SYS_POWER_HALT, value=0) config = SystemConfig.getConfig() if config: channel_map = {'I2C': cayennemqtt.SYS_I2C, 'SPI': cayennemqtt.SYS_SPI, 'Serial': cayennemqtt.SYS_UART, 'OneWire': cayennemqtt.SYS_ONEWIRE, 'DeviceTree': cayennemqtt.SYS_DEVICETREE} for key, channel in channel_map.items(): try: - cayennemqtt.DataChannel.add(data, channel, value=config[key]) + cayennemqtt.DataChannel.add(currentSystemInfo, channel, value=config[key]) except: pass - info('Send system info: {}'.format([{item['channel']:item['value']} for item in data])) - self.EnqueuePacket(data) + if currentSystemInfo != self.systemInfo: + data = currentSystemInfo + if self.systemInfo: + data = [x for x in data if x not in self.systemInfo] + if data: + self.systemInfo = currentSystemInfo + info('Send system info: {}'.format([{item['channel']:item['value']} for item in data])) + self.EnqueuePacket(data) except Exception: exception('SendSystemInfo unexpected error') - def SendSystemState(self): - """Enqueue a packet containing system information to send to the server""" - try: - data = [] - download_speed = self.downloadSpeed.getDownloadSpeed() - if download_speed: - cayennemqtt.DataChannel.add(data, cayennemqtt.SYS_NET, suffix=cayennemqtt.SPEEDTEST, value=download_speed, type='bw', unit='mbps') - data += self.sensorsClient.systemData - info('Send system state: {} items'.format(len(data))) - self.EnqueuePacket(data) - except Exception as e: - exception('ThreadSystemInfo unexpected error: ' + str(e)) - def CheckSubscription(self): """Check that an invite code is valid""" inviteCode = self.config.get('Agent', 'InviteCode', fallback=None) if not inviteCode: - error('No invite code found in {}'.format(APP_SETTINGS)) + error('No invite code found in {}'.format(self.config.path)) print('Please input an invite code. This can be retrieved from the Cayenne dashboard by adding a new Raspberry Pi device.\n' 'The invite code will be part of the script name shown there: rpi_[invitecode].sh.') inviteCode = input('Invite code: ') diff --git a/myDevices/cloud/doupdatecheck.py b/myDevices/cloud/doupdatecheck.py index aaf8197..b3e0591 100644 --- a/myDevices/cloud/doupdatecheck.py +++ b/myDevices/cloud/doupdatecheck.py @@ -1,6 +1,5 @@ from myDevices.cloud.updater import Updater -from myDevices.utils.config import Config -from myDevices.cloud.client import APP_SETTINGS +from myDevices.utils.config import Config, APP_SETTINGS from myDevices.utils.logger import setInfo if __name__ == '__main__': diff --git a/myDevices/devices/digital/gpio.py b/myDevices/devices/digital/gpio.py index 3bd52c6..952ccad 100644 --- a/myDevices/devices/digital/gpio.py +++ b/myDevices/devices/digital/gpio.py @@ -12,8 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import mmap +import select +from threading import Thread from time import sleep from myDevices.utils.types import M_JSON from myDevices.utils.logger import debug, info, error, exception @@ -37,6 +40,8 @@ class NativeGPIO(Singleton, GPIOPort): IN = 0 OUT = 1 + OUT_LOW = 2 + OUT_HIGH = 3 ASUS_GPIO = 44 @@ -63,6 +68,10 @@ def __init__(self): self.pinFunctionSet = set() self.valueFile = {pin:None for pin in self.pins} self.functionFile = {pin:None for pin in self.pins} + self.callbacks = {} + self.edge_poll = select.epoll() + thread = Thread(target=self.pollEdges, daemon=True) + thread.start() for pin in self.pins: # Export the pins here to prevent a delay when accessing the values for the # first time while waiting for the file group to be set @@ -151,6 +160,7 @@ def close(self): self.setFunction(gpio, g["func"]) if g["value"] >= 0 and self.getFunction(gpio) == self.OUT: self.__digitalWrite__(gpio, g["value"]) + self.edge_poll.close() def checkDigitalChannelExported(self, channel): if not channel in self.pins: @@ -170,6 +180,9 @@ def __getFunctionFilePath__(self, channel): def __getValueFilePath__(self, channel): return "/sys/class/gpio/gpio%s/value" % channel + def __getEdgeFilePath__(self, channel): + return "/sys/class/gpio/gpio%s/edge" % channel + def __checkFilesystemExport__(self, channel): #debug("checkExport for channel %d" % channel) if not os.path.isdir("/sys/class/gpio/gpio%s" % channel): @@ -295,11 +308,9 @@ def __setFunction__(self, channel, value): self.checkDigitalChannelExported(channel) self.checkPostingFunctionAllowed() try: - if value == self.IN: - value = 'in' - else: - value = 'out' - try: + value_dict = {self.IN: 'in', self.OUT: 'out', self.OUT_LOW: 'low', self.OUT_HIGH: 'high'} + value = value_dict[value] + try: self.functionFile[channel].write(value) self.functionFile[channel].seek(0) except: @@ -324,6 +335,49 @@ def __portWrite__(self, value): else: raise Exception("Please limit exported GPIO to write integers") + def setCallback(self, channel, callback, data=None): + debug('Set callback for GPIO pin {}'.format(channel)) + self.__checkFilesystemValue__(channel) + with open(self.__getEdgeFilePath__(channel), 'w') as f: + f.write('both') + self.callbacks[channel] = {'function':callback, 'data':data} + try: + self.edge_poll.register(self.valueFile[channel], (select.EPOLLPRI | select.EPOLLET)) + except FileExistsError as e: + # Ignore file exists error since it means we already registered the file. + pass + + def removeCallback(self, channel): + debug('removeCallback: {}'.format(channel)) + self.__checkFilesystemValue__(channel) + with open(self.__getEdgeFilePath__(channel), 'w') as f: + f.write('none') + del self.callbacks[channel] + self.edge_poll.unregister(self.valueFile[channel]) + + def pollEdges(self): + while True: + try: + events = self.edge_poll.poll(1) + except IOError as e: + if e.errno != errno.EINTR: + error(e) + if len(events) > 0: + self.onEdgeEvent(events) + + def onEdgeEvent(self, events): + debug('onEdgeEvent: {}'.format(events)) + for fd, event in events: + if not (event & (select.EPOLLPRI | select.EPOLLET)): + continue + for channel, valueFile in self.valueFile.items(): + if valueFile and valueFile.fileno() == fd: + value = valueFile.read() + valueFile.seek(0) + callback = self.callbacks[channel] + debug('onEdgeEvent: channel {}, value {}'.format(channel, value)) + callback['function'](callback['data'], int(value)) + #@request("GET", "*") @response(contentType=M_JSON) def wildcard(self, compact=False): diff --git a/myDevices/devices/digital/helper.py b/myDevices/devices/digital/helper.py index 4c5460a..0f08af0 100644 --- a/myDevices/devices/digital/helper.py +++ b/myDevices/devices/digital/helper.py @@ -46,6 +46,14 @@ def setGPIOInstance(self): if self.gpio: self.gpio.setFunction(self.channel, GPIO.IN) + def setCallback(self, callback, data=None): + if self.gpioname == "GPIO" and self.__family__() == "DigitalSensor": + self.gpio.setCallback(self.channel, callback, data) + + def removeCallback(self): + if self.gpioname == "GPIO" and self.__family__() == "DigitalSensor": + self.gpio.removeCallback(self.channel) + #@request("GET", "value") @response("%d") def read(self): @@ -63,9 +71,17 @@ def __str__(self): return "MotionSensor" class DigitalActuator(DigitalSensor): - def __init__(self, gpio, channel, invert=False): + def __init__(self, gpio, channel, invert=False, last_state=None): DigitalSensor.__init__(self, gpio, channel, invert) - self.gpio.setFunction(self.channel, GPIO.OUT) + function = GPIO.OUT + if gpio == 'GPIO' and last_state is not None: + if self.invert: + last_state = int(not last_state) + if last_state == 1: + function = GPIO.OUT_HIGH + elif last_state == 0: + function = GPIO.OUT_LOW + self.gpio.setFunction(self.channel, function) def __str__(self): return "DigitalActuator" @@ -82,30 +98,29 @@ def write(self, value): return self.read() class LightSwitch(DigitalActuator): - def __init__(self, gpio, channel, invert=False): - DigitalActuator.__init__(self, gpio, channel, invert) + def __init__(self, gpio, channel, invert=False, last_state=None): + DigitalActuator.__init__(self, gpio, channel, invert, last_state) def __str__(self): return "LightSwitch" class MotorSwitch(DigitalActuator): - def __init__(self, gpio, channel, invert=False): - DigitalActuator.__init__(self, gpio, channel, invert) + def __init__(self, gpio, channel, invert=False, last_state=None): + DigitalActuator.__init__(self, gpio, channel, invert, last_state) def __str__(self): return "MotorSwitch" class RelaySwitch(DigitalActuator): - def __init__(self, gpio, channel, invert=False): - DigitalActuator.__init__(self, gpio, channel, invert) + def __init__(self, gpio, channel, invert=False, last_state=None): + DigitalActuator.__init__(self, gpio, channel, invert, last_state) def __str__(self): return "RelaySwitch" class ValveSwitch(DigitalActuator): - def __init__(self, gpio, channel, invert=False): - DigitalActuator.__init__(self, gpio, channel, invert) + def __init__(self, gpio, channel, invert=False, last_state=None): + DigitalActuator.__init__(self, gpio, channel, invert, last_state) def __str__(self): return "ValveSwitch" - diff --git a/myDevices/devices/manager.py b/myDevices/devices/manager.py index 19155f3..f1942a9 100644 --- a/myDevices/devices/manager.py +++ b/myDevices/devices/manager.py @@ -8,7 +8,7 @@ from myDevices.utils.config import Config from myDevices.devices import serial, digital, analog, sensor, shield from myDevices.devices.instance import DEVICES -from myDevices.devices.onewire import detectOneWireDevices +from myDevices.devices.onewire import detectOneWireDevices, deviceExists, FAMILIES PACKAGES = [serial, digital, analog, sensor, shield] DYNAMIC_DEVICES = {} @@ -16,6 +16,12 @@ mutex = RLock() +def missingOneWireDevice(device): + if device['class'] in FAMILIES.values() and ('slave' not in device['args'] or not deviceExists(device['args']['slave'])): + logger.info('1-wire device does not exist: {}, {}'.format(device['class'], device['args']['slave'])) + return True + return False + def deviceDetector(): logger.debug('deviceDetector') try: @@ -30,6 +36,9 @@ def deviceDetector(): if not found: if addDevice(dev['name'], dev['device'], dev['description'], dev['args'], "auto") > 0: saveDevice(dev['name'], int(time())) + missing = [key for key, value in DEVICES.items() if missingOneWireDevice(value)] + for dev in missing: + removeDevice(dev) except Exception as e: logger.error("Device detector: %s" % e) @@ -49,7 +58,7 @@ def findDeviceClass(name): return getattr(module, name) return None -def saveDevice(name, install_date): +def saveDevice(name, install_date=None): with mutex: logger.debug('saveDevice: ' + str(name)) if name not in DEVICES: @@ -58,7 +67,8 @@ def saveDevice(name, install_date): if DEVICES[name]['origin'] == 'manual': return DYNAMIC_DEVICES[name] = DEVICES[name] - DEVICES[name]['install_date'] = install_date + if install_date: + DEVICES[name]['install_date'] = install_date json_devices = getJSON(DYNAMIC_DEVICES) with open(DEVICES_JSON_FILE, 'w') as outfile: outfile.write(json_devices) @@ -68,7 +78,7 @@ def removeDevice(name): if name in DEVICES: if name in DYNAMIC_DEVICES: if hasattr(DEVICES[name]["device"], 'close'): - DEVICES[name]["device"].close() + DEVICES[name]["device"].close() del DEVICES[name] del DYNAMIC_DEVICES[name] json_devices = getJSON(DYNAMIC_DEVICES) @@ -129,12 +139,27 @@ def updateDevice(name, json): return (c, d, t) +def updateDeviceState(name, value): + with mutex: + try: + if not name in DEVICES: + return + device = DEVICES[name] + if 'last_state' not in device['args'] or device['args']['last_state'] != value: + logger.info('Saving state {} for device {}'.format(value, name)) + device['args'].update({'last_state': value}) + saveDevice(name) + except: + pass + def addDevice(name, device, description, args, origin): with mutex: if name in DEVICES: logger.error("Device <%s> already exists" % name) return -1 logger.debug('addDevice: ' + str(name) + ' ' + str(device)) + if missingOneWireDevice({'class': device, 'args': args}): + return -1 # if '/' in device: # deviceClass = device.split('/')[0] # else: diff --git a/myDevices/devices/onewire.py b/myDevices/devices/onewire.py index 40f1246..8345011 100644 --- a/myDevices/devices/onewire.py +++ b/myDevices/devices/onewire.py @@ -105,3 +105,5 @@ def detectOneWireDevices(): debug('Error detecting 1-wire devices: {}'.format(err)) return devices +def deviceExists(slave): + return os.path.exists("/sys/bus/w1/devices/%s" % slave) diff --git a/myDevices/sensors/sensors.py b/myDevices/sensors/sensors.py index 2dd63d0..deb675c 100644 --- a/myDevices/sensors/sensors.py +++ b/myDevices/sensors/sensors.py @@ -2,26 +2,27 @@ This module provides a class for interfacing with sensors and actuators. It can add, edit and remove sensors and actuators as well as monitor their states and execute commands. """ -from myDevices.utils.logger import exception, info, warn, error, debug, logJson -from time import sleep, time -from json import loads, dumps -from threading import RLock, Event -from myDevices.system import services from datetime import datetime, timedelta -from os import path, getpid -from myDevices.utils.daemon import Daemon +from json import dumps, loads +from os import getpid, path +from threading import Event, RLock + +from myDevices.cloud import cayennemqtt from myDevices.cloud.dbmanager import DbManager -from myDevices.utils.threadpool import ThreadPool -from myDevices.devices.bus import checkAllBus, BUSLIST +from myDevices.cloud.download_speed import DownloadSpeed +from myDevices.devices import instance, manager +from myDevices.devices.bus import BUSLIST, checkAllBus from myDevices.devices.digital.gpio import NativeGPIO as GPIO -from myDevices.devices import manager -from myDevices.devices import instance -from myDevices.utils.types import M_JSON +from myDevices.system import services from myDevices.system.systeminfo import SystemInfo -from myDevices.cloud import cayennemqtt +from myDevices.utils.config import Config, APP_SETTINGS +from myDevices.utils.daemon import Daemon +from myDevices.utils.logger import debug, error, exception, info, logJson, warn +from myDevices.utils.threadpool import ThreadPool +from myDevices.utils.types import M_JSON -REFRESH_FREQUENCY = 5 #seconds -# SENSOR_INFO_SLEEP = 0.05 +REFRESH_FREQUENCY = 15 #seconds +DIGITAL_FREQUENCY = 60/55 #Seconds/messages, this is done to keep messages under the rate limit class SensorsClient(): """Class for interfacing with sensors and actuators""" @@ -29,32 +30,69 @@ class SensorsClient(): def __init__(self): """Initialize the bus and sensor info and start monitoring sensor states""" self.sensorMutex = RLock() + self.digitalMutex = RLock() self.exiting = Event() self.onDataChanged = None - self.onSystemInfo = None self.systemData = [] self.currentSystemState = [] + self.currentDigitalData = {} + self.queuedDigitalData = {} self.disabledSensors = {} self.disabledSensorTable = "disabled_sensors" checkAllBus() self.gpio = GPIO() + self.downloadSpeed = DownloadSpeed(Config(APP_SETTINGS)) + self.downloadSpeed.getDownloadSpeed() manager.addDeviceInstance("GPIO", "GPIO", "GPIO", self.gpio, [], "system") manager.loadJsonDevices("rest") results = DbManager.Select(self.disabledSensorTable) if results: for row in results: self.disabledSensors[row[0]] = 1 + self.digitalMonitorRunning = False + self.InitCallbacks() self.StartMonitoring() - def SetDataChanged(self, onDataChanged=None, onSystemInfo=None): - """Set callbacks to call when data has changed + def SetDataChanged(self, onDataChanged=None): + """Set callback to call when data has changed Args: onDataChanged: Function to call when sensor data changes - onSystemInfo: Function to call when system info changes """ self.onDataChanged = onDataChanged - self.onSystemInfo = onSystemInfo + + def OnSensorChange(self, device, value): + """Callback that is called when digital sensor data has changed + + Args: + device: The device that has changed data + value: The new data value + """ + debug('OnSensorChange: {}, {}'.format(device, value)) + with self.digitalMutex: + if device['name'] not in self.currentDigitalData: + self.currentDigitalData[device['name']] = {'device': device, 'value': value} + else: + self.queuedDigitalData[device['name']] = {'device': device, 'value': value} + + def InitCallbacks(self): + """Set callback function for any digital devices that support them""" + devices = manager.getDeviceList() + for device in devices: + sensor = instance.deviceInstance(device['name']) + if 'DigitalSensor' in device['type'] and hasattr(sensor, 'setCallback'): + debug('Set callback for {}'.format(sensor)) + sensor.setCallback(self.OnSensorChange, device) + if not self.digitalMonitorRunning: + ThreadPool.Submit(self.DigitalMonitor) + + def RemoveCallbacks(self): + """Remove callback function for all digital devices""" + devices = manager.getDeviceList() + for device in devices: + sensor = instance.deviceInstance(device['name']) + if 'DigitalSensor' in device['type'] and hasattr(sensor, 'removeCallback'): + sensor.removeCallback() def StartMonitoring(self): """Start thread monitoring sensor data""" @@ -62,29 +100,69 @@ def StartMonitoring(self): def StopMonitoring(self): """Stop thread monitoring sensor data""" + self.RemoveCallbacks() self.exiting.set() def Monitor(self): """Monitor bus/sensor states and system info and report changed data via callbacks""" debug('Monitoring sensors and os resources started') + sendAllDataCount = 0 + nextTime = datetime.now() while not self.exiting.is_set(): try: - if not self.exiting.wait(REFRESH_FREQUENCY): + difference = nextTime - datetime.now() + delay = min(REFRESH_FREQUENCY, difference.total_seconds()) + delay = max(0, delay) + if not self.exiting.wait(delay): + nextTime = datetime.now() + timedelta(seconds=REFRESH_FREQUENCY) self.currentSystemState = [] self.MonitorSystemInformation() self.MonitorSensors() self.MonitorBus() if self.currentSystemState != self.systemData: - changedSystemData = self.currentSystemState - if self.systemData: - changedSystemData = [x for x in self.currentSystemState if x not in self.systemData] - if self.onDataChanged and changedSystemData: - self.onDataChanged(changedSystemData) + data = self.currentSystemState + if self.systemData and not sendAllDataCount == 0: + data = [x for x in self.currentSystemState if x not in self.systemData] + if self.onDataChanged and data: + self.onDataChanged(data) + sendAllDataCount += 1 + if sendAllDataCount >= 4: + sendAllDataCount = 0 self.systemData = self.currentSystemState except: exception('Monitoring sensors and os resources failed') debug('Monitoring sensors and os resources finished') + def DigitalMonitor(self): + """Monitor digital state changes and report changed data via callbacks""" + self.digitalMonitorRunning = True + info('Monitoring digital sensor changes') + nextTime = datetime.now() + while not self.exiting.is_set(): + try: + if not self.exiting.wait(0.5): + if datetime.now() > nextTime: + nextTime = datetime.now() + timedelta(seconds=DIGITAL_FREQUENCY) + data = [] + with self.digitalMutex: + if self.currentDigitalData: + for name, item in self.currentDigitalData.items(): + cayennemqtt.DataChannel.add_unique(data, cayennemqtt.DEV_SENSOR, name, value=item['value'], name=item['device']['description'], type='digital_sensor', unit='d') + try: + cayennemqtt.DataChannel.add_unique(data, cayennemqtt.SYS_GPIO, item['device']['args']['channel'], cayennemqtt.VALUE, item['value']) + except: + pass + if name in self.queuedDigitalData and self.queuedDigitalData[name]['value'] == item['value']: + del self.queuedDigitalData[name] + self.currentDigitalData = self.queuedDigitalData + self.queuedDigitalData = {} + if data: + self.onDataChanged(data) + except: + exception('Monitoring digital sensor changes failed') + debug('Monitoring digital sensor changes finished') + self.digitalMonitorRunning = False + def MonitorSensors(self): """Check sensor states for changes""" if self.exiting.is_set(): @@ -109,6 +187,9 @@ def SystemInformation(self): try: systemInfo = SystemInfo() newSystemInfo = systemInfo.getSystemInformation() + download_speed = self.downloadSpeed.getDownloadSpeed() + if download_speed: + cayennemqtt.DataChannel.add(newSystemInfo, cayennemqtt.SYS_NET, suffix=cayennemqtt.SPEEDTEST, value=download_speed, type='bw', unit='mbps') except Exception: exception('SystemInformation failed') return newSystemInfo @@ -164,10 +245,10 @@ def SensorsInfo(self): 'DigitalActuator': {'function': 'read', 'data_args': {'type': 'digital_actuator', 'unit': 'd'}}, 'AnalogSensor': {'function': 'readFloat', 'data_args': {'type': 'analog_sensor'}}, 'AnalogActuator': {'function': 'readFloat', 'data_args': {'type': 'analog_actuator'}}} - extension_types = {'ADC': {'function': 'analogReadAllFloat'}, - 'DAC': {'function': 'analogReadAllFloat'}, - 'PWM': {'function': 'pwmWildcard'}, - 'GPIOPort': {'function': 'wildcard'}} + # extension_types = {'ADC': {'function': 'analogReadAllFloat'}, + # 'DAC': {'function': 'analogReadAllFloat'}, + # 'PWM': {'function': 'pwmWildcard'}, + # 'GPIOPort': {'function': 'wildcard'}} for device_type in device['type']: try: display_name = device['description'] @@ -181,19 +262,22 @@ def SensorsInfo(self): channel = '{}:{}'.format(device['name'], device_type.lower()) else: channel = device['name'] - cayennemqtt.DataChannel.add(sensors_info, cayennemqtt.DEV_SENSOR, channel, value=self.CallDeviceFunction(func), name=display_name, **sensor_type['data_args']) + value = self.CallDeviceFunction(func) + cayennemqtt.DataChannel.add(sensors_info, cayennemqtt.DEV_SENSOR, channel, value=value, name=display_name, **sensor_type['data_args']) + if 'DigitalActuator' == device_type and value in (0, 1): + manager.updateDeviceState(device['name'], value) except: exception('Failed to get sensor data: {} {}'.format(device_type, device['name'])) - else: - try: - extension_type = extension_types[device_type] - func = getattr(sensor, extension_type['function']) - values = self.CallDeviceFunction(func) - for pin, value in values.items(): - cayennemqtt.DataChannel.add(sensors_info, cayennemqtt.DEV_SENSOR, device['name'] + ':' + str(pin), cayennemqtt.VALUE, value, name=display_name) - except: - exception('Failed to get extension data: {} {}'.format(device_type, device['name'])) - logJson('Sensors info: {}'.format(sensors_info)) + # else: + # try: + # extension_type = extension_types[device_type] + # func = getattr(sensor, extension_type['function']) + # values = self.CallDeviceFunction(func) + # for pin, value in values.items(): + # cayennemqtt.DataChannel.add(sensors_info, cayennemqtt.DEV_SENSOR, device['name'] + ':' + str(pin), cayennemqtt.VALUE, value, name=display_name) + # except: + # exception('Failed to get extension data: {} {}'.format(device_type, device['name'])) + info('Sensors info: {}'.format(sensors_info)) return sensors_info def AddSensor(self, name, description, device, args): @@ -222,10 +306,12 @@ def AddSensor(self, name, description, device, args): sensorAdd['description'] = description with self.sensorMutex: retValue = manager.addDeviceJSON(sensorAdd) + self.InitCallbacks() info('Add device returned: {}'.format(retValue)) if retValue[0] == 200: bVal = True - except: + except Exception: + exception('Error adding sensor') bVal = False return bVal @@ -252,6 +338,7 @@ def EditSensor(self, name, description, device, args): sensorEdit['args'] = args with self.sensorMutex: retValue = manager.updateDevice(name, sensorEdit) + self.InitCallbacks() info('Edit device returned: {}'.format(retValue)) if retValue[0] == 200: bVal = True @@ -272,6 +359,12 @@ def RemoveSensor(self, name): bVal = False try: sensorRemove = name + try: + sensor = instance.deviceInstance(sensorRemove) + if hasattr(sensor, 'removeCallback'): + sensor.removeCallback() + except: + pass with self.sensorMutex: retValue = manager.removeDevice(sensorRemove) info('Remove device returned: {}'.format(retValue)) @@ -302,7 +395,7 @@ def EnableSensor(self, sensor, enable): if enable == 0: #add item to the list if sensor not in self.disabledSensors: - rowId = DbManager.Insert(self.disabledSensorTable, sensor) + DbManager.Insert(self.disabledSensorTable, sensor) self.disabledSensors[sensor] = 1 else: #remove item from the list @@ -313,7 +406,6 @@ def EnableSensor(self, sensor, enable): except Exception as ex: error('EnableSensor Failed with exception: ' + str(ex)) return False - self.AddRefresh() return True def GpioCommand(self, command, channel, value): @@ -368,17 +460,19 @@ def SensorCommand(self, command, sensorId, channel, value): info('Sensor not found') return result if command in commands: - info('Sensor found: {}'.format(instance.DEVICES[sensorId])) + device = instance.DEVICES[sensorId] + info('Sensor found: {}'.format(device)) func = getattr(sensor, commands[command]['function']) value = commands[command]['value_type'](value) if channel: result = self.CallDeviceFunction(func, int(channel), value) else: result = self.CallDeviceFunction(func, value) + if 'DigitalActuator' in device['type']: + manager.updateDeviceState(sensorId, value) return result warn('Command not implemented: {}'.format(command)) return result except Exception: exception('SensorCommand failed') return result - diff --git a/myDevices/test/gpio_test.py b/myDevices/test/gpio_test.py index 6e69df1..7084bfe 100644 --- a/myDevices/test/gpio_test.py +++ b/myDevices/test/gpio_test.py @@ -1,7 +1,9 @@ +import time import unittest from myDevices.utils.logger import exception, setDebug, info, debug, error, logToFile, setInfo from myDevices.devices.digital.gpio import NativeGPIO + class GpioTest(unittest.TestCase): def setUp(self): self.gpio = NativeGPIO() @@ -32,6 +34,21 @@ def testPinStatus(self): self.assertGreaterEqual(value['value'], 0) self.assertLessEqual(value['value'], 1) + def edgeCallback(self, data, value): + info('edgeCallback data {}, value {}'.format(data, value)) + self.callback_data = data + + def testEdgeCallback(self): + self.callback_data = 0 + pin = 27 + self.gpio.setFunctionString(pin, 'IN') + self.gpio.setCallback(pin, self.edgeCallback, pin) + for x in range(15): + if self.callback_data != 0: + break + time.sleep(1) + self.assertEqual(pin, self.callback_data) + if __name__ == '__main__': setInfo() diff --git a/myDevices/test/sensors_test.py b/myDevices/test/sensors_test.py index 50ab93c..79107d3 100644 --- a/myDevices/test/sensors_test.py +++ b/myDevices/test/sensors_test.py @@ -24,17 +24,22 @@ def tearDownClass(cls): del cls.client def OnDataChanged(self, sensor_data): + # if len(sensor_data) < 5: + # info('OnDataChanged: {}'.format(sensor_data)) + # else: + # info('OnDataChanged: {}'.format(len(sensor_data))) self.previousSystemData = self.currentSystemData self.currentSystemData = sensor_data if self.previousSystemData: self.done = True def testMonitor(self): + debug('testMonitor') self.previousSystemData = None self.currentSystemData = None self.done = False SensorsClientTest.client.SetDataChanged(self.OnDataChanged) - for i in range(25): + for i in range(35): sleep(1) if self.done: break @@ -42,6 +47,7 @@ def testMonitor(self): self.assertNotEqual(self.previousSystemData, self.currentSystemData) def testBusInfo(self): + debug('testBusInfo') bus = {item['channel']:item['value'] for item in SensorsClientTest.client.BusInfo()} info('Bus info: {}'.format(bus)) for pin in GPIO().pins: @@ -49,6 +55,7 @@ def testBusInfo(self): self.assertIn('sys:gpio:{};value'.format(pin), bus) def testSensorsInfo(self): + debug('testSensorsInfo') sensors = SensorsClientTest.client.SensorsInfo() info('Sensors info: {}'.format(sensors)) for sensor in sensors: @@ -56,15 +63,18 @@ def testSensorsInfo(self): self.assertIn('value', sensor) def testSetFunction(self): + debug('testSetFunciton') self.setChannelFunction(GPIO().pins[7], 'IN') self.setChannelFunction(GPIO().pins[7], 'OUT') def testSetValue(self): + debug('testSetValue') self.setChannelFunction(GPIO().pins[7], 'OUT') self.setChannelValue(GPIO().pins[7], 1) self.setChannelValue(GPIO().pins[7], 0) def testSensors(self): + debug('testSensors') #Test adding a sensor channel = GPIO().pins[8] testSensor = {'description': 'Digital Input', 'device': 'DigitalSensor', 'args': {'gpio': 'GPIO', 'invert': False, 'channel': channel}, 'name': 'testdevice'} @@ -90,8 +100,9 @@ def testSensors(self): self.assertNotIn(testSensor['name'], deviceNames) def testSensorInfo(self): - actuator_channel = GPIO().pins[9] - light_switch_channel = GPIO().pins[9] + debug('testSensorInfo') + actuator_channel = GPIO().pins[10] + light_switch_channel = GPIO().pins[11] sensors = {'actuator' : {'description': 'Digital Output', 'device': 'DigitalActuator', 'args': {'gpio': 'GPIO', 'invert': False, 'channel': actuator_channel}, 'name': 'test_actuator'}, 'light_switch' : {'description': 'Light Switch', 'device': 'LightSwitch', 'args': {'gpio': 'GPIO', 'invert': True, 'channel': light_switch_channel}, 'name': 'test_light_switch'}, 'MCP3004' : {'description': 'MCP3004', 'device': 'MCP3004', 'args': {'chip': '0'}, 'name': 'test_MCP3004'}, @@ -113,6 +124,27 @@ def testSensorInfo(self): for sensor in sensors.values(): self.assertTrue(SensorsClientTest.client.RemoveSensor(sensor['name'])) + def testSensorCallback(self): + debug('testSensorCallback') + self.previousSystemData = None + self.currentSystemData = None + self.done = False + SensorsClientTest.client.SetDataChanged(self.OnDataChanged) + actuator_channel = GPIO().pins[10] + sensor_channel = GPIO().pins[11] + sensors = {'actuator' : {'description': 'Digital Output', 'device': 'DigitalActuator', 'args': {'gpio': 'GPIO', 'invert': False, 'channel': actuator_channel}, 'name': 'test_actuator'}, + 'sensor': {'description': 'Digital Input', 'device': 'DigitalSensor', 'args': {'gpio': 'GPIO', 'invert': False, 'channel': sensor_channel}, 'name': 'testdevice'}} + for sensor in sensors.values(): + SensorsClientTest.client.AddSensor(sensor['name'], sensor['description'], sensor['device'], sensor['args']) + for i in range(35): + sleep(1) + if self.done: + break + info('Changed items: {}'.format([x for x in self.currentSystemData if x not in self.previousSystemData])) + self.assertNotEqual(self.previousSystemData, self.currentSystemData) + for sensor in sensors.values(): + self.assertTrue(SensorsClientTest.client.RemoveSensor(sensor['name'])) + def setSensorValue(self, sensor, value): SensorsClientTest.client.SensorCommand('integer', sensor['name'], None, value) channel = 'dev:{}'.format(sensor['name']) diff --git a/myDevices/utils/config.py b/myDevices/utils/config.py index 9e55b23..96f1051 100644 --- a/myDevices/utils/config.py +++ b/myDevices/utils/config.py @@ -1,8 +1,10 @@ from configparser import RawConfigParser, _UNSET, NoSectionError from threading import RLock +NETWORK_SETTINGS = '/etc/myDevices/Network.ini' APP_SETTINGS = '/etc/myDevices/AppSettings.ini' + class Config: def __init__(self, path): self.mutex = RLock()