Skip to content

Commit

Permalink
Battery status with channel communication (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
ela-kotulska-frequenz authored Jan 31, 2023
2 parents 0c2501d + 6a865d4 commit bf36ba6
Show file tree
Hide file tree
Showing 10 changed files with 1,415 additions and 1,445 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ classifiers = [
requires-python = ">= 3.8, < 4"
dependencies = [
"frequenz-api-microgrid >= 0.11.0, < 0.12.0",
"frequenz-channels >= 0.12.0, < 0.13.0",
"frequenz-channels >= 0.13.0, < 0.14.0",
"google-api-python-client >= 2.71, < 3",
"grpcio >= 1.51.1, < 2",
"grpcio-tools >= 1.51.1, < 2",
Expand Down
223 changes: 145 additions & 78 deletions src/frequenz/sdk/actor/power_distributing/_battery_pool_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,75 @@

import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, Set

from ..._internal.asyncio import AsyncConstructible
from ...microgrid._battery import BatteryStatus, StatusTracker
from .result import PartialFailure, Result, Success
from frequenz.channels import Broadcast, Receiver
from frequenz.channels.util import MergeNamed

from ..._internal.asyncio import cancel_and_await
from ._battery_status import BatteryStatusTracker, SetPowerResult, Status

_logger = logging.getLogger(__name__)


class BatteryPoolStatus(AsyncConstructible):
"""Return status of batteries in the pool.
@dataclass
class BatteryStatus:
"""Status of the batteries."""

working: Set[int]
"""Set of working battery ids."""

uncertain: Set[int]
"""Set of batteries that should be used only if there are no working batteries."""

def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
"""From the given set of batteries return working batteries.
Args:
batteries: Set of batteries
Returns:
Subset with working batteries.
"""
working = self.working.intersection(batteries)
if len(working) > 0:
return working
return self.uncertain.intersection(batteries)


@dataclass
class _BatteryStatusChannelHelper:
"""Helper class to create battery status channel.
To create an instance of this class you should use `async_new` class method.
Standard constructor (__init__) is not supported and using it will raise
`NotSyncConstructible` error.
Channel has only one receiver.
Receiver has size 1, because we need only latest status.
"""

# This is instance attribute.
# Don't assign default value, because then it becomes class attribute.
_batteries: Dict[int, StatusTracker]
battery_id: int
"""Id of the battery for which we should create channel."""

def __post_init__(self):
self.name: str = f"battery-{self.battery_id}-status"
channel = Broadcast[Status](self.name)

@classmethod
async def async_new(
cls,
receiver_name = f"{self.name}-receiver"
self.receiver = channel.new_receiver(name=receiver_name, maxsize=1)
self.sender = channel.new_sender()


class BatteryPoolStatus:
"""Track status of the batteries.
Send set of working and uncertain batteries, when the any battery change status.
"""

def __init__(
self,
battery_ids: Set[int],
max_data_age_sec: float,
max_blocking_duration_sec: float,
) -> BatteryPoolStatus:
) -> None:
"""Create BatteryPoolStatus instance.
Args:
Expand All @@ -47,81 +88,107 @@ async def async_new(
Raises:
RuntimeError: If any battery has no adjacent inverter.
Returns:
New instance of this class.
"""
self: BatteryPoolStatus = BatteryPoolStatus.__new__(cls)

tasks = [
StatusTracker.async_new(id, max_data_age_sec, max_blocking_duration_sec)
for id in battery_ids
]
# At first no battery is working, we will get notification when they start
# working.
self._current_status = BatteryStatus(working=set(), uncertain=set())

# Channel for sending results of requests to the batteries
request_result_channel = Broadcast[SetPowerResult]("battery_request_status")
self._request_result_sender = request_result_channel.new_sender()

self._batteries: Dict[str, BatteryStatusTracker] = {}

# Receivers for individual battery statuses are needed to create a `MergeNamed`
# object.
receivers: Dict[str, Receiver[Status]] = {}

for battery_id in battery_ids:
channel = _BatteryStatusChannelHelper(battery_id)
receivers[channel.name] = channel.receiver

self._batteries[channel.name] = BatteryStatusTracker(
battery_id=battery_id,
max_data_age_sec=max_data_age_sec,
max_blocking_duration_sec=max_blocking_duration_sec,
status_sender=channel.sender,
request_result_receiver=request_result_channel.new_receiver(
f"battery_{battery_id}_request_status"
),
)

self._battery_status_channel = MergeNamed[Status](
**receivers,
)

trackers = await asyncio.gather(*tasks)
self._batteries = {tracker.battery_id: tracker for tracker in trackers}
self._task = asyncio.create_task(self._run())

return self
async def stop(self) -> None:
"""Stop tracking batteries status."""
await cancel_and_await(self._task)

def get_working_batteries(self, battery_ids: Set[int]) -> Set[int]:
"""Get subset of battery_ids with working batteries.
await asyncio.gather(
*[
tracker.stop() # pylint: disable=protected-access
for tracker in self._batteries.values()
],
)
await self._battery_status_channel.stop()

Args:
battery_ids: batteries ids
async def _run(self) -> None:
"""Start tracking batteries status."""
while True:
try:
await self._update_status(self._battery_status_channel)
except Exception as err: # pylint: disable=broad-except
_logger.error(
"BatteryPoolStatus failed with error: %s. Restarting.", err
)

Raises:
RuntimeError: If `async_init` method was not called at the beginning to
initialized object.
KeyError: If any battery in the given batteries is not in the pool.
async def _update_status(self, status_channel: MergeNamed[Status]) -> None:
"""Wait for any battery to change status and update status.
Returns:
Subset of given batteries with working batteries.
Args:
status_channel: Receivers packed in Select object.
"""
working: Set[int] = set()
uncertain: Set[int] = set()
for bat_id in battery_ids:
if bat_id not in battery_ids:
ids = str(self._batteries.keys())
raise KeyError(f"No battery {bat_id} in pool. All batteries: {ids}")
battery_status = self._batteries[bat_id].get_status()
if battery_status == BatteryStatus.WORKING:
working.add(bat_id)
elif battery_status == BatteryStatus.UNCERTAIN:
uncertain.add(bat_id)

if len(working) > 0:
return working
async for channel_name, status in status_channel:
battery_id = self._batteries[channel_name].battery_id
if status == Status.WORKING:
self._current_status.working.add(battery_id)
self._current_status.uncertain.discard(battery_id)
elif status == Status.UNCERTAIN:
self._current_status.working.discard(battery_id)
self._current_status.uncertain.add(battery_id)
elif status == Status.NOT_WORKING:
self._current_status.working.discard(battery_id)
self._current_status.uncertain.discard(battery_id)

# In the future here we should send status to the subscribed actors

async def update_status(
self, succeed_batteries: Set[int], failed_batteries: Set[int]
) -> None:
"""Notify which batteries succeed and failed in the request.
Batteries that failed will be considered as broken and will be blocked for
some time.
Batteries that succeed will be unblocked.
_logger.warning(
"There are no working batteries in %s. Falling back to using uncertain batteries %s.",
str(battery_ids),
str(uncertain),
Args:
succeed_batteries: Batteries that succeed request
failed_batteries: Batteries that failed request
"""
await self._request_result_sender.send(
SetPowerResult(succeed_batteries, failed_batteries)
)
return uncertain

def update_last_request_status(self, result: Result):
"""Update batteries in pool based on the last result from the request.
def get_working_batteries(self, batteries: Set[int]) -> Set[int]:
"""From the given set of batteries get working.
Args:
result: Summary of what batteries failed and succeed in last request.
batteries: Set of batteries
Raises:
RuntimeError: If `async_init` method was not called at the beginning to
initialize object.
Returns:
Subset with working batteries.
"""
if isinstance(result, Success):
for bat_id in result.used_batteries:
self._batteries[bat_id].unblock()

elif isinstance(result, PartialFailure):
for bat_id in result.failed_batteries:
duration = self._batteries[bat_id].block()
if duration > 0:
_logger.warning(
"Battery %d failed last response. Block it for %f sec",
bat_id,
duration,
)

for bat_id in result.succeed_batteries:
self._batteries[bat_id].unblock()
return self._current_status.get_working_batteries(batteries)
Loading

0 comments on commit bf36ba6

Please sign in to comment.