diff --git a/redisbloom/client.py b/redisbloom/client.py index 6fb72c0..3adf14f 100644 --- a/redisbloom/client.py +++ b/redisbloom/client.py @@ -65,6 +65,25 @@ def __init__(self, args): self.depth = response['depth'] self.decay = response['decay'] +class TDigestInfo(object): + compression = None + capacity = None + mergedNodes = None + unmergedNodes = None + mergedWeight = None + unmergedWeight = None + totalCompressions = None + + def __init__(self, args): + response = dict(zip(map(nativestr, args[::2]), args[1::2])) + self.compression = response['Compression'] + self.capacity = response['Capacity'] + self.mergedNodes = response['Merged nodes'] + self.unmergedNodes = response['Unmerged nodes'] + self.mergedWeight = response['Merged weight'] + self.unmergedWeight = response['Unmerged weight'] + self.totalCompressions = response['Total compressions'] + def spaceHolder(response): return response @@ -87,7 +106,8 @@ class Client(Redis): #changed from StrictRedis - BF for Bloom Filter - CF for Cuckoo Filter - CMS for Count-Min Sketch - - TopK for TopK Data Structure + - TOPK for TopK Data Structure + - TDIGEST for estimate rank statistics """ BF_RESERVE = 'BF.RESERVE' @@ -126,6 +146,16 @@ class Client(Redis): #changed from StrictRedis TOPK_LIST = 'TOPK.LIST' TOPK_INFO = 'TOPK.INFO' + TDIGEST_CREATE = 'TDIGEST.CREATE' + TDIGEST_RESET = 'TDIGEST.RESET' + TDIGEST_ADD = 'TDIGEST.ADD' + TDIGEST_MERGE = 'TDIGEST.MERGE' + TDIGEST_CDF = 'TDIGEST.CDF' + TDIGEST_QUANTILE = 'TDIGEST.QUANTILE' + TDIGEST_MIN = 'TDIGEST.MIN' + TDIGEST_MAX = 'TDIGEST.MAX' + TDIGEST_INFO = 'TDIGEST.INFO' + def __init__(self, *args, **kwargs): """ Creates a new RedisBloom client. @@ -170,6 +200,16 @@ def __init__(self, *args, **kwargs): #self.TOPK_COUNT : spaceHolder, self.TOPK_LIST : parseToList, self.TOPK_INFO : TopKInfo, + + self.TDIGEST_CREATE : bool_ok, + # self.TDIGEST_RESET : bool_ok, + # self.TDIGEST_ADD : spaceHolder, + # self.TDIGEST_MERGE : spaceHolder, + # self.TDIGEST_CDF : spaceHolder, + # self.TDIGEST_QUANTILE : spaceHolder, + # self.TDIGEST_MIN : spaceHolder, + # self.TDIGEST_MAX : spaceHolder, + self.TDIGEST_INFO : TDigestInfo, } for k, v in six.iteritems(MODULE_CALLBACKS): self.set_response_callback(k, v) @@ -216,6 +256,12 @@ def appendItemsAndIncrements(params, items, increments): params.append(items[i]) params.append(increments[i]) + @staticmethod + def appendValuesAndWeights(params, items, weights): + for i in range(len(items)): + params.append(items[i]) + params.append(weights[i]) + @staticmethod def appendMaxIterations(params, max_iterations): if max_iterations is not None: @@ -550,6 +596,83 @@ def topkInfo(self, key): return self.execute_command(self.TOPK_INFO, key) +################## T-Digest Functions ###################### + + def tdigestCreate(self, key, compression): + """" + Allocate the memory and initialize the t-digest. + """ + params = [key, compression] + + return self.execute_command(self.TDIGEST_CREATE, *params) + + def tdigestReset(self, key): + """ + Reset the sketch ``key`` to zero - empty out the sketch and re-initialize it. + """ + + return self.execute_command(self.TDIGEST_RESET, key) + + def tdigestAdd(self, key, values, weights): + """ + Adds one or more samples (value with weight) to a sketch ``key``. + Both ``values`` and ``weights`` are lists. + Example - tdigestAdd('A', [1500.0], [1.0]) + """ + params = [key] + self.appendValuesAndWeights(params, values, weights) + + return self.execute_command(self.TDIGEST_ADD, *params) + + def tdigestMerge(self, toKey, fromKey): + """ + Merges all of the values from 'fromKey' to 'toKey' sketch. + """ + params = [toKey, fromKey] + + return self.execute_command(self.TDIGEST_MERGE, *params) + + def tdigestMin(self, key): + """ + Returns minimum value from the sketch ``key``. + Will return DBL_MAX if the sketch is empty. + """ + + return self.execute_command(self.TDIGEST_MIN, key) + + def tdigestMax(self, key): + """ + Returns maximum value from the sketch ``key``. + Will return DBL_MIN if the sketch is empty. + """ + + return self.execute_command(self.TDIGEST_MAX, key) + + def tdigestQuantile(self, key, quantile): + """ + Returns double value estimate of the cutoff such that a specified fraction of the data added + to this TDigest would be less than or equal to the cutoff. + """ + params = [key, quantile] + + return self.execute_command(self.TDIGEST_QUANTILE, *params) + + def tdigestCdf(self, key, value): + """ + Returns double fraction of all points added which are <= value. + """ + params = [key, value] + + return self.execute_command(self.TDIGEST_CDF, *params) + + def tdigestInfo(self, key): + """ + Returns Compression, Capacity, Merged Nodes, Unmerged Nodes, Merged Weight, Unmerged Weight + and Total Compressions. + """ + + return self.execute_command(self.TDIGEST_INFO, key) + def pipeline(self, transaction=True, shard_hint=None): """ Return a new pipeline object that can queue multiple commands for diff --git a/test_commands.py b/test_commands.py index 8f7f7f6..ff989b9 100644 --- a/test_commands.py +++ b/test_commands.py @@ -32,6 +32,7 @@ def testCreate(self): self.assertTrue(rb.cmsInitByDim('cmsDim', 100, 5)) self.assertTrue(rb.cmsInitByProb('cmsProb', 0.01, 0.01)) self.assertTrue(rb.topkReserve('topk', 5, 100, 5, 0.9)) + self.assertTrue(rb.tdigestCreate('tDigest', 100)) ################### Test Bloom Filter ################### def testBFAdd(self): @@ -204,6 +205,64 @@ def testTopK(self): self.assertEqual(3, info.depth) self.assertAlmostEqual(0.9, float(info.decay)) + ################### Test T-Digest ################### + def testTDigestReset(self): + self.assertTrue(rb.tdigestCreate('tDigest', 10)) + # reset on empty histogram + self.assertTrue(rb.tdigestReset('tDigest')) + # insert data-points into sketch + self.assertTrue(rb.tdigestAdd('tDigest', list(range(10)), [1.0] * 10)) + + self.assertTrue(rb.tdigestReset('tDigest')) + # assert we have 0 unmerged nodes + self.assertEqual(0, rb.tdigestInfo('tDigest').unmergedNodes) + + def testTDigestMerge(self): + self.assertTrue(rb.tdigestCreate('to-tDigest', 10)) + self.assertTrue(rb.tdigestCreate('from-tDigest', 10)) + # insert data-points into sketch + self.assertTrue(rb.tdigestAdd('from-tDigest', [1.0] * 10, [1.0] * 10)) + self.assertTrue(rb.tdigestAdd('to-tDigest', [2.0] * 10, [10.0] * 10)) + # merge from-tdigest into to-tdigest + self.assertTrue(rb.tdigestMerge('to-tDigest', 'from-tDigest')) + # we should now have 110 weight on to-histogram + info = rb.tdigestInfo('to-tDigest') + total_weight_to = float(info.mergedWeight) + float(info.unmergedWeight) + self.assertEqual(110, total_weight_to) + + def testTDigestMinMax(self): + self.assertTrue(rb.tdigestCreate('tDigest', 100)) + # insert data-points into sketch + self.assertTrue(rb.tdigestAdd('tDigest', [1, 2, 3], [1.0] * 3)) + # min/max + self.assertEqual(3, float(rb.tdigestMax('tDigest'))) + self.assertEqual(1, float(rb.tdigestMin('tDigest'))) + + def testTDigestQuantile(self): + self.assertTrue(rb.tdigestCreate('tDigest', 500)) + # insert data-points into sketch + self.assertTrue(rb.tdigestAdd('tDigest', list([x * 0.01 for x in range(1, 10000)]), [1.0] * 10000)) + # assert min min/max have same result as quantile 0 and 1 + self.assertEqual( + float(rb.tdigestMax('tDigest')), + float(rb.tdigestQuantile('tDigest', 1.0)), + ) + self.assertEqual( + float(rb.tdigestMin('tDigest')), + float(rb.tdigestQuantile('tDigest', 0.0)), + ) + + self.assertAlmostEqual(1.0, float(rb.tdigestQuantile('tDigest', 0.01)), 2) + self.assertAlmostEqual(99.0, float(rb.tdigestQuantile('tDigest', 0.99)), 2) + + def testTDigestCdf(self): + self.assertTrue(rb.tdigestCreate('tDigest', 100)) + # insert data-points into sketch + self.assertTrue(rb.tdigestAdd('tDigest', list(range(1, 10)), [1.0] * 10)) + + self.assertAlmostEqual(0.1, float(rb.tdigestCdf('tDigest', 1.0)), 1) + self.assertAlmostEqual(0.9, float(rb.tdigestCdf('tDigest', 9.0)), 1) + def test_pipeline(self): pipeline = rb.pipeline()