From 788d0cfbf1fb0ebc0fe1e406b36e3da8ab7ec64d Mon Sep 17 00:00:00 2001 From: Olivier Le Thanh Duong Date: Thu, 13 Feb 2025 09:55:42 +0100 Subject: [PATCH] Fix regression: Monitoring of payment stream Problem: in monitoring_payment_task the check of stream payment was not done Regression introduced in https://github.com/aleph-im/aleph-vm/pull/726 --- src/aleph/vm/orchestrator/tasks.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/aleph/vm/orchestrator/tasks.py b/src/aleph/vm/orchestrator/tasks.py index 84d9ca49..75fff236 100644 --- a/src/aleph/vm/orchestrator/tasks.py +++ b/src/aleph/vm/orchestrator/tasks.py @@ -4,7 +4,6 @@ import math import time from collections.abc import AsyncIterable -from decimal import Decimal from typing import TypeVar import aiohttp @@ -193,17 +192,17 @@ async def check_payment(pool: VmPool): await pool.stop_vm(last_execution.vm_hash) required_balance = await compute_required_balance(executions) - # Check if the balance held in the wallet is sufficient stream tier resources - for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): - for chain, executions in chains.items(): - try: - stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) - logger.debug( - f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" - ) - except ValueError as error: - logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") - continue + # Check if the balance held in the wallet is sufficient stream tier resources + for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items(): + for chain, executions in chains.items(): + try: + stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain) + logger.debug( + f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}" + ) + except ValueError as error: + logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}") + continue required_stream = await compute_required_flow(executions) logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")