Skip to content

Commit

Permalink
Add detectors api (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adebayo-Oshingbesan authored May 5, 2022
1 parent 455cc23 commit 9eae520
Show file tree
Hide file tree
Showing 25 changed files with 4,173 additions and 749 deletions.
2 changes: 2 additions & 0 deletions aif360/detectors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from aif360.detectors.mdss.MDSS import MDSS
from aif360.detectors.mdss_detector import bias_scan
161 changes: 111 additions & 50 deletions aif360/metrics/mdss/MDSS.py → aif360/detectors/mdss/MDSS.py

Large diffs are not rendered by default.

138 changes: 138 additions & 0 deletions aif360/detectors/mdss/ScoringFunctions/BerkJones.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from aif360.detectors.mdss.ScoringFunctions.ScoringFunction import ScoringFunction
from aif360.detectors.mdss.ScoringFunctions import optim

import numpy as np


class BerkJones(ScoringFunction):
def __init__(self, **kwargs):
"""
Berk-Jones score function is a non parametric expectatation based
scan statistic that also satisfies the ALTSS property; Non-parametric scoring functions
do not make parametric assumptions about the model or outcome [1].
kwargs must contain
'direction (str)' - direction of the severity; could be higher than expected outcomes ('positive') or lower than expected ('negative')
'alpha (float)' - the alpha threshold that will be used to compute the score.
In practice, it may be useful to search over a grid of alpha thresholds and select the one with the maximum score.
[1] Neill, D. B., & Lingwall, J. (2007). A nonparametric scan statistic for multivariate disease surveillance. Advances in
Disease Surveillance, 4(106), 570
"""

super(BerkJones, self).__init__(**kwargs)
self.alpha = self.kwargs.get('alpha')
assert self.alpha is not None, "Warning: calling Berk Jones without alpha"

if self.direction == 'negative':
self.alpha = 1 - self.alpha


def score(self, observed_sum: float, expectations: np.array, penalty: float, q: float):
"""
Computes berk jones score for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty term. Should be positive
:param q: current value of q
:return: berk jones score for the current value of q
"""
alpha = self.alpha

key = tuple([observed_sum, len(expectations), penalty, q, alpha])
ans = self.score_cache.get(key)
if ans is not None:
self.cache_counter['score'] += 1
return ans

if q < alpha:
q = alpha

assert q > 0, (
"Warning: calling compute_score_given_q with "
"observed_sum=%.2f, expectations of length=%d, penalty=%.2f, q=%.2f, alpha=%.3f"
% (observed_sum, len(expectations), penalty, q, alpha)
)
if q == 1:
ans = observed_sum * np.log(q / alpha) - penalty
self.score_cache[key] = ans
return ans

a = observed_sum * np.log(q / alpha)
b = (len(expectations) - observed_sum) * np.log((1 - q) / (1 - alpha))
ans = (
a
+ b
- penalty
)

self.score_cache[key] = ans
return ans

def qmle(self, observed_sum: float, expectations: np.array):
"""
Computes the q which maximizes score (q_mle).
for berk jones this is given to be N_a/N
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param direction: direction not considered
:return: q MLE
"""
alpha = self.alpha

key = tuple([observed_sum, len(expectations), alpha])
ans = self.qmle_cache.get(key)
if ans is not None:
self.cache_counter['qmle'] += 1
return ans

if len(expectations) == 0:
self.qmle_cache[key] = 0
return 0
else:
q = observed_sum / len(expectations)

if (q < alpha):
self.qmle_cache[key] = alpha
return alpha

self.qmle_cache[key] = q
return q

def compute_qs(self, observed_sum: float, expectations: np.array, penalty: float):
"""
Computes roots (qmin and qmax) of the score function for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty coefficient
"""
alpha = self.alpha

key = tuple([observed_sum, len(expectations), penalty, alpha])
ans = self.compute_qs_cache.get(key)
if ans is not None:
self.cache_counter['qs'] += 1
return ans

q_mle = self.qmle(observed_sum, expectations)

if self.score(observed_sum, expectations, penalty, q_mle) > 0:
exist = 1
q_min = optim.bisection_q_min(
self, observed_sum, expectations, penalty, q_mle, temp_min=alpha
)
q_max = optim.bisection_q_max(
self, observed_sum, expectations, penalty, q_mle, temp_max=1
)
else:
# there are no roots
exist = 0
q_min = 0
q_max = 0

ans = [exist, q_mle, q_min, q_max]
self.compute_qs_cache[key] = ans
return ans
121 changes: 121 additions & 0 deletions aif360/detectors/mdss/ScoringFunctions/Bernoulli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from aif360.detectors.mdss.ScoringFunctions.ScoringFunction import ScoringFunction
from aif360.detectors.mdss.ScoringFunctions import optim

import numpy as np


class Bernoulli(ScoringFunction):
def __init__(self, **kwargs):
"""
Bernoulli score function. May be appropriate to use when the outcome of
interest is assumed to be Bernoulli distributed or Binary.
kwargs must contain
'direction (str)' - direction of the severity; could be higher than expected outcomes ('positive') or lower than expected ('negative')
"""

super(Bernoulli, self).__init__(**kwargs)

def score(self, observed_sum: float, expectations: np.array, penalty: float, q: float):
"""
Computes bernoulli bias score for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty term. Should be positive
:param q: current value of q
:return: bias score for the current value of q
"""

assert q > 0, (
"Warning: calling compute_score_given_q with "
"observed_sum=%.2f, expectations of length=%d, penalty=%.2f, q=%.2f"
% (observed_sum, len(expectations), penalty, q)
)

key = tuple([observed_sum, expectations.tostring(), penalty, q])
ans = self.score_cache.get(key)
if ans is not None:
self.cache_counter['score'] += 1
return ans

ans = observed_sum * np.log(q) - np.log(1 - expectations + q * expectations).sum() - penalty
self.score_cache[key] = ans
return ans

def qmle(self, observed_sum: float, expectations: np.array):
"""
Computes the q which maximizes score (q_mle).
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
"""
direction = self.direction

key = tuple([observed_sum, expectations.tostring()])
ans = self.qmle_cache.get(key)
if ans is not None:
self.cache_counter['qmle'] += 1
return ans

ans = optim.bisection_q_mle(self, observed_sum, expectations, direction=direction)
self.qmle_cache[key] = ans
return ans

def compute_qs(self, observed_sum: float, expectations: np.array, penalty: float):
"""
Computes roots (qmin and qmax) of the score function for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty coefficient
"""
direction = self.direction

key = tuple([observed_sum, expectations.tostring(), penalty])
ans = self.compute_qs_cache.get(key)
if ans is not None:
self.cache_counter['qs'] += 1
return ans

q_mle = self.qmle(observed_sum, expectations)

if self.score(observed_sum, expectations, penalty, q_mle) > 0:
exist = 1
q_min = optim.bisection_q_min(self, observed_sum, expectations, penalty, q_mle)
q_max = optim.bisection_q_max(self, observed_sum, expectations, penalty, q_mle)
else:
# there are no roots
exist = 0
q_min = 0
q_max = 0

# only consider the desired direction, positive or negative
if exist:
exist, q_min, q_max = optim.direction_assertions(direction, q_min, q_max)

ans = [exist, q_mle, q_min, q_max]
self.compute_qs_cache[key] = ans
return ans

def q_dscore(self, observed_sum:float, expectations:np.array, q:float):
"""
This actually computes q times the slope, which has the same sign as the slope since q is positive.
score = Y log q - \sum_i log(1-p_i+qp_i)
dscore/dq = Y/q - \sum_i (p_i/(1-p_i+qp_i))
q dscore/dq = Y - \sum_i (qp_i/(1-p_i+qp_i))
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param q: current value of q
:return: q dscore/dq
"""
key = tuple([observed_sum, expectations.tostring(), q])
ans = self.qdscore_cache.get(key)
if ans is not None:
self.cache_counter['qdscore'] += 1
return ans

ans = observed_sum - (q * expectations / (1 - expectations + q * expectations)).sum()
self.qdscore_cache[key] = ans
return ans
122 changes: 122 additions & 0 deletions aif360/detectors/mdss/ScoringFunctions/Gaussian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from turtle import pen
from aif360.detectors.mdss.ScoringFunctions.ScoringFunction import ScoringFunction
from aif360.detectors.mdss.ScoringFunctions import optim

import numpy as np


class Gaussian(ScoringFunction):
def __init__(self, **kwargs):
"""
Gaussian score function. May be appropriate to use when the outcome of
interest is assumed to be normally distributed.
kwargs must contain
'direction (str)' - direction of the severity; could be higher than expected outcomes ('positive') or lower than expected ('negative')
"""

super(Gaussian, self).__init__(**kwargs)

def score(
self, observed_sum: float, expectations: np.array, penalty: float, q: float
):
"""
Computes gaussian bias score for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty term. Should be positive
:param q: current value of q
:return: bias score for the current value of q
"""

key = tuple([observed_sum, expectations.sum(), penalty, q])
ans = self.score_cache.get(key)
if ans is not None:
self.cache_counter["score"] += 1
return ans

assumed_var = self.var
expected_sum = expectations.sum()
penalty /= self.var

C = (
observed_sum * expected_sum / assumed_var * (q - 1)
)

B = (
expected_sum**2 * (1 - q**2) / (2 * assumed_var)
)

if C > B and self.direction == 'positive':
ans = C + B
elif B > C and self.direction == 'negative':
ans = C + B
else:
ans = 0

ans -= penalty
self.score_cache[key] = ans

return ans

def qmle(self, observed_sum: float, expectations: np.array):
"""
Computes the q which maximizes score (q_mle).
"""
key = tuple([observed_sum, expectations.sum()])
ans = self.qmle_cache.get(key)
if ans is not None:
self.cache_counter["qmle"] += 1
return ans

expected_sum = expectations.sum()

# Deals with case where observed_sum = expected_sum = 0
if observed_sum == expected_sum:
ans = 1
else:
ans = observed_sum / expected_sum

assert np.isnan(ans) == False, f'{expected_sum}, {observed_sum}, {ans}'
self.qmle_cache[key] = ans
return ans

def compute_qs(self, observed_sum: float, expectations: np.array, penalty: float):
"""
Computes roots (qmin and qmax) of the score function for given q
:param observed_sum: sum of observed binary outcomes for all i
:param expectations: predicted outcomes for each data element i
:param penalty: penalty coefficient
"""

direction = self.direction

q_mle = self.qmle(observed_sum, expectations)

key = tuple([observed_sum, expectations.sum(), penalty])
ans = self.compute_qs_cache.get(key)
if ans is not None:
self.cache_counter["qs"] += 1
return ans

q_mle_score = self.score(observed_sum, expectations, penalty, q_mle)

if q_mle_score > 0:
exist = 1
q_min = optim.bisection_q_min(self, observed_sum, expectations, penalty, q_mle, temp_min=-1e6)
q_max = optim.bisection_q_max(self, observed_sum, expectations, penalty, q_mle, temp_max=1e6)
else:
# there are no roots
exist = 0
q_min = 0
q_max = 0

# only consider the desired direction, positive or negative
if exist:
exist, q_min, q_max = optim.direction_assertions(direction, q_min, q_max)

ans = [exist, q_mle, q_min, q_max]
self.compute_qs_cache[key] = ans
return ans
Loading

0 comments on commit 9eae520

Please sign in to comment.