Skip to content

Commit

Permalink
use mixin class for tensorflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Moors committed Dec 27, 2024
1 parent 82bd926 commit 745d5fa
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 45 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
__pycache__/
*.egg-info/
build/

# Vim
*.sw[op]
*~
Empty file.
69 changes: 24 additions & 45 deletions eessi/testsuite/tests/apps/tensorflow/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,34 @@
"""

import reframe as rfm
from reframe.core.builtins import deferrable, parameter, run_after, sanity_function, performance_function
import reframe.utility.sanity as sn

from eessi.testsuite import hooks, utils
from eessi.testsuite.constants import * # noqa
from eessi.testsuite.constants import COMPUTE_UNIT, CPU, CPU_SOCKET, DEVICE_TYPES, GPU
from eessi.testsuite.eessi_mixin import EESSI_Mixin


@rfm.simple_test
class EESSI_TensorFlow(rfm.RunOnlyRegressionTest):

# This test can run at any scale, so parameterize over all known SCALES
scale = parameter(SCALES.keys())
valid_prog_environs = ['default']
valid_systems = ['*']
class EESSI_TensorFlow(rfm.RunOnlyRegressionTest, EESSI_Mixin):

# Parameterize over all modules that start with TensorFlow
module_name = parameter(utils.find_modules('TensorFlow'))

# Make CPU and GPU versions of this test
device_type = parameter(['cpu', 'gpu'])
device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]])

executable = 'python tf_test.py'

time_limit = '30m'

# This test should be run as part of EESSI CI
tags = {TAGS['CI']}
bench_name = bench_name_ci = 'bench_ci'

readonly_files = ['mnist_setup.py', 'tf_test.py']

def required_mem_per_node(self):
return self.num_tasks_per_node * 2048

@deferrable
def assert_tf_config_ranks(self):
Expand All @@ -44,9 +46,7 @@ def assert_completion(self):
'''Assert that the test ran until completion'''
n_fit_completed = sn.count(sn.extractall('^Rank [0-9]+: Keras fit completed', self.stdout))

return sn.all([
sn.assert_eq(n_fit_completed, self.num_tasks),
])
return sn.assert_eq(n_fit_completed, self.num_tasks)

@deferrable
def assert_convergence(self):
Expand All @@ -68,16 +68,6 @@ def assert_sanity(self):
def perf(self):
return sn.extractsingle(r'^Performance:\s+(?P<perf>\S+)', self.stdout, 'perf', float)

@run_after('init')
def run_after_init(self):
"""hooks to run after the init phase"""
# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)

hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type)
hooks.set_modules(self)
hooks.set_tag_scale(self)

@run_after('init')
def set_executable_opts(self):
"""Set executable opts based on device_type parameter"""
Expand All @@ -91,33 +81,22 @@ def set_executable_opts(self):
def set_test_descr(self):
self.descr = f'TensorFlow benchmark on {self.device_type}'

@run_after('setup')
def run_after_setup(self):
"""hooks to run after the setup phase"""
# TODO: implement
# It should bind to socket, but different MPIs may have different arguments to do that...
# We should at very least prevent that it binds to single core per process,
# as that results in many threads being scheduled to one core.
# binding may also differ per launcher used. It'll be hard to support a wide range and still get proper binding
if self.device_type == 'cpu':
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET'])
elif self.device_type == 'gpu':
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU'])
else:
raise NotImplementedError(f'Failed to set number of tasks and cpus per task for device {self.device_type}')
@run_after('init')
def set_compute_unit(self):
"""
Set the compute unit to which tasks will be assigned:
one task per CPU socket for CPU runs, and one task per GPU for GPU runs.
"""
device_to_compute_unit = {
DEVICE_TYPES[CPU]: COMPUTE_UNIT[CPU_SOCKET],
DEVICE_TYPES[GPU]: COMPUTE_UNIT[GPU],
}
self.compute_unit = device_to_compute_unit.get(self.device_type)

@run_after('setup')
def set_thread_count_args(self):
"""Set exectuable opts defining the thread count"""
"""Set executable opts defining the thread count"""
if not self.has_custom_executable_opts:
self.executable_opts += ['--intra-op-parallelism', '%s' % self.num_cpus_per_task]
self.executable_opts += ['--inter-op-parallelism', '1']
utils.log(f'executable_opts set to {self.executable_opts}')

@run_after('setup')
def set_binding_policy(self):
"""
Sets a binding policy for tasks. We don't bind threads because of
https://github.com/tensorflow/tensorflow/issues/60843
"""
hooks.set_compact_process_binding(self)

0 comments on commit 745d5fa

Please sign in to comment.