Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable nonblocking mode while large object calls #498

Merged
merged 1 commit into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions ext/pg_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -3643,6 +3643,14 @@ pgconn_send_flush_request(VALUE self)
* LARGE OBJECT SUPPORT
**************************************************************************/

#define BLOCKING_BEGIN(conn) do { \
int old_nonblocking = PQisnonblocking(conn); \
PQsetnonblocking(conn, 0);

#define BLOCKING_END(th) \
PQsetnonblocking(conn, old_nonblocking); \
} while(0);

/*
* call-seq:
* conn.lo_creat( [mode] ) -> Integer
Expand All @@ -3663,7 +3671,10 @@ pgconn_locreat(int argc, VALUE *argv, VALUE self)
else
mode = NUM2INT(nmode);

lo_oid = lo_creat(conn, mode);
BLOCKING_BEGIN(conn)
lo_oid = lo_creat(conn, mode);
BLOCKING_END(conn)

if (lo_oid == 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_creat failed");

Expand Down Expand Up @@ -3708,7 +3719,10 @@ pgconn_loimport(VALUE self, VALUE filename)

Check_Type(filename, T_STRING);

lo_oid = lo_import(conn, StringValueCStr(filename));
BLOCKING_BEGIN(conn)
lo_oid = lo_import(conn, StringValueCStr(filename));
BLOCKING_END(conn)

if (lo_oid == 0) {
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));
}
Expand All @@ -3726,11 +3740,16 @@ pgconn_loexport(VALUE self, VALUE lo_oid, VALUE filename)
{
PGconn *conn = pg_get_pgconn(self);
Oid oid;
int ret;
Check_Type(filename, T_STRING);

oid = NUM2UINT(lo_oid);

if (lo_export(conn, oid, StringValueCStr(filename)) < 0) {
BLOCKING_BEGIN(conn)
ret = lo_export(conn, oid, StringValueCStr(filename));
BLOCKING_END(conn)

if (ret < 0) {
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));
}
return Qnil;
Expand Down Expand Up @@ -3761,7 +3780,11 @@ pgconn_loopen(int argc, VALUE *argv, VALUE self)
else
mode = NUM2INT(nmode);

if((fd = lo_open(conn, lo_oid, mode)) < 0) {
BLOCKING_BEGIN(conn)
fd = lo_open(conn, lo_oid, mode);
BLOCKING_END(conn)

if(fd < 0) {
pg_raise_conn_error( rb_ePGerror, self, "can't open large object: %s", PQerrorMessage(conn));
}
return INT2FIX(fd);
Expand All @@ -3786,8 +3809,12 @@ pgconn_lowrite(VALUE self, VALUE in_lo_desc, VALUE buffer)
if( RSTRING_LEN(buffer) < 0) {
pg_raise_conn_error( rb_ePGerror, self, "write buffer zero string");
}
if((n = lo_write(conn, fd, StringValuePtr(buffer),
RSTRING_LEN(buffer))) < 0) {
BLOCKING_BEGIN(conn)
n = lo_write(conn, fd, StringValuePtr(buffer),
RSTRING_LEN(buffer));
BLOCKING_END(conn)

if(n < 0) {
pg_raise_conn_error( rb_ePGerror, self, "lo_write failed: %s", PQerrorMessage(conn));
}

Expand Down Expand Up @@ -3815,7 +3842,12 @@ pgconn_loread(VALUE self, VALUE in_lo_desc, VALUE in_len)
pg_raise_conn_error( rb_ePGerror, self, "negative length %d given", len);

buffer = ALLOC_N(char, len);
if((ret = lo_read(conn, lo_desc, buffer, len)) < 0)

BLOCKING_BEGIN(conn)
ret = lo_read(conn, lo_desc, buffer, len);
BLOCKING_END(conn)

if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_read failed");

if(ret == 0) {
Expand Down Expand Up @@ -3845,7 +3877,11 @@ pgconn_lolseek(VALUE self, VALUE in_lo_desc, VALUE offset, VALUE whence)
int lo_desc = NUM2INT(in_lo_desc);
int ret;

if((ret = lo_lseek(conn, lo_desc, NUM2INT(offset), NUM2INT(whence))) < 0) {
BLOCKING_BEGIN(conn)
ret = lo_lseek(conn, lo_desc, NUM2INT(offset), NUM2INT(whence));
BLOCKING_END(conn)

if(ret < 0) {
pg_raise_conn_error( rb_ePGerror, self, "lo_lseek failed");
}

Expand All @@ -3865,7 +3901,11 @@ pgconn_lotell(VALUE self, VALUE in_lo_desc)
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);

if((position = lo_tell(conn, lo_desc)) < 0)
BLOCKING_BEGIN(conn)
position = lo_tell(conn, lo_desc);
BLOCKING_END(conn)

if(position < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_tell failed");

return INT2FIX(position);
Expand All @@ -3883,8 +3923,13 @@ pgconn_lotruncate(VALUE self, VALUE in_lo_desc, VALUE in_len)
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);
size_t len = NUM2INT(in_len);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_truncate(conn,lo_desc,len);
BLOCKING_END(conn)

if(lo_truncate(conn,lo_desc,len) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_truncate failed");

return Qnil;
Expand All @@ -3901,8 +3946,13 @@ pgconn_loclose(VALUE self, VALUE in_lo_desc)
{
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_close(conn,lo_desc);
BLOCKING_END(conn)

if(lo_close(conn,lo_desc) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_close failed");

return Qnil;
Expand All @@ -3919,8 +3969,13 @@ pgconn_lounlink(VALUE self, VALUE in_oid)
{
PGconn *conn = pg_get_pgconn(self);
Oid oid = NUM2UINT(in_oid);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_unlink(conn,oid);
BLOCKING_END(conn)

if(lo_unlink(conn,oid) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_unlink failed");

return Qnil;
Expand Down
49 changes: 41 additions & 8 deletions spec/pg/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,47 @@
end
end

it "not read past the end of a large object" do
@conn.transaction do
oid = @conn.lo_create( 0 )
fd = @conn.lo_open( oid, PG::INV_READ|PG::INV_WRITE )
@conn.lo_write( fd, "foobar" )
expect( @conn.lo_read( fd, 10 ) ).to be_nil()
@conn.lo_lseek( fd, 0, PG::SEEK_SET )
expect( @conn.lo_read( fd, 10 ) ).to eq( 'foobar' )
describe "large objects" do

it "not read past the end of a large object" do
@conn.transaction do
oid = @conn.lo_create( 0 )
fd = @conn.lo_open( oid, PG::INV_READ|PG::INV_WRITE )
expect( @conn.lo_write( fd, "foobar" ) ).to eq( 6 )
expect( @conn.lo_read( fd, 10 ) ).to be_nil()
expect( @conn.lo_lseek( fd, 0, PG::SEEK_SET ) ).to eq( 0 )
expect( @conn.lo_read( fd, 10 ) ).to eq( 'foobar' )
expect( @conn.lo_close( fd ) ).to be_nil
expect( @conn.lo_unlink( oid ) ).to be_nil
end
end

it "large object can handle big data", :unix_socket do
# Using lo_write with > 300000 bytes on a UnixSocket connection in nonblocking mode results in the following error:
# PG::UnableToSend: unexpected response from server; first received character was "V"
# This is because the lo_write call doesn't wait for the response of the server function, but sends the next command early, so that results overlap.
# Switching to blocking mode as part of lo_* calls fixes this issue and is tested here.

uri = "postgres://#{@unix_socket.gsub("/", "%2F")}:#{@port}/test"
conn = described_class.connect( uri )

bytes = Random.urandom(512000)
oid = conn.lo_creat
conn.transaction do
fd = conn.lo_open( oid, PG::INV_WRITE )
conn.lo_write( fd, bytes )
expect( conn.lo_close( fd ) ).to be_nil
end

conn.transaction do
fd = conn.lo_open( oid, PG::INV_READ )
bytes2 = conn.lo_read( fd, bytes.bytesize )
expect( bytes2 ).to eq( bytes )
expect( conn.lo_close( fd ) ).to be_nil
end
expect( conn.lo_unlink( oid ) ).to be_nil
ensure
conn&.finish
end
end

Expand Down