From e1425dc62418c516af718436463f0c5bebc62942 Mon Sep 17 00:00:00 2001 From: Lewis Buckley Date: Tue, 1 Mar 2022 21:56:16 +0000 Subject: [PATCH 1/6] Resolve pipelining deprecation warnings --- lib/kredis/migration.rb | 15 ++++++++++++--- lib/kredis/types/proxy.rb | 12 ++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lib/kredis/migration.rb b/lib/kredis/migration.rb index 3396068..f114562 100644 --- a/lib/kredis/migration.rb +++ b/lib/kredis/migration.rb @@ -5,6 +5,7 @@ class Kredis::Migration def initialize(config = :shared) @redis = Kredis.configured_for config + @pipeline = nil # TODO: Replace script loading with `copy` command once Redis 6.2+ is the minimum supported version. @copy_sha = @redis.script "load", "redis.call('SETNX', KEYS[2], redis.call('GET', KEYS[1])); return 1;" end @@ -23,7 +24,7 @@ def migrate(from:, to:) if to.present? && from != namespaced_to log_migration "Migrating key #{from} to #{namespaced_to}" do - @redis.evalsha @copy_sha, keys: [ from, namespaced_to ] + connection.evalsha @copy_sha, keys: [ from, namespaced_to ] end else log_migration "Skipping blank/unaltered migration key #{from} → #{to}" @@ -32,18 +33,26 @@ def migrate(from:, to:) def delete_all(key_pattern) each_key_batch_matching(key_pattern) do |keys| - @redis.del *keys + connection.del *keys end end private SCAN_BATCH_SIZE = 1_000 + def connection + @pipeline || @redis + end + def each_key_batch_matching(key_pattern, &block) cursor = "0" begin cursor, keys = @redis.scan(cursor, match: key_pattern, count: SCAN_BATCH_SIZE) - @redis.pipelined { yield keys } + @redis.pipelined do |pipeline| + @pipeline = pipeline + yield keys + @pipeline = nil + end end until cursor == "0" end diff --git a/lib/kredis/types/proxy.rb b/lib/kredis/types/proxy.rb index 99033f2..e5eac3f 100644 --- a/lib/kredis/types/proxy.rb +++ b/lib/kredis/types/proxy.rb @@ -2,21 +2,25 @@ class Kredis::Types::Proxy require_relative "proxy/failsafe" include Failsafe - attr_accessor :redis, :key + attr_accessor :redis, :key, :pipeline def initialize(redis, key, **options) @redis, @key = redis, key options.each { |key, value| send("#{key}=", value) } end - def multi(...) - redis.multi(...) + def multi(*args, **kwargs, &block) + redis.multi(*args, **kwargs) do |pipeline| + self.pipeline = pipeline + block.call + self.pipeline = nil + end end def method_missing(method, *args, **kwargs) Kredis.instrument :proxy, **log_message(method, *args, **kwargs) do failsafe do - redis.public_send method, key, *args, **kwargs + (pipeline || redis).public_send method, key, *args, **kwargs end end end From 6c2a8a2d714e366dbddd85c75919bedcd5782f1e Mon Sep 17 00:00:00 2001 From: Lewis Buckley Date: Mon, 2 May 2022 08:51:28 +0100 Subject: [PATCH 2/6] Revert "Use block parameter to pipeline in Redis#multi (#68)" This reverts commit 88b8c29623fb68bef925963017429d7987ac7687. --- lib/kredis/types/counter.rb | 12 ++++++------ lib/kredis/types/list.rb | 12 ++++++------ lib/kredis/types/set.rb | 14 +++++++------- lib/kredis/types/unique_list.rb | 16 ++++++++-------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/lib/kredis/types/counter.rb b/lib/kredis/types/counter.rb index bcfcf62..cacebb3 100644 --- a/lib/kredis/types/counter.rb +++ b/lib/kredis/types/counter.rb @@ -4,16 +4,16 @@ class Kredis::Types::Counter < Kredis::Types::Proxying attr_accessor :expires_in def increment(by: 1) - multi do |pipeline| - pipeline.set 0, ex: expires_in, nx: true - pipeline.incrby by + multi do + set 0, ex: expires_in, nx: true + incrby by end[-1] end def decrement(by: 1) - multi do |pipeline| - pipeline.set 0, ex: expires_in, nx: true - pipeline.decrby by + multi do + set 0, ex: expires_in, nx: true + decrby by end[-1] end diff --git a/lib/kredis/types/list.rb b/lib/kredis/types/list.rb index cc1867d..cbda3bd 100644 --- a/lib/kredis/types/list.rb +++ b/lib/kredis/types/list.rb @@ -8,16 +8,16 @@ def elements end alias to_a elements - def remove(*elements, pipeline: nil) - types_to_strings(elements, typed).each { |element| (pipeline || proxy).lrem 0, element } + def remove(*elements) + types_to_strings(elements, typed).each { |element| lrem 0, element } end - def prepend(*elements, pipeline: nil) - (pipeline || proxy).lpush types_to_strings(elements, typed) if elements.flatten.any? + def prepend(*elements) + lpush types_to_strings(elements, typed) if elements.flatten.any? end - def append(*elements, pipeline: nil) - (pipeline || proxy).rpush types_to_strings(elements, typed) if elements.flatten.any? + def append(*elements) + rpush types_to_strings(elements, typed) if elements.flatten.any? end alias << append diff --git a/lib/kredis/types/set.rb b/lib/kredis/types/set.rb index dcb79e1..5f7a1e0 100644 --- a/lib/kredis/types/set.rb +++ b/lib/kredis/types/set.rb @@ -8,19 +8,19 @@ def members end alias to_a members - def add(*members, pipeline: nil) - (pipeline || proxy).sadd types_to_strings(members, typed) if members.flatten.any? + def add(*members) + sadd types_to_strings(members, typed) if members.flatten.any? end alias << add - def remove(*members, pipeline: nil) - (pipeline || proxy).srem types_to_strings(members, typed) if members.flatten.any? + def remove(*members) + srem types_to_strings(members, typed) if members.flatten.any? end def replace(*members) - multi do |pipeline| - pipeline.del - add members, pipeline: pipeline + multi do + del + add members end end diff --git a/lib/kredis/types/unique_list.rb b/lib/kredis/types/unique_list.rb index 2a85afc..eb4bbcc 100644 --- a/lib/kredis/types/unique_list.rb +++ b/lib/kredis/types/unique_list.rb @@ -8,10 +8,10 @@ def prepend(elements) elements = Array(elements).uniq return if elements.empty? - multi do |pipeline| - remove elements, pipeline: pipeline - super(elements, pipeline: pipeline) - pipeline.ltrim 0, (limit - 1) if limit + multi do + remove elements + super + ltrim 0, (limit - 1) if limit end end @@ -19,10 +19,10 @@ def append(elements) elements = Array(elements).uniq return if elements.empty? - multi do |pipeline| - remove elements, pipeline: pipeline - super(elements, pipeline: pipeline) - pipeline.ltrim -limit, -1 if limit + multi do + remove elements + super + ltrim -limit, -1 if limit end end alias << append From 7bc291427145a0886a0d094be548829e0b756b95 Mon Sep 17 00:00:00 2001 From: Lewis Buckley Date: Mon, 2 May 2022 08:57:57 +0100 Subject: [PATCH 3/6] Use a thread-local variable for pipeline --- lib/kredis/migration.rb | 5 ----- lib/kredis/types/proxy.rb | 9 +++++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/kredis/migration.rb b/lib/kredis/migration.rb index f107c9f..fa12e68 100644 --- a/lib/kredis/migration.rb +++ b/lib/kredis/migration.rb @@ -5,7 +5,6 @@ class Kredis::Migration def initialize(config = :shared) @redis = Kredis.configured_for config - @pipeline = nil # TODO: Replace script loading with `copy` command once Redis 6.2+ is the minimum supported version. @copy_sha = @redis.script "load", "redis.call('SETNX', KEYS[2], redis.call('GET', KEYS[1])); return 1;" end @@ -40,10 +39,6 @@ def delete_all(key_pattern) private SCAN_BATCH_SIZE = 1_000 - def connection - @pipeline || @redis - end - def each_key_batch_matching(key_pattern, &block) cursor = "0" begin diff --git a/lib/kredis/types/proxy.rb b/lib/kredis/types/proxy.rb index e5eac3f..9a092b2 100644 --- a/lib/kredis/types/proxy.rb +++ b/lib/kredis/types/proxy.rb @@ -2,7 +2,7 @@ class Kredis::Types::Proxy require_relative "proxy/failsafe" include Failsafe - attr_accessor :redis, :key, :pipeline + attr_accessor :redis, :key def initialize(redis, key, **options) @redis, @key = redis, key @@ -11,16 +11,17 @@ def initialize(redis, key, **options) def multi(*args, **kwargs, &block) redis.multi(*args, **kwargs) do |pipeline| - self.pipeline = pipeline + Thread.current[:pipeline] = pipeline block.call - self.pipeline = nil + ensure + Thread.current[:pipeline] = nil end end def method_missing(method, *args, **kwargs) Kredis.instrument :proxy, **log_message(method, *args, **kwargs) do failsafe do - (pipeline || redis).public_send method, key, *args, **kwargs + (Thread.current[:pipeline] || redis).public_send method, key, *args, **kwargs end end end From 476069f57de455f9850c720f3bb93207142ceef7 Mon Sep 17 00:00:00 2001 From: tleish Date: Sat, 18 Jun 2022 06:56:56 -0600 Subject: [PATCH 4/6] Delete list of keys in batch (#90) * Kredis::Migration#delete_all array of keys * Kredis::Migration#delete_all array of keys * Kredis::Migration#delete_all array of keys --- lib/kredis/migration.rb | 12 +++++++++--- test/migration_test.rb | 10 +++++++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/kredis/migration.rb b/lib/kredis/migration.rb index fa12e68..9b24a30 100644 --- a/lib/kredis/migration.rb +++ b/lib/kredis/migration.rb @@ -30,9 +30,15 @@ def migrate(from:, to:, pipeline: nil) end end - def delete_all(key_pattern) - each_key_batch_matching(key_pattern) do |keys, pipeline| - pipeline.del *keys + def delete_all(*key_patterns) + log_migration "DELETE ALL #{key_patterns.inspect}" do + if key_patterns.length > 1 + @redis.del *key_patterns + else + each_key_batch_matching(key_patterns.first) do |keys, pipeline| + pipeline.del *keys + end + end end end diff --git a/test/migration_test.rb b/test/migration_test.rb index 032385a..40b9c5a 100644 --- a/test/migration_test.rb +++ b/test/migration_test.rb @@ -51,11 +51,19 @@ class MigrationTest < ActiveSupport::TestCase end end - test "delete_all" do + test "delete_all with pattern" do 3.times { |index| Kredis.proxy("mykey:#{index}").set "hello there #{index}" } Kredis::Migration.delete_all "mykey:*" 3.times { |index| assert_nil Kredis.proxy("mykey:#{index}").get } end + + test "delete_all with keys" do + 3.times { |index| Kredis.proxy("mykey:#{index}").set "hello there #{index}" } + + Kredis::Migration.delete_all *3.times.map { |index| "mykey:#{index}" } + + 3.times { |index| assert_nil Kredis.proxy("mykey:#{index}").get } + end end From 4291815a62fd84cfc4cf34445202390733213bdb Mon Sep 17 00:00:00 2001 From: Lewis Buckley Date: Tue, 21 Jun 2022 15:57:59 +0100 Subject: [PATCH 5/6] Pefer a thread_mattr_accessor over a thread local variable --- lib/kredis.rb | 1 + lib/kredis/types/proxy.rb | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/kredis.rb b/lib/kredis.rb index b65545b..03e8b0b 100644 --- a/lib/kredis.rb +++ b/lib/kredis.rb @@ -1,5 +1,6 @@ require "active_support" require "active_support/core_ext/module/attribute_accessors" +require "active_support/core_ext/module/attribute_accessors_per_thread" require "kredis/version" diff --git a/lib/kredis/types/proxy.rb b/lib/kredis/types/proxy.rb index 9a092b2..dbef792 100644 --- a/lib/kredis/types/proxy.rb +++ b/lib/kredis/types/proxy.rb @@ -4,6 +4,8 @@ class Kredis::Types::Proxy attr_accessor :redis, :key + thread_mattr_accessor :pipeline + def initialize(redis, key, **options) @redis, @key = redis, key options.each { |key, value| send("#{key}=", value) } @@ -11,17 +13,17 @@ def initialize(redis, key, **options) def multi(*args, **kwargs, &block) redis.multi(*args, **kwargs) do |pipeline| - Thread.current[:pipeline] = pipeline + self.pipeline = pipeline block.call ensure - Thread.current[:pipeline] = nil + self.pipeline = nil end end def method_missing(method, *args, **kwargs) Kredis.instrument :proxy, **log_message(method, *args, **kwargs) do failsafe do - (Thread.current[:pipeline] || redis).public_send method, key, *args, **kwargs + (pipeline || redis).public_send method, key, *args, **kwargs end end end From 516c14a486cb00f2da40c835d92716adda195e96 Mon Sep 17 00:00:00 2001 From: Lewis Buckley Date: Fri, 24 Jun 2022 12:43:14 +0100 Subject: [PATCH 6/6] Coalesce "current pipeline or redis" into the redis method itself Noting that Proxying subclasses that (mistakenly, perhaps) talk to redis directly will need to take care to check for pipeline presence as well. Also remove the redis attr reader from Proxying to prevent mistaken calling of redis without knowledge of the current MULTI state. --- lib/kredis/types/proxy.rb | 8 ++++++-- lib/kredis/types/proxying.rb | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/kredis/types/proxy.rb b/lib/kredis/types/proxy.rb index dbef792..b609a6f 100644 --- a/lib/kredis/types/proxy.rb +++ b/lib/kredis/types/proxy.rb @@ -2,7 +2,7 @@ class Kredis::Types::Proxy require_relative "proxy/failsafe" include Failsafe - attr_accessor :redis, :key + attr_accessor :key thread_mattr_accessor :pipeline @@ -23,12 +23,16 @@ def multi(*args, **kwargs, &block) def method_missing(method, *args, **kwargs) Kredis.instrument :proxy, **log_message(method, *args, **kwargs) do failsafe do - (pipeline || redis).public_send method, key, *args, **kwargs + redis.public_send method, key, *args, **kwargs end end end private + def redis + pipeline || @redis + end + def log_message(method, *args, **kwargs) args = args.flatten.reject(&:blank?).presence kwargs = kwargs.reject { |_k, v| v.blank? }.presence diff --git a/lib/kredis/types/proxying.rb b/lib/kredis/types/proxying.rb index a7c0e55..3607f0f 100644 --- a/lib/kredis/types/proxying.rb +++ b/lib/kredis/types/proxying.rb @@ -1,14 +1,14 @@ require "active_support/core_ext/module/delegation" class Kredis::Types::Proxying - attr_accessor :proxy, :redis, :key + attr_accessor :proxy, :key def self.proxying(*commands) delegate *commands, to: :proxy end def initialize(redis, key, **options) - @redis, @key = redis, key + @key = key @proxy = Kredis::Types::Proxy.new(redis, key) options.each { |key, value| send("#{key}=", value) } end