diff --git a/redis/commands/timeseries/commands.py b/redis/commands/timeseries/commands.py index 3a30c246d5..13e3cdf498 100644 --- a/redis/commands/timeseries/commands.py +++ b/redis/commands/timeseries/commands.py @@ -1,4 +1,7 @@ +from typing import Dict, List, Optional, Tuple, Union + from redis.exceptions import DataError +from redis.typing import KeyT, Number ADD_CMD = "TS.ADD" ALTER_CMD = "TS.ALTER" @@ -22,7 +25,15 @@ class TimeSeriesCommands: """RedisTimeSeries Commands.""" - def create(self, key, **kwargs): + def create( + self, + key: KeyT, + retention_msecs: Optional[int] = None, + uncompressed: Optional[bool] = False, + labels: Optional[Dict[str, str]] = None, + chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ): """ Create a new time-series. @@ -31,40 +42,26 @@ def create(self, key, **kwargs): key: time-series key retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). + Maximum age for samples compared to highest reported timestamp (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: - Since RedisTimeSeries v1.2, both timestamps and values are - compressed by default. - Adding this flag will keep data in an uncompressed form. - Compression not only saves - memory but usually improve performance due to lower number - of memory accesses. + Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: - Each time-serie uses chunks of memory of fixed size for - time series samples. - You can alter the default TSDB chunk size by passing the - chunk_size argument (in Bytes). + Memory size, in bytes, allocated for each data chunk. + Must be a multiple of 8 in the range [128 .. 1048576]. duplicate_policy: - Since RedisTimeSeries v1.4 you can specify the duplicate sample policy - ( Configure what to do on duplicate sample. ) + Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': an error will occur for any out of order sample. - 'first': ignore the new value. - 'last': override with latest value. - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. - When this is not set, the server-wide default will be used. - For more information: https://oss.redis.com/redistimeseries/commands/#tscreate + For more information: https://redis.io/commands/ts.create/ """ # noqa - retention_msecs = kwargs.get("retention_msecs", None) - uncompressed = kwargs.get("uncompressed", False) - labels = kwargs.get("labels", {}) - chunk_size = kwargs.get("chunk_size", None) - duplicate_policy = kwargs.get("duplicate_policy", None) params = [key] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) @@ -74,29 +71,62 @@ def create(self, key, **kwargs): return self.execute_command(CREATE_CMD, *params) - def alter(self, key, **kwargs): + def alter( + self, + key: KeyT, + retention_msecs: Optional[int] = None, + labels: Optional[Dict[str, str]] = None, + chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ): """ - Update the retention, labels of an existing key. - For more information see + Update the retention, chunk size, duplicate policy, and labels of an existing + time series. + + Args: - The parameters are the same as TS.CREATE. + key: + time-series key + retention_msecs: + Maximum retention period, compared to maximal existing timestamp (in milliseconds). + If None or 0 is passed then the series is not trimmed at all. + labels: + Set of label-value pairs that represent metadata labels of the key. + chunk_size: + Memory size, in bytes, allocated for each data chunk. + Must be a multiple of 8 in the range [128 .. 1048576]. + duplicate_policy: + Policy for handling multiple samples with identical timestamps. + Can be one of: + - 'block': an error will occur for any out of order sample. + - 'first': ignore the new value. + - 'last': override with latest value. + - 'min': only override if the value is lower than the existing value. + - 'max': only override if the value is higher than the existing value. - For more information: https://oss.redis.com/redistimeseries/commands/#tsalter + For more information: https://redis.io/commands/ts.alter/ """ # noqa - retention_msecs = kwargs.get("retention_msecs", None) - labels = kwargs.get("labels", {}) - duplicate_policy = kwargs.get("duplicate_policy", None) params = [key] self._append_retention(params, retention_msecs) + self._append_chunk_size(params, chunk_size) self._append_duplicate_policy(params, ALTER_CMD, duplicate_policy) self._append_labels(params, labels) return self.execute_command(ALTER_CMD, *params) - def add(self, key, timestamp, value, **kwargs): + def add( + self, + key: KeyT, + timestamp: Union[int, str], + value: Number, + retention_msecs: Optional[int] = None, + uncompressed: Optional[bool] = False, + labels: Optional[Dict[str, str]] = None, + chunk_size: Optional[int] = None, + duplicate_policy: Optional[str] = None, + ): """ - Append (or create and append) a new sample to the series. - For more information see + Append (or create and append) a new sample to a time series. Args: @@ -107,35 +137,26 @@ def add(self, key, timestamp, value, **kwargs): value: Numeric data value of the sample retention_msecs: - Maximum age for samples compared to last event time (in milliseconds). + Maximum retention period, compared to maximal existing timestamp (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: - Since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses. + Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: - Each time-serie uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + Memory size, in bytes, allocated for each data chunk. + Must be a multiple of 8 in the range [128 .. 1048576]. duplicate_policy: - Since RedisTimeSeries v1.4 you can specify the duplicate sample policy - (Configure what to do on duplicate sample). + Policy for handling multiple samples with identical timestamps. Can be one of: - 'block': an error will occur for any out of order sample. - 'first': ignore the new value. - 'last': override with latest value. - 'min': only override if the value is lower than the existing value. - 'max': only override if the value is higher than the existing value. - When this is not set, the server-wide default will be used. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsadd + For more information: https://redis.io/commands/ts.add/ """ # noqa - retention_msecs = kwargs.get("retention_msecs", None) - uncompressed = kwargs.get("uncompressed", False) - labels = kwargs.get("labels", {}) - chunk_size = kwargs.get("chunk_size", None) - duplicate_policy = kwargs.get("duplicate_policy", None) params = [key, timestamp, value] self._append_retention(params, retention_msecs) self._append_uncompressed(params, uncompressed) @@ -145,28 +166,34 @@ def add(self, key, timestamp, value, **kwargs): return self.execute_command(ADD_CMD, *params) - def madd(self, ktv_tuples): + def madd(self, ktv_tuples: List[Tuple[KeyT, Union[int, str], Number]]): """ Append (or create and append) a new `value` to series `key` with `timestamp`. Expects a list of `tuples` as (`key`,`timestamp`, `value`). Return value is an array with timestamps of insertions. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmadd + For more information: https://redis.io/commands/ts.madd/ """ # noqa params = [] for ktv in ktv_tuples: - for item in ktv: - params.append(item) + params.extend(ktv) return self.execute_command(MADD_CMD, *params) - def incrby(self, key, value, **kwargs): + def incrby( + self, + key: KeyT, + value: Number, + timestamp: Optional[Union[int, str]] = None, + retention_msecs: Optional[int] = None, + uncompressed: Optional[bool] = False, + labels: Optional[Dict[str, str]] = None, + chunk_size: Optional[int] = None, + ): """ - Increment (or create an time-series and increment) the latest - sample's of a series. - This command can be used as a counter or gauge that automatically gets - history as a time series. + Increment (or create an time-series and increment) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. Args: @@ -175,27 +202,19 @@ def incrby(self, key, value, **kwargs): value: Numeric data value of the sample timestamp: - Timestamp of the sample. None can be used for automatic timestamp (using the system clock). + Timestamp of the sample. * can be used for automatic timestamp (using the system clock). retention_msecs: Maximum age for samples compared to last event time (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: - Since RedisTimeSeries v1.2, both timestamps and values are compressed by default. - Adding this flag will keep data in an uncompressed form. Compression not only saves - memory but usually improve performance due to lower number of memory accesses. + Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: - Each time-series uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + Memory size, in bytes, allocated for each data chunk. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby + For more information: https://redis.io/commands/ts.incrby/ """ # noqa - timestamp = kwargs.get("timestamp", None) - retention_msecs = kwargs.get("retention_msecs", None) - uncompressed = kwargs.get("uncompressed", False) - labels = kwargs.get("labels", {}) - chunk_size = kwargs.get("chunk_size", None) params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) @@ -205,12 +224,19 @@ def incrby(self, key, value, **kwargs): return self.execute_command(INCRBY_CMD, *params) - def decrby(self, key, value, **kwargs): + def decrby( + self, + key: KeyT, + value: Number, + timestamp: Optional[Union[int, str]] = None, + retention_msecs: Optional[int] = None, + uncompressed: Optional[bool] = False, + labels: Optional[Dict[str, str]] = None, + chunk_size: Optional[int] = None, + ): """ - Decrement (or create an time-series and decrement) the - latest sample's of a series. - This command can be used as a counter or gauge that - automatically gets history as a time series. + Decrement (or create an time-series and decrement) the latest sample's of a series. + This command can be used as a counter or gauge that automatically gets history as a time series. Args: @@ -219,31 +245,19 @@ def decrby(self, key, value, **kwargs): value: Numeric data value of the sample timestamp: - Timestamp of the sample. None can be used for automatic - timestamp (using the system clock). + Timestamp of the sample. * can be used for automatic timestamp (using the system clock). retention_msecs: Maximum age for samples compared to last event time (in milliseconds). If None or 0 is passed then the series is not trimmed at all. uncompressed: - Since RedisTimeSeries v1.2, both timestamps and values are - compressed by default. - Adding this flag will keep data in an uncompressed form. - Compression not only saves - memory but usually improve performance due to lower number - of memory accesses. + Changes data storage from compressed (by default) to uncompressed labels: Set of label-value pairs that represent metadata labels of the key. chunk_size: - Each time-series uses chunks of memory of fixed size for time series samples. - You can alter the default TSDB chunk size by passing the chunk_size argument (in Bytes). + Memory size, in bytes, allocated for each data chunk. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsincrbytsdecrby + For more information: https://redis.io/commands/ts.decrby/ """ # noqa - timestamp = kwargs.get("timestamp", None) - retention_msecs = kwargs.get("retention_msecs", None) - uncompressed = kwargs.get("uncompressed", False) - labels = kwargs.get("labels", {}) - chunk_size = kwargs.get("chunk_size", None) params = [key, value] self._append_timestamp(params, timestamp) self._append_retention(params, retention_msecs) @@ -253,14 +267,9 @@ def decrby(self, key, value, **kwargs): return self.execute_command(DECRBY_CMD, *params) - def delete(self, key, from_time, to_time): + def delete(self, key: KeyT, from_time: int, to_time: int): """ - Delete data points for a given timeseries and interval range - in the form of start and end delete timestamps. - The given timestamp interval is closed (inclusive), meaning start - and end data points will also be deleted. - Return the count for deleted items. - For more information see + Delete all samples between two timestamps for a given time series. Args: @@ -271,68 +280,98 @@ def delete(self, key, from_time, to_time): to_time: End timestamp for the range deletion. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsdel + For more information: https://redis.io/commands/ts.del/ """ # noqa return self.execute_command(DEL_CMD, key, from_time, to_time) - def createrule(self, source_key, dest_key, aggregation_type, bucket_size_msec): + def createrule( + self, + source_key: KeyT, + dest_key: KeyT, + aggregation_type: str, + bucket_size_msec: int, + align_timestamp: Optional[int] = None, + ): """ Create a compaction rule from values added to `source_key` into `dest_key`. - Aggregating for `bucket_size_msec` where an `aggregation_type` can be - [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, - `std.p`, `std.s`, `var.p`, `var.s`] - For more information: https://oss.redis.com/redistimeseries/master/commands/#tscreaterule + Args: + + source_key: + Key name for source time series + dest_key: + Key name for destination (compacted) time series + aggregation_type: + Aggregation type: One of the following: + [`avg`, `sum`, `min`, `max`, `range`, `count`, `first`, `last`, `std.p`, + `std.s`, `var.p`, `var.s`, `twa`] + bucket_size_msec: + Duration of each bucket, in milliseconds + align_timestamp: + Assure that there is a bucket that starts at exactly align_timestamp and + align all other buckets accordingly. + + For more information: https://redis.io/commands/ts.createrule/ """ # noqa params = [source_key, dest_key] self._append_aggregation(params, aggregation_type, bucket_size_msec) + if align_timestamp is not None: + params.append(align_timestamp) return self.execute_command(CREATERULE_CMD, *params) - def deleterule(self, source_key, dest_key): + def deleterule(self, source_key: KeyT, dest_key: KeyT): """ - Delete a compaction rule. - For more information see + Delete a compaction rule from `source_key` to `dest_key`.. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsdeleterule + For more information: https://redis.io/commands/ts.deleterule/ """ # noqa return self.execute_command(DELETERULE_CMD, source_key, dest_key) def __range_params( self, - key, - from_time, - to_time, - count, - aggregation_type, - bucket_size_msec, - filter_by_ts, - filter_by_min_value, - filter_by_max_value, - align, + key: KeyT, + from_time: Union[int, str], + to_time: Union[int, str], + count: Optional[int], + aggregation_type: Optional[str], + bucket_size_msec: Optional[int], + filter_by_ts: Optional[List[int]], + filter_by_min_value: Optional[int], + filter_by_max_value: Optional[int], + align: Optional[Union[int, str]], + latest: Optional[bool], + bucket_timestamp: Optional[str], + empty: Optional[bool], ): """Create TS.RANGE and TS.REVRANGE arguments.""" params = [key, from_time, to_time] + self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) + self._append_bucket_timestamp(params, bucket_timestamp) + self._append_empty(params, empty) return params def range( self, - key, - from_time, - to_time, - count=None, - aggregation_type=None, - bucket_size_msec=0, - filter_by_ts=None, - filter_by_min_value=None, - filter_by_max_value=None, - align=None, + key: KeyT, + from_time: Union[int, str], + to_time: Union[int, str], + count: Optional[int] = None, + aggregation_type: Optional[str] = None, + bucket_size_msec: Optional[int] = 0, + filter_by_ts: Optional[List[int]] = None, + filter_by_min_value: Optional[int] = None, + filter_by_max_value: Optional[int] = None, + align: Optional[Union[int, str]] = None, + latest: Optional[bool] = False, + bucket_timestamp: Optional[str] = None, + empty: Optional[bool] = False, ): """ Query a range in forward direction for a specific time-serie. @@ -342,31 +381,34 @@ def range( key: Key name for timeseries. from_time: - Start timestamp for the range query. - can be used to express - the minimum possible timestamp (0). + Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). to_time: - End timestamp for range query, + can be used to express the - maximum possible timestamp. + End timestamp for range query, + can be used to express the maximum possible timestamp. count: - Optional maximum number of returned results. + Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of - [`avg`, `sum`, `min`, `max`, `range`, `count`, - `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: - Filter result by minimum value (must mention also filter - by_max_value). + Filter result by minimum value (must mention also filter by_max_value). filter_by_max_value: - Filter result by maximum value (must mention also filter - by_min_value). + Filter result by maximum value (must mention also filter by_min_value). align: Timestamp for alignment control for aggregation. - - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange + latest: + Used when a time series is a compaction, reports the compacted value of the + latest possibly partial bucket + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, + `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + + For more information: https://redis.io/commands/ts.range/ """ # noqa params = self.__range_params( key, @@ -379,21 +421,27 @@ def range( filter_by_min_value, filter_by_max_value, align, + latest, + bucket_timestamp, + empty, ) return self.execute_command(RANGE_CMD, *params) def revrange( self, - key, - from_time, - to_time, - count=None, - aggregation_type=None, - bucket_size_msec=0, - filter_by_ts=None, - filter_by_min_value=None, - filter_by_max_value=None, - align=None, + key: KeyT, + from_time: Union[int, str], + to_time: Union[int, str], + count: Optional[int] = None, + aggregation_type: Optional[str] = None, + bucket_size_msec: Optional[int] = 0, + filter_by_ts: Optional[List[int]] = None, + filter_by_min_value: Optional[int] = None, + filter_by_max_value: Optional[int] = None, + align: Optional[Union[int, str]] = None, + latest: Optional[bool] = False, + bucket_timestamp: Optional[str] = None, + empty: Optional[bool] = False, ): """ Query a range in reverse direction for a specific time-series. @@ -409,10 +457,10 @@ def revrange( to_time: End timestamp for range query, + can be used to express the maximum possible timestamp. count: - Optional maximum number of returned results. + Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, `range`, `count`, - `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. filter_by_ts: @@ -423,8 +471,16 @@ def revrange( Filter result by maximum value (must mention also filter_by_min_value). align: Timestamp for alignment control for aggregation. - - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsrangetsrevrange + latest: + Used when a time series is a compaction, reports the compacted value of the + latest possibly partial bucket + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, + `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + + For more information: https://redis.io/commands/ts.revrange/ """ # noqa params = self.__range_params( key, @@ -437,34 +493,43 @@ def revrange( filter_by_min_value, filter_by_max_value, align, + latest, + bucket_timestamp, + empty, ) return self.execute_command(REVRANGE_CMD, *params) def __mrange_params( self, - aggregation_type, - bucket_size_msec, - count, - filters, - from_time, - to_time, - with_labels, - filter_by_ts, - filter_by_min_value, - filter_by_max_value, - groupby, - reduce, - select_labels, - align, + aggregation_type: Optional[str], + bucket_size_msec: Optional[int], + count: Optional[int], + filters: List[str], + from_time: Union[int, str], + to_time: Union[int, str], + with_labels: Optional[bool], + filter_by_ts: Optional[List[int]], + filter_by_min_value: Optional[int], + filter_by_max_value: Optional[int], + groupby: Optional[str], + reduce: Optional[str], + select_labels: Optional[List[str]], + align: Optional[Union[int, str]], + latest: Optional[bool], + bucket_timestamp: Optional[str], + empty: Optional[bool], ): """Create TS.MRANGE and TS.MREVRANGE arguments.""" params = [from_time, to_time] + self._append_latest(params, latest) self._append_filer_by_ts(params, filter_by_ts) self._append_filer_by_value(params, filter_by_min_value, filter_by_max_value) + self._append_with_labels(params, with_labels, select_labels) self._append_count(params, count) self._append_align(params, align) self._append_aggregation(params, aggregation_type, bucket_size_msec) - self._append_with_labels(params, with_labels, select_labels) + self._append_bucket_timestamp(params, bucket_timestamp) + self._append_empty(params, empty) params.extend(["FILTER"]) params += filters self._append_groupby_reduce(params, groupby, reduce) @@ -472,20 +537,23 @@ def __mrange_params( def mrange( self, - from_time, - to_time, - filters, - count=None, - aggregation_type=None, - bucket_size_msec=0, - with_labels=False, - filter_by_ts=None, - filter_by_min_value=None, - filter_by_max_value=None, - groupby=None, - reduce=None, - select_labels=None, - align=None, + from_time: Union[int, str], + to_time: Union[int, str], + filters: List[str], + count: Optional[int] = None, + aggregation_type: Optional[str] = None, + bucket_size_msec: Optional[int] = 0, + with_labels: Optional[bool] = False, + filter_by_ts: Optional[List[int]] = None, + filter_by_min_value: Optional[int] = None, + filter_by_max_value: Optional[int] = None, + groupby: Optional[str] = None, + reduce: Optional[str] = None, + select_labels: Optional[List[str]] = None, + align: Optional[Union[int, str]] = None, + latest: Optional[bool] = False, + bucket_timestamp: Optional[str] = None, + empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in forward direction. @@ -493,46 +561,45 @@ def mrange( Args: from_time: - Start timestamp for the range query. `-` can be used to - express the minimum possible timestamp (0). + Start timestamp for the range query. `-` can be used to express the minimum possible timestamp (0). to_time: - End timestamp for range query, `+` can be used to express - the maximum possible timestamp. + End timestamp for range query, `+` can be used to express the maximum possible timestamp. filters: filter to match the time-series labels. count: - Optional maximum number of returned results. + Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of - [`avg`, `sum`, `min`, `max`, `range`, `count`, - `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: - Include in the reply the label-value pairs that represent metadata - labels of the time-series. - If this argument is not set, by default, an empty Array will be - replied on the labels array position. + Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: - Filter result by minimum value (must mention also - filter_by_max_value). + Filter result by minimum value (must mention also filter_by_max_value). filter_by_max_value: - Filter result by maximum value (must mention also - filter_by_min_value). + Filter result by maximum value (must mention also filter_by_min_value). groupby: Grouping by fields the results (must mention also reduce). reduce: - Applying reducer functions on each group. Can be one - of [`sum`, `min`, `max`]. + Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, + `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: - Include in the reply only a subset of the key-value - pair labels of a series. + Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. - - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange + latest: + Used when a time series is a compaction, reports the compacted + value of the latest possibly partial bucket + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, + `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + + For more information: https://redis.io/commands/ts.mrange/ """ # noqa params = self.__mrange_params( aggregation_type, @@ -549,26 +616,32 @@ def mrange( reduce, select_labels, align, + latest, + bucket_timestamp, + empty, ) return self.execute_command(MRANGE_CMD, *params) def mrevrange( self, - from_time, - to_time, - filters, - count=None, - aggregation_type=None, - bucket_size_msec=0, - with_labels=False, - filter_by_ts=None, - filter_by_min_value=None, - filter_by_max_value=None, - groupby=None, - reduce=None, - select_labels=None, - align=None, + from_time: Union[int, str], + to_time: Union[int, str], + filters: List[str], + count: Optional[int] = None, + aggregation_type: Optional[str] = None, + bucket_size_msec: Optional[int] = 0, + with_labels: Optional[bool] = False, + filter_by_ts: Optional[List[int]] = None, + filter_by_min_value: Optional[int] = None, + filter_by_max_value: Optional[int] = None, + groupby: Optional[str] = None, + reduce: Optional[str] = None, + select_labels: Optional[List[str]] = None, + align: Optional[Union[int, str]] = None, + latest: Optional[bool] = False, + bucket_timestamp: Optional[str] = None, + empty: Optional[bool] = False, ): """ Query a range across multiple time-series by filters in reverse direction. @@ -576,48 +649,45 @@ def mrevrange( Args: from_time: - Start timestamp for the range query. - can be used to express - the minimum possible timestamp (0). + Start timestamp for the range query. - can be used to express the minimum possible timestamp (0). to_time: - End timestamp for range query, + can be used to express - the maximum possible timestamp. + End timestamp for range query, + can be used to express the maximum possible timestamp. filters: Filter to match the time-series labels. count: - Optional maximum number of returned results. + Limits the number of returned samples. aggregation_type: - Optional aggregation type. Can be one of - [`avg`, `sum`, `min`, `max`, `range`, `count`, - `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`] + Optional aggregation type. Can be one of [`avg`, `sum`, `min`, `max`, + `range`, `count`, `first`, `last`, `std.p`, `std.s`, `var.p`, `var.s`, `twa`] bucket_size_msec: Time bucket for aggregation in milliseconds. with_labels: - Include in the reply the label-value pairs that represent - metadata labels - of the time-series. - If this argument is not set, by default, an empty Array - will be replied - on the labels array position. + Include in the reply all label-value pairs representing metadata labels of the time series. filter_by_ts: List of timestamps to filter the result by specific timestamps. filter_by_min_value: - Filter result by minimum value (must mention also filter - by_max_value). + Filter result by minimum value (must mention also filter_by_max_value). filter_by_max_value: - Filter result by maximum value (must mention also filter - by_min_value). + Filter result by maximum value (must mention also filter_by_min_value). groupby: Grouping by fields the results (must mention also reduce). reduce: - Applying reducer functions on each group. Can be one - of [`sum`, `min`, `max`]. + Applying reducer functions on each group. Can be one of [`avg` `sum`, `min`, + `max`, `range`, `count`, `std.p`, `std.s`, `var.p`, `var.s`]. select_labels: - Include in the reply only a subset of the key-value pair - labels of a series. + Include in the reply only a subset of the key-value pair labels of a series. align: Timestamp for alignment control for aggregation. - - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmrangetsmrevrange + latest: + Used when a time series is a compaction, reports the compacted + value of the latest possibly partial bucket + bucket_timestamp: + Controls how bucket timestamps are reported. Can be one of [`-`, `low`, `+`, + `high`, `~`, `mid`]. + empty: + Reports aggregations for empty buckets. + + For more information: https://redis.io/commands/ts.mrevrange/ """ # noqa params = self.__mrange_params( aggregation_type, @@ -634,54 +704,85 @@ def mrevrange( reduce, select_labels, align, + latest, + bucket_timestamp, + empty, ) return self.execute_command(MREVRANGE_CMD, *params) - def get(self, key): + def get(self, key: KeyT, latest: Optional[bool] = False): """# noqa Get the last sample of `key`. + `latest` used when a time series is a compaction, reports the compacted + value of the latest (possibly partial) bucket - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsget + For more information: https://redis.io/commands/ts.get/ """ # noqa - return self.execute_command(GET_CMD, key) + params = [key] + self._append_latest(params, latest) + return self.execute_command(GET_CMD, *params) - def mget(self, filters, with_labels=False): + def mget( + self, + filters: List[str], + with_labels: Optional[bool] = False, + select_labels: Optional[List[str]] = None, + latest: Optional[bool] = False, + ): """# noqa Get the last samples matching the specific `filter`. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsmget + Args: + + filters: + Filter to match the time-series labels. + with_labels: + Include in the reply all label-value pairs representing metadata + labels of the time series. + select_labels: + Include in the reply only a subset of the key-value pair labels of a series. + latest: + Used when a time series is a compaction, reports the compacted + value of the latest possibly partial bucket + + For more information: https://redis.io/commands/ts.mget/ """ # noqa params = [] - self._append_with_labels(params, with_labels) + self._append_latest(params, latest) + self._append_with_labels(params, with_labels, select_labels) params.extend(["FILTER"]) params += filters return self.execute_command(MGET_CMD, *params) - def info(self, key): + def info(self, key: KeyT): """# noqa Get information of `key`. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsinfo + For more information: https://redis.io/commands/ts.info/ """ # noqa return self.execute_command(INFO_CMD, key) - def queryindex(self, filters): + def queryindex(self, filters: List[str]): """# noqa - Get all the keys matching the `filter` list. + Get all time series keys matching the `filter` list. - For more information: https://oss.redis.com/redistimeseries/master/commands/#tsqueryindex + For more information: https://redis.io/commands/ts.queryindex/ """ # noq return self.execute_command(QUERYINDEX_CMD, *filters) @staticmethod - def _append_uncompressed(params, uncompressed): + def _append_uncompressed(params: List[str], uncompressed: Optional[bool]): """Append UNCOMPRESSED tag to params.""" if uncompressed: params.extend(["UNCOMPRESSED"]) @staticmethod - def _append_with_labels(params, with_labels, select_labels=None): + def _append_with_labels( + params: List[str], + with_labels: Optional[bool], + select_labels: Optional[List[str]], + ): """Append labels behavior to params.""" if with_labels and select_labels: raise DataError( @@ -694,19 +795,21 @@ def _append_with_labels(params, with_labels, select_labels=None): params.extend(["SELECTED_LABELS", *select_labels]) @staticmethod - def _append_groupby_reduce(params, groupby, reduce): + def _append_groupby_reduce( + params: List[str], groupby: Optional[str], reduce: Optional[str] + ): """Append GROUPBY REDUCE property to params.""" if groupby is not None and reduce is not None: params.extend(["GROUPBY", groupby, "REDUCE", reduce.upper()]) @staticmethod - def _append_retention(params, retention): + def _append_retention(params: List[str], retention: Optional[int]): """Append RETENTION property to params.""" if retention is not None: params.extend(["RETENTION", retention]) @staticmethod - def _append_labels(params, labels): + def _append_labels(params: List[str], labels: Optional[List[str]]): """Append LABELS property to params.""" if labels: params.append("LABELS") @@ -714,38 +817,43 @@ def _append_labels(params, labels): params.extend([k, v]) @staticmethod - def _append_count(params, count): + def _append_count(params: List[str], count: Optional[int]): """Append COUNT property to params.""" if count is not None: params.extend(["COUNT", count]) @staticmethod - def _append_timestamp(params, timestamp): + def _append_timestamp(params: List[str], timestamp: Optional[int]): """Append TIMESTAMP property to params.""" if timestamp is not None: params.extend(["TIMESTAMP", timestamp]) @staticmethod - def _append_align(params, align): + def _append_align(params: List[str], align: Optional[Union[int, str]]): """Append ALIGN property to params.""" if align is not None: params.extend(["ALIGN", align]) @staticmethod - def _append_aggregation(params, aggregation_type, bucket_size_msec): + def _append_aggregation( + params: List[str], + aggregation_type: Optional[str], + bucket_size_msec: Optional[int], + ): """Append AGGREGATION property to params.""" if aggregation_type is not None: - params.append("AGGREGATION") - params.extend([aggregation_type, bucket_size_msec]) + params.extend(["AGGREGATION", aggregation_type, bucket_size_msec]) @staticmethod - def _append_chunk_size(params, chunk_size): + def _append_chunk_size(params: List[str], chunk_size: Optional[int]): """Append CHUNK_SIZE property to params.""" if chunk_size is not None: params.extend(["CHUNK_SIZE", chunk_size]) @staticmethod - def _append_duplicate_policy(params, command, duplicate_policy): + def _append_duplicate_policy( + params: List[str], command: Optional[str], duplicate_policy: Optional[str] + ): """Append DUPLICATE_POLICY property to params on CREATE and ON_DUPLICATE on ADD. """ @@ -756,13 +864,33 @@ def _append_duplicate_policy(params, command, duplicate_policy): params.extend(["DUPLICATE_POLICY", duplicate_policy]) @staticmethod - def _append_filer_by_ts(params, ts_list): + def _append_filer_by_ts(params: List[str], ts_list: Optional[List[int]]): """Append FILTER_BY_TS property to params.""" if ts_list is not None: params.extend(["FILTER_BY_TS", *ts_list]) @staticmethod - def _append_filer_by_value(params, min_value, max_value): + def _append_filer_by_value( + params: List[str], min_value: Optional[int], max_value: Optional[int] + ): """Append FILTER_BY_VALUE property to params.""" if min_value is not None and max_value is not None: params.extend(["FILTER_BY_VALUE", min_value, max_value]) + + @staticmethod + def _append_latest(params: List[str], latest: Optional[bool]): + """Append LATEST property to params.""" + if latest: + params.append("LATEST") + + @staticmethod + def _append_bucket_timestamp(params: List[str], bucket_timestamp: Optional[str]): + """Append BUCKET_TIMESTAMP property to params.""" + if bucket_timestamp is not None: + params.extend(["BUCKETTIMESTAMP", bucket_timestamp]) + + @staticmethod + def _append_empty(params: List[str], empty: Optional[bool]): + """Append EMPTY property to params.""" + if empty: + params.append("EMPTY") diff --git a/redis/commands/timeseries/info.py b/redis/commands/timeseries/info.py index fba7f093b1..65f3baacd0 100644 --- a/redis/commands/timeseries/info.py +++ b/redis/commands/timeseries/info.py @@ -60,15 +60,15 @@ def __init__(self, args): https://oss.redis.com/redistimeseries/configuration/#duplicate_policy """ response = dict(zip(map(nativestr, args[::2]), args[1::2])) - self.rules = response["rules"] - self.source_key = response["sourceKey"] - self.chunk_count = response["chunkCount"] - self.memory_usage = response["memoryUsage"] - self.total_samples = response["totalSamples"] - self.labels = list_to_dict(response["labels"]) - self.retention_msecs = response["retentionTime"] - self.lastTimeStamp = response["lastTimestamp"] - self.first_time_stamp = response["firstTimestamp"] + self.rules = response.get("rules") + self.source_key = response.get("sourceKey") + self.chunk_count = response.get("chunkCount") + self.memory_usage = response.get("memoryUsage") + self.total_samples = response.get("totalSamples") + self.labels = list_to_dict(response.get("labels")) + self.retention_msecs = response.get("retentionTime") + self.last_timestamp = response.get("lastTimestamp") + self.first_timestamp = response.get("firstTimestamp") if "maxSamplesPerChunk" in response: self.max_samples_per_chunk = response["maxSamplesPerChunk"] self.chunk_size = ( diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 7d42147a16..b4b85e1715 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -1,8 +1,11 @@ +import math import time from time import sleep import pytest +import redis + from .conftest import skip_ifmodversion_lt @@ -230,6 +233,84 @@ def test_range_advanced(client): assert [(0, 5.0), (5, 6.0)] == client.ts().range( 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=5 ) + assert [(0, 2.5500000000000003), (10, 3.95)] == client.ts().range( + 1, 0, 10, aggregation_type="twa", bucket_size_msec=10 + ) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_range_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2") + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + res = timeseries.range("t1", 0, 20) + assert res == [(1, 1.0), (2, 3.0), (11, 7.0), (13, 1.0)] + res = timeseries.range("t2", 0, 10) + assert res == [(0, 4.0)] + res = timeseries.range("t2", 0, 10, latest=True) + assert res == [(0, 4.0), (10, 8.0)] + res = timeseries.range("t2", 0, 9, latest=True) + assert res == [(0, 4.0)] + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_range_bucket_timestamp(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.add("t1", 15, 1) + timeseries.add("t1", 17, 4) + timeseries.add("t1", 51, 3) + timeseries.add("t1", 73, 5) + timeseries.add("t1", 75, 3) + assert [(10, 4.0), (50, 3.0), (70, 5.0)] == timeseries.range( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10 + ) + assert [(20, 4.0), (60, 3.0), (80, 5.0)] == timeseries.range( + "t1", + 0, + 100, + align=0, + aggregation_type="max", + bucket_size_msec=10, + bucket_timestamp="+", + ) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_range_empty(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.add("t1", 15, 1) + timeseries.add("t1", 17, 4) + timeseries.add("t1", 51, 3) + timeseries.add("t1", 73, 5) + timeseries.add("t1", 75, 3) + assert [(10, 4.0), (50, 3.0), (70, 5.0)] == timeseries.range( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10 + ) + res = timeseries.range( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10, empty=True + ) + for i in range(len(res)): + if math.isnan(res[i][1]): + res[i] = (res[i][0], None) + assert [ + (10, 4.0), + (20, None), + (30, None), + (40, None), + (50, 3.0), + (60, None), + (70, 5.0), + ] == res @pytest.mark.redismod @@ -262,11 +343,87 @@ def test_rev_range(client): assert [(1, 10.0), (0, 1.0)] == client.ts().revrange( 1, 0, 10, aggregation_type="count", bucket_size_msec=10, align=1 ) + assert [(10, 3.0), (0, 2.5500000000000003)] == client.ts().revrange( + 1, 0, 10, aggregation_type="twa", bucket_size_msec=10 + ) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_revrange_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2") + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + res = timeseries.revrange("t2", 0, 10) + assert res == [(0, 4.0)] + res = timeseries.revrange("t2", 0, 10, latest=True) + assert res == [(10, 8.0), (0, 4.0)] + res = timeseries.revrange("t2", 0, 9, latest=True) + assert res == [(0, 4.0)] + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_revrange_bucket_timestamp(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.add("t1", 15, 1) + timeseries.add("t1", 17, 4) + timeseries.add("t1", 51, 3) + timeseries.add("t1", 73, 5) + timeseries.add("t1", 75, 3) + assert [(70, 5.0), (50, 3.0), (10, 4.0)] == timeseries.revrange( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10 + ) + assert [(20, 4.0), (60, 3.0), (80, 5.0)] == timeseries.range( + "t1", + 0, + 100, + align=0, + aggregation_type="max", + bucket_size_msec=10, + bucket_timestamp="+", + ) + + +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_revrange_empty(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.add("t1", 15, 1) + timeseries.add("t1", 17, 4) + timeseries.add("t1", 51, 3) + timeseries.add("t1", 73, 5) + timeseries.add("t1", 75, 3) + assert [(70, 5.0), (50, 3.0), (10, 4.0)] == timeseries.revrange( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10 + ) + res = timeseries.revrange( + "t1", 0, 100, align=0, aggregation_type="max", bucket_size_msec=10, empty=True + ) + for i in range(len(res)): + if math.isnan(res[i][1]): + res[i] = (res[i][0], None) + assert [ + (70, 5.0), + (60, None), + (50, 3.0), + (40, None), + (30, None), + (20, None), + (10, 4.0), + ] == res @pytest.mark.redismod @pytest.mark.onlynoncluster -def testMultiRange(client): +def test_mrange(client): client.ts().create(1, labels={"Test": "This", "team": "ny"}) client.ts().create(2, labels={"Test": "This", "Taste": "That", "team": "sf"}) for i in range(100): @@ -351,6 +508,31 @@ def test_multi_range_advanced(client): assert [(0, 5.0), (5, 6.0)] == res[0]["1"][1] +@pytest.mark.redismod +@pytest.mark.onlynoncluster +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_mrange_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2", labels={"is_compaction": "true"}) + timeseries.create("t3") + timeseries.create("t4", labels={"is_compaction": "true"}) + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.createrule("t3", "t4", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + timeseries.add("t3", 1, 1) + timeseries.add("t3", 2, 3) + timeseries.add("t3", 11, 7) + timeseries.add("t3", 13, 1) + assert client.ts().mrange(0, 10, filters=["is_compaction=true"], latest=True) == [ + {"t2": [{}, [(0, 4.0), (10, 8.0)]]}, + {"t4": [{}, [(0, 4.0), (10, 8.0)]]}, + ] + + @pytest.mark.redismod @pytest.mark.onlynoncluster @skip_ifmodversion_lt("99.99.99", "timeseries") @@ -434,6 +616,30 @@ def test_multi_reverse_range(client): assert [(1, 10.0), (0, 1.0)] == res[0]["1"][1] +@pytest.mark.redismod +@pytest.mark.onlynoncluster +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_mrevrange_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2", labels={"is_compaction": "true"}) + timeseries.create("t3") + timeseries.create("t4", labels={"is_compaction": "true"}) + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.createrule("t3", "t4", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + timeseries.add("t3", 1, 1) + timeseries.add("t3", 2, 3) + timeseries.add("t3", 11, 7) + timeseries.add("t3", 13, 1) + assert client.ts().mrevrange( + 0, 10, filters=["is_compaction=true"], latest=True + ) == [{"t2": [{}, [(10, 8.0), (0, 4.0)]]}, {"t4": [{}, [(10, 8.0), (0, 4.0)]]}] + + @pytest.mark.redismod def test_get(client): name = "test" @@ -445,6 +651,21 @@ def test_get(client): assert 4 == client.ts().get(name)[1] +@pytest.mark.redismod +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_get_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2") + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + assert (0, 4.0) == timeseries.get("t2") + assert (10, 8.0) == timeseries.get("t2", latest=True) + + @pytest.mark.redismod @pytest.mark.onlynoncluster def test_mget(client): @@ -467,6 +688,24 @@ def test_mget(client): assert {"Taste": "That", "Test": "This"} == res[0]["2"][0] +@pytest.mark.redismod +@pytest.mark.onlynoncluster +@skip_ifmodversion_lt("1.8.0", "timeseries") +def test_mget_latest(client: redis.Redis): + timeseries = client.ts() + timeseries.create("t1") + timeseries.create("t2", labels={"is_compaction": "true"}) + timeseries.createrule("t1", "t2", aggregation_type="sum", bucket_size_msec=10) + timeseries.add("t1", 1, 1) + timeseries.add("t1", 2, 3) + timeseries.add("t1", 11, 7) + timeseries.add("t1", 13, 1) + assert timeseries.mget(filters=["is_compaction=true"]) == [{"t2": [{}, 0, 4.0]}] + assert [{"t2": [{}, 10, 8.0]}] == timeseries.mget( + filters=["is_compaction=true"], latest=True + ) + + @pytest.mark.redismod def test_info(client): client.ts().create(1, retention_msecs=5, labels={"currentLabel": "currentData"}) @@ -506,7 +745,7 @@ def test_pipeline(client): pipeline.execute() info = client.ts().info("with_pipeline") - assert info.lastTimeStamp == 99 + assert info.last_timestamp == 99 assert info.total_samples == 100 assert client.ts().get("with_pipeline")[1] == 99 * 1.1