Skip to content

Commit

Permalink
Fix and improve PG::Result#stream_each_tuple
Browse files Browse the repository at this point in the history
This fixes a double free of pgresult which happend when the block of stream_each_tuple raised an exception.
That is prevented now by setting pgresult to NULL before the rb_yield call.

This commit also fixes typemap use in combination with stream_each_tuple.
Before the typemap of the connection was used in PG::Tuple instead of the typemap of the base result.

While being at it, I also improved the performance of stream_each_tuple.
The per-tuple PG::Result is now copied per memcpy instead of being initialized based on the connection.
It also shares field names between all PG::Tuple objects now, instead of repeated allocation of the same values.
  • Loading branch information
larskanis committed Nov 16, 2019
1 parent 435990e commit 6a450fa
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 17 deletions.
64 changes: 47 additions & 17 deletions ext/pg_result.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ pgresult_clear( t_pg_result *this )
{
if( this->pgresult && !this->autoclear ){
PQclear(this->pgresult);
this->pgresult = NULL;
#ifdef HAVE_RB_GC_ADJUST_MEMORY_USAGE
rb_gc_adjust_memory_usage(-this->result_size);
#endif
this->result_size = 0;
}
this->result_size = 0;
this->nfields = -1;
this->pgresult = NULL;
}

static void
Expand Down Expand Up @@ -231,6 +231,20 @@ pg_new_result(PGresult *result, VALUE rb_pgconn)
return self;
}

static VALUE
pg_copy_result(t_pg_result *this)
{
int nfields = this->nfields == -1 ? (this->pgresult ? PQnfields(this->pgresult) : 0) : this->nfields;
size_t len = sizeof(*this) + sizeof(*this->fnames) * nfields;
t_pg_result *copy;

copy = (t_pg_result *)xmalloc(len);
memcpy(copy, this, len);
this->result_size = 0;

return TypedData_Wrap_Struct(rb_cPGresult, &pgresult_type, copy);
}

VALUE
pg_new_result_autoclear(PGresult *result, VALUE rb_pgconn)
{
Expand Down Expand Up @@ -1145,6 +1159,25 @@ pgresult_tuple_values(VALUE self, VALUE index)
}
}

static void ensure_init_for_tuple(VALUE self)
{
t_pg_result *this = pgresult_get_this_safe(self);

if( this->field_map == Qnil ){
int i;
VALUE field_map = rb_hash_new();

if( this->nfields == -1 )
pgresult_init_fnames( self );

for( i = 0; i < this->nfields; i++ ){
rb_hash_aset(field_map, this->fnames[i], INT2FIX(i));
}
rb_obj_freeze(field_map);
this->field_map = field_map;
}
}

/*
* call-seq:
* res.tuple( n ) -> PG::Tuple
Expand All @@ -1165,19 +1198,7 @@ pgresult_tuple(VALUE self, VALUE index)
if ( tuple_num < 0 || tuple_num >= num_tuples )
rb_raise( rb_eIndexError, "Index %d is out of range", tuple_num );

if( this->field_map == Qnil ){
int i;
VALUE field_map = rb_hash_new();

if( this->nfields == -1 )
pgresult_init_fnames( self );

for( i = 0; i < this->nfields; i++ ){
rb_hash_aset(field_map, this->fnames[i], INT2FIX(i));
}
rb_obj_freeze(field_map);
this->field_map = field_map;
}
ensure_init_for_tuple(self);

return pg_tuple_new(self, tuple_num);
}
Expand Down Expand Up @@ -1308,11 +1329,17 @@ yield_tuple(VALUE self, int ntuples, int nfields)
{
int tuple_num;
t_pg_result *this = pgresult_get_this(self);
VALUE result = pg_new_result(this->pgresult, this->connection);
VALUE copy;
UNUSED(nfields);

/* make a copy of the base result, that is bound to the PG::Tuple */
copy = pg_copy_result(this);
/* The copy is now owner of the PGresult and is responsible to PQclear it.
* We clear the pgresult here, so that it's not double freed on error within yield. */
this->pgresult = NULL;

for(tuple_num = 0; tuple_num < ntuples; tuple_num++) {
VALUE tuple = pgresult_tuple(result, INT2FIX(tuple_num));
VALUE tuple = pgresult_tuple(copy, INT2FIX(tuple_num));
rb_yield( tuple );
}
}
Expand Down Expand Up @@ -1433,6 +1460,9 @@ pgresult_stream_each_row(VALUE self)
static VALUE
pgresult_stream_each_tuple(VALUE self)
{
/* allocate VALUEs that are shared between all streamed tuples */
ensure_init_for_tuple(self);

return pgresult_stream_any(self, yield_tuple);
}

Expand Down
43 changes: 43 additions & 0 deletions spec/pg/result_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@
expect( @conn.get_result ).to be_nil
end

it "keeps last result on error while iterating stream_each" do
@conn.send_query( "SELECT generate_series(2,4) AS a" )
@conn.set_single_row_mode
res = @conn.get_result
expect do
res.stream_each_row do
raise ZeroDivisionError
end
end.to raise_error(ZeroDivisionError)
expect( res.values ).to eq([["2"]])
end

it "can iterate over all rows as Array" do
@conn.send_query( "SELECT generate_series(2,4) AS a; SELECT 1 AS b, generate_series(5,6) AS c" )
@conn.set_single_row_mode
Expand All @@ -71,6 +83,18 @@
expect( @conn.get_result ).to be_nil
end

it "keeps last result on error while iterating stream_each_row" do
@conn.send_query( "SELECT generate_series(2,4) AS a" )
@conn.set_single_row_mode
res = @conn.get_result
expect do
res.stream_each_row do
raise ZeroDivisionError
end
end.to raise_error(ZeroDivisionError)
expect( res.values ).to eq([["2"]])
end

it "can iterate over all rows as PG::Tuple" do
@conn.send_query( "SELECT generate_series(2,4) AS a; SELECT 1 AS b, generate_series(5,6) AS c" )
@conn.set_single_row_mode
Expand All @@ -87,6 +111,25 @@
expect( @conn.get_result ).to be_nil
end

it "clears result on error while iterating stream_each_tuple" do
@conn.send_query( "SELECT generate_series(2,4) AS a" )
@conn.set_single_row_mode
res = @conn.get_result
expect do
res.stream_each_tuple do
raise ZeroDivisionError
end
end.to raise_error(ZeroDivisionError)
expect( res.cleared? ).to eq(true)
end

it "should reuse field names in stream_each_tuple" do
@conn.send_query( "SELECT generate_series(2,3) AS a" )
@conn.set_single_row_mode
tuple1, tuple2 = *@conn.get_result.stream_each_tuple.to_a
expect( tuple1.keys[0].object_id ).to eq(tuple2.keys[0].object_id)
end

it "complains when not in single row mode" do
@conn.send_query( "SELECT generate_series(2,4)" )
expect{
Expand Down

0 comments on commit 6a450fa

Please sign in to comment.