Skip to content

Commit

Permalink
Add query_blocker extension, for blocking queries inside a block
Browse files Browse the repository at this point in the history
This is useful for ensuring a given block of code does not execute
any queries.

For use in concurrent programs, the query_blocker takes a :scope option
for the scope of the block.
  • Loading branch information
jeremyevans committed Jan 20, 2025
1 parent 1fe32a5 commit f9b3a74
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
=== master

* Add query_blocker extension, for blocking queries inside a block (jeremyevans)

* Support alter_table add_primary_key/add_unique_constraint :using_index option on PostgreSQL 9.1+ (jeremyevans)

* Add temporary names for internally defined anonymous classes and modules on Ruby 3.3+ (jeremyevans)
Expand Down
126 changes: 126 additions & 0 deletions lib/sequel/extensions/query_blocker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# frozen-string-literal: true
#
# The query_blocker extension adds Database#block_queries.
# Inside the block passed to #block_queries, any attempts to
# execute a query/statement on the database will raise a
# Sequel::QueryBlocker::BlockedQuery exception.
#
# DB.extension :query_blocker
# DB.block_queries do
# ds = DB[:table] # No exception
# ds = ds.where(column: 1) # No exception
# ds.all # Attempts query, exception raised
# end
#
# To handle concurrency, you can pass a :scope option:
#
# # Current Thread
# DB.block_queries(scope: :thread){}
#
# # Current Fiber
# DB.block_queries(scope: :fiber){}
#
# # Specific Thread
# DB.block_queries(scope: Thread.current){}
#
# # Specific Fiber
# DB.block_queries(scope: Fiber.current){}
#
# Note that this should catch all queries executed through the
# Database instance. Whether it catches queries executed directly
# on a connection object depends on the adapter in use.
#
# Related module: Sequel::QueryBlocker

# :nocov:
require "fiber" if RUBY_VERSION <= "2.7"
# :nocov:

#
module Sequel
module QueryBlocker
# Exception class raised if there is an attempt to execute a
# query/statement on the database inside a block passed to
# block_queries.
class BlockedQuery < Sequel::Error
end

def self.extended(db)
db.instance_exec do
@blocked_query_scopes ||= {}
end
end

# Check whether queries are blocked before executing them.
def log_connection_yield(sql, conn, args=nil)
# All database adapters should be calling this method around
# query execution (otherwise the queries would not get logged),
# ensuring the blocking is checked. Any database adapter issuing
# a query without calling this method is considered buggy.
check_blocked_queries!
super
end

# Whether queries are currently blocked.
def block_queries?
b = @blocked_query_scopes
Sequel.synchronize{b[:global] || b[Thread.current] || b[Fiber.current]} || false
end

# Reject (raise an BlockedQuery exception) if there is an attempt to execute
# a query/statement inside the block.
#
# The :scope option indicates which queries are rejected inside the block:
#
# :global :: This is the default, and rejects all queries.
# :thread :: Reject all queries in the current thread.
# :fiber :: Reject all queries in the current fiber.
# Thread :: Reject all queries in the given thread.
# Fiber :: Reject all queries in the given fiber.
def block_queries(opts=OPTS)
case scope = opts[:scope]
when nil
scope = :global
when :global
# nothing
when :thread
scope = Thread.current
when :fiber
scope = Fiber.current
when Thread, Fiber
# nothing
else
raise Sequel::Error, "invalid scope given to block_queries: #{scope.inspect}"
end

prev_value = nil
scopes = @blocked_query_scopes

begin
Sequel.synchronize do
prev_value = scopes[scope]
scopes[scope] = true
end

yield
ensure
Sequel.synchronize do
if prev_value
scopes[scope] = prev_value
else
scopes.delete(scope)
end
end
end
end

private

# Raise a BlockQuery exception if queries are currently blocked.
def check_blocked_queries!
raise BlockedQuery, "cannot execute query inside a block_queries block" if block_queries?
end
end

Database.register_extension(:query_blocker, QueryBlocker)
end
2 changes: 1 addition & 1 deletion spec/adapters/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def DB.drop_table(*tables)

if ENV['SEQUEL_FREEZE_DATABASE']
raise "cannot freeze database when running specs for specific adapters" if adapter_test_type
DB.extension(:constraint_validations, :string_agg, :date_arithmetic)
DB.extension(:constraint_validations, :string_agg, :date_arithmetic, :query_blocker)
DB.extension(:pg_array) if DB.database_type == :postgres
DB.freeze
end
Expand Down
124 changes: 124 additions & 0 deletions spec/extensions/query_blocker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
require_relative "spec_helper"

describe "query_blocker extension" do
fiber_is_thread = RUBY_ENGINE == 'jruby' && Fiber.new{Thread.current}.resume != Thread.current

before do
@db = Sequel.mock(:extensions=>[:query_blocker])
@ds = @db[:items]
end

it "#block_queries should block queries globally inside the block when called without options" do
@ds.all.must_equal []
proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join}
@ds.all.must_equal []
end

it "#block_queries should block queries globally inside the block when called with scope: :global" do
@ds.all.must_equal []
proc{@db.block_queries(:scope=>:global){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:global){Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join}
@ds.all.must_equal []
end

it "#block_queries should block queries inside the current thread when called with scope: :thread" do
@ds.all.must_equal []
proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread
@ds.all.must_equal []
end

it "#block_queries should block queries inside the current fiber when called with scope: :fiber" do
@ds.all.must_equal []
proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal []
@ds.all.must_equal []
end

it "#block_queries should block queries inside the given thread when called with scope: Thread" do
@ds.all.must_equal []
proc{@db.block_queries(:scope=>Thread.current){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>Thread.current){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>Thread.current){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread
@ds.all.must_equal []
end

it "#block_queries should block queries inside the given fiber when called with scope: Fiber" do
@ds.all.must_equal []
proc{@db.block_queries(:scope=>Fiber.current){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>Fiber.current){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>Fiber.current){Fiber.new{@ds.all}.resume}.must_equal []
@ds.all.must_equal []
end

it "#block_queries should raise Error if called with unsupported :scope option" do
proc{@db.block_queries(:scope=>Object.new){}}.must_raise Sequel::Error
end

it "#block_queries should handle nested usage" do
@ds.all.must_equal []
Thread.new{@ds.all}.value.must_equal []
Fiber.new{@ds.all}.resume.must_equal []

@db.block_queries(scope: :fiber) do
proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal []

@db.block_queries(scope: :fiber) do
proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal []
end

@db.block_queries(scope: :thread) do
proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread

@db.block_queries(scope: :thread) do
proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread
end

@db.block_queries do
proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery
proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join}

@db.block_queries do
proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery
proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join}
end

proc{@ds.all}.must_raise Sequel::QueryBlocker::BlockedQuery
proc{@db.block_queries{@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries{Thread.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.join}
end

proc{@db.block_queries(:scope=>:thread){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:thread){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:thread){Fiber.new{assert_raises(Sequel::QueryBlocker::BlockedQuery){@ds.all}}.resume} unless fiber_is_thread
end

proc{@db.block_queries(:scope=>:fiber){@ds.all}}.must_raise Sequel::QueryBlocker::BlockedQuery
@db.block_queries(:scope=>:fiber){Thread.new{@ds.all}.value}.must_equal []
@db.block_queries(:scope=>:fiber){Fiber.new{@ds.all}.resume}.must_equal []
end

@ds.all.must_equal []
Thread.new{@ds.all}.value.must_equal []
Fiber.new{@ds.all}.resume.must_equal []
end

it "#block_queries? should check whether queries are currently blocked" do
@db.block_queries?.must_equal false
@db.block_queries{@db.block_queries?}.must_equal true
@db.block_queries?.must_equal false
end
end
34 changes: 34 additions & 0 deletions spec/integration/plugin_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3222,3 +3222,37 @@ def set(k, v, ttl) self[k] = v end
counts.must_equal [49]
end
end unless DB.database_type == :db2 && DB.offset_strategy == :emulate

describe "query_blocker extension" do
before(:all) do
@db = DB
@db.create_table!(:query_blocker_test) do
Integer :i
end
@db.extension :query_blocker
@ds = @db[:query_blocker_test]
end
before do
@ds.delete
end
after(:all) do
@db.drop_table?(:query_blocker_test)
end

types = {
"SELECT" => proc{@ds.all},
"INSERT" => proc{@ds.insert(1)},
"UPDATE" => proc{@ds.update(:i => 1)},
"DELETE" => proc{@ds.delete},
"TRUNCATE" => proc{@ds.truncate},
"bound variable" => proc{@ds.call(:select)},
"prepared statement" => proc{@ds.prepare(:select, :select_query_blocker_test).call},
"arbitrary SQL" => proc{@db.run(@ds.select(1).sql)},
}.each do |type, block|
it "should block #{type} queries" do
@db.block_queries do
proc{instance_exec(&block)}.must_raise Sequel::QueryBlocker::BlockedQuery
end
end
end
end
4 changes: 4 additions & 0 deletions www/pages/plugins.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,10 @@
<span class="ul__span">Uses timestamptz (timestamp with time zone) as the generic timestamp type used for Time and DateTime classes.</span>
</li>
<li class="ul__li ul__li--grid">
<a class="a" href="rdoc-plugins/files/lib/sequel/extensions/query_blocker_rb.html">query_blocker </a>
<span class="ul__span">Supports raising an exception if queries are executed inside a given block.</span>
</li>
<li class="ul__li ul__li--grid">
<a class="a" href="rdoc-plugins/files/lib/sequel/extensions/run_transaction_hooks_rb.html">run_transaction_hooks </a>
<span class="ul__span">Support running after_commit and after_rollback transaction hooks before transaction commit/rollback, designed for transactional testing.</span>
</li>
Expand Down

0 comments on commit f9b3a74

Please sign in to comment.