Skip to content

Commit

Permalink
Use top-level imports
Browse files Browse the repository at this point in the history
With this commit we re-enable the pylint warning `C0415` which checks
whether modules are imported outside of the toplevel. While we move most
of the imports to toplevel, we keep some imports more targeted, most
notable the `elasticsearch` module so we ensure that the library picks
up Rally's logging configuration properly even in a (multiprocess) actor
system.

Relates elastic#838
  • Loading branch information
danielmitterdorfer committed Dec 3, 2020
1 parent 72fbb94 commit 5507fcc
Show file tree
Hide file tree
Showing 25 changed files with 71 additions and 59 deletions.
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ disable=print-statement,
inconsistent-return-statements,
C0302,
C0330,
C0415,
C4001,
R0916,
W0201,
Expand Down
2 changes: 1 addition & 1 deletion esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import logging
import socket

import thespian.actors
import thespian.system.messages.status
Expand Down Expand Up @@ -185,7 +186,6 @@ def actor_system_already_running(ip="127.0.0.1"):
Note: It may be possible that another system is running on the same port.
"""
import socket
s = socket.socket()
try:
s.connect((ip, 1900))
Expand Down
4 changes: 4 additions & 0 deletions esrally/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, hosts, client_options):

# we're using an SSL context now and it is not allowed to have use_ssl present in client options anymore
if self.client_options.pop("use_ssl", False):
# pylint: disable=import-outside-toplevel
import ssl
self.logger.info("SSL support: on")
self.client_options["scheme"] = "https"
Expand Down Expand Up @@ -128,10 +129,12 @@ def _is_set(self, client_opts, k):
return False

def create(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
return elasticsearch.Elasticsearch(hosts=self.hosts, ssl_context=self.ssl_context, **self.client_options)

def create_async(self):
# pylint: disable=import-outside-toplevel
import elasticsearch
import esrally.async_connection
import io
Expand Down Expand Up @@ -208,6 +211,7 @@ def wait_for_rest_layer(es, max_attempts=40):
logger = logging.getLogger(__name__)
for attempt in range(max_attempts):
logger.debug("REST API is available after %s attempts", attempt)
# pylint: disable=import-outside-toplevel
import elasticsearch
try:
# see also WaitForHttpResource in Elasticsearch tests. Contrary to the ES tests we consider the API also
Expand Down
14 changes: 5 additions & 9 deletions esrally/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
from esrally.utils import io, git, console, convert


class ConfigError(exceptions.RallyError):
pass


class Scope(Enum):
# Valid for all benchmarks, typically read from the configuration file
application = 1
Expand Down Expand Up @@ -179,7 +175,7 @@ def opts(self, section, key, default_value=None, mandatory=True):
if not mandatory:
return default_value
else:
raise ConfigError("No value for mandatory configuration: section='%s', key='%s'" % (section, key))
raise exceptions.ConfigError(f"No value for mandatory configuration: section='{section}', key='{key}'")

def all_opts(self, section):
"""
Expand Down Expand Up @@ -525,14 +521,14 @@ def migrate(config_file, current_version, target_version, out=print, i=input):
logger.info("Config file is already at version [%s]. Skipping migration.", target_version)
return
if current_version < Config.EARLIEST_SUPPORTED_VERSION:
raise ConfigError("The config file in {} is too old. Please delete it and reconfigure Rally from scratch with {} configure."
.format(config_file.location, PROGRAM_NAME))
raise exceptions.ConfigError(f"The config file in {config_file.location} is too old. Please delete it "
f"and reconfigure Rally from scratch with {PROGRAM_NAME} configure.")

logger.info("Upgrading configuration from version [%s] to [%s].", current_version, target_version)
# Something is really fishy. We don't want to downgrade the configuration.
if current_version >= target_version:
raise ConfigError("The existing config file is available in a later version already. Expected version <= [%s] but found [%s]"
% (target_version, current_version))
raise exceptions.ConfigError(f"The existing config file is available in a later version already. "
f"Expected version <= [{target_version}] but found [{current_version}]")
# but first a backup...
config_file.backup()
config = config_file.load()
Expand Down
5 changes: 4 additions & 1 deletion esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import collections
import concurrent.futures
import datetime
import itertools
import logging
import math
import multiprocessing
Expand Down Expand Up @@ -1085,7 +1086,6 @@ def calculate(self, samples, bucket_interval_secs=1):
:return: A global view of throughput samples.
"""

import itertools
samples_per_task = {}
# first we group all samples by task (operation).
for sample in samples:
Expand Down Expand Up @@ -1269,6 +1269,8 @@ def __init__(self, target):
self.profile_logger = logging.getLogger("rally.profile")

async def __call__(self, *args, **kwargs):
# initialize lazily, we don't need it in the majority of cases
# pylint: disable=import-outside-toplevel
import yappi
import io as python_io
yappi.start()
Expand Down Expand Up @@ -1398,6 +1400,7 @@ async def execute_single(runner, es, params, on_error):
:return: a triple of: total number of operations, unit of operations, a dict of request meta data (may be None).
"""
# pylint: disable=import-outside-toplevel
import elasticsearch
fatal_error = False
try:
Expand Down
19 changes: 14 additions & 5 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import logging
import random
import sys
import time
import types
from collections import Counter, OrderedDict
from copy import deepcopy
from enum import Enum
from functools import total_ordering
from os.path import commonprefix

import ijson
Expand Down Expand Up @@ -669,6 +672,7 @@ class ForceMerge(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
max_num_segments = params.get("max-num-segments")
mode = params.get("mode")
Expand Down Expand Up @@ -991,9 +995,6 @@ class ClusterHealth(Runner):
"""

async def __call__(self, es, params):
from enum import Enum
from functools import total_ordering

@total_ordering
class ClusterHealthStatus(Enum):
UNKNOWN = 0
Expand Down Expand Up @@ -1189,6 +1190,7 @@ async def __call__(self, es, params):
request_params = mandatory(params, "request-params", self)

async def _exists(name):
# pylint: disable=import-outside-toplevel
from elasticsearch.client import _make_path
# currently not supported by client and hence custom request
return await es.transport.perform_request(
Expand Down Expand Up @@ -1397,6 +1399,7 @@ class CreateMlDatafeed(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
datafeed_id = mandatory(params, "datafeed-id", self)
body = mandatory(params, "body", self)
Expand All @@ -1423,6 +1426,7 @@ class DeleteMlDatafeed(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
datafeed_id = mandatory(params, "datafeed-id", self)
force = params.get("force", False)
Expand Down Expand Up @@ -1453,6 +1457,7 @@ class StartMlDatafeed(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
datafeed_id = mandatory(params, "datafeed-id", self)
body = params.get("body")
Expand Down Expand Up @@ -1482,6 +1487,7 @@ class StopMlDatafeed(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
datafeed_id = mandatory(params, "datafeed-id", self)
force = params.get("force", False)
Expand Down Expand Up @@ -1514,6 +1520,7 @@ class CreateMlJob(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
job_id = mandatory(params, "job-id", self)
body = mandatory(params, "body", self)
Expand All @@ -1540,6 +1547,7 @@ class DeleteMlJob(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
job_id = mandatory(params, "job-id", self)
force = params.get("force", False)
Expand Down Expand Up @@ -1570,6 +1578,7 @@ class OpenMlJob(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
job_id = mandatory(params, "job-id", self)
try:
Expand All @@ -1594,6 +1603,7 @@ class CloseMlJob(Runner):
"""

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
job_id = mandatory(params, "job-id", self)
force = params.get("force", False)
Expand Down Expand Up @@ -1908,8 +1918,6 @@ async def __call__(self, es, params):
* ``transform-timeout``: overall runtime timeout of the transform in seconds, default 3600 (1h)
* ``poll-interval``: how often transform stats are polled, used to set progress and check the state, default 0.5.
"""
import time

transform_id = mandatory(params, "transform-id", self)
force = params.get("force", False)
timeout = params.get("timeout")
Expand Down Expand Up @@ -2073,6 +2081,7 @@ async def __aenter__(self):
return self

async def __call__(self, es, params):
# pylint: disable=import-outside-toplevel
import elasticsearch
import socket

Expand Down
4 changes: 4 additions & 0 deletions esrally/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class RallyAssertionError(RallyError):
"""


class ConfigError(RallyError):
pass


class DataError(RallyError):
"""
Thrown when something is wrong with the benchmark data
Expand Down
2 changes: 1 addition & 1 deletion esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import pickle
import sys
import traceback
from collections import defaultdict

import thespian.actors
Expand Down Expand Up @@ -568,7 +569,6 @@ def receiveMsg_StartNodes(self, msg, sender):
except Exception:
self.logger.exception("Cannot process message [%s]", msg)
# avoid "can't pickle traceback objects"
import traceback
_, ex_value, _ = sys.exc_info()
self.send(getattr(msg, "reply_to", sender), actor.BenchmarkFailure(ex_value, traceback.format_exc()))

Expand Down
2 changes: 1 addition & 1 deletion esrally/mechanic/provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import os
import shutil
import uuid

import jinja2

Expand Down Expand Up @@ -395,7 +396,6 @@ def __init__(self, car, node_name, cluster_settings, ip, http_port, node_root_di
self.rally_root = rally_root
self.binary_path = os.path.join(node_root_dir, "install")
# use a random subdirectory to isolate multiple runs because an external (non-root) user cannot clean it up.
import uuid
self.data_paths = [os.path.join(node_root_dir, "data", str(uuid.uuid4()))]
self.logger = logging.getLogger(__name__)

Expand Down
3 changes: 1 addition & 2 deletions esrally/mechanic/supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,9 @@ def _supply_requirements(sources, distribution, build, plugins, revisions, distr

def _src_dir(cfg, mandatory=True):
# Don't let this spread across the whole module
from esrally import config
try:
return cfg.opts("node", "src.root.dir", mandatory=mandatory)
except config.ConfigError:
except exceptions.ConfigError:
raise exceptions.SystemSetupError("You cannot benchmark Elasticsearch from sources. Did you install Gradle? Please install"
" all prerequisites and reconfigure Rally with %s configure" % PROGRAM_NAME)

Expand Down
11 changes: 5 additions & 6 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.

import collections
import glob
import json
import logging
import math
import os
Expand All @@ -30,7 +32,7 @@

import tabulate

from esrally import time, exceptions, config, version, paths
from esrally import client, time, exceptions, config, version, paths
from esrally.utils import convert, console, io, versions


Expand Down Expand Up @@ -85,6 +87,7 @@ def refresh(self, index):

def bulk_index(self, index, doc_type, items):
# TODO #653: Remove version-specific support for metrics stores before 7.0.0.
# pylint: disable=import-outside-toplevel
import elasticsearch.helpers
if self._cluster_version[0] > 6:
self.guarded(elasticsearch.helpers.bulk, self._client, items, index=index, chunk_size=5000)
Expand All @@ -103,6 +106,7 @@ def search(self, index, body):
return self.guarded(self._client.search, index=index, body=body)

def guarded(self, target, *args, **kwargs):
# pylint: disable=import-outside-toplevel
import elasticsearch
max_execution_count = 11
execution_count = 0
Expand Down Expand Up @@ -183,8 +187,6 @@ def __init__(self, cfg):
ca_path = self._config.opts("reporting", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
self.probe_version = self._config.opts("reporting", "datastore.probe.cluster_version", default_value=True, mandatory=False)

from esrally import client

# Instead of duplicating code, we're just adapting the metrics store specific properties to match the regular client options.
client_options = {
"use_ssl": secure,
Expand Down Expand Up @@ -1333,7 +1335,6 @@ def __init__(self, cfg):
self.race_path = paths.race_root(self.cfg)

def _store(self, doc):
import json
io.ensure_dir(self.race_path)
with open(self._race_file(), mode="wt", encoding="utf-8") as f:
f.write(json.dumps(doc, indent=True, ensure_ascii=False))
Expand All @@ -1342,7 +1343,6 @@ def _race_file(self, race_id=None):
return os.path.join(paths.race_root(cfg=self.cfg, race_id=race_id), "race.json")

def list(self):
import glob
results = glob.glob(self._race_file(race_id="*"))
all_races = self._to_races(results)
return all_races[:self._max_results()]
Expand All @@ -1356,7 +1356,6 @@ def find_by_race_id(self, race_id):
raise exceptions.NotFound("No race with race id [{}]".format(race_id))

def _to_races(self, results):
import json
races = []
for result in results:
# noinspection PyBroadException
Expand Down
3 changes: 2 additions & 1 deletion esrally/rally.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import time
import uuid

import thespian.actors

from esrally import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry
from esrally import version, actor, config, paths, racecontrol, reporter, metrics, track, chart_generator, exceptions, \
log
Expand Down Expand Up @@ -681,7 +683,6 @@ def race(cfg):


def with_actor_system(runnable, cfg):
import thespian.actors
logger = logging.getLogger(__name__)
already_running = actor.actor_system_already_running()
logger.info("Actor system already running locally? [%s]", str(already_running))
Expand Down
Loading

0 comments on commit 5507fcc

Please sign in to comment.