Skip to content

Commit

Permalink
Python Dataset Reader (#2414)
Browse files Browse the repository at this point in the history
* Add skeleton for new python data reader

* Implement basic functionality

* Fix initialization for distconv

* Add support for labels

* Add python library supporting classes

* clang format

* Raise exception if rank/io parts not set

* Rename to python dataset

* Add optional module dir argument to add to path

* Add unit tests

* Simplify naming

* Add cosmoflow example and reader helper

* Update release notes

* Save dataset pickle in work dir

* Overhaul new data reader to support prefetching multiple samples/batches

* Fix worker index calculation

* clang-format

* Clarify proto comments

* Throw error if file fails to open

* Add docstrings and type hints

* Update CosmoFlow example and enable parallel IO

* Add basic sample size checking, remove label reconstruction, general clean up

* Switch to multiprocessing pool

* Implement response shuffling for distconv

* fix typo

Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com>

---------

Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com>
  • Loading branch information
fiedorowicz1 and tbennun authored Apr 4, 2024
1 parent 811af60 commit 1db91a2
Show file tree
Hide file tree
Showing 14 changed files with 1,306 additions and 10 deletions.
2 changes: 2 additions & 0 deletions ReleaseNotes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Experiments & Applications:
Internal features:

I/O & data ingestion:
- Added a new python dataset reader for simple, flexible, and distconv-supported
python data readers.

Build system:

Expand Down
31 changes: 31 additions & 0 deletions applications/physics/cosmology/cosmoflow/cosmoflow_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np
from glob import glob
from lbann.util.data import Sample, SampleDims, Dataset, DistConvDataset
import h5py as h5
import os


class CosmoFlowDataset(DistConvDataset):
def __init__(self, data_dir, input_width, num_secrets):
self.data_dir = data_dir
self.input_width = input_width
self.num_secrets = num_secrets
self.samples = glob(os.path.join(data_dir, '*.hdf5'))
self.samples.sort()

def __len__(self):
return len(self.samples)

def __getitem__(self, index) -> Sample:
data = h5.File(self.samples[index], 'r')
slice_width = self.input_width // self.num_io_partitions
slice_ind = self.rank % self.num_io_partitions
full = data['full'][:,
slice_ind*slice_width:(slice_ind+1)*slice_width,
:self.input_width,
:self.input_width].astype(np.float32)
par = data['unitPar'][:].astype(np.float32)
return Sample(sample=np.ascontiguousarray(full), response=par)

def get_sample_dims(self):
return SampleDims(sample=[4, self.input_width, self.input_width, self.input_width], response=self.num_secrets)
23 changes: 22 additions & 1 deletion applications/physics/cosmology/cosmoflow/train_cosmoflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,21 @@
import lbann.contrib.args
import lbann.contrib.launcher
from lbann.core.util import get_parallel_strategy_args
import lbann.util.data
import os
from cosmoflow_dataset import CosmoFlowDataset

def create_python_dataset_reader(args):
"""Create a python dataset reader for CosmoFlow."""

readers = []
for role in ['train', 'val', 'test']:
role_dir = getattr(args, f'{role}_dir')
dataset = CosmoFlowDataset(role_dir, args.input_width, args.num_secrets)
reader = lbann.util.data.construct_python_dataset_reader(dataset, role=role)
readers.append(reader)

return lbann.reader_pb2.DataReader(reader=readers)

def create_cosmoflow_data_reader(
train_path, val_path, test_path, num_responses):
Expand Down Expand Up @@ -133,6 +147,9 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
parser.add_argument(
'--synthetic', action='store_true',
help='Use synthetic data')
parser.add_argument(
'--python-dataset', action='store_true',
help='Use python dataset reader')
parser.add_argument(
'--no-datastore', action='store_true',
help='Disable the data store')
Expand Down Expand Up @@ -220,22 +237,26 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
# optimizer.learn_rate *= 1e-2

# Setup data reader
serialize_io = False
if args.synthetic:
data_reader = create_synthetic_data_reader(
args.input_width, args.num_secrets)
elif args.python_dataset:
data_reader = create_python_dataset_reader(args)
else:
data_reader = create_cosmoflow_data_reader(
args.train_dir,
args.val_dir,
args.test_dir,
num_responses=args.num_secrets)
serialize_io = True

# Setup trainer
random_seed_arg = {'random_seed': args.random_seed} \
if args.random_seed is not None else {}
trainer = lbann.Trainer(
mini_batch_size=args.mini_batch_size,
serialize_io=True,
serialize_io=serialize_io,
**random_seed_arg)

# Runtime parameters/arguments
Expand Down
137 changes: 137 additions & 0 deletions ci_test/unit_tests/test_unit_datareader_python_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import os.path
import sys
import numpy as np
from lbann.util.data import Dataset, Sample, SampleDims, construct_python_dataset_reader

# Bamboo utilities
current_file = os.path.realpath(__file__)
current_dir = os.path.dirname(current_file)
sys.path.insert(0, os.path.join(os.path.dirname(current_dir), 'common_python'))
import tools

# ==============================================
# Objects for Python dataset data reader
# ==============================================
# Note: The Python dataset data reader loads the dataset constructed below.

# Data
class TestDataset(Dataset):
def __init__(self):
np.random.seed(20240109)
self.num_samples = 29
self.sample_size = 7
self.samples = np.random.normal(size=(self.num_samples,self.sample_size)).astype(np.float32)

def __len__(self):
return self.num_samples

def __getitem__(self, index):
return Sample(sample=self.samples[index,:])

def get_sample_dims(self):
return SampleDims(sample=[self.sample_size])

test_dataset = TestDataset()

# ==============================================
# Setup LBANN experiment
# ==============================================

def setup_experiment(lbann, weekly):
"""Construct LBANN experiment.
Args:
lbann (module): Module for LBANN Python frontend
"""
mini_batch_size = len(test_dataset) // 4
trainer = lbann.Trainer(mini_batch_size)
model = construct_model(lbann)
data_reader = construct_data_reader(lbann)
optimizer = lbann.NoOptimizer()
return trainer, model, data_reader, optimizer, None # Don't request any specific number of nodes

def construct_model(lbann):
"""Construct LBANN model.
Args:
lbann (module): Module for LBANN Python frontend
"""

# Layer graph
x = lbann.Input(data_field='samples')
y = lbann.L2Norm2(x)
layers = list(lbann.traverse_layer_graph(x))
metric = lbann.Metric(y, name='obj')
callbacks = []

# Compute expected value with NumPy
vals = []
for i in range(len(test_dataset)):
x = test_dataset[i].sample.astype(np.float64)
y = tools.numpy_l2norm2(x)
vals.append(y)
val = np.mean(vals)
tol = 8 * val * np.finfo(np.float32).eps
callbacks.append(lbann.CallbackCheckMetric(
metric=metric.name,
lower_bound=val-tol,
upper_bound=val+tol,
error_on_failure=True,
execution_modes='test'))

# Construct model
num_epochs = 0
return lbann.Model(num_epochs,
layers=layers,
metrics=[metric],
callbacks=callbacks)

def construct_data_reader(lbann):
"""Construct Protobuf message for Python dataset data reader.
The Python data reader will import the current Python file to
access the sample access functions.
Args:
lbann (module): Module for LBANN Python frontend
"""

dataset_path = os.path.join(work_dir, 'dataset.pkl')

# Note: The training data reader should be removed when
# https://github.com/LLNL/lbann/issues/1098 is resolved.
message = lbann.reader_pb2.DataReader()
message.reader.extend([
construct_python_dataset_reader(
test_dataset,
dataset_path,
'train',
shuffle=False
)
])
message.reader.extend([
construct_python_dataset_reader(
test_dataset,
dataset_path,
'test',
shuffle=False
)
])
return message

# ==============================================
# Setup PyTest
# ==============================================

work_dir = os.path.join(os.path.dirname(__file__),
'experiments',
os.path.basename(__file__).split('.py')[0])
os.makedirs(work_dir, exist_ok=True)

# Create test functions that can interact with PyTest
for _test_func in tools.create_tests(setup_experiment, __file__, work_dir=work_dir):
globals()[_test_func.__name__] = _test_func
Loading

0 comments on commit 1db91a2

Please sign in to comment.