Skip to content

Commit

Permalink
Disable nonblocking mode while large object calls
Browse files Browse the repository at this point in the history
libpq's "lo_*" calls fail when a bigger amount of data is transferred and the connection is set to nonblocking mode.
According to some discussion here
  https://www.postgresql.org/message-id/20130606.082048.998330612674120762.t-ishii%40sraoss.co.jp
the large object interface isn't intended to be used with nonblocking mode.

It works to some extend with small data (< 8000 Bytes), but fails after lo_write of bigger data at the next exec call on the same connection like so:
```
message type 0x56 arrived from server while idle
message type 0x5a arrived from server while idle
message type 0x43 arrived from server while idle
message type 0x5a arrived from server while idle
PG::UnableToSend: unexpected response from server; first received character was "V"
```

I think the best solution is to disable nonblocking mode for the time of the lo_* call.
Calling PQisnonblocking and PQsetnonblocking is very low overhead, since these functions do a few memory operations only.

Fixes #460
  • Loading branch information
larskanis committed Feb 23, 2023
1 parent 65a8f6f commit dba5322
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 20 deletions.
79 changes: 67 additions & 12 deletions ext/pg_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -3643,6 +3643,14 @@ pgconn_send_flush_request(VALUE self)
* LARGE OBJECT SUPPORT
**************************************************************************/

#define BLOCKING_BEGIN(conn) do { \
int old_nonblocking = PQisnonblocking(conn); \
PQsetnonblocking(conn, 0);

#define BLOCKING_END(th) \
PQsetnonblocking(conn, old_nonblocking); \
} while(0);

/*
* call-seq:
* conn.lo_creat( [mode] ) -> Integer
Expand All @@ -3663,7 +3671,10 @@ pgconn_locreat(int argc, VALUE *argv, VALUE self)
else
mode = NUM2INT(nmode);

lo_oid = lo_creat(conn, mode);
BLOCKING_BEGIN(conn)
lo_oid = lo_creat(conn, mode);
BLOCKING_END(conn)

if (lo_oid == 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_creat failed");

Expand Down Expand Up @@ -3708,7 +3719,10 @@ pgconn_loimport(VALUE self, VALUE filename)

Check_Type(filename, T_STRING);

lo_oid = lo_import(conn, StringValueCStr(filename));
BLOCKING_BEGIN(conn)
lo_oid = lo_import(conn, StringValueCStr(filename));
BLOCKING_END(conn)

if (lo_oid == 0) {
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));
}
Expand All @@ -3726,11 +3740,16 @@ pgconn_loexport(VALUE self, VALUE lo_oid, VALUE filename)
{
PGconn *conn = pg_get_pgconn(self);
Oid oid;
int ret;
Check_Type(filename, T_STRING);

oid = NUM2UINT(lo_oid);

if (lo_export(conn, oid, StringValueCStr(filename)) < 0) {
BLOCKING_BEGIN(conn)
ret = lo_export(conn, oid, StringValueCStr(filename));
BLOCKING_END(conn)

if (ret < 0) {
pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn));
}
return Qnil;
Expand Down Expand Up @@ -3761,7 +3780,11 @@ pgconn_loopen(int argc, VALUE *argv, VALUE self)
else
mode = NUM2INT(nmode);

if((fd = lo_open(conn, lo_oid, mode)) < 0) {
BLOCKING_BEGIN(conn)
fd = lo_open(conn, lo_oid, mode);
BLOCKING_END(conn)

if(fd < 0) {
pg_raise_conn_error( rb_ePGerror, self, "can't open large object: %s", PQerrorMessage(conn));
}
return INT2FIX(fd);
Expand All @@ -3786,8 +3809,12 @@ pgconn_lowrite(VALUE self, VALUE in_lo_desc, VALUE buffer)
if( RSTRING_LEN(buffer) < 0) {
pg_raise_conn_error( rb_ePGerror, self, "write buffer zero string");
}
if((n = lo_write(conn, fd, StringValuePtr(buffer),
RSTRING_LEN(buffer))) < 0) {
BLOCKING_BEGIN(conn)
n = lo_write(conn, fd, StringValuePtr(buffer),
RSTRING_LEN(buffer));
BLOCKING_END(conn)

if(n < 0) {
pg_raise_conn_error( rb_ePGerror, self, "lo_write failed: %s", PQerrorMessage(conn));
}

Expand Down Expand Up @@ -3815,7 +3842,12 @@ pgconn_loread(VALUE self, VALUE in_lo_desc, VALUE in_len)
pg_raise_conn_error( rb_ePGerror, self, "negative length %d given", len);

buffer = ALLOC_N(char, len);
if((ret = lo_read(conn, lo_desc, buffer, len)) < 0)

BLOCKING_BEGIN(conn)
ret = lo_read(conn, lo_desc, buffer, len);
BLOCKING_END(conn)

if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_read failed");

if(ret == 0) {
Expand Down Expand Up @@ -3845,7 +3877,11 @@ pgconn_lolseek(VALUE self, VALUE in_lo_desc, VALUE offset, VALUE whence)
int lo_desc = NUM2INT(in_lo_desc);
int ret;

if((ret = lo_lseek(conn, lo_desc, NUM2INT(offset), NUM2INT(whence))) < 0) {
BLOCKING_BEGIN(conn)
ret = lo_lseek(conn, lo_desc, NUM2INT(offset), NUM2INT(whence));
BLOCKING_END(conn)

if(ret < 0) {
pg_raise_conn_error( rb_ePGerror, self, "lo_lseek failed");
}

Expand All @@ -3865,7 +3901,11 @@ pgconn_lotell(VALUE self, VALUE in_lo_desc)
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);

if((position = lo_tell(conn, lo_desc)) < 0)
BLOCKING_BEGIN(conn)
position = lo_tell(conn, lo_desc);
BLOCKING_END(conn)

if(position < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_tell failed");

return INT2FIX(position);
Expand All @@ -3883,8 +3923,13 @@ pgconn_lotruncate(VALUE self, VALUE in_lo_desc, VALUE in_len)
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);
size_t len = NUM2INT(in_len);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_truncate(conn,lo_desc,len);
BLOCKING_END(conn)

if(lo_truncate(conn,lo_desc,len) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_truncate failed");

return Qnil;
Expand All @@ -3901,8 +3946,13 @@ pgconn_loclose(VALUE self, VALUE in_lo_desc)
{
PGconn *conn = pg_get_pgconn(self);
int lo_desc = NUM2INT(in_lo_desc);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_close(conn,lo_desc);
BLOCKING_END(conn)

if(lo_close(conn,lo_desc) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_close failed");

return Qnil;
Expand All @@ -3919,8 +3969,13 @@ pgconn_lounlink(VALUE self, VALUE in_oid)
{
PGconn *conn = pg_get_pgconn(self);
Oid oid = NUM2UINT(in_oid);
int ret;

BLOCKING_BEGIN(conn)
ret = lo_unlink(conn,oid);
BLOCKING_END(conn)

if(lo_unlink(conn,oid) < 0)
if(ret < 0)
pg_raise_conn_error( rb_ePGerror, self, "lo_unlink failed");

return Qnil;
Expand Down
49 changes: 41 additions & 8 deletions spec/pg/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,47 @@
end
end

it "not read past the end of a large object" do
@conn.transaction do
oid = @conn.lo_create( 0 )
fd = @conn.lo_open( oid, PG::INV_READ|PG::INV_WRITE )
@conn.lo_write( fd, "foobar" )
expect( @conn.lo_read( fd, 10 ) ).to be_nil()
@conn.lo_lseek( fd, 0, PG::SEEK_SET )
expect( @conn.lo_read( fd, 10 ) ).to eq( 'foobar' )
describe "large objects" do

it "not read past the end of a large object" do
@conn.transaction do
oid = @conn.lo_create( 0 )
fd = @conn.lo_open( oid, PG::INV_READ|PG::INV_WRITE )
expect( @conn.lo_write( fd, "foobar" ) ).to eq( 6 )
expect( @conn.lo_read( fd, 10 ) ).to be_nil()
expect( @conn.lo_lseek( fd, 0, PG::SEEK_SET ) ).to eq( 0 )
expect( @conn.lo_read( fd, 10 ) ).to eq( 'foobar' )
expect( @conn.lo_close( fd ) ).to be_nil
expect( @conn.lo_unlink( oid ) ).to be_nil
end
end

it "large object can handle big data", :unix_socket do
# Using lo_write with > 300000 bytes on a UnixSocket connection in nonblocking mode results in the following error:
# PG::UnableToSend: unexpected response from server; first received character was "V"
# This is because the lo_write call doesn't wait for the response of the server function, but sends the next command early, so that results overlap.
# Switching to blocking mode as part of lo_* calls fixes this issue and is tested here.

uri = "postgres://#{@unix_socket.gsub("/", "%2F")}:#{@port}/test"
conn = described_class.connect( uri )

bytes = Random.urandom(512000)
oid = conn.lo_creat
conn.transaction do
fd = conn.lo_open( oid, PG::INV_WRITE )
conn.lo_write( fd, bytes )
expect( conn.lo_close( fd ) ).to be_nil
end

conn.transaction do
fd = conn.lo_open( oid, PG::INV_READ )
bytes2 = conn.lo_read( fd, bytes.bytesize )
expect( bytes2 ).to eq( bytes )
expect( conn.lo_close( fd ) ).to be_nil
end
expect( conn.lo_unlink( oid ) ).to be_nil
ensure
conn&.finish
end
end

Expand Down

0 comments on commit dba5322

Please sign in to comment.