Skip to content

Commit

Permalink
Jsonl batching & Schema update (#72)
Browse files Browse the repository at this point in the history
* update schema

* mqtt batching
  • Loading branch information
scribblrsam authored Sep 6, 2021
1 parent f5e610c commit 199663e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
41 changes: 26 additions & 15 deletions custom_components/openhasp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ def __init__(self, hass, config, entry):

self._subscriptions = []

with open(pathlib.Path(__file__).parent.joinpath("pages_schema.json"), 'r') as schema_file:
with open(
pathlib.Path(__file__).parent.joinpath("pages_schema.json"), "r"
) as schema_file:
self.json_schema = json.load(schema_file)

async def async_will_remove_from_hass(self):
Expand Down Expand Up @@ -568,7 +570,7 @@ async def async_change_page(self, page):
self.async_write_ha_state()

async def async_command_service(self, keyword, parameters):
"""Sends commands directly to the plate entity """
"""Sends commands directly to the plate entity"""
self.hass.components.mqtt.async_publish(
f"{self._topic}/command",
f"{keyword} {parameters}".strip(),
Expand Down Expand Up @@ -626,23 +628,32 @@ async def async_load_page(self, path):
_LOGGER.error("'%s' is not an allowed directory", path)
return

def send_lines(lines):
mqtt_payload_buffer = ""
for line in lines:
if len(mqtt_payload_buffer) + len(line) > 1000:
self.hass.components.mqtt.async_publish(
f"{cmd_topic}/jsonl", mqtt_payload_buffer, qos=0, retain=False
)
mqtt_payload_buffer = line
else:
mqtt_payload_buffer = mqtt_payload_buffer + line
self.hass.components.mqtt.async_publish(
f"{cmd_topic}/jsonl", mqtt_payload_buffer, qos=0, retain=False
)

try:
with open(path, 'r') as pages_file:
with open(path, "r") as pages_file:
if path.endswith(".json"):
json_data = json.load(pages_file)
jsonschema.validate(instance=json_data, schema=self.json_schema)
lines = []
for item in json_data:
if isinstance(item, dict):
self.hass.components.mqtt.async_publish(
f"{cmd_topic}/jsonl", json.dumps(item), qos=0, retain=False
)
else: # process as .jsonl file
# load line by line
for line in pages_file:
if line:
self.hass.components.mqtt.async_publish(
f"{cmd_topic}/jsonl", line, qos=0, retain=False
)
lines.append(json.dumps(item) + "\n")
send_lines(lines)
else:
send_lines(pages_file)
await self.refresh()

except (IndexError, FileNotFoundError, IsADirectoryError, UnboundLocalError):
Expand All @@ -651,7 +662,7 @@ async def async_load_page(self, path):
os.path.basename(path),
)

except (json.JSONDecodeError, TypeError):
except json.JSONDecodeError:
_LOGGER.error(
"Error decoding .json file: %s",
os.path.basename(path),
Expand All @@ -661,7 +672,7 @@ async def async_load_page(self, path):
_LOGGER.error(
"Schema check failed for %s. Validation Error: %s",
os.path.basename(path),
e.message
e.message,
)


Expand Down
5 changes: 2 additions & 3 deletions custom_components/openhasp/pages_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
},
"id": {
"type": "integer",
"minimum": 1,
"minimum": 0,
"maximum": 254
},
"obj": {
"type": "string"
}
},
"required": [
"id",
"obj"
"id"
]
}
]
Expand Down

0 comments on commit 199663e

Please sign in to comment.