Skip to content

Commit

Permalink
Improve batch execute (#52)
Browse files Browse the repository at this point in the history
* Rename `#execute_multi` to `#batch_execute`

The method `#execute_multi` is still available as alias to `#batch_execute`. Eventually it will be deprecated.

* Add support for batch query execution with any enumerable

`#batch_execute` can now be called with any object that implements `#each` (i.e. Enumerable and Enumerator objects).

* Implement `#batch_execute` with block

* Update README

* Improve documentation
  • Loading branch information
noteflakes authored Dec 25, 2023
1 parent 230f785 commit f5a62a3
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 42 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ db.execute('insert into foo values (?)', Extralite::Blob.new('Hello, 世界!'))
db.execute('insert into foo values (?)', 'Hello, 世界!'.force_encoding(Encoding::ASCII_8BIT))

# insert multiple rows
db.execute_multi('insert into foo values (?)', ['bar', 'baz'])
db.execute_multi('insert into foo values (?, ?)', [[1, 2], [3, 4]])
db.batch_execute('insert into foo values (?)', ['bar', 'baz'])
db.batch_execute('insert into foo values (?, ?)', [[1, 2], [3, 4]])

# batch execute from enumerable
db.batch_execute('insert into foo values (?)', 1..10)

# batch execute from block
source = [[1, 2], [2, 3], [3, 4]]
db.batch_execute('insert into foo values (?, ?)') { source.shift }

# prepared queries
query = db.prepare('select ? as foo, ? as bar') #=> Extralite::Query
Expand Down
58 changes: 57 additions & 1 deletion ext/extralite/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ VALUE safe_query_single_value(query_ctx *ctx) {
return value;
}

VALUE safe_execute_multi(query_ctx *ctx) {
VALUE safe_batch_execute_array(query_ctx *ctx) {
int count = RARRAY_LEN(ctx->params);
int changes = 0;

Expand All @@ -450,6 +450,62 @@ VALUE safe_execute_multi(query_ctx *ctx) {
return INT2FIX(changes);
}

struct batch_execute_each_ctx {
query_ctx *ctx;
int changes;
};

static VALUE safe_batch_execute_each_iter(RB_BLOCK_CALL_FUNC_ARGLIST(yield_value, vctx)) {
struct batch_execute_each_ctx *each_ctx = (struct batch_execute_each_ctx*)vctx;

sqlite3_reset(each_ctx->ctx->stmt);
sqlite3_clear_bindings(each_ctx->ctx->stmt);
bind_all_parameters_from_object(each_ctx->ctx->stmt, yield_value);

while (stmt_iterate(each_ctx->ctx));
each_ctx->changes += sqlite3_changes(each_ctx->ctx->sqlite3_db);

return Qnil;
}

VALUE safe_batch_execute_each(query_ctx *ctx) {
struct batch_execute_each_ctx each_ctx = { ctx, 0 };
rb_block_call(ctx->params, ID_each, 0, 0, safe_batch_execute_each_iter, (VALUE)&each_ctx);
return INT2FIX(each_ctx.changes);
}

VALUE safe_batch_execute_proc(query_ctx *ctx) {
VALUE params = Qnil;
int changes = 0;
while (1) {
params = rb_funcall(ctx->params, ID_call, 0);
if (NIL_P(params)) break;

sqlite3_reset(ctx->stmt);
sqlite3_clear_bindings(ctx->stmt);
bind_all_parameters_from_object(ctx->stmt, params);

while (stmt_iterate(ctx));
changes += sqlite3_changes(ctx->sqlite3_db);
}

RB_GC_GUARD(params);
return INT2FIX(changes);
}

VALUE safe_batch_execute(query_ctx *ctx) {
if (TYPE(ctx->params) == T_ARRAY)
return safe_batch_execute_array(ctx);

if (rb_respond_to(ctx->params, ID_each))
return safe_batch_execute_each(ctx);

if (rb_respond_to(ctx->params, ID_call))
return safe_batch_execute_proc(ctx);

rb_raise(cParameterError, "Invalid parameter source supplied to #batch_execute");
}

VALUE safe_query_columns(query_ctx *ctx) {
return get_column_names(ctx->stmt, sqlite3_column_count(ctx->stmt));
}
Expand Down
68 changes: 44 additions & 24 deletions ext/extralite/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ VALUE eArgumentError;

ID ID_bind;
ID ID_call;
ID ID_each;
ID ID_keys;
ID ID_new;
ID ID_strip;
Expand Down Expand Up @@ -185,7 +186,6 @@ static inline VALUE Database_perform_query(int argc, VALUE *argv, VALUE self, VA
sql = rb_funcall(argv[0], ID_strip, 0);
if (RSTRING_LEN(sql) == 0) return Qnil;

// prepare query ctx
if (db->trace_block != Qnil) rb_funcall(db->trace_block, ID_call, 1, sql);
prepare_multi_stmt(db->sqlite3_db, &stmt, sql);
RB_GC_GUARD(sql);
Expand Down Expand Up @@ -346,46 +346,65 @@ VALUE Database_execute(int argc, VALUE *argv, VALUE self) {
return Database_perform_query(argc, argv, self, safe_query_changes);
}

/* call-seq: db.execute_multi(sql, params_array) -> changes
* db.execute_multi(sql) { ... } -> changes
*
* Executes the given query for each list of parameters in params_array. If a
* block is given, the block is called for each iteration, and its return value
* is used as parameters for the query. To stop iteration, the block should
* return nil.
/* call-seq:
* db.batch_execute(sql, params_array) -> changes
* db.batch_execute(sql, enumerable) -> changes
* db.batch_execute(sql, callable) -> changes
* db.batch_execute(sql) { ... } -> changes
*
* Executes the given query for each list of parameters in params_array. If an
* enumerable is given, it is iterated and each of its values is used as the
* parameters for running the query. If a callable is given, it is called
* repeatedly and each of its return values is used as the parameters, until nil
* is returned. If a block is given, the block is called for each iteration, and
* its return value is used as parameters for the query, until nil is returned.
*
* Returns the number of changes effected. This method is designed for inserting
* multiple records.
* multiple records or performing other mass operations.
*
* records = [
* [1, 2, 3],
* [4, 5, 6]
* ]
* db.execute_multi('insert into foo values (?, ?, ?)', records)
* db.batch_execute('insert into foo values (?, ?, ?)', records)
*
* records = [
* source = [
* [1, 2, 3],
* [4, 5, 6]
* ]
* db.execute_multi('insert into foo values (?, ?, ?)') do
* x = queue.pop
* y = queue.pop
* z = queue.pop
* [x, y, z]
* end
*
*/
VALUE Database_execute_multi(VALUE self, VALUE sql, VALUE params_array) {
* db.batch_execute('insert into foo values (?, ?, ?)') { records.shift }
*
* @param sql [String] query SQL
* @param parameters [Array<Array, Hash>, Enumerable, Enumerator, Callable] array of parameters to run query with
* @return [Integer] Total number of changes effected
*/
VALUE Database_batch_execute(int argc, VALUE *argv, VALUE self) {
Database_t *db = self_to_open_database(self);
sqlite3_stmt *stmt;
VALUE sql = Qnil;
VALUE parameters = Qnil;

if (argc == 0)
rb_raise(eArgumentError, "No parameter source given");

sql = argv[0];

if (argc == 1) {
if (!rb_block_given_p())
rb_raise(cParameterError, "No parameter source given");
parameters = rb_block_proc();
}
else if (argc == 2)
parameters = argv[1];
else
rb_raise(cParameterError, "Multiple parameter sources given");

if (RSTRING_LEN(sql) == 0) return Qnil;

// prepare query ctx
prepare_single_stmt(db->sqlite3_db, &stmt, sql);
query_ctx ctx = QUERY_CTX(self, db, stmt, params_array, QUERY_MULTI_ROW, ALL_ROWS);
query_ctx ctx = QUERY_CTX(self, db, stmt, parameters, QUERY_MULTI_ROW, ALL_ROWS);

return rb_ensure(SAFE(safe_execute_multi), (VALUE)&ctx, SAFE(cleanup_stmt), (VALUE)&ctx);
return rb_ensure(SAFE(safe_batch_execute), (VALUE)&ctx, SAFE(cleanup_stmt), (VALUE)&ctx);
}

/* call-seq:
Expand Down Expand Up @@ -833,7 +852,7 @@ void Init_ExtraliteDatabase(void) {
#endif

rb_define_method(cDatabase, "execute", Database_execute, -1);
rb_define_method(cDatabase, "execute_multi", Database_execute_multi, 2);
rb_define_method(cDatabase, "batch_execute", Database_batch_execute, -1);
rb_define_method(cDatabase, "filename", Database_filename, -1);
rb_define_method(cDatabase, "gvl_release_threshold", Database_gvl_release_threshold_get, 0);
rb_define_method(cDatabase, "gvl_release_threshold=", Database_gvl_release_threshold_set, 1);
Expand Down Expand Up @@ -871,6 +890,7 @@ void Init_ExtraliteDatabase(void) {

ID_bind = rb_intern("bind");
ID_call = rb_intern("call");
ID_each = rb_intern("each");
ID_keys = rb_intern("keys");
ID_new = rb_intern("new");
ID_strip = rb_intern("strip");
Expand Down
3 changes: 2 additions & 1 deletion ext/extralite/extralite.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern VALUE cInterruptError;
extern VALUE cParameterError;

extern ID ID_call;
extern ID ID_each;
extern ID ID_keys;
extern ID ID_new;
extern ID ID_strip;
Expand Down Expand Up @@ -100,7 +101,7 @@ enum gvl_mode {

extern rb_encoding *UTF8_ENCODING;

VALUE safe_execute_multi(query_ctx *ctx);
VALUE safe_batch_execute(query_ctx *ctx);
VALUE safe_query_ary(query_ctx *ctx);
VALUE safe_query_changes(query_ctx *ctx);
VALUE safe_query_columns(query_ctx *ctx);
Expand Down
35 changes: 29 additions & 6 deletions ext/extralite/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,13 @@ VALUE Query_execute_chevrons(VALUE self, VALUE params) {
return self;
}

/* Executes the query for each set of parameters in the given array. Parameters
/* call-seq:
* query.batch_execute(params_array) -> changes
* query.batch_execute(enumerable) -> changes
* query.batch_execute(callable) -> changes
* query.batch_execute { ... } -> changes
*
* Executes the query for each set of parameters in the given array. Parameters
* can be specified as either an array (for unnamed parameters) or a hash (for
* named parameters). Returns the number of changes effected. This method is
* designed for inserting multiple records.
Expand All @@ -412,18 +418,35 @@ VALUE Query_execute_chevrons(VALUE self, VALUE params) {
* [1, 2, 3],
* [4, 5, 6]
* ]
* query.execute_multi(records)
* query.batch_execute(records)
*
* source = [
* [1, 2, 3],
* [4, 5, 6]
* ]
* query.batch_execute { records.shift }
*
* @param parameters [Array<Array, Hash>] array of parameters to run query with
* @param parameters [Array<Array, Hash>, Enumerable, Enumerator, Callable] array of parameters to run query with
* @return [Integer] number of changes effected
*/
VALUE Query_execute_multi(VALUE self, VALUE parameters) {
VALUE Query_batch_execute(int argc, VALUE *argv, VALUE self) {
Query_t *query = self_to_query(self);
VALUE parameters = Qnil;
if (query->closed) rb_raise(cError, "Query is closed");

if (!query->stmt)
prepare_single_stmt(query->sqlite3_db, &query->stmt, query->sql);

if (argc == 0) {
if (!rb_block_given_p())
rb_raise(cParameterError, "No parameter source given");
parameters = rb_block_proc();
}
else if (argc == 1)
parameters = argv[0];
else
rb_raise(cParameterError, "Multiple parameter sources given");

query_ctx ctx = QUERY_CTX(
self,
query->db_struct,
Expand All @@ -432,7 +455,7 @@ VALUE Query_execute_multi(VALUE self, VALUE parameters) {
QUERY_MODE(QUERY_MULTI_ROW),
ALL_ROWS
);
return safe_execute_multi(&ctx);
return safe_batch_execute(&ctx);
}

/* Returns the database associated with the query.
Expand Down Expand Up @@ -570,7 +593,7 @@ void Init_ExtraliteQuery(void) {
rb_define_method(cQuery, "eof?", Query_eof_p, 0);
rb_define_method(cQuery, "execute", Query_execute, -1);
rb_define_method(cQuery, "<<", Query_execute_chevrons, 1);
rb_define_method(cQuery, "execute_multi", Query_execute_multi, 1);
rb_define_method(cQuery, "batch_execute", Query_batch_execute, -1);
rb_define_method(cQuery, "initialize", Query_initialize, 2);
rb_define_method(cQuery, "inspect", Query_inspect, 0);

Expand Down
6 changes: 6 additions & 0 deletions lib/extralite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class Database
AND name NOT LIKE 'sqlite_%';
SQL

alias_method :execute_multi, :batch_execute

# Returns the list of currently defined tables.
#
# @return [Array] list of tables
Expand Down Expand Up @@ -90,4 +92,8 @@ def pragma_get(key)
query("pragma #{key}")
end
end

class Query
alias_method :execute_multi, :batch_execute
end
end
Loading

0 comments on commit f5a62a3

Please sign in to comment.