Skip to content

Commit

Permalink
Merge pull request #1728 from romank0/fetch-notifications-on-commit
Browse files Browse the repository at this point in the history
Adds notifies processing during commit
  • Loading branch information
dvarrazzo authored Oct 11, 2024
2 parents 5283a83 + f64dd39 commit 78561ac
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
1 change: 1 addition & 0 deletions NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ What's new in psycopg 2.9.10 (unreleased)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- Add support for Python 3.13.
- Receive notifications on commit (:ticket:`#1728`).
- Drop support for Python 3.7.
- `~psycopg2.errorcodes` map and `~psycopg2.errors` classes updated to
PostgreSQL 17.
Expand Down
10 changes: 10 additions & 0 deletions psycopg/connection_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,11 @@ conn_set_session(connectionObject *self, int autocommit,
}
}

Py_BLOCK_THREADS;
conn_notifies_process(self);
conn_notice_process(self);
Py_UNBLOCK_THREADS;

if (autocommit != SRV_STATE_UNCHANGED) {
self->autocommit = autocommit;
}
Expand Down Expand Up @@ -1408,6 +1413,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc)
goto endlock;
}

Py_BLOCK_THREADS;
conn_notifies_process(self);
conn_notice_process(self);
Py_UNBLOCK_THREADS;

endlock:
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
Expand Down
3 changes: 3 additions & 0 deletions psycopg/pqpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ pq_commit(connectionObject *conn)
}

Py_BLOCK_THREADS;
conn_notifies_process(conn);
conn_notice_process(conn);
Py_UNBLOCK_THREADS;

Expand Down Expand Up @@ -468,6 +469,7 @@ pq_abort(connectionObject *conn)
retvalue = pq_abort_locked(conn, &_save);

Py_BLOCK_THREADS;
conn_notifies_process(conn);
conn_notice_process(conn);
Py_UNBLOCK_THREADS;

Expand Down Expand Up @@ -538,6 +540,7 @@ pq_reset(connectionObject *conn)

Py_BLOCK_THREADS;
conn_notice_process(conn);
conn_notifies_process(conn);
Py_UNBLOCK_THREADS;

pthread_mutex_unlock(&conn->lock);
Expand Down
54 changes: 52 additions & 2 deletions tests/test_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

import os
import unittest
from collections import deque
from functools import partial

import psycopg2
from psycopg2 import extensions
from psycopg2.extensions import Notify
from .testutils import ConnectingTestCase, skip_if_crdb, slow
from .testutils import ConnectingTestCase, skip_if_crdb, skip_if_windows, slow
from .testconfig import dsn

import sys
Expand Down Expand Up @@ -74,7 +76,9 @@ def notify(self, name, sec=0, payload=None):
module=psycopg2.__name__,
dsn=dsn, sec=sec, name=name, payload=payload))

return Popen([sys.executable, '-c', script], stdout=PIPE)
env = os.environ.copy()
env.pop("PSYCOPG_DEBUG", None)
return Popen([sys.executable, '-c', script], stdout=PIPE, env=env)

@slow
def test_notifies_received_on_poll(self):
Expand Down Expand Up @@ -126,6 +130,52 @@ def test_notifies_received_on_execute(self):
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

def _test_notifies_received_on_operation(self, operation, execute_query=True):
self.listen('foo')
self.conn.commit()
if execute_query:
self.conn.cursor().execute('select 1;')
pid = int(self.notify('foo').communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
operation()
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])

@slow
@skip_if_windows
def test_notifies_received_on_commit(self):
self._test_notifies_received_on_operation(self.conn.commit)

@slow
@skip_if_windows
def test_notifies_received_on_rollback(self):
self._test_notifies_received_on_operation(self.conn.rollback)

@slow
@skip_if_windows
def test_notifies_received_on_reset(self):
self._test_notifies_received_on_operation(self.conn.reset, execute_query=False)

@slow
@skip_if_windows
def test_notifies_received_on_set_session(self):
self._test_notifies_received_on_operation(
partial(self.conn.set_session, autocommit=True, readonly=True),
execute_query=False,
)

@slow
@skip_if_windows
def test_notifies_received_on_set_client_encoding(self):
self._test_notifies_received_on_operation(
partial(
self.conn.set_client_encoding,
'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8'
),
execute_query=False,
)

@slow
def test_notify_object(self):
self.autocommit(self.conn)
Expand Down

0 comments on commit 78561ac

Please sign in to comment.