Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion redisbloom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ def __init__(self, args):
self.depth = response['depth']
self.decay = response['decay']

class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response['Compression']
self.capacity = response['Capacity']
self.mergedNodes = response['Merged nodes']
self.unmergedNodes = response['Unmerged nodes']
self.mergedWeight = response['Merged weight']
self.unmergedWeight = response['Unmerged weight']
self.totalCompressions = response['Total compressions']

def spaceHolder(response):
return response

Expand All @@ -87,7 +106,8 @@ class Client(Redis): #changed from StrictRedis
- BF for Bloom Filter
- CF for Cuckoo Filter
- CMS for Count-Min Sketch
- TopK for TopK Data Structure
- TOPK for TopK Data Structure
- TDIGEST for estimate rank statistics
"""

BF_RESERVE = 'BF.RESERVE'
Expand Down Expand Up @@ -126,6 +146,16 @@ class Client(Redis): #changed from StrictRedis
TOPK_LIST = 'TOPK.LIST'
TOPK_INFO = 'TOPK.INFO'

TDIGEST_CREATE = 'TDIGEST.CREATE'
TDIGEST_RESET = 'TDIGEST.RESET'
TDIGEST_ADD = 'TDIGEST.ADD'
TDIGEST_MERGE = 'TDIGEST.MERGE'
TDIGEST_CDF = 'TDIGEST.CDF'
TDIGEST_QUANTILE = 'TDIGEST.QUANTILE'
TDIGEST_MIN = 'TDIGEST.MIN'
TDIGEST_MAX = 'TDIGEST.MAX'
TDIGEST_INFO = 'TDIGEST.INFO'

def __init__(self, *args, **kwargs):
"""
Creates a new RedisBloom client.
Expand Down Expand Up @@ -170,6 +200,16 @@ def __init__(self, *args, **kwargs):
#self.TOPK_COUNT : spaceHolder,
self.TOPK_LIST : parseToList,
self.TOPK_INFO : TopKInfo,

self.TDIGEST_CREATE : bool_ok,
# self.TDIGEST_RESET : bool_ok,
# self.TDIGEST_ADD : spaceHolder,
# self.TDIGEST_MERGE : spaceHolder,
# self.TDIGEST_CDF : spaceHolder,
# self.TDIGEST_QUANTILE : spaceHolder,
# self.TDIGEST_MIN : spaceHolder,
# self.TDIGEST_MAX : spaceHolder,
self.TDIGEST_INFO : TDigestInfo,
}
for k, v in six.iteritems(MODULE_CALLBACKS):
self.set_response_callback(k, v)
Expand Down Expand Up @@ -216,6 +256,12 @@ def appendItemsAndIncrements(params, items, increments):
params.append(items[i])
params.append(increments[i])

@staticmethod
def appendValuesAndWeights(params, items, weights):
for i in range(len(items)):
params.append(items[i])
params.append(weights[i])

@staticmethod
def appendMaxIterations(params, max_iterations):
if max_iterations is not None:
Expand Down Expand Up @@ -550,6 +596,83 @@ def topkInfo(self, key):

return self.execute_command(self.TOPK_INFO, key)

################## T-Digest Functions ######################

def tdigestCreate(self, key, compression):
""""
Allocate the memory and initialize the t-digest.
"""
params = [key, compression]

return self.execute_command(self.TDIGEST_CREATE, *params)

def tdigestReset(self, key):
"""
Reset the sketch ``key`` to zero - empty out the sketch and re-initialize it.
"""

return self.execute_command(self.TDIGEST_RESET, key)

def tdigestAdd(self, key, values, weights):
"""
Adds one or more samples (value with weight) to a sketch ``key``.
Both ``values`` and ``weights`` are lists.
Example - tdigestAdd('A', [1500.0], [1.0])
"""
params = [key]
self.appendValuesAndWeights(params, values, weights)

return self.execute_command(self.TDIGEST_ADD, *params)

def tdigestMerge(self, toKey, fromKey):
"""
Merges all of the values from 'fromKey' to 'toKey' sketch.
"""
params = [toKey, fromKey]

return self.execute_command(self.TDIGEST_MERGE, *params)

def tdigestMin(self, key):
"""
Returns minimum value from the sketch ``key``.
Will return DBL_MAX if the sketch is empty.
"""

return self.execute_command(self.TDIGEST_MIN, key)

def tdigestMax(self, key):
"""
Returns maximum value from the sketch ``key``.
Will return DBL_MIN if the sketch is empty.
"""

return self.execute_command(self.TDIGEST_MAX, key)

def tdigestQuantile(self, key, quantile):
"""
Returns double value estimate of the cutoff such that a specified fraction of the data added
to this TDigest would be less than or equal to the cutoff.
"""
params = [key, quantile]

return self.execute_command(self.TDIGEST_QUANTILE, *params)

def tdigestCdf(self, key, value):
"""
Returns double fraction of all points added which are <= value.
"""
params = [key, value]

return self.execute_command(self.TDIGEST_CDF, *params)

def tdigestInfo(self, key):
"""
Returns Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight
and Total Compressions.
"""

return self.execute_command(self.TDIGEST_INFO, key)

def pipeline(self, transaction=True, shard_hint=None):
"""
Return a new pipeline object that can queue multiple commands for
Expand Down
59 changes: 59 additions & 0 deletions test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def testCreate(self):
self.assertTrue(rb.cmsInitByDim('cmsDim', 100, 5))
self.assertTrue(rb.cmsInitByProb('cmsProb', 0.01, 0.01))
self.assertTrue(rb.topkReserve('topk', 5, 100, 5, 0.9))
self.assertTrue(rb.tdigestCreate('tDigest', 100))

################### Test Bloom Filter ###################
def testBFAdd(self):
Expand Down Expand Up @@ -204,6 +205,64 @@ def testTopK(self):
self.assertEqual(3, info.depth)
self.assertAlmostEqual(0.9, float(info.decay))

################### Test T-Digest ###################
def testTDigestReset(self):
self.assertTrue(rb.tdigestCreate('tDigest', 10))
# reset on empty histogram
self.assertTrue(rb.tdigestReset('tDigest'))
# insert data-points into sketch
self.assertTrue(rb.tdigestAdd('tDigest', list(range(10)), [1.0] * 10))

self.assertTrue(rb.tdigestReset('tDigest'))
# assert we have 0 unmerged nodes
self.assertEqual(0, rb.tdigestInfo('tDigest').unmergedNodes)

def testTDigestMerge(self):
self.assertTrue(rb.tdigestCreate('to-tDigest', 10))
self.assertTrue(rb.tdigestCreate('from-tDigest', 10))
# insert data-points into sketch
self.assertTrue(rb.tdigestAdd('from-tDigest', [1.0] * 10, [1.0] * 10))
self.assertTrue(rb.tdigestAdd('to-tDigest', [2.0] * 10, [10.0] * 10))
# merge from-tdigest into to-tdigest
self.assertTrue(rb.tdigestMerge('to-tDigest', 'from-tDigest'))
# we should now have 110 weight on to-histogram
info = rb.tdigestInfo('to-tDigest')
total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight)
self.assertEqual(110, total_weight_to)

def testTDigestMinMax(self):
self.assertTrue(rb.tdigestCreate('tDigest', 100))
# insert data-points into sketch
self.assertTrue(rb.tdigestAdd('tDigest', [1, 2, 3], [1.0] * 3))
# min/max
self.assertEqual(3, float(rb.tdigestMax('tDigest')))
self.assertEqual(1, float(rb.tdigestMin('tDigest')))

def testTDigestQuantile(self):
self.assertTrue(rb.tdigestCreate('tDigest', 500))
# insert data-points into sketch
self.assertTrue(rb.tdigestAdd('tDigest', list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000))
# assert min min/max have same result as quantile 0 and 1
self.assertEqual(
float(rb.tdigestMax('tDigest')),
float(rb.tdigestQuantile('tDigest', 1.0)),
)
self.assertEqual(
float(rb.tdigestMin('tDigest')),
float(rb.tdigestQuantile('tDigest', 0.0)),
)

self.assertAlmostEqual(1.0, float(rb.tdigestQuantile('tDigest', 0.01)), 2)
self.assertAlmostEqual(99.0, float(rb.tdigestQuantile('tDigest', 0.99)), 2)

def testTDigestCdf(self):
self.assertTrue(rb.tdigestCreate('tDigest', 100))
# insert data-points into sketch
self.assertTrue(rb.tdigestAdd('tDigest', list(range(1, 10)), [1.0] * 10))

self.assertAlmostEqual(0.1, float(rb.tdigestCdf('tDigest', 1.0)), 1)
self.assertAlmostEqual(0.9, float(rb.tdigestCdf('tDigest', 9.0)), 1)

def test_pipeline(self):
pipeline = rb.pipeline()

Expand Down