Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pilotnet integration tests #79

Merged
merged 4 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
117 changes: 117 additions & 0 deletions tests/lava/lib/dl/netx/integration/dataset_sdnn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Copyright (C) 2022 Intel Corporation
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
# SPDX-License-Identifier: BSD-3-Clause

import os
from typing import Tuple, Union
import numpy as np
from PIL import Image
import glob


class PilotNetDataset():
"""Generic PilotNet dataset class. Returns image and ground truth value
when the object is indexed.

Parameters
----------
path : str, optional
Path of the dataset folder. If the folder does not exists, the folder
is created and the dataset is downloaded and extracted to the folder.
Defaults to '../data'.
size : list, optional
Size of the image. If it is not `200x66`, it is resized to the given
value. Defaults to [200, 66].
transform : lambda, optional
Transformation function to be applied to the input image.
Defaults to None.
train : bool, optional
Flag to indicate training or testing set. Defaults to True.
visualize : bool, optional
If true, the train/test split is ignored and the temporal sequence
of the data is preserved. Defaults to False.
sample_offset : int, optional
sample offset. Default is 0.

Usage
-----

>>> dataset = PilotNetDataset()
>>> image, gt = dataeset[0]
>>> num_samples = len(dataset)
"""
def __init__(
self,
path: str = '../data',
size: list = [200, 66],
transform: Union[bool, None] = None,
train: Union[bool, None] = True,
visualize: Union[bool, None] = False,
sample_offset: int = 0,
) -> None:
self.path = os.path.join(path, 'driving_dataset')

# check if dataset is available in path. If not download it
if len(glob.glob(self.path)) == 0: # dataset does not exist
os.makedirs(path, exist_ok=True)

print('Dataset not available locally. Starting download ...')
id = '1Ue4XohCOV5YXy57S_5tDfCVqzLr101M7'
download_cmd = 'wget --load-cookies /tmp/cookies.txt '\
+ '"https://docs.google.com/uc?export=download&confirm='\
+ '$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate '\
+ f"'https://docs.google.com/uc?export=download&id={id}' -O- | "\
+ f"sed -rn \'s/.*confirm=([0-9A-Za-z_]+).*/\\1\\n/p\')&id={id}"\
+ f'" -O {path}/driving_dataset.zip && rm -rf /tmp/cookies.txt'
print(download_cmd)
os.system(download_cmd + f' >> {path}/download.log')
print('Download complete.')
print('Extracting data (this may take a while) ...')
os.system(
f'unzip {path}/driving_dataset.zip -d {path} >> '
f'{path}/unzip.log'
)
print('Extraction complete.')

with open(os.path.join(self.path, 'data.txt'), 'r') as data:
all_samples = [line.split() for line in data]

# this is what seems to be done in https://github.com/lhzlhz/PilotNet
if visualize is True:
inds = np.arange(len(all_samples))
self.samples = [all_samples[i] for i in inds]
else:
inds = np.random.RandomState(seed=42).permutation(len(all_samples))
if train is True:
self.samples = [
all_samples[i] for i in inds[:int(len(all_samples) * 0.8)]
]
else:
self.samples = [
all_samples[i] for i in inds[-int(len(all_samples) * 0.2):]
]

self.size = size
self.transform = transform
self.sample_offset = sample_offset

def __getitem__(self, index: int) -> Tuple[np.ndarray, float]:
index = (index + self.sample_offset) % len(self.samples)
image = Image.open(
os.path.join(self.path, self.samples[index][0])
).resize(self.size, resample=Image.BILINEAR)
image = np.array(image) / 255
if self.transform is not None:
image = 2 * self.transform['weight'] * image \
- self.transform['weight'] + self.transform['bias']
image = image.astype(np.int32).transpose([1, 0, 2])
ground_truth = float(self.samples[index][1])
if ground_truth == 0:
ground_truth = (
float(self.samples[index - 1][1])
+ float(self.samples[index + 1][1])
) / 2
gt_val = ground_truth * np.pi / 180
return image.reshape(image.shape + (1,)), gt_val

def __len__(self) -> int:
return len(self.samples)
118 changes: 118 additions & 0 deletions tests/lava/lib/dl/netx/integration/dataset_snn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright (C) 2022 Intel Corporation
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
# SPDX-License-Identifier: BSD-3-Clause

import os
from typing import Tuple, Union
import numpy as np
from PIL import Image
import glob


class PilotNetDataset():
"""Generic PilotNet dataset class. Returns image and ground truth value
when the object is indexed.

Parameters
----------
path : str, optional
Path of the dataset folder. If the folder does not exists, the folder
is created and the dataset is downloaded and extracted to the folder.
Defaults to '../data'.
size : list, optional
Size of the image. If it is not `200x66`, it is resized to the given
value. Defaults to [200, 66].
transform : lambda, optional
Transformation function to be applied to the input image.
Defaults to None.
train : bool, optional
Flag to indicate training or testing set. Defaults to True.
visualize : bool, optional
If true, the train/test split is ignored and the temporal sequence
of the data is preserved. Defaults to False.
sample_offset : int, optional
sample offset. Default is 0.

Usage
-----

>>> dataset = PilotNetDataset()
>>> image, gt = dataeset[0]
>>> num_samples = len(dataset)
"""
def __init__(
self,
path: str = '../data',
size: list = [200, 66],
transform: Union[bool, None] = None,
train: Union[bool, None] = True,
visualize: Union[bool, None] = False,
sample_offset: int = 0,
) -> None:
self.path = os.path.join(path, 'driving_dataset')

# check if dataset is available in path. If not download it
if len(glob.glob(self.path)) == 0: # dataset does not exist
os.makedirs(path, exist_ok=True)

print('Dataset not available locally. Starting download ...')
id = '1Ue4XohCOV5YXy57S_5tDfCVqzLr101M7'
download_cmd = 'wget --load-cookies /tmp/cookies.txt '\
+ '"https://docs.google.com/uc?export=download&confirm='\
+ '$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate '\
+ f"'https://docs.google.com/uc?export=download&id={id}' -O- | "\
+ f"sed -rn \'s/.*confirm=([0-9A-Za-z_]+).*/\\1\\n/p\')&id={id}"\
+ f'" -O {path}/driving_dataset.zip && rm -rf /tmp/cookies.txt'
print(download_cmd)
os.system(download_cmd + f' >> {path}/download.log')
print('Download complete.')
print('Extracting data (this may take a while) ...')
os.system(
f'unzip {path}/driving_dataset.zip -d {path} >> '
f'{path}/unzip.log'
)
print('Extraction complete.')

with open(os.path.join(self.path, 'data.txt'), 'r') as data:
all_samples = [line.split() for line in data]

# this is what seems to be done in https://github.com/lhzlhz/PilotNet
if visualize is True:
inds = np.arange(len(all_samples))
self.samples = [all_samples[i] for i in inds]
else:
inds = np.random.RandomState(seed=42).permutation(len(all_samples))
if train is True:
self.samples = [
all_samples[i] for i in inds[:int(len(all_samples) * 0.8)]
]
else:
self.samples = [
all_samples[i] for i in inds[-int(len(all_samples) * 0.2):]
]

self.size = size
self.transform = transform
self.sample_offset = sample_offset

def __getitem__(self, index: int) -> Tuple[np.ndarray, float]:
index = (index + self.sample_offset) % len(self.samples)
image = Image.open(
os.path.join(self.path, self.samples[index][0])
).resize(self.size, resample=Image.BILINEAR)
image = np.array(image) / 255
if self.transform is not None:
image = 2 * self.transform['weight'] * image \
- self.transform['weight'] + self.transform['bias']
image = image.astype(np.int32).transpose([1, 0, 2])
ground_truth = float(self.samples[index][1])
if ground_truth == 0:
ground_truth = (
float(self.samples[index - 1][1])
+ float(self.samples[index + 1][1])
) / 2
gt_val = ground_truth * np.pi / 180
print(f'\rSample: {index}', end='')
return image, gt_val

def __len__(self) -> int:
return len(self.samples)
107 changes: 107 additions & 0 deletions tests/lava/lib/dl/netx/integration/test_integration_pilotnet_sdnn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import subprocess
import os
import unittest
import typing as ty
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

from lava.magma.core.run_configs import Loihi2HwCfg
from lava.magma.core.run_conditions import RunSteps
from lava.proc import io
from lava.magma.core.model.model import AbstractProcessModel

from lava.lib.dl import netx
from .dataset_sdnn import PilotNetDataset


class CustomRunConfig(Loihi2HwCfg):

def select(self,
proc,
proc_models: ty.List[ty.Type[AbstractProcessModel]]):
# customize run config to always use float model for io.sink.RingBuffer
if isinstance(proc, io.sink.RingBuffer):
return io.sink.PyReceiveModelFloat
else:
if isinstance(proc_models, list):
return super().select(proc, proc_models)
else:
raise AssertionError("Process models, not a list")


class TestPilotNetSdnn(unittest.TestCase):
run_it_tests: int = int(os.environ.get("RUN_IT_TESTS",
0))

@unittest.skipUnless(run_it_tests == 1,
"")
def test_pilotnet_sdnn(self):
repo_dir = subprocess.Popen(
['git', 'rev-parse', '--show-toplevel'],
stdout=subprocess.PIPE).communicate()[0].rstrip().decode('utf-8')
pilotnet_sdnn_path = repo_dir + \
"/tutorials" \
"/lava/lib/dl/netx/pilotnet_sdnn"
dataset_path: str = os.environ.get("PILOTNET_DATASET_PATH",
"../data")
net = netx.hdf5.Network(net_config=(pilotnet_sdnn_path
+ "/network.net"))

print(net)

print(f'There are {len(net)} layers in network:')

for l in net.layers:
print(f'{l.__class__.__name__:5s} \
: {l.name:10s}, shape : {l.shape}')

num_samples = 200
num_steps = num_samples + len(net.layers)

full_set = PilotNetDataset(
path=dataset_path,
size=[100, 33],
transform=net.in_layer.transform, # input transform
visualize=True, # visualize ensures images are returned in sequence
sample_offset=10550,
)
# train_set = PilotNetDataset(
# path=dataset_path,
# size=[100, 33],
# transform=net.in_layer.transform, # input transform
# train=True,
# )
# test_set = PilotNetDataset(
# path=dataset_path,
# size=[100, 33],
# transform=net.in_layer.transform, # input transform
# train=False,
# )

dataloader = io.dataloader.SpikeDataloader(dataset=full_set)

gt_logger = io.sink.RingBuffer(shape=(1,), buffer=num_steps)
output_logger = io.sink.RingBuffer(shape=net.out_layer.shape,
buffer=num_steps)
dataloader.ground_truth.connect(gt_logger.a_in)
dataloader.s_out.connect(net.in_layer.neuron.a_in)
net.out_layer.out.connect(output_logger.a_in)

run_config = CustomRunConfig(select_tag='fixed_pt')
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
net.run(condition=RunSteps(num_steps=num_steps), run_cfg=run_config)
output = output_logger.data.get().flatten()
gts = gt_logger.data.get().flatten()
net.stop()

plt.figure(figsize=(15, 10))
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
plt.plot(np.array(gts[1:]), label='Ground Truth')
plt.plot(np.array(output[len(net.layers):]).flatten()/(1 << 18),
label='Lava output')
plt.xlabel('Sample frames (+10550)')
plt.ylabel('Steering angle (radians)')
plt.legend()
Loading