Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/instantaneous rate #453

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions elephant/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import numpy as np
import quantities as pq
import scipy.stats
import scipy.signal

import elephant.conversion as conv
import elephant.kernels as kernels
Expand Down Expand Up @@ -647,8 +648,8 @@ def instantaneous_rate(spiketrains, sampling_period, kernel='auto',
trim : bool, optional
Accounts for the asymmetry of a kernel.
If False, the output of the Fast Fourier Transformation being a longer
vector than the input vector by the size of the kernel is reduced back
to the original size of the considered time interval of the
vector than the input vector (ouput = input + kernel - 1) is reduced
back to the original size of the considered time interval of the
`spiketrain` using the median of the kernel. False (no trimming) is
equivalent to 'same' convolution mode for symmetrical kernels.
If True, only the region of the convolved signal is returned, where
Expand Down Expand Up @@ -822,10 +823,17 @@ def optimal_kernel(st):
t_start = t_start.rescale(spiketrains[0].units)
t_stop = t_stop.rescale(spiketrains[0].units)

n_bins = int(((t_stop - t_start) / sampling_period).simplified) + 1
time_vectors = np.zeros((len(spiketrains), n_bins), dtype=np.float64)
n_bins = int(((t_stop - t_start) / sampling_period).simplified)
# if the sampling period is not an integer multiple of (t_stop - t_start)
# add one bin
if n_bins * sampling_period != t_stop:
n_bins += 1

hist_range_end = t_stop + sampling_period.rescale(spiketrains[0].units)
hist_range = (t_start.item(), hist_range_end.item())

# preallocation
time_vectors = np.zeros((len(spiketrains), n_bins), dtype=np.float64)
for i, st in enumerate(spiketrains):
time_vectors[i], _ = np.histogram(st.magnitude, bins=n_bins,
range=hist_range)
Expand All @@ -849,7 +857,9 @@ def optimal_kernel(st):
median = kernel.icdf(0.5).rescale(units).item()
else:
median = 0
t_arr = np.linspace(-cutoff_sigma + median, stop=cutoff_sigma + median,
# shift kernel using the calculated median
t_arr = np.linspace(start=-cutoff_sigma + median,
stop=cutoff_sigma + median,
num=2 * n_half + 1, endpoint=True) * units

if center_kernel:
Expand All @@ -871,23 +881,25 @@ def optimal_kernel(st):
# the convolution of non-negative vectors is non-negative
rate = np.clip(rate, a_min=0, a_max=None, out=rate)

if center_kernel: # account for the kernel asymmetry
# cut off the wings from the result of "full" convolution
if center_kernel:
median_id = kernel.median_index(t_arr)
# the size of kernel() output matches the input size, len(t_arr)
kernel_array_size = len(t_arr)
if not trim:
rate = rate[median_id: -kernel_array_size + median_id]
if -kernel_array_size + median_id + 1 == 0:
rate = rate[median_id::]
else:
rate = rate[median_id: -kernel_array_size + median_id + 1]
else:
rate = rate[2 * median_id: -2 * (kernel_array_size - median_id)]
if -2 * (kernel_array_size - median_id - 1) == 0:
rate = rate[2 * median_id::]
else:
rate = rate[2 * median_id:
-2 * (kernel_array_size - median_id - 1)]

t_start = t_start + median_id * units
t_stop = t_stop - (kernel_array_size - median_id) * units
else:
# FIXME: don't shrink the output array
# (to be consistent with center_kernel=True)
# n points have n-1 intervals;
# instantaneous rate is a list of intervals;
# hence, the last element is excluded
rate = rate[:-1]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed since ouput size is now corrected


kernel_annotation = dict(type=type(kernel).__name__,
sigma=str(kernel.sigma),
Expand Down
2 changes: 1 addition & 1 deletion elephant/test/test_spike_train_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ def test_recovered_firing_rate_profile(self):
rate_recovered = rate_recovered.flatten().magnitude
trim = (rate_profile.shape[0] - rate_recovered.shape[0]) // 2
rate_profile_valid = rate_profile.magnitude.squeeze()
rate_profile_valid = rate_profile_valid[trim: -trim - 1]
rate_profile_valid = rate_profile_valid[trim: -trim]
assert_allclose(rate_recovered, rate_profile_valid,
rtol=0, atol=rtol * rate.item())

Expand Down
43 changes: 23 additions & 20 deletions elephant/test/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import elephant.kernels as kernels
from elephant import statistics
from elephant.spike_train_generation import homogeneous_poisson_process
from elephant.spike_train_generation import StationaryPoissonProcess as StatPP
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed deprecation



class isi_TestCase(unittest.TestCase):
class IsiTestCase(unittest.TestCase):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CamelCase convention for classes

def setUp(self):
self.test_array_2d = np.array([[0.3, 0.56, 0.87, 1.23],
[0.02, 0.71, 1.82, 8.46],
Expand Down Expand Up @@ -83,10 +83,10 @@ def test_unsorted_array(self):
np.random.seed(0)
array = np.random.rand(100)
with self.assertWarns(UserWarning):
isi = statistics.isi(array)
statistics.isi(array)


class isi_cv_TestCase(unittest.TestCase):
class IsiCvTestCase(unittest.TestCase):
def setUp(self):
self.test_array_regular = np.arange(1, 6)

Expand All @@ -103,7 +103,7 @@ def test_cv_isi_regular_array_is_zero(self):
self.assertEqual(res, targ)


class mean_firing_rate_TestCase(unittest.TestCase):
class MeanFiringRateTestCase(unittest.TestCase):
def setUp(self):
self.test_array_3d = np.ones([5, 7, 13])
self.test_array_2d = np.array([[0.3, 0.56, 0.87, 1.23],
Expand Down Expand Up @@ -139,7 +139,7 @@ def test_mean_firing_rate_with_spiketrain(self):

def test_mean_firing_rate_typical_use_case(self):
np.random.seed(92)
st = homogeneous_poisson_process(rate=100 * pq.Hz, t_stop=100 * pq.s)
st = StatPP(rate=100 * pq.Hz, t_stop=100 * pq.s).generate_spiketrain()
rate1 = statistics.mean_firing_rate(st)
rate2 = statistics.mean_firing_rate(st, t_start=st.t_start,
t_stop=st.t_stop)
Expand Down Expand Up @@ -517,7 +517,7 @@ def test_instantaneous_rate_and_warnings(self):
self.assertEqual(
inst_rate.sampling_period.simplified, sampling_period.simplified)
self.assertEqual(inst_rate.simplified.units, pq.Hz)
self.assertEqual(inst_rate.t_stop.simplified, st.t_stop.simplified)
self.assertEqual(st.t_stop.simplified, inst_rate.t_stop.simplified)
self.assertEqual(inst_rate.t_start.simplified, st.t_start.simplified)

def test_error_instantaneous_rate(self):
Expand Down Expand Up @@ -616,9 +616,9 @@ def test_not_center_kernel(self):
def test_regression_288(self):
np.random.seed(9)
sampling_period = 200 * pq.ms
spiketrain = homogeneous_poisson_process(10 * pq.Hz,
t_start=0 * pq.s,
t_stop=10 * pq.s)
spiketrain = StatPP(10 * pq.Hz,
t_start=0 * pq.s,
t_stop=10 * pq.s).generate_spiketrain()
kernel = kernels.AlphaKernel(sigma=5 * pq.ms, invert=True)
# check that instantaneous_rate "works" for kernels with small sigma
# without triggering an incomprehensible error
Expand All @@ -636,9 +636,9 @@ def test_small_kernel_sigma(self):
sampling_period = 200 * pq.ms
sigma = 5 * pq.ms
rate_expected = 10 * pq.Hz
spiketrain = homogeneous_poisson_process(rate_expected,
t_start=0 * pq.s,
t_stop=10 * pq.s)
spiketrain = StatPP(rate_expected,
t_start=0 * pq.s,
t_stop=10 * pq.s).generate_spiketrain()
kernel_types = tuple(
kern_cls for kern_cls in kernels.__dict__.values()
if isinstance(kern_cls, type) and
Expand All @@ -661,6 +661,7 @@ def test_spikes_on_edges(self):
# spiketrain (see test_rate_estimation_consistency)
cutoff = 5
sampling_period = 0.01 * pq.s
# with t_spikes = [-5, 5]s the isi is 10s, so 1/isi 0.1 Hz
t_spikes = np.array([-cutoff, cutoff]) * pq.s
spiketrain = neo.SpikeTrain(t_spikes, t_start=t_spikes[0],
t_stop=t_spikes[-1])
Expand All @@ -680,7 +681,7 @@ def test_spikes_on_edges(self):
kernel=kernel,
cutoff=cutoff, trim=True,
center_kernel=center_kernel)
assert_array_almost_equal(rate.magnitude, 0, decimal=3)
assert_array_almost_equal(rate.magnitude, 0, decimal=2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowering the precision is appropriate in this case ?, see comment in line 664


def test_trim_as_convolve_mode(self):
cutoff = 5
Expand All @@ -701,7 +702,8 @@ def test_trim_as_convolve_mode(self):
for trim in (False, True):
rate_centered = statistics.instantaneous_rate(
spiketrain, sampling_period=sampling_period,
kernel=kernel, cutoff=cutoff, trim=trim)
kernel=kernel, cutoff=cutoff, trim=trim,
center_kernel=True)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for better readability of the unittest, furthermore the default center_kernel=True might be changed in the future


rate_convolve = statistics.instantaneous_rate(
spiketrain, sampling_period=sampling_period,
Expand Down Expand Up @@ -750,7 +752,7 @@ def test_instantaneous_rate_regression_245(self):
# This test makes sure that the correct kernel width is chosen when
# selecting 'auto' as kernel
spiketrain = neo.SpikeTrain(
range(1, 30) * pq.ms, t_start=0 * pq.ms, t_stop=30 * pq.ms)
pq.ms * range(1, 30), t_start=0 * pq.ms, t_stop=30 * pq.ms)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quantities works with range(), not the other way, but this could be changed back for better readability


# This is the correct procedure to attain the kernel: first, the result
# of sskernel retrieves the kernel bandwidth of an optimal Gaussian
Expand All @@ -777,8 +779,8 @@ def test_instantaneous_rate_regression_245(self):
def test_instantaneous_rate_grows_with_sampling_period(self):
np.random.seed(0)
rate_expected = 10 * pq.Hz
spiketrain = homogeneous_poisson_process(rate=rate_expected,
t_stop=10 * pq.s)
spiketrain = StatPP(rate=rate_expected,
t_stop=10 * pq.s).generate_spiketrain()
kernel = kernels.GaussianKernel(sigma=100 * pq.ms)
rates_mean = []
for sampling_period in np.linspace(1, 1000, num=10) * pq.ms:
Expand Down Expand Up @@ -909,8 +911,9 @@ def test_time_histogram_output(self):

def test_annotations(self):
np.random.seed(1)
spiketrains = [homogeneous_poisson_process(
rate=10 * pq.Hz, t_stop=10 * pq.s) for _ in range(10)]
spiketrains = [StatPP(rate=10 * pq.Hz,
t_stop=10 * pq.s).generate_spiketrain()
for _ in range(10)]
for output in ("counts", "mean", "rate"):
histogram = statistics.time_histogram(spiketrains,
bin_size=3 * pq.ms,
Expand Down