From b6c31c41fe56c65c5e4af3500091603df57bd60b Mon Sep 17 00:00:00 2001 From: "Jonathan H. Wage" Date: Tue, 6 Feb 2024 20:02:53 -0600 Subject: [PATCH] [Doctrine Messenger] Fix support for pgsql + pgbouncer. --- Tests/Fixtures/pgbouncer/pgbouncer.ini | 13 +++ Tests/Fixtures/pgbouncer/userlist.txt | 1 + Tests/Transport/ConnectionTest.php | 98 +++++++++++++++++++ ...rinePostgreSqlPgbouncerIntegrationTest.php | 89 +++++++++++++++++ ...ctrinePostgreSqlRegularIntegrationTest.php | 89 +++++++++++++++++ Transport/Connection.php | 65 +++++++++--- 6 files changed, 342 insertions(+), 13 deletions(-) create mode 100644 Tests/Fixtures/pgbouncer/pgbouncer.ini create mode 100644 Tests/Fixtures/pgbouncer/userlist.txt create mode 100644 Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php create mode 100644 Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php diff --git a/Tests/Fixtures/pgbouncer/pgbouncer.ini b/Tests/Fixtures/pgbouncer/pgbouncer.ini new file mode 100644 index 0000000..214fd21 --- /dev/null +++ b/Tests/Fixtures/pgbouncer/pgbouncer.ini @@ -0,0 +1,13 @@ +[databases] +postgres = host=localhost port=5432 user=postgres dbname=postgres pool_mode=transaction + +[pgbouncer] +logfile = /var/log/postgresql/pgbouncer.log +pidfile = /var/run/postgresql/pgbouncer.pid +listen_addr = localhost +listen_port = 6432 +unix_socket_dir = /var/run/postgresql +auth_type = md5 +auth_file = /etc/pgbouncer/userlist.txt +max_client_conn = 20 +default_pool_size = 20 diff --git a/Tests/Fixtures/pgbouncer/userlist.txt b/Tests/Fixtures/pgbouncer/userlist.txt new file mode 100644 index 0000000..c410978 --- /dev/null +++ b/Tests/Fixtures/pgbouncer/userlist.txt @@ -0,0 +1 @@ +"postgres" "md532e12f215ba27cb750c9e093ce4b5127" diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index a3262c1..02203af 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -119,6 +119,104 @@ public function testItThrowsATransportExceptionIfItCannotRejectMessage() $connection->reject('dummy_id'); } + public function testSend() + { + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + + $driverConnection->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('insert') + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('values') + ->with([ + 'body' => '?', + 'headers' => '?', + 'queue_name' => '?', + 'created_at' => '?', + 'available_at' => '?', + ]) + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('getSQL') + ->willReturn('INSERT'); + + $driverConnection->expects($this->once()) + ->method('beginTransaction'); + + $driverConnection->expects($this->once()) + ->method('executeStatement') + ->with('INSERT') + ->willReturn(1); + + $driverConnection->expects($this->once()) + ->method('lastInsertId') + ->willReturn('1'); + + $driverConnection->expects($this->once()) + ->method('commit'); + + $connection = new Connection([], $driverConnection); + $id = $connection->send('test', []); + + self::assertSame('1', $id); + } + + public function testSendLastInsertIdReturnsInteger() + { + $queryBuilder = $this->getQueryBuilderMock(); + $driverConnection = $this->getDBALConnectionMock(); + + $driverConnection->expects($this->once()) + ->method('createQueryBuilder') + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('insert') + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('values') + ->with([ + 'body' => '?', + 'headers' => '?', + 'queue_name' => '?', + 'created_at' => '?', + 'available_at' => '?', + ]) + ->willReturn($queryBuilder); + + $queryBuilder->expects($this->once()) + ->method('getSQL') + ->willReturn('INSERT'); + + $driverConnection->expects($this->once()) + ->method('beginTransaction'); + + $driverConnection->expects($this->once()) + ->method('executeStatement') + ->with('INSERT') + ->willReturn(1); + + $driverConnection->expects($this->once()) + ->method('lastInsertId') + ->willReturn(1); + + $driverConnection->expects($this->once()) + ->method('commit'); + + $connection = new Connection([], $driverConnection); + $id = $connection->send('test', []); + + self::assertSame('1', $id); + } + private function getDBALConnectionMock() { $driverConnection = $this->createMock(DBALConnection::class); diff --git a/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php b/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php new file mode 100644 index 0000000..9d1b8a9 --- /dev/null +++ b/Tests/Transport/DoctrinePostgreSqlPgbouncerIntegrationTest.php @@ -0,0 +1,89 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Configuration; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\AbstractSchemaManager; +use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory; +use Doctrine\DBAL\Tools\DsnParser; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; + +/** + * This tests using PostgreSqlConnection with PgBouncer between pgsql and the application. + * + * @requires extension pdo_pgsql + * + * @group integration + */ +class DoctrinePostgreSqlPgbouncerIntegrationTest extends TestCase +{ + private Connection $driverConnection; + private PostgreSqlConnection $connection; + + public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready() + { + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $this->connection->get(); + $this->assertSame('{"message": "Hi"}', $encoded['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($this->connection->get()); + } + + public function testSendAndGetWithAutoSetupEnabledAndSetupAlready() + { + $this->connection->setup(); + + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $this->connection->get(); + $this->assertSame('{"message": "Hi"}', $encoded['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($this->connection->get()); + } + + protected function setUp(): void + { + if (!$host = getenv('PGBOUNCER_HOST')) { + $this->markTestSkipped('Missing PGBOUNCER_HOST env variable'); + } + + $url = "pdo-pgsql://postgres:password@$host"; + $params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url]; + $config = new Configuration(); + if (class_exists(DefaultSchemaManagerFactory::class)) { + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); + } + + $this->driverConnection = DriverManager::getConnection($params, $config); + $this->connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $this->driverConnection); + } + + protected function tearDown(): void + { + $this->createSchemaManager()->dropTable('queue_table'); + $this->driverConnection->close(); + } + + private function createSchemaManager(): AbstractSchemaManager + { + return method_exists($this->driverConnection, 'createSchemaManager') + ? $this->driverConnection->createSchemaManager() + : $this->driverConnection->getSchemaManager(); + } +} diff --git a/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php b/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php new file mode 100644 index 0000000..c8abede --- /dev/null +++ b/Tests/Transport/DoctrinePostgreSqlRegularIntegrationTest.php @@ -0,0 +1,89 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; + +use Doctrine\DBAL\Configuration; +use Doctrine\DBAL\DriverManager; +use Doctrine\DBAL\Schema\AbstractSchemaManager; +use Doctrine\DBAL\Schema\DefaultSchemaManagerFactory; +use Doctrine\DBAL\Tools\DsnParser; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; + +/** + * This tests a using Doctrine PostgreSql connection without using PostgreSqlConnection + * that gets used when use_notify is enabled. + * + * @requires extension pdo_pgsql + * + * @group integration + */ +class DoctrinePostgreSqlRegularIntegrationTest extends TestCase +{ + private \Doctrine\DBAL\Connection $driverConnection; + private Connection $connection; + + public function testSendAndGetWithAutoSetupEnabledAndNotSetupAlready() + { + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $this->connection->get(); + $this->assertSame('{"message": "Hi"}', $encoded['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($this->connection->get()); + } + + public function testSendAndGetWithAutoSetupEnabledAndSetupAlready() + { + $this->connection->setup(); + + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + + $encoded = $this->connection->get(); + $this->assertSame('{"message": "Hi"}', $encoded['body']); + $this->assertSame(['type' => DummyMessage::class], $encoded['headers']); + + $this->assertNull($this->connection->get()); + } + + protected function setUp(): void + { + if (!$host = getenv('POSTGRES_HOST')) { + $this->markTestSkipped('Missing POSTGRES_HOST env variable'); + } + + $url = "pdo-pgsql://postgres:password@$host"; + $params = class_exists(DsnParser::class) ? (new DsnParser())->parse($url) : ['url' => $url]; + $config = new Configuration(); + if (class_exists(DefaultSchemaManagerFactory::class)) { + $config->setSchemaManagerFactory(new DefaultSchemaManagerFactory()); + } + + $this->driverConnection = DriverManager::getConnection($params, $config); + $this->connection = new Connection(['table_name' => 'queue_table'], $this->driverConnection); + } + + protected function tearDown(): void + { + $this->createSchemaManager()->dropTable('queue_table'); + $this->driverConnection->close(); + } + + private function createSchemaManager(): AbstractSchemaManager + { + return method_exists($this->driverConnection, 'createSchemaManager') + ? $this->driverConnection->createSchemaManager() + : $this->driverConnection->getSchemaManager(); + } +} diff --git a/Transport/Connection.php b/Transport/Connection.php index b23cea1..e3030d3 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -20,6 +20,7 @@ use Doctrine\DBAL\LockMode; use Doctrine\DBAL\Platforms\MySQLPlatform; use Doctrine\DBAL\Platforms\OraclePlatform; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\AbstractSchemaManager; @@ -139,7 +140,7 @@ public function send(string $body, array $headers, int $delay = 0): string 'available_at' => '?', ]); - $this->executeStatement($queryBuilder->getSQL(), [ + return $this->executeInsert($queryBuilder->getSQL(), [ $body, json_encode($headers), $this->configuration['queue_name'], @@ -152,8 +153,6 @@ public function send(string $body, array $headers, int $delay = 0): string Types::DATETIME_IMMUTABLE, Types::DATETIME_IMMUTABLE, ]); - - return $this->driverConnection->lastInsertId(); } public function get(): ?array @@ -399,14 +398,12 @@ private function executeQuery(string $sql, array $parameters = [], array $types try { $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types); } catch (TableNotFoundException $e) { - if ($this->driverConnection->isTransactionActive()) { + if (!$this->autoSetup || $this->driverConnection->isTransactionActive()) { throw $e; } - // create table - if ($this->autoSetup) { - $this->setup(); - } + $this->setup(); + $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types); } @@ -418,20 +415,62 @@ protected function executeStatement(string $sql, array $parameters = [], array $ try { $stmt = $this->driverConnection->executeStatement($sql, $parameters, $types); } catch (TableNotFoundException $e) { - if ($this->driverConnection->isTransactionActive()) { + if (!$this->autoSetup || $this->driverConnection->isTransactionActive()) { throw $e; } - // create table - if ($this->autoSetup) { - $this->setup(); - } + $this->setup(); + $stmt = $this->driverConnection->executeStatement($sql, $parameters, $types); } return $stmt; } + private function executeInsert(string $sql, array $parameters = [], array $types = []): string + { + // Use PostgreSQL RETURNING clause instead of lastInsertId() to get the + // inserted id in one operation instead of two. + if ($this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) { + $sql .= ' RETURNING id'; + } + + insert: + $this->driverConnection->beginTransaction(); + + try { + if ($this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) { + $first = $this->driverConnection->fetchFirstColumn($sql, $parameters, $types); + + $id = $first[0] ?? null; + + if (!$id) { + throw new TransportException('no id was returned by PostgreSQL from RETURNING clause.'); + } + } else { + $this->driverConnection->executeStatement($sql, $parameters, $types); + + if (!$id = $this->driverConnection->lastInsertId()) { + throw new TransportException('lastInsertId() returned false, no id was returned.'); + } + } + + $this->driverConnection->commit(); + } catch (\Throwable $e) { + $this->driverConnection->rollBack(); + + // handle setup after transaction is no longer open + if ($this->autoSetup && $e instanceof TableNotFoundException) { + $this->setup(); + goto insert; + } + + throw $e; + } + + return $id; + } + private function getSchema(): Schema { $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());