Skip to content

Commit

Permalink
#109 Implement TSLastN functionality for redis time series.
Browse files Browse the repository at this point in the history
Summary:
TSLastN was an important feature missing from redis time series since its a common
operation to retrieve the last N values of a time series. This diff adds support for TSLastN:

```
127.0.0.2:6379> tsadd ts_key 10 v1
OK
127.0.0.2:6379> tsadd ts_key 20 v2 30 v3 40 v4 50 v5 60 v6 70 v7 80 v8 90 v9 100 v10
OK
127.0.0.2:6379> tslastn ts_key 5
 1) "60"
 2) "v6"
 3) "70"
 4) "v7"
 5) "80"
 6) "v8"
 7) "90"
 8) "v9"
 9) "100"
10) "v10"
```

Test Plan: Unit tests

Reviewers: amitanand, hector, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D4469
  • Loading branch information
pritamdamania87 committed Apr 3, 2018
1 parent cfcc5c5 commit 29058a0
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 6 deletions.
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<commons-lang3.version>3.6</commons-lang3.version>
<guava.version>16.0.1</guava.version>
<hadoop.version>2.7.3</hadoop.version>
<jedis.version>2.9.0-yb-2</jedis.version>
<jedis.version>2.9.0-yb-4</jedis.version>
<jsr305.version>3.0.1</jsr305.version>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@

import redis.clients.jedis.*;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNotNull;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.*;

public class TestYBJedis extends BaseJedisTest {
private static final Logger LOG = LoggerFactory.getLogger(TestYBJedis.class);
Expand Down Expand Up @@ -275,7 +272,7 @@ public void testTSRangeByTimeInvalidString() throws Exception {
try {
List<String> values = jedis_client.tsrangeByTime("k0", "foo", "bar");
} catch (Exception e) {
assertEquals("ERR TSRANGEBYTIME: foo is not a valid number", e.getMessage());
assertTrue(e.getMessage().contains("ERR TSRANGEBYTIME: foo is not a valid number"));
return;
}
// We shouldn't reach here.
Expand Down Expand Up @@ -308,4 +305,40 @@ public void testNX() throws Exception {
assertEquals("OK", jedis_client.set("k1", "v2", "NX", "EX", 5));
assertEquals("v2", jedis_client.get("k1"));
}

@Test
public void TestTSLastN() throws Exception {
Map<Long, String> ts = new HashMap<>();
ts.put(-50L, "v1");
ts.put(-40L, "v2");
ts.put(-30L, "v3");
ts.put(-20L, "v4");
ts.put(-10L, "v5");
ts.put(10L, "v6");
ts.put(20L, "v7");
ts.put(30L, "v8");
ts.put(40L, "v9");
ts.put(50L, "v10");
assertEquals("OK", jedis_client.tsadd("ts_key", ts));
assertEquals(
Arrays.asList("10", "v6", "20", "v7", "30", "v8", "40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 5));
assertEquals(
Arrays.asList("20", "v7", "30", "v8", "40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 4));
assertEquals(
Arrays.asList("30", "v8", "40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 3));
assertEquals(
Arrays.asList("40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 2));
assertEquals(
Arrays.asList("-50", "v1", "-40", "v2", "-30", "v3", "-20", "v4", "-10", "v5", "10", "v6",
"20", "v7", "30", "v8", "40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 10));
assertEquals(
Arrays.asList("-50", "v1", "-40", "v2", "-30", "v3", "-20", "v4", "-10", "v5", "10", "v6",
"20", "v7", "30", "v8", "40", "v9", "50", "v10"),
jedis_client.tsLastN("ts_key", 20));
}
}
2 changes: 2 additions & 0 deletions src/yb/common/redis_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ message RedisReadRequestPB {
optional RedisKeyValuePB key_value = 6;
optional RedisSubKeyRangePB subkey_range = 7;
optional RedisIndexRangePB index_range = 8;
// The maximum number of entries to retrieve for a range request.
optional int32 range_request_limit = 10 [default = 0];
}

message RedisSubKeyRangePB {
Expand Down
1 change: 1 addition & 0 deletions src/yb/docdb/doc_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ Status RedisReadOperation::ExecuteCollectionGetRange() {
GetSubDocumentData data = { encoded_doc_key, &doc, &doc_found };
data.low_subkey = &low_subkey;
data.high_subkey = &high_subkey;
data.limit = request_.range_request_limit();
RETURN_NOT_OK(GetAndPopulateResponseValues(iterator_.get(), AddResponseValuesGeneric, data,
ValueType::kRedisTS, request_, &response_,
/* add_keys */ true, /* add_values */ true, /* reverse */ true));
Expand Down
6 changes: 6 additions & 0 deletions src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,12 @@ CHECKED_STATUS BuildSubDocument(
}

SubDocument* current = data.result;
size_t num_children;
RETURN_NOT_OK(current->NumChildren(&num_children));
if (data.limit != 0 && num_children >= data.limit) {
// We have processed enough records.
return Status::OK();
}

Slice temp = key;
temp.remove_prefix(data.subdocument_key.size());
Expand Down
3 changes: 3 additions & 0 deletions src/yb/docdb/docdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ struct GetSubDocumentData {
// Represent bounds on the first and last ranks to be considered.
const IndexBound* low_index = &IndexBound::Empty();
const IndexBound* high_index = &IndexBound::Empty();
// Maximum number of children to add for this subdocument (0 means no limit).
int32_t limit = 0;

GetSubDocumentData Adjusted(
const Slice& subdoc_key, SubDocument* result_, bool* doc_found_ = nullptr) const {
Expand All @@ -355,6 +357,7 @@ struct GetSubDocumentData {
result.high_subkey = high_subkey;
result.low_index = low_index;
result.high_index = high_index;
result.limit = limit;
return result;
}

Expand Down
8 changes: 8 additions & 0 deletions src/yb/docdb/subdocument.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ Status SubDocument::ConvertToRedisSortedSet() {
return Status::OK();
}

Status SubDocument::NumChildren(size_t *num_children) {
if (!has_valid_object_container()) {
return STATUS(IllegalState, "Not a valid object container");
}
*num_children = object_container().size();
return Status::OK();
}

SubDocument* SubDocument::GetChild(const PrimitiveValue& key) {
if (!has_valid_object_container()) {
return nullptr;
Expand Down
3 changes: 3 additions & 0 deletions src/yb/docdb/subdocument.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class SubDocument : public PrimitiveValue {
// exist or this subdocument is not an object.
SubDocument* GetChild(const PrimitiveValue& key);

// Returns the number of children for this subdocument.
CHECKED_STATUS NumChildren(size_t *num_children);

const SubDocument* GetChild(const PrimitiveValue& key) const;

// Returns the child of this object at the given subkey, or default-constructs one if it does not
Expand Down
1 change: 1 addition & 0 deletions src/yb/yql/redis/redisserver/redis_commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace redisserver {
((srem, SRem, -3, WRITE)) \
((tsadd, TsAdd, -4, WRITE)) \
((tsrangebytime, TsRangeByTime, 4, READ)) \
((tslastn, TsLastN, 3, READ)) \
((zrangebyscore, ZRangeByScore, -4, READ)) \
((zrevrange, ZRevRange, -4, READ)) \
((tsrem, TsRem, -3, WRITE)) \
Expand Down
21 changes: 21 additions & 0 deletions src/yb/yql/redis/redisserver/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,27 @@ CHECKED_STATUS ParseIndexBound(const Slice& slice, RedisIndexBoundPB* bound_pb)
return Status::OK();
}

CHECKED_STATUS ParseTsLastN(YBRedisReadOp* op, const RedisClientCommand& args) {
// TSLastN is basically TSRangeByTime -INF, INF with a limit on number of entries. Note that
// there is a subtle difference here since TSRangeByTime iterates on entries from highest to
// lowest and hence we end up returning the highest N entries. This operation is more like
// TSRevRangeByTime -INF, INF with a limit (Note that TSRevRangeByTime is not implemented).
op->mutable_request()->set_allocated_get_collection_range_request(
new RedisCollectionGetRangeRequestPB());
op->mutable_request()->mutable_get_collection_range_request()->set_request_type(
RedisCollectionGetRangeRequestPB_GetRangeRequestType_TSRANGEBYTIME);
const auto& key = args[1];
auto limit = ParseInt32(args[2], "limit");
RETURN_NOT_OK(limit);
op->mutable_request()->mutable_key_value()->set_key(key.ToBuffer());
op->mutable_request()->set_range_request_limit(*limit);
op->mutable_request()->mutable_subkey_range()->mutable_lower_bound()->set_infinity_type
(RedisSubKeyBoundPB_InfinityType_NEGATIVE);
op->mutable_request()->mutable_subkey_range()->mutable_upper_bound()->set_infinity_type
(RedisSubKeyBoundPB_InfinityType_POSITIVE);
return Status::OK();
}

CHECKED_STATUS ParseTsRangeByTime(YBRedisReadOp* op, const RedisClientCommand& args) {
op->mutable_request()->set_allocated_get_collection_range_request(
new RedisCollectionGetRangeRequestPB());
Expand Down
42 changes: 42 additions & 0 deletions src/yb/yql/redis/redisserver/redisserver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,48 @@ TEST_F(TestRedisService, TestTimeSeriesTTL) {
std::to_string(curr_time_sec + kRedisMaxTtlSeconds + 1)});
}

TEST_F(TestRedisService, TestTsLastN) {
DoRedisTestOk(__LINE__, {"TSADD", "ts_key",
"-50", "v1",
"-40", "v2",
"-30", "v3",
"-20", "v4",
"-10", "v5",
"10", "v6",
"20", "v7",
"30", "v8",
"40", "v9",
"50", "v10",
});

SyncClient();
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "5"},
{"10", "v6", "20", "v7", "30", "v8", "40", "v9", "50", "v10"});
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "4"},
{"20", "v7", "30", "v8", "40", "v9", "50", "v10"});
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "3"},
{"30", "v8", "40", "v9", "50", "v10"});
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "2"},
{"40", "v9", "50", "v10"});
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "10"},
{"-50", "v1", "-40", "v2", "-30", "v3", "-20", "v4", "-10", "v5", "10", "v6",
"20", "v7", "30", "v8", "40", "v9", "50", "v10"});
DoRedisTestArray(__LINE__, {"TSLASTN", "ts_key", "20"},
{"-50", "v1", "-40", "v2", "-30", "v3", "-20", "v4", "-10", "v5", "10", "v6",
"20", "v7", "30", "v8", "40", "v9", "50", "v10"});

DoRedisTestExpectError(__LINE__, {"TSLASTN" , "ts_key", "abc"});
DoRedisTestExpectError(__LINE__, {"TSLASTN" , "ts_key", "3.0"});
DoRedisTestExpectError(__LINE__, {"TSLASTN" , "ts_key", "999999999999"}); // out of bounds.
DoRedisTestExpectError(__LINE__, {"TSLASTN" , "ts_key", "-999999999999"}); // out of bounds.
DoRedisTestExpectError(__LINE__, {"TSLASTN" , "randomkey", "10"}); // invalid key.
DoRedisTestInt(__LINE__, {"ZADD", "z_multi", "0", "v0", "0", "v1", "0", "v2",
"1", "v3", "1", "v4", "1", "v5"}, 6);
DoRedisTestExpectError(__LINE__, {"TSLASTN" , "z_multi", "10"}); // incorrect type.
SyncClient();
VerifyCallbacks();
}

TEST_F(TestRedisService, TestTsRangeByTime) {
DoRedisTestOk(__LINE__, {"TSADD", "ts_key",
"-50", "v1",
Expand Down

0 comments on commit 29058a0

Please sign in to comment.