Skip to content

Commit

Permalink
ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
eth2353 committed Sep 21, 2024
1 parent 49fbde9 commit 5e07980
Show file tree
Hide file tree
Showing 41 changed files with 820 additions and 720 deletions.
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,13 @@ asyncio_mode = "auto"
[tool.ruff]
# Assume Python 3.12+
target-version = "py312"

[tool.ruff.lint]
# Enable all rules and explicitly disable some that we don't comply with (yet?)
select = ["ALL"]
ignore = ["D", "EM", "FIX", "PL", "TD", "ANN204", "ANN401", "C408", "C901", "COM812", "ERA001", "E501", "FBT", "G004", "ISC001", "N805", "N812", "N818","TRY003"]

[tool.ruff.lint.per-file-ignores]
"tests/mock_api/beacon_node.py" = ["C901"]
"tests/*" = ["S", "SLF", "ARG", "INP", "FBT"]
"tests/conftest.py" = ["F401", "F403", "F811", "I"]
26 changes: 16 additions & 10 deletions src/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
from logging import getLevelNamesMapping
from pathlib import Path

from pydantic import BaseModel, HttpUrl, field_validator, ValidationError
from pydantic import BaseModel, HttpUrl, ValidationError, field_validator

_expected_fee_recipient_input_length = 42
_graffiti_max_bytes = 32


class CLIArgs(BaseModel):
Expand All @@ -26,7 +29,7 @@ def _validate_beacon_node_urls(input_string: str) -> list[str]:
urls = [u.strip() for u in input_string.split(",") if len(u.strip()) > 0]

if len(urls) == 0:
raise ValueError("no beacon node urls provided")
raise ValueError("No beacon node urls provided")

if len(urls) != len(set(urls)):
raise ValueError(f"Beacon node urls must be unique: {urls}")
Expand All @@ -44,18 +47,19 @@ def validate_beacon_node_urls_proposal(cls, v: str | None) -> list[str]:
@field_validator("fee_recipient")
def validate_fee_recipient(cls, v: str) -> str:
_error_msg = "fee recipient must be a valid hex string starting with 0x"
if len(v) < 42 or not v.startswith("0x"):
if len(v) < _expected_fee_recipient_input_length or not v.startswith("0x"):
raise ValueError(_error_msg)
try:
bytes.fromhex(v[2:])
return v
except ValueError:
raise ValueError(_error_msg)
raise ValueError(_error_msg) from None
else:
return v

@field_validator("graffiti", mode="before")
def validate_graffiti(cls, v: str) -> bytes:
encoded = v.encode("utf-8").ljust(32, b"\x00")
if len(v) > 32:
encoded = v.encode("utf-8").ljust(_graffiti_max_bytes, b"\x00")
if len(v) > _graffiti_max_bytes:
raise ValueError("Encoded graffiti exceeds the maximum length of 32 bytes")
return encoded

Expand All @@ -64,7 +68,10 @@ def parse_cli_args() -> CLIArgs:
parser = argparse.ArgumentParser(description="Vero validator client.")

parser.add_argument(
"--remote-signer-url", type=str, required=True, help="URL of the remote signer."
"--remote-signer-url",
type=str,
required=True,
help="URL of the remote signer.",
)
parser.add_argument(
"--beacon-node-urls",
Expand Down Expand Up @@ -148,7 +155,6 @@ def parse_cli_args() -> CLIArgs:

try:
# Convert parsed args to dictionary and validate using Pydantic model
validated_args = CLIArgs(**vars(args))
return validated_args
return CLIArgs(**vars(args))
except ValidationError as e:
parser.error(str(e))
43 changes: 20 additions & 23 deletions src/initialize.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import asyncio
import datetime
import logging
import os
from pathlib import Path

import pytz
from apscheduler.schedulers.asyncio import AsyncIOScheduler

from args import CLIArgs
from observability.event_loop import monitor_event_loop
from providers import RemoteSigner, MultiBeaconNode, BeaconChain
from providers import BeaconChain, MultiBeaconNode, RemoteSigner
from schemas import SchemaBeaconAPI
from services import (
AttestationService,
BlockProposalService,
SyncCommitteeService,
EventConsumerService,
SyncCommitteeService,
ValidatorDutyServiceOptions,
ValidatorStatusTrackerService,
)
Expand Down Expand Up @@ -71,36 +70,34 @@ def _register_event_handlers(


def check_data_dir_permissions(cli_args: CLIArgs) -> None:
if not os.path.isdir(cli_args.data_dir):
if not Path.is_dir(cli_args.data_dir):
_logger.info("Data directory does not exist, attempting to create it")
try:
os.makedirs(cli_args.data_dir)
Path.mkdir(cli_args.data_dir, parents=True)
except Exception as e:
raise RuntimeError(
f"Failed to create data directory: {cli_args.data_dir} - {e}"
)
f"Failed to create data directory at {cli_args.data_dir}",
) from e

# Attempt to write a file and reading from it
try:
test_filename = ".vero_test_permissions"
test_file_path = Path(cli_args.data_dir) / test_filename
test_file_content = "test_permissions"
with open(test_file_path, "w") as f:
f.write(test_file_content)
with open(test_file_path, "r") as f:
content_read = f.read()
os.remove(test_file_path)
if content_read != test_file_content:
raise PermissionError(
f"Mismatch between data written {test_file_content} and read {content_read} into test file"
)
except Exception:
raise
test_filename = ".vero_test_permissions"
test_file_path = Path(cli_args.data_dir) / test_filename
test_file_content = "test_permissions"
with Path.open(test_file_path, "w") as f:
f.write(test_file_content)
with Path.open(test_file_path) as f:
content_read = f.read()
Path.unlink(test_file_path)
if content_read != test_file_content:
raise PermissionError(
f"Mismatch between data written {test_file_content} and read {content_read} into test file",
)


async def run_services(cli_args: CLIArgs) -> None:
scheduler = AsyncIOScheduler(
timezone=pytz.UTC, job_defaults=dict(misfire_grace_time=1)
timezone=pytz.UTC,
job_defaults=dict(misfire_grace_time=1),
)
scheduler.start()

Expand Down
18 changes: 9 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import asyncio
import logging
import signal
import sys
from pathlib import Path
from types import FrameType


from args import parse_cli_args, CLIArgs
from args import CLIArgs, parse_cli_args
from initialize import check_data_dir_permissions, run_services
from observability import init_observability, get_service_version, get_service_commit
from observability import get_service_commit, get_service_version, init_observability


def prep_datadir(data_dir: Path) -> None:
Expand All @@ -17,22 +17,22 @@ def prep_datadir(data_dir: Path) -> None:
# moment but will be used soon for
# another laying of slashing protection
# and caching.
with open(data_dir / "vero_placeholder.yml", "w") as f:
with Path.open(data_dir / "vero_placeholder.yml", "w") as f:
f.write("placeholder")


async def main(cli_args: CLIArgs) -> None:
logging.getLogger("vero-init").info(
f"Starting vero {get_service_version()} (commit {get_service_commit()})"
f"Starting vero {get_service_version()} (commit {get_service_commit()})",
)
check_data_dir_permissions(cli_args=cli_args)
prep_datadir(data_dir=cli_args.data_dir)
await run_services(cli_args=cli_args)


def sigterm_handler(signum: int, frame: FrameType | None) -> None:
print("Received SIGTERM. Exiting.")
exit(0)
def sigterm_handler(_signum: int, _frame: FrameType | None) -> None:
logging.getLogger().info("Received SIGTERM. Exiting.")
sys.exit(0)


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions src/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ._logging import setup_logging
from ._metrics import setup_metrics
from ._metrics_shared import get_shared_metrics, ERROR_TYPE
from ._metrics_shared import ErrorType, get_shared_metrics
from ._profiling import setup_profiling
from ._tracing import setup_tracing
from ._vero_info import get_service_commit, get_service_name, get_service_version
Expand Down Expand Up @@ -28,5 +28,5 @@ def init_observability(
"get_service_commit",
"get_service_name",
"get_service_version",
"ERROR_TYPE",
"ErrorType",
]
8 changes: 7 additions & 1 deletion src/observability/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@


def setup_logging(log_level: str) -> None:
logging.logProcesses = False
logging.logThreads = False
logging.logMultiprocessing = False
if hasattr(logging, "logAsyncioTasks"):
logging.logAsyncioTasks = False

root_logger = logging.getLogger()
root_logger.handlers.clear()
formatter = logging.Formatter(
"%(asctime)s - %(name)-20s - %(levelname)-5s: %(message)s"
"%(asctime)s - %(name)-20s - %(levelname)-5s: %(message)s",
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
Expand Down
6 changes: 3 additions & 3 deletions src/observability/_metrics.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from pathlib import Path

from prometheus_client import multiprocess, start_http_server, REGISTRY
from prometheus_client import REGISTRY, multiprocess, start_http_server


def setup_metrics(addr: str, port: int, multiprocess_mode: bool = False) -> None:
Expand All @@ -15,11 +15,11 @@ def setup_metrics(addr: str, port: int, multiprocess_mode: bool = False) -> None
_multiprocessing_data_path = Path(_multiprocessing_data_dir)
if not _multiprocessing_data_path.is_dir():
raise ValueError(
f"PROMETHEUS_MULTIPROC_DIR {_multiprocessing_data_path} does not exist"
f"PROMETHEUS_MULTIPROC_DIR {_multiprocessing_data_path} does not exist",
)

for file in _multiprocessing_data_path.iterdir():
os.remove(_multiprocessing_data_dir / file)
Path.unlink(_multiprocessing_data_dir / file)

multiprocess.MultiProcessCollector(REGISTRY)

Expand Down
4 changes: 2 additions & 2 deletions src/observability/_metrics_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
_METRICS_INITIATED = False


class ERROR_TYPE(Enum):
class ErrorType(Enum):
ATTESTATION_CONSENSUS = "attestation-consensus"
ATTESTATION_PUBLISH = "attestation-publish"
AGGREGATE_ATTESTATION_PRODUCE = "aggregate-attestation-produce"
Expand All @@ -30,7 +30,7 @@ def get_shared_metrics() -> tuple[Counter]:
"Number of errors",
labelnames=["error_type"],
)
for enum_type in ERROR_TYPE:
for enum_type in ErrorType:
_ERRORS_METRIC.labels(enum_type.value).reset()

_METRICS_INITIATED = True
Expand Down
12 changes: 6 additions & 6 deletions src/observability/_tracing.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

from observability._vero_info import get_service_version, get_service_name
from observability._vero_info import get_service_name, get_service_version


def setup_tracing() -> None:
if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
return None
return

provider = TracerProvider(
resource=Resource.create(
{
"service.name": get_service_name(),
"service.version": get_service_version(),
}
)
},
),
)
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"))
OTLPSpanExporter(endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT")),
)
provider.add_span_processor(processor)

Expand Down
14 changes: 6 additions & 8 deletions src/observability/api_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Helper that provides observability into API requests - request count and duration.
"""
"""Helper that provides observability into API requests - request count and duration."""

import asyncio
import logging
Expand All @@ -26,15 +24,15 @@


async def _on_request_start(
session: aiohttp.ClientSession,
_session: aiohttp.ClientSession,
trace_config_ctx: SimpleNamespace,
params: aiohttp.TraceRequestStartParams,
_params: aiohttp.TraceRequestStartParams,
) -> None:
trace_config_ctx.start = asyncio.get_event_loop().time()


async def _on_request_end(
session: aiohttp.ClientSession,
_session: aiohttp.ClientSession,
trace_config_ctx: SimpleNamespace,
params: aiohttp.TraceRequestEndParams,
) -> None:
Expand Down Expand Up @@ -83,8 +81,8 @@ def __init__(self, host: str, service_type: ServiceType):
trace_request_ctx=trace_request_ctx,
host=host,
service_type=service_type.value,
)
)
),
),
)

self.on_request_start.append(_on_request_start)
Expand Down
5 changes: 3 additions & 2 deletions src/observability/event_loop.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging

from prometheus_client import Histogram, Gauge
from prometheus_client import Gauge, Histogram

EVENT_LOOP_LAG = Histogram(
"event_loop_lag_seconds",
Expand All @@ -18,11 +18,12 @@ async def monitor_event_loop() -> None:
event_loop = asyncio.get_event_loop()
_start = event_loop.time()
_interval = 0.1 # Check every 100 milliseconds
_loop_lag_high_threshold = 0.5 # 500 milliseconds

while True:
await asyncio.sleep(_interval)
lag = event_loop.time() - _start - _interval
if lag > 0.5:
if lag > _loop_lag_high_threshold:
_logger.warning(f"Event loop lag high: {lag}")
EVENT_LOOP_LAG.observe(lag)
EVENT_LOOP_TASKS.set(len(asyncio.all_tasks(event_loop)))
Expand Down
Loading

0 comments on commit 5e07980

Please sign in to comment.