From c7310c1f23755552252f7ccf6307c148cc903aa0 Mon Sep 17 00:00:00 2001 From: Hector Cuellar Date: Tue, 19 Feb 2019 10:29:52 -0800 Subject: [PATCH] #665: [YSQL] Enable support for sequences MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: This diff adds support for: - CREATE SEQUENCE - DROP SEQUENCE - nextval - currval - lastval - Create a table with columns of serial types Our implementation of sequences uses one replicated user level table to store all sequences data. This table has four columns: db_oid, rel_oid, last_val, and is_called. The first two columns uniquely identify a sequence. last_val and is_called is the data necessary to determine what is the next value in the sequence. Our implementation differs from Postgres implementation by using one row in a special table as opposed to using a one-row table to store the same data. Initially, when a sequence is created, an RPC to insert a new row is sent to the tserver that is the leader of the tablet which will store the new row. Initially last_val is set to the start value (default 1), and is_called is set to false. is_called is false when last_val hasn't been used. In other words, if nextval() reads the sequence data, it returns last_val if is_called is false. It returns last_val + increment otherwise. Postgres implementation of nextval locks the sequence table, reads the data, checks whether incrementing (possibly by a negative value) would violate any constrains, writes the new values for last_val and is_called, and then it unlocks the table. In YugaByte's implementation we cannot lock the data table because this table is shared among all the sequences. Instead, a nextval() call sends a read RPC to read the current values `last_val` and `is_called`, constraints are then evaluated for these values, and if no errors occurred, we do a conditional update (update the row only if the values haven't changed since we last read them). If the conditional update fails, we retry the whole operation again: read data, check constraints, update data. Because our implementation uses two RPCs each time we increment `last_val` or change `is_called`, the performance of a default sequence (with `CACHE` set to 1) will be much lower than a similar sequence in Postgres. To minimize this, the user should use a cache sufficiently large to avoid issuing two RPCs for each sequence value requested through nextval(). The disadvantage of this approach, is that once a block of cache numbers has been generated, any unused numbers from the cache will be lost forever. Pending: - Support for CYCLE option - ALTER SEQUENCE - setval Test Plan: Manual for now. Tests coming soon: ``` postgres=# create sequence s1 increment 3 start 100 cache 1000 ; CREATE SEQUENCE postgres=# select nextval('s1'); nextval --------- 100 (1 row) postgres=# select nextval('s1'); nextval --------- 103 (1 row) postgres=# ^D\q dog.local:~/code/yugabyte [postgres_sequence ↓·2↑·1|✚ 5⚑ 7] 14:22 $ ./bin/yb-ctl destroy; ./bin/yb-ctl create --enable_postgres; ./bin/yb-ctl status; ./bin/yb-ctl setup_pg_sequences_table; ./bin/psql -p 5433 -U postgres -h localhost^C dog.local:~/code/yugabyte [postgres_sequence ↓·2↑·1|✚ 5⚑ 7] 14:22 $ ./bin/psql -p 5433 -U postgres -h localhost psql (10.4) Type "help" for help. postgres=# select nextval('s1'); nextval --------- 3100 (1 row) postgres=# ^D\q dog.local:~/code/yugabyte [postgres_sequence ↓·2↑·1|✚ 5⚑ 7] 14:23 $ ./bin/psql -p 5433 -U postgres -h localhost psql (10.4) Type "help" for help. postgres=# select nextval('s1'); nextval --------- 6100 (1 row) ``` ``` psql (10.4) Type "help" for help. postgres=# create table t(k serial primary key, v int); insert CREATE TABLE postgres=# insert into t(v) values (100); INSERT 0 1 postgres=# insert into t(v) values (101); INSERT 0 1 postgres=# insert into t(v) values (102); INSERT 0 1 postgres=# select * from t; k | v ---+----- 1 | 100 2 | 101 3 | 102 (3 rows) postgres=# \d t; Table "public.t" Column | Type | Collation | Nullable | Default --------+---------+-----------+----------+------------------------------ k | integer | | not null | nextval('t_k_seq'::regclass) v | integer | | | postgres=# select nextval('t_k_seq'); nextval --------- 4 (1 row) postgres=# select currval('t_k_seq'); currval --------- 4 (1 row) postgres=# select lastval(); lastval --------- 4 (1 row) ``` ``` postgres=# SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'; relname --------- (0 rows) postgres=# create sequence s1; CREATE SEQUENCE postgres=# create table t4(k serial, v int); CREATE TABLE postgres=# SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'; relname ---------- s1 t4_k_seq (2 rows) postgres=# drop sequence s1; DROP SEQUENCE postgres=# drop table t4; DROP TABLE postgres=# SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'; relname ---------- t4_k_seq (1 row) postgres=# drop sequence t4_k_seq; DROP SEQUENCE postgres=# SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'; relname --------- (0 rows) ``` Reviewers: neil, karthik, mihnea, robert Reviewed By: robert Subscribers: kannan, bogdan, neha, yql Differential Revision: https://phabricator.dev.yugabyte.com/D6128 --- .../java/org/yb/pgsql/TestPgSequences.java | 615 ++++++++++++++++++ src/postgres/src/backend/catalog/heap.c | 6 +- src/postgres/src/backend/commands/sequence.c | 142 +++- src/postgres/src/backend/commands/ybccmds.c | 2 +- src/postgres/src/backend/parser/gram.y | 9 +- .../src/backend/utils/misc/pg_yb_utils.c | 3 +- .../regress/expected/yb_feature_types.out | 3 - src/yb/client/client.cc | 48 +- src/yb/client/client.h | 19 +- src/yb/common/pgsql_protocol.proto | 16 +- src/yb/common/ql_expr.cc | 195 +++++- src/yb/common/ql_expr.h | 8 + src/yb/docdb/pgsql_operation.cc | 81 ++- src/yb/docdb/pgsql_operation.h | 2 +- src/yb/tserver/tablet_service.cc | 3 +- src/yb/yql/pggate/pg_session.cc | 222 +++++++ src/yb/yql/pggate/pg_session.h | 24 + src/yb/yql/pggate/pggate.cc | 39 ++ src/yb/yql/pggate/pggate.h | 26 + src/yb/yql/pggate/ybc_pggate.cc | 32 + src/yb/yql/pggate/ybc_pggate.h | 23 + 21 files changed, 1434 insertions(+), 84 deletions(-) create mode 100644 java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java new file mode 100644 index 000000000000..14c8b41af531 --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgSequences.java @@ -0,0 +1,615 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yb.pgsql.BasePgSQLTest; +import org.yb.util.YBTestRunnerNonTsanOnly; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.yb.AssertionWrappers.*; + +@RunWith(value=YBTestRunnerNonTsanOnly.class) +public class TestPgSequences extends BasePgSQLTest { + private static final Logger LOG = LoggerFactory.getLogger(TestPgSequences.class); + + @After + public void deleteSequences() throws Exception { + if (connection == null) { + LOG.warn("No connection created, skipping dropping sequences"); + return; + } + try (Statement statement = connection.createStatement()) { + statement.execute("DROP SEQUENCE s1 CASCADE"); + statement.execute("DROP SEQUENCE s2 CASCADE"); + } catch (Exception e) { + // Ignore it. + } + } + + @Test + public void testSequencesSimple() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt("nextval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt("nextval")); + } + } + + @Test + public void testCreateIfNotExistsSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt("nextval")); + + statement.execute("CREATE SEQUENCE IF NOT EXISTS s1"); + rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt("nextval")); + + statement.execute("CREATE SEQUENCE IF NOT EXISTS s2 START 100"); + rs = statement.executeQuery("SELECT nextval('s2')"); + assertTrue(rs.next()); + assertEquals(100, rs.getInt("nextval")); + } + } + + @Test + public void testSequencesWithCache() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 CACHE 100"); + // Use only half of the cached values. + for (int i = 1; i <= 50; i++) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(i, rs.getInt("nextval")); + } + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + // Because values are allocated in blocks of 100 numbers, the next value should be 101. + assertEquals(101, rs.getInt("nextval")); + } + } + + @Test + public void testSequencesWithCacheAndIncrement() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 CACHE 50 INCREMENT 3"); + for (int i = 1; i <= 21; i+=3) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(i, rs.getInt("nextval")); + } + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + // The previous session should have allocated 50 values: 1, 4, 7, 10 ... 145, 148. So the next + // value should be 151. + assertEquals(151, rs.getInt("nextval")); + } + } + + @Test + public void testSequencesWithMaxValue() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 MAXVALUE 5"); + for (int i = 1; i <= 5; i++) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(i, rs.getInt("nextval")); + } + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (5)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testSequenceWithMinValue() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 MINVALUE 5"); + ResultSet rs = statement.executeQuery("SELECT NEXTVAL('s1')"); + assertTrue(rs.next()); + assertEquals(5, rs.getInt("nextval")); + } + } + + @Test + public void testCreateInvalidSequenceWithMinValueAndNegativeIncrement() throws Exception { + try (Statement statement = connection.createStatement()) { + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("MINVALUE (10) must be less than MAXVALUE (-1)"); + statement.execute("CREATE SEQUENCE s1 MINVALUE 10 INCREMENT -1"); + } + } + + @Test + public void testSequenceWithMinValueAndMaxValueAndNegativeIncrement() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 MINVALUE 100 MAXVALUE 105 INCREMENT -1"); + for (int i = 105; i >= 100; i--) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(i, rs.getInt("nextval")); + } + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached minimum value of sequence \"s1\" (100)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testSequenceWithMaxValueAndCache() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 MAXVALUE 5 CACHE 10"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt("nextval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Since the previous client already got all the available sequence numbers in its cache, + // we should get an error when we request another sequence number from another client. + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (5)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + //------------------------------------------------------------------------------------------------ + // Drop tests. + //------------------------------------------------------------------------------------------------ + @Test + public void testDropSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + statement.execute("DROP SEQUENCE s1"); + + // Verify that the sequence was deleted. + ResultSet rs = statement.executeQuery( + "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'"); + assertFalse(rs.next()); + } + } + + @Test + public void testDropIfExistsSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + statement.execute("DROP SEQUENCE IF EXISTS s1"); + + // Verify that the sequence was deleted. + ResultSet rs = statement.executeQuery( + "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'"); + assertFalse(rs.next()); + } + } + + @Test + public void testDropIfExistsSequenceForNonExistingSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("DROP SEQUENCE IF EXISTS s1"); + } + } + + @Test + public void testDropWithDependingObjects() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE table t(k SERIAL)"); + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("cannot drop sequence t_k_seq because other objects depend on it"); + statement.execute("DROP SEQUENCE t_k_seq"); + + // Verify that the sequence was not deleted. + ResultSet rs = statement.executeQuery( + "SELECT relname FROM pg_class WHERE relkind = 'S' AND relname = 't_k_seq"); + assertTrue(rs.next()); + } + } + + @Test + public void testDropRestrictedWithDependingObjects() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE table t(k SERIAL)"); + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("cannot drop sequence t_k_seq because other objects depend on it"); + statement.execute("DROP SEQUENCE t_k_seq RESTRICT"); + + // Verify that the sequence was not deleted. + ResultSet rs = statement.executeQuery( + "SELECT relname FROM pg_class WHERE relkind = 'S' AND relname = 't_k_seq"); + assertTrue(rs.next()); + } + } + + @Test + public void testDropCascadeWithDependingObjects() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE table t(k SERIAL)"); + statement.execute("DROP SEQUENCE t_k_seq CASCADE"); + + // Verify that the sequence was deleted. + ResultSet rs = statement.executeQuery( + "SELECT relname FROM pg_class WHERE relkind = 'S'"); + assertFalse(rs.next()); + } + } + + @Test + public void testOwnedBy() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE t(k int)"); + statement.execute("CREATE SEQUENCE s OWNED BY t.k"); + statement.execute("DROP TABLE t"); + + // Verify that the sequence was deleted. + ResultSet rs = statement.executeQuery( + "SELECT relname FROM pg_class WHERE relkind = 'S'"); + assertFalse(rs.next()); + } + } + + @Test + public void testInt64LimitsInSequences() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute( + "CREATE SEQUENCE s1 START -9223372036854775808 MINVALUE -9223372036854775808"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(-9223372036854775808L, rs.getLong("nextval")); + + statement.execute("CREATE SEQUENCE s2 START 9223372036854775807"); + rs = statement.executeQuery("SELECT nextval('s2')"); + assertTrue(rs.next()); + assertEquals(9223372036854775807L, rs.getLong("nextval")); + } + } + + @Test + public void testMaxInt64FailureInSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START 9223372036854775807"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(9223372036854775807L, rs.getLong("nextval")); + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (9223372036854775807)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testMaxInt64FailureInSequenceInDifferentSession() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START 9223372036854775807"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(9223372036854775807L, rs.getLong("nextval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Since the previous client already got all the available sequence numbers in its cache, + // we should get an error when we request another sequence number from another client. + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (9223372036854775807)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testMaxInt64OverflowFailureInSequenceInDifferentSession() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START 9223372036854775806 CACHE 100"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(9223372036854775806L, rs.getLong("nextval")); + + rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(9223372036854775807L, rs.getLong("nextval")); + + boolean exceptionOcurred = false; + try { + rs = statement.executeQuery("SELECT nextval('s1')"); + } catch (org.postgresql.util.PSQLException e) { + assertTrue(e.getMessage().contains( + "reached maximum value of sequence \"s1\" (9223372036854775807)")); + exceptionOcurred = true; + } + assertTrue(exceptionOcurred); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Since the previous client already got all the available sequence numbers in its cache, + // we should get an error when we request another sequence number from another client. + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (9223372036854775807)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testMinInt64FailureInSequence() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START -9223372036854775808 " + + "minvalue -9223372036854775808 INCREMENT -1"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(-9223372036854775808L, rs.getLong("nextval")); + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached minimum value of sequence \"s1\" (-9223372036854775808)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testMinInt64FailureInSequenceInDifferentSession() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START -9223372036854775808 " + + "minvalue -9223372036854775808 INCREMENT -1"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(-9223372036854775808L, rs.getLong("nextval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Since the previous client already got all the available sequence numbers in its cache, + // we should get an error when we request another sequence number from another client. + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached minimum value of sequence \"s1\" (-9223372036854775808)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testMinInt64OverflowFailureInSequenceInDifferentSession() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 START -9223372036854775807 " + + "minvalue -9223372036854775808 INCREMENT -1 CACHE 100"); + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(-9223372036854775807L, rs.getLong("nextval")); + + rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(-9223372036854775808L, rs.getLong("nextval")); + + boolean exceptionOcurred = false; + try { + rs = statement.executeQuery("SELECT nextval('s1')"); + } catch (org.postgresql.util.PSQLException e) { + assertTrue(e.getMessage().contains( + "reached minimum value of sequence \"s1\" (-9223372036854775808)")); + exceptionOcurred = true; + } + assertTrue(exceptionOcurred); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Since the previous client already got all the available sequence numbers in its cache, + // we should get an error when we request another sequence number from another client. + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached minimum value of sequence \"s1\" (-9223372036854775808)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + @Test + public void testCurrvalFails() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("currval of sequence \"s1\" is not yet defined in this session"); + statement.executeQuery("SELECT currval('s1')"); + } + } + + @Test + public void testCurrval() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + statement.execute("SELECT nextval('s1')"); + ResultSet rs = statement.executeQuery("SELECT currval('s1')"); + assertTrue(rs.next()); + assertEquals(1, rs.getLong("currval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + statement.execute("SELECT nextval('s1')"); + ResultSet rs = statement.executeQuery("SELECT currval('s1')"); + assertTrue(rs.next()); + assertEquals(2, rs.getLong("currval")); + } + } + + @Test + public void testLastvalFails() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("lastval is not yet defined in this session"); + statement.execute("SELECT lastval()"); + } + } + + @Test + public void testLastvalInAnotherSessionFails() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + statement.execute("SELECT nextval('s1')"); + ResultSet rs = statement.executeQuery("SELECT lastval()"); + assertTrue(rs.next()); + assertEquals(1, rs.getLong("lastval")); + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("lastval is not yet defined in this session"); + statement.execute("SELECT lastval()"); + } + } + + @Test + public void testLastval() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1"); + statement.execute("CREATE SEQUENCE s2"); + + statement.execute("SELECT nextval('s1')"); + ResultSet rs = statement.executeQuery("SELECT lastval()"); + assertTrue(rs.next()); + assertEquals(1, rs.getLong("lastval")); + + statement.execute("SELECT nextval('s2')"); + rs = statement.executeQuery("SELECT lastval()"); + assertTrue(rs.next()); + assertEquals(1, rs.getLong("lastval")); + + for (int i = 2; i <= 10; i++) { + statement.execute("SELECT nextval('s1')"); + } + rs = statement.executeQuery("SELECT lastval()"); + assertTrue(rs.next()); + assertEquals(10, rs.getLong("lastval")); + + statement.execute("SELECT nextval('s2')"); + rs = statement.executeQuery("SELECT lastval()"); + assertTrue(rs.next()); + assertEquals(2, rs.getLong("lastval")); + } + } + + @Test + public void testNoCycle() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 MAXVALUE 2 NO CYCLE"); + for (int i = 1; i <= 2; i++) { + ResultSet rs = statement.executeQuery("SELECT nextval('s1')"); + assertTrue(rs.next()); + assertEquals(i, rs.getInt("nextval")); + } + + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("reached maximum value of sequence \"s1\" (2)"); + statement.executeQuery("SELECT nextval('s1')"); + } + } + + //------------------------------------------------------------------------------------------------ + // Unsupported features tests. + //----------------------------------------------------------------------------------------------- + @Test + public void testCycle() throws Exception { + try (Statement statement = connection.createStatement()) { + thrown.expect(org.postgresql.util.PSQLException.class); + thrown.expectMessage("CYCLE not supported yet"); + statement.execute("CREATE SEQUENCE s1 CYCLE"); + } + } + + //------------------------------------------------------------------------------------------------ + // Serial type tests. + //------------------------------------------------------------------------------------------------ + @Test + public void testSerialTypes() throws Exception { + List serialTypes = Arrays.asList( + "smallserial", "serial2", "serial", "serial4", "bigserial", "serial8"); + + // Max values as defined in https://www.postgresql.org/docs/10/datatype-numeric.html + List serialTypesMaxValues = Arrays.asList( + 32767L, 32767L, 2147483647L, 2147483647L, 9223372036854775807L, 9223372036854775807L); + + for (int i = 0; i < serialTypes.size(); i++) { + String serialType = serialTypes.get(i); + LOG.info("Testing serial type " + serialType); + try (Statement statement = connection.createStatement()) { + statement.execute(String.format("CREATE TABLE t(k %s primary key, v int)", serialType)); + for (int k = 1; k <= 10; k++) { + statement.execute("INSERT INTO t(v) VALUES (3)"); + ResultSet rs = statement.executeQuery("SELECT * FROM t WHERE k = " + k); + assertTrue(rs.next()); + } + ResultSet rs = statement.executeQuery( + "SELECT max_value FROM pg_sequences WHERE sequencename = 't_k_seq'"); + assertTrue(rs.next()); + Long serialTypeMaxValue = serialTypesMaxValues.get(i); + LOG.info(String.format("Expected max_value: %d, received max_value: ", serialTypeMaxValue, + rs.getLong("max_value"))); + assertEquals(serialTypeMaxValue.longValue(), rs.getLong("max_value")); + statement.execute("DROP TABLE t"); + } + } + } + + @Test + public void testNextValAsDefaultValueInTable() throws Exception { + try (Statement statement = connection.createStatement()) { + statement.execute("CREATE SEQUENCE s1 CACHE 20"); + statement.execute("CREATE TABLE t(k int NOT NULL DEFAULT nextval('s1'), v int)"); + for (int k = 1; k <= 10; k++) { + statement.execute("INSERT INTO t(v) VALUES (10)"); + ResultSet rs = statement.executeQuery("SELECT * FROM t WHERE k = " + k); + assertTrue(rs.next()); + } + } + + Connection connection2 = createConnection(); + try (Statement statement = connection2.createStatement()) { + // Because of our current implementation, the first value is 22 for now instead of 21. + for (int k = 21; k <= 30; k++) { + statement.execute("INSERT INTO t(v) VALUES (10)"); + ResultSet rs = statement.executeQuery("SELECT * FROM t WHERE k = " + k); + assertTrue(rs.next()); + } + } + } +} diff --git a/src/postgres/src/backend/catalog/heap.c b/src/postgres/src/backend/catalog/heap.c index b0e80dc2536c..1c329fe794a7 100644 --- a/src/postgres/src/backend/catalog/heap.c +++ b/src/postgres/src/backend/catalog/heap.c @@ -2147,9 +2147,9 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, */ adbin = nodeToString(expr); - /* - * Also deparse it to form the mostly-obsolete adsrc field. - */ + /* + * Also deparse it to form the mostly-obsolete adsrc field. + */ adsrc = deparse_expression(expr, deparse_context_for(RelationGetRelationName(rel), RelationGetRelid(rel)), diff --git a/src/postgres/src/backend/commands/sequence.c b/src/postgres/src/backend/commands/sequence.c index 0371f9e18f94..2347b54e7160 100644 --- a/src/postgres/src/backend/commands/sequence.c +++ b/src/postgres/src/backend/commands/sequence.c @@ -45,6 +45,8 @@ #include "utils/syscache.h" #include "utils/varlena.h" +/* YB includes. */ +#include "pg_yb_utils.h" /* * We don't want to log each fetching of a value from a sequence, @@ -218,9 +220,20 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) rel = heap_open(seqoid, AccessExclusiveLock); tupDesc = RelationGetDescr(rel); - /* now initialize the sequence's data */ - tuple = heap_form_tuple(tupDesc, value, null); - fill_seq_with_data(rel, tuple); + if (IsYugaByteEnabled()) + { + HandleYBStatus(YBCInsertSequenceTuple(ybc_pg_session, + MyDatabaseId, + ObjectIdGetDatum(seqoid), + Int64GetDatumFast(seqdataform.last_value), + false /* is_called */)); + } + else + { + /* now initialize the sequence's data */ + tuple = heap_form_tuple(tupDesc, value, null); + fill_seq_with_data(rel, tuple); + } /* process OWNED BY if given */ if (owned_by) @@ -421,6 +434,7 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) HeapTupleData datatuple; Form_pg_sequence seqform; Form_pg_sequence_data newdataform; + FormData_pg_sequence_data seq_data; bool need_seq_rewrite; List *owned_by; ObjectAddress address; @@ -453,14 +467,27 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) seqform = (Form_pg_sequence) GETSTRUCT(seqtuple); - /* lock page's buffer and read tuple into new sequence structure */ - (void) read_seq_tuple(seqrel, &buf, &datatuple); + if (IsYugaByteEnabled()) + { + int64_t last_val = 0; + HandleYBStatus(YBCReadSequenceTuple(ybc_pg_session, MyDatabaseId, ObjectIdGetDatum(relid), + &last_val, &seq_data.is_called)); + + seq_data.last_value = last_val; + seq_data.log_cnt = 0; + newdataform = &seq_data; + } + else + { + /* lock page's buffer and read tuple into new sequence structure */ + (void) read_seq_tuple(seqrel, &buf, &datatuple); - /* copy the existing sequence data tuple, so it can be modified locally */ - newdatatuple = heap_copytuple(&datatuple); - newdataform = (Form_pg_sequence_data) GETSTRUCT(newdatatuple); + /* copy the existing sequence data tuple, so it can be modified locally */ + newdatatuple = heap_copytuple(&datatuple); + newdataform = (Form_pg_sequence_data) GETSTRUCT(newdatatuple); - UnlockReleaseBuffer(buf); + UnlockReleaseBuffer(buf); + } /* Check and set new values */ init_params(pstate, stmt->options, stmt->for_identity, false, @@ -522,6 +549,12 @@ DeleteSequenceTuple(Oid relid) if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for sequence %u", relid); + if (IsYugaByteEnabled()) + { + HandleYBStatus( + YBCDeleteSequenceTuple(ybc_pg_session, MyDatabaseId, ObjectIdGetDatum(relid))); + } + CatalogTupleDelete(rel, tuple); ReleaseSysCache(tuple); @@ -573,6 +606,7 @@ nextval_internal(Oid relid, bool check_permissions) HeapTuple pgstuple; Form_pg_sequence pgsform; HeapTupleData seqdatatuple; + FormData_pg_sequence_data seq_data; Form_pg_sequence_data seq; int64 incby, maxv, @@ -630,9 +664,27 @@ nextval_internal(Oid relid, bool check_permissions) cycle = pgsform->seqcycle; ReleaseSysCache(pgstuple); - /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); - page = BufferGetPage(buf); +retry: + if (IsYugaByteEnabled()) + { + int64_t last_val; + bool is_called; + HandleYBStatus(YBCReadSequenceTuple(ybc_pg_session, + MyDatabaseId, + ObjectIdGetDatum(relid), + &last_val, + &is_called)); + seq_data.last_value = last_val; + seq_data.is_called = is_called; + seq_data.log_cnt = 0; + seq = &seq_data; + } + else + { + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + page = BufferGetPage(buf); + } elm->increment = incby; last = next = result = seq->last_value; @@ -645,6 +697,13 @@ nextval_internal(Oid relid, bool check_permissions) fetch--; } + + /* + * We don't use the WAL log record. The value has already been updated and there is no way + * to rollback to another sequence number. + */ + if (IsYugaByteEnabled()) + goto check_bounds; /* * Decide whether we should emit a WAL log record. If so, force up the * fetch count to grab SEQ_LOG_VALS more values than we actually need to @@ -673,6 +732,7 @@ nextval_internal(Oid relid, bool check_permissions) } } +check_bounds: while (fetch) /* try to fetch cache [+ log ] numbers */ { /* @@ -737,7 +797,8 @@ nextval_internal(Oid relid, bool check_permissions) } log -= fetch; /* adjust for any unfetched numbers */ - Assert(log >= 0); + if (!IsYugaByteEnabled()) + Assert(log >= 0); /* save info in local cache */ elm->last = result; /* last returned number */ @@ -746,6 +807,34 @@ nextval_internal(Oid relid, bool check_permissions) last_used_seq = elm; + /* + * YugaByte doesn't use the WAL, and we don't need to free the buffer because we didn't allocate + * memory for it. So close the relation and return the result now. + */ + if (IsYugaByteEnabled()) + { + bool skipped = false; + /* + * We do a conditional update here to detect write conflicts with other sessions. If the + * update fails, we retry again by reading the last_val and is_called values and going + * through the whole process again. + */ + HandleYBStatus(YBCUpdateSequenceTuple(ybc_pg_session, + MyDatabaseId, + ObjectIdGetDatum(relid), + last /* last_val */, + true /* is_called */, + seq->last_value /* expected_last_val */, + seq->is_called /* expected_is_called */, + &skipped)); + if (skipped) + { + goto retry; + } + relation_close(seqrel, NoLock); + return result; + } + /* * If something needs to be WAL logged, acquire an xid, so this * transaction's commit will trigger a WAL flush and wait for syncrep. @@ -934,8 +1023,15 @@ do_setval(Oid relid, int64 next, bool iscalled) */ PreventCommandIfParallelMode("setval()"); - /* lock page' buffer and read tuple */ - seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + /* + * TODO(hector): Finish the implementation for setval(). For now, we only skip this part of the + * code to avoid errors. + */ + if (!IsYugaByteEnabled()) + { + /* lock page' buffer and read tuple */ + seq = read_seq_tuple(seqrel, &buf, &seqdatatuple); + } if ((next < minv) || (next > maxv)) { @@ -963,6 +1059,16 @@ do_setval(Oid relid, int64 next, bool iscalled) /* In any case, forget any future cached numbers */ elm->cached = elm->last; + /* + * TODO(hector): Finish the implementation for setval(). YugaByte doesn't use the WAL, and we + * didn't allocate memory for buffer, so no need to free it. + */ + if (IsYugaByteEnabled()) + { + relation_close(seqrel, NoLock); + return; + } + /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) GetTopTransactionId(); @@ -1852,6 +1958,12 @@ pg_sequence_last_value(PG_FUNCTION_ARGS) errmsg("permission denied for sequence %s", RelationGetRelationName(seqrel)))); + if (IsYugaByteEnabled()) + { + /* TODO(hector): Read the sequence's data. For now return null. */ + relation_close(seqrel, NoLock); + PG_RETURN_NULL(); + } seq = read_seq_tuple(seqrel, &buf, &seqtuple); is_called = seq->is_called; diff --git a/src/postgres/src/backend/commands/ybccmds.c b/src/postgres/src/backend/commands/ybccmds.c index ea4a0ef71129..4e089ac5c47b 100644 --- a/src/postgres/src/backend/commands/ybccmds.c +++ b/src/postgres/src/backend/commands/ybccmds.c @@ -200,7 +200,7 @@ YBCCreateTable(CreateStmt *stmt, char relkind, TupleDesc desc, Oid relationId, O relationId, false, /* is_shared_table */ false, /* if_not_exists */ - primary_key == NULL /* add_primary_key */, + primary_key == NULL /* add_primary_key */, &handle)); /* diff --git a/src/postgres/src/backend/parser/gram.y b/src/postgres/src/backend/parser/gram.y index 54143e43eabf..119bf3e38788 100644 --- a/src/postgres/src/backend/parser/gram.y +++ b/src/postgres/src/backend/parser/gram.y @@ -838,6 +838,7 @@ stmt : | CreateAsStmt | CopyStmt | CreateSchemaStmt + | CreateSeqStmt | CreateStmt | CreateUserStmt | CreatedbStmt @@ -915,7 +916,6 @@ stmt : | AlterOpFamilyStmt { parser_ybc_not_support(@1, "This statement"); } | CreatePolicyStmt { parser_ybc_not_support(@1, "This statement"); } | CreatePLangStmt { parser_ybc_not_support(@1, "This statement"); } - | CreateSeqStmt { parser_ybc_not_support(@1, "This statement"); } | CreateSubscriptionStmt { parser_ybc_not_support(@1, "This statement"); } | CreateStatsStmt { parser_ybc_not_support(@1, "This statement"); } | CreateTableSpaceStmt { parser_ybc_not_support(@1, "This statement"); } @@ -2693,7 +2693,7 @@ alter_column_default: ; opt_drop_behavior: - CASCADE { $$ = DROP_CASCADE; parser_ybc_not_support(@1, "CASCADE"); } + CASCADE { $$ = DROP_CASCADE; } | RESTRICT { $$ = DROP_RESTRICT; } | /* EMPTY */ { $$ = DROP_RESTRICT; /* default */ } ; @@ -4376,7 +4376,6 @@ RefreshMatViewStmt: CreateSeqStmt: CREATE OptTemp SEQUENCE qualified_name OptSeqOptList { - parser_ybc_not_support(@1, "CREATE SEQUENCE"); CreateSeqStmt *n = makeNode(CreateSeqStmt); $4->relpersistence = $2; n->sequence = $4; @@ -4387,7 +4386,6 @@ CreateSeqStmt: } | CREATE OptTemp SEQUENCE IF_P NOT EXISTS qualified_name OptSeqOptList { - parser_ybc_not_support(@1, "CREATE SEQUENCE"); CreateSeqStmt *n = makeNode(CreateSeqStmt); $7->relpersistence = $2; n->sequence = $7; @@ -4442,6 +4440,7 @@ SeqOptElem: AS SimpleTypename } | CYCLE { + parser_ybc_not_support(@1, "CYCLE"); $$ = makeDefElem("cycle", (Node *)makeInteger(true), @1); } | NO CYCLE @@ -6662,7 +6661,7 @@ DropStmt: DROP drop_type_any_name IF_P EXISTS any_name_list opt_drop_behavior /* object types taking any_name_list */ drop_type_any_name: TABLE { $$ = OBJECT_TABLE; } - | SEQUENCE { parser_ybc_not_support(@1, "DROP SEQUENCE"); $$ = OBJECT_SEQUENCE; } + | SEQUENCE { $$ = OBJECT_SEQUENCE; } | VIEW { $$ = OBJECT_VIEW; } | MATERIALIZED VIEW { diff --git a/src/postgres/src/backend/utils/misc/pg_yb_utils.c b/src/postgres/src/backend/utils/misc/pg_yb_utils.c index c01aef9618aa..8cff89b15d58 100644 --- a/src/postgres/src/backend/utils/misc/pg_yb_utils.c +++ b/src/postgres/src/backend/utils/misc/pg_yb_utils.c @@ -82,7 +82,8 @@ IsYBRelationById(Oid relid) } bool -IsYBRelationByKind(char relKind){ +IsYBRelationByKind(char relKind) +{ return (relKind == RELKIND_RELATION || relKind == RELKIND_INDEX); } diff --git a/src/postgres/src/test/regress/expected/yb_feature_types.out b/src/postgres/src/test/regress/expected/yb_feature_types.out index fc143fd6ee47..19af8a91d990 100644 --- a/src/postgres/src/test/regress/expected/yb_feature_types.out +++ b/src/postgres/src/test/regress/expected/yb_feature_types.out @@ -12,11 +12,8 @@ CREATE TABLE feature_tab_double_precision (feature_col DOUBLE PRECISION); CREATE TABLE feature_tab_decimal (feature_col DECIMAL); CREATE TABLE feature_tab_numeric (feature_col NUMERIC); CREATE TABLE feature_tab_smallserial (feature_col SMALLSERIAL); -ERROR: could not open file "base/18373/0": No such file or directory CREATE TABLE feature_tab_serial (feature_col SERIAL); -ERROR: could not open file "base/18373/0": No such file or directory CREATE TABLE feature_tab_bigserial (feature_col BIGSERIAL); -ERROR: could not open file "base/18373/0": No such file or directory -- -- Monetary Types CREATE TABLE feature_tab_money (feature_col MONEY); diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 5279c637d303..1fb8be5ce381 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -589,11 +589,19 @@ Status YBClient::CreateNamespace(const std::string& namespace_name, } Status YBClient::CreateNamespaceIfNotExists(const std::string& namespace_name, - const boost::optional& database_type) { - Result namespace_exists = NamespaceExists(namespace_name); - RETURN_NOT_OK(namespace_exists); - return namespace_exists.get() ? Status::OK() - : CreateNamespace(namespace_name, database_type); + const boost::optional& database_type, + const std::string& creator_role_name, + const std::string& namespace_id, + const std::string& source_namespace_id, + const boost::optional& next_pg_oid) { + Result namespace_exists = (!namespace_id.empty() ? NamespaceIdExists(namespace_id) + : NamespaceExists(namespace_name)); + if (VERIFY_RESULT(namespace_exists)) { + return Status::OK(); + } + + return CreateNamespace(namespace_name, database_type, creator_role_name, namespace_id, + source_namespace_id, next_pg_oid); } Status YBClient::DeleteNamespace(const std::string& namespace_name, @@ -609,7 +617,8 @@ Status YBClient::DeleteNamespace(const std::string& namespace_name, } Status YBClient::ListNamespaces(const boost::optional& database_type, - std::vector* namespaces) { + std::vector* namespace_names, + std::vector* namespace_ids) { ListNamespacesRequestPB req; ListNamespacesResponsePB resp; if (database_type) { @@ -617,9 +626,13 @@ Status YBClient::ListNamespaces(const boost::optional& database_typ } CALL_SYNC_LEADER_MASTER_RPC(req, resp, ListNamespaces); - CHECK_NOTNULL(namespaces); for (auto ns : resp.namespaces()) { - namespaces->push_back(ns.name()); + if (namespace_names != nullptr) { + namespace_names->push_back(ns.name()); + } + if (namespace_ids != nullptr) { + namespace_ids->push_back(ns.id()); + } } return Status::OK(); } @@ -675,10 +688,10 @@ Status YBClient::GrantRevokePermission(GrantRevokeStatementType statement_type, Result YBClient::NamespaceExists(const std::string& namespace_name, const boost::optional& database_type) { - std::vector namespaces; - RETURN_NOT_OK(ListNamespaces(database_type, &namespaces)); + std::vector namespace_names; + RETURN_NOT_OK(ListNamespaces(database_type, &namespace_names)); - for (const string& name : namespaces) { + for (const string& name : namespace_names) { if (name == namespace_name) { return true; } @@ -686,6 +699,19 @@ Result YBClient::NamespaceExists(const std::string& namespace_name, return false; } +Result YBClient::NamespaceIdExists(const std::string& namespace_id, + const boost::optional& database_type) { + std::vector namespace_ids; + RETURN_NOT_OK(ListNamespaces(database_type, nullptr /* namespace_names */, &namespace_ids)); + + for (const string& id : namespace_ids) { + if (namespace_id == id) { + return true; + } + } + return false; +} + CHECKED_STATUS YBClient::GetUDType(const std::string& namespace_name, const std::string& type_name, std::shared_ptr* ql_type) { diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 5f135847ac61..fe9e573ed18d 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -330,6 +330,11 @@ class YBClient : public std::enable_shared_from_this { // Except for testing we should use proper database_types for all creations. CHECKED_STATUS CreateNamespaceIfNotExists(const std::string& namespace_name, const boost::optional& database_type = + boost::none, + const std::string& creator_role_name = "", + const std::string& namespace_id = "", + const std::string& source_namespace_id = "", + const boost::optional& next_pg_oid = boost::none); // Delete namespace with the given name. @@ -351,18 +356,22 @@ class YBClient : public std::enable_shared_from_this { const char* resource_name, const char* namespace_name, const std::string& role_name); - // List all namespace names. + // List all namespace names and optionally namespace ids. // 'namespaces' is appended to only on success. - CHECKED_STATUS ListNamespaces(std::vector* namespaces) { - return ListNamespaces(boost::none, namespaces); + CHECKED_STATUS ListNamespaces(std::vector* namespace_names, + std::vector* namespace_ids = nullptr) { + return ListNamespaces(boost::none, namespace_names, namespace_ids); } CHECKED_STATUS ListNamespaces(const boost::optional& database_type, - std::vector* namespaces); + std::vector* namespace_names, + std::vector* namespace_ids = nullptr); - // Check if the namespace given by 'namespace_name' exists. + // Check if the namespace given by 'namespace_name' or 'namespace_id' exists. // Result value is set only on success. Result NamespaceExists(const std::string& namespace_name, const boost::optional& database_type = boost::none); + Result NamespaceIdExists(const std::string& namespace_id, + const boost::optional& database_type = boost::none); // Authentication and Authorization // Create a new role. diff --git a/src/yb/common/pgsql_protocol.proto b/src/yb/common/pgsql_protocol.proto index e52e075081fa..85c26a3b7fd9 100644 --- a/src/yb/common/pgsql_protocol.proto +++ b/src/yb/common/pgsql_protocol.proto @@ -39,12 +39,18 @@ message PgsqlBCallPB { repeated PgsqlExpressionPB operands = 2; } +// A logical condition that evaluates to true/false. Used in the WHERE clause. +message PgsqlConditionPB { + optional QLOperator op = 1; + repeated PgsqlExpressionPB operands = 2; +} + // An expression in a WHERE condition. -// - Bind values would be given by client and grouped into a repeated field that can be accesed +// - Bind values would be given by client and grouped into a repeated field that can be accessed // by their indexes. -// - Alias values would be computed by server and grouped into repeated field that can be accesed +// - Alias values would be computed by server and grouped into repeated field that can be accessed // by their indexes. -// - Code generator write indexes as ref. Executtor deref indexes to get actual values. +// - Code generator write indexes as ref. Executor deref indexes to get actual values. message PgsqlExpressionPB { oneof expr { QLValuePB value = 1; @@ -54,6 +60,7 @@ message PgsqlExpressionPB { PgsqlBCallPB bfcall = 5; // Regular builtin calls. PgsqlBCallPB tscall = 6; // Tablet server builtin calls. PgsqlBCallPB bocall = 7; // Builtin operator calls. + PgsqlConditionPB condition = 8; // Logical condition that evaluates to true/false. } } @@ -126,6 +133,7 @@ message PgsqlWriteRequestPB { repeated PgsqlExpressionPB partition_column_values = 7; repeated PgsqlExpressionPB range_column_values = 8; optional PgsqlExpressionPB ybctid_column_value = 9; + // Not used with UPDATEs. Use column_new_values to UPDATE a value. repeated PgsqlColumnValuePB column_values = 10; //------------------------------------------------------------------------------------------------ @@ -140,7 +148,7 @@ message PgsqlWriteRequestPB { // a selected row. We call it rsrow to distinguish a selected row from a row of a table in the // database in our coding. optional PgsqlRSRowDescPB rsrow_desc = 12; - repeated PgsqlExpressionPB targets = 13; // required. + repeated PgsqlExpressionPB targets = 13; // required for a RETURNING clause. //------------------------------------------------------------------------------------------------ // Where clause condition diff --git a/src/yb/common/ql_expr.cc b/src/yb/common/ql_expr.cc index d5e156fa0cbc..54fa00bed09b 100644 --- a/src/yb/common/ql_expr.cc +++ b/src/yb/common/ql_expr.cc @@ -175,7 +175,7 @@ CHECKED_STATUS QLExprExecutor::EvalCondition(const QLConditionPB& condition, if (!temp.Comparable(lower) || !temp.Comparable(upper)) { \ return STATUS(RuntimeError, "values not comparable"); \ } \ - result->set_bool_value(temp.value() >= lower.value() rel_op temp.value() <= upper.value()); \ + result->set_bool_value(temp.value() op1 lower.value() rel_op temp.value() op2 upper.value());\ return Status::OK(); \ } while (false) @@ -358,6 +358,9 @@ CHECKED_STATUS QLExprExecutor::EvalExpr(const PgsqlExpressionPB& ql_expr, case PgsqlExpressionPB::ExprCase::kTscall: return EvalTSCall(ql_expr.tscall(), table_row, result); + case PgsqlExpressionPB::ExprCase::kCondition: + return EvalCondition(ql_expr.condition(), table_row, result); + case PgsqlExpressionPB::ExprCase::kBocall: FALLTHROUGH_INTENDED; case PgsqlExpressionPB::ExprCase::kBindId: FALLTHROUGH_INTENDED; case PgsqlExpressionPB::ExprCase::kAliasId: FALLTHROUGH_INTENDED; @@ -433,6 +436,196 @@ CHECKED_STATUS QLExprExecutor::ReadTSCallValue(const PgsqlBCallPB& ql_expr, //-------------------------------------------------------------------------------------------------- +CHECKED_STATUS QLExprExecutor::EvalCondition(const PgsqlConditionPB& condition, + const QLTableRow::SharedPtrConst& table_row, + bool* result) { + QLValue result_pb; + RETURN_NOT_OK(EvalCondition(condition, table_row, &result_pb)); + *result = result_pb.bool_value(); + return Status::OK(); +} + +CHECKED_STATUS QLExprExecutor::EvalCondition(const PgsqlConditionPB& condition, + const QLTableRow::SharedPtrConst& table_row, + QLValue *result) { +#define QL_EVALUATE_RELATIONAL_OP(op) \ + do { \ + CHECK_EQ(operands.size(), 2); \ + QLValue left, right; \ + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &left)); \ + RETURN_NOT_OK(EvalExpr(operands.Get(1), table_row, &right)); \ + if (!left.Comparable(right)) \ + return STATUS(RuntimeError, "values not comparable"); \ + result->set_bool_value(left.value() op right.value()); \ + return Status::OK(); \ + } while (false) + +#define QL_EVALUATE_BETWEEN(op1, op2, rel_op) \ + do { \ + CHECK_EQ(operands.size(), 3); \ + QLValue lower, upper; \ + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &temp)); \ + RETURN_NOT_OK(EvalExpr(operands.Get(1), table_row, &lower)); \ + RETURN_NOT_OK(EvalExpr(operands.Get(2), table_row, &upper)); \ + if (!temp.Comparable(lower) || !temp.Comparable(upper)) { \ + return STATUS(RuntimeError, "values not comparable"); \ + } \ + result->set_bool_value(temp.value() op1 lower.value() rel_op temp.value() op2 upper.value());\ + return Status::OK(); \ + } while (false) + + QLValue temp; + const auto& operands = condition.operands(); + switch (condition.op()) { + case QL_OP_NOT: + CHECK_EQ(operands.size(), 1); + CHECK_EQ(operands.Get(0).expr_case(), PgsqlExpressionPB::ExprCase::kCondition); + RETURN_NOT_OK(EvalCondition(operands.Get(0).condition(), table_row, &temp)); + result->set_bool_value(!temp.bool_value()); + return Status::OK(); + + case QL_OP_IS_NULL: + CHECK_EQ(operands.size(), 1); + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &temp)); + result->set_bool_value(temp.IsNull()); + return Status::OK(); + + case QL_OP_IS_NOT_NULL: + CHECK_EQ(operands.size(), 1); + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &temp)); + result->set_bool_value(!temp.IsNull()); + return Status::OK(); + + case QL_OP_IS_TRUE: + CHECK_EQ(operands.size(), 1); + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &temp)); + if (temp.type() != QLValue::InternalType::kBoolValue) + return STATUS(RuntimeError, "not a bool value"); + result->set_bool_value(!temp.IsNull() && temp.bool_value()); + return Status::OK(); + + case QL_OP_IS_FALSE: { + CHECK_EQ(operands.size(), 1); + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &temp)); + if (temp.type() != QLValue::InternalType::kBoolValue) + return STATUS(RuntimeError, "not a bool value"); + result->set_bool_value(!temp.IsNull() && !temp.bool_value()); + return Status::OK(); + } + + case QL_OP_EQUAL: + QL_EVALUATE_RELATIONAL_OP(==); + + case QL_OP_LESS_THAN: + QL_EVALUATE_RELATIONAL_OP(<); // NOLINT + + case QL_OP_LESS_THAN_EQUAL: + QL_EVALUATE_RELATIONAL_OP(<=); + + case QL_OP_GREATER_THAN: + QL_EVALUATE_RELATIONAL_OP(>); // NOLINT + + case QL_OP_GREATER_THAN_EQUAL: + QL_EVALUATE_RELATIONAL_OP(>=); + + case QL_OP_NOT_EQUAL: + QL_EVALUATE_RELATIONAL_OP(!=); + + case QL_OP_AND: + CHECK_GT(operands.size(), 0); + for (const auto &operand : operands) { + CHECK_EQ(operand.expr_case(), PgsqlExpressionPB::ExprCase::kCondition); + RETURN_NOT_OK(EvalCondition(operand.condition(), table_row, result)); + if (!result->bool_value()) { + break; + } + } + return Status::OK(); + + case QL_OP_OR: + CHECK_GT(operands.size(), 0); + for (const auto &operand : operands) { + CHECK_EQ(operand.expr_case(), PgsqlExpressionPB::ExprCase::kCondition); + RETURN_NOT_OK(EvalCondition(operand.condition(), table_row, result)); + if (result->bool_value()) { + break; + } + } + return Status::OK(); + + case QL_OP_BETWEEN: + QL_EVALUATE_BETWEEN(>=, <=, &&); + + case QL_OP_NOT_BETWEEN: + QL_EVALUATE_BETWEEN(<, >, ||); + + // When a row exists, the primary key columns are always populated in the row (value-map) by + // DocRowwiseIterator and only when it exists. Therefore, the row exists if and only if + // the row (value-map) is not empty. + case QL_OP_EXISTS: + result->set_bool_value(!table_row->IsEmpty()); + return Status::OK(); + + case QL_OP_NOT_EXISTS: + result->set_bool_value(table_row->IsEmpty()); + return Status::OK(); + + case QL_OP_IN: { + CHECK_EQ(operands.size(), 2); + QLValue left, right; + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &left)); + RETURN_NOT_OK(EvalExpr(operands.Get(1), table_row, &right)); + + result->set_bool_value(false); + for (const QLValuePB& elem : right.list_value().elems()) { + if (!Comparable(elem, left)) { + return STATUS(RuntimeError, "values not comparable"); + } + if (elem == left) { + result->set_bool_value(true); + break; + } + } + return Status::OK(); + } + + case QL_OP_NOT_IN: { + CHECK_EQ(operands.size(), 2); + QLValue left, right; + RETURN_NOT_OK(EvalExpr(operands.Get(0), table_row, &left)); + RETURN_NOT_OK(EvalExpr(operands.Get(1), table_row, &right)); + + result->set_bool_value(true); + for (const QLValuePB& elem : right.list_value().elems()) { + if (!Comparable(elem, left)) { + return STATUS(RuntimeError, "values not comparable"); + } + if (elem == left) { + result->set_bool_value(false); + break; + } + } + return Status::OK(); + } + + case QL_OP_LIKE: FALLTHROUGH_INTENDED; + case QL_OP_NOT_LIKE: + LOG(ERROR) << "Internal error: illegal or unknown operator " << condition.op(); + break; + + case QL_OP_NOOP: + break; + } + + result->SetNull(); + return STATUS(RuntimeError, "Internal error: illegal or unknown operator"); + +#undef QL_EVALUATE_RELATIONAL_OP +#undef QL_EVALUATE_BETWEEN +} + +//-------------------------------------------------------------------------------------------------- + CHECKED_STATUS QLTableRow::ReadColumn(ColumnIdRep col_id, QLValue *col_value) const { const auto& col_iter = col_map_.find(col_id); if (col_iter == col_map_.end()) { diff --git a/src/yb/common/ql_expr.h b/src/yb/common/ql_expr.h index 0d4f5e8fd501..577b392fce4d 100644 --- a/src/yb/common/ql_expr.h +++ b/src/yb/common/ql_expr.h @@ -243,6 +243,14 @@ class QLExprExecutor { virtual CHECKED_STATUS ReadTSCallValue(const PgsqlBCallPB& ql_expr, const QLTableRow::SharedPtrConst& table_row, QLValue *result); + + // Evaluate a boolean condition for the given row. + virtual CHECKED_STATUS EvalCondition(const PgsqlConditionPB& condition, + const QLTableRow::SharedPtrConst& table_row, + bool* result); + virtual CHECKED_STATUS EvalCondition(const PgsqlConditionPB& condition, + const QLTableRow::SharedPtrConst& table_row, + QLValue *result); }; } // namespace yb diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index 67e595f6cc7e..05c31491d02d 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -162,7 +162,7 @@ Status PgsqlWriteOperation::ApplyInsert(const DocOperationApplyData& data) { sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id(), ttl, user_timestamp)); } - RETURN_NOT_OK(PopulateResultSet()); + RETURN_NOT_OK(PopulateResultSet(table_row)); response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); return Status::OK(); @@ -210,35 +210,50 @@ Status PgsqlWriteOperation::ApplyUpdate(const DocOperationApplyData& data) { } else { // This UPDATE is calling PGGATE directly without going thru PosgreSQL layer. // Keep it here as we might need it. - for (const auto& column_value : request_.column_new_values()) { - // Get the column. - if (!column_value.has_column_id()) { - return STATUS_FORMAT(InvalidArgument, "column id missing: $0", - column_value.DebugString()); - } - const ColumnId column_id(column_value.column_id()); - auto column = schema_.column_by_id(column_id); - RETURN_NOT_OK(column); - // Check column-write operator. - CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) + + // Very limited support for where expressions. Only used for updates to the sequences data + // table. + bool is_match = true; + if (request_.has_where_expr()) { + QLValue match; + RETURN_NOT_OK(EvalExpr(request_.where_expr(), table_row, &match)); + is_match = match.bool_value(); + } + + if (is_match) { + for (const auto &column_value : request_.column_new_values()) { + // Get the column. + if (!column_value.has_column_id()) { + return STATUS_FORMAT(InvalidArgument, "column id missing: $0", + column_value.DebugString()); + } + const ColumnId column_id(column_value.column_id()); + auto column = schema_.column_by_id(column_id); + RETURN_NOT_OK(column); + + // Check column-write operator. + CHECK(GetTSWriteInstruction(column_value.expr()) == bfpg::TSOpcode::kScalarInsert) << "Illegal write instruction"; - // Evaluate column value. - QLValue expr_result; - RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, &expr_result)); - const SubDocument sub_doc = - SubDocument::FromQLValuePB(expr_result.value(), column->sorting_type()); + // Evaluate column value. + QLValue expr_result; + RETURN_NOT_OK(EvalExpr(column_value.expr(), table_row, &expr_result)); - // Inserting into specified column. - DocPath sub_path(encoded_range_doc_key_.as_slice(), PrimitiveValue(column_id)); - RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( - sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id())); - skipped = false; + const SubDocument sub_doc = + SubDocument::FromQLValuePB(expr_result.value(), column->sorting_type()); + + // Inserting into specified column. + DocPath sub_path(encoded_range_doc_key_.as_slice(), PrimitiveValue(column_id)); + RETURN_NOT_OK(data.doc_write_batch->InsertSubDocument( + sub_path, sub_doc, data.read_time, data.deadline, request_.stmt_id())); + skipped = false; + } } } - RETURN_NOT_OK(PopulateResultSet()); + // Returning the values before the update. + RETURN_NOT_OK(PopulateResultSet(table_row)); if (skipped) { response_->set_skipped(true); @@ -258,7 +273,7 @@ Status PgsqlWriteOperation::ApplyDelete(const DocOperationApplyData& data) { RETURN_NOT_OK(data.doc_write_batch->DeleteSubDoc(DocPath( encoded_range_doc_key_.as_slice()), data.read_time, data.deadline)); - RETURN_NOT_OK(PopulateResultSet()); + RETURN_NOT_OK(PopulateResultSet(table_row)); response_->set_status(PgsqlResponsePB::PGSQL_STATUS_OK); return Status::OK(); @@ -289,19 +304,19 @@ Status PgsqlWriteOperation::ReadColumns(const DocOperationApplyData& data, return Status::OK(); } -Status PgsqlWriteOperation::PopulateResultSet() { - if (!request_.targets().empty()) { - PgsqlRSRow *rsrow = resultset_.AllocateRSRow(request_.targets().size()); - int rscol_index = 0; - for (const PgsqlExpressionPB& expr : request_.targets()) { - if (expr.has_column_id() && - expr.column_id() == static_cast(PgSystemAttrNum::kYBTupleId)) { +Status PgsqlWriteOperation::PopulateResultSet(const QLTableRow::SharedPtr& table_row) { + PgsqlRSRow* rsrow = resultset_.AllocateRSRow(request_.targets().size()); + int rscol_index = 0; + for (const PgsqlExpressionPB& expr : request_.targets()) { + if (expr.has_column_id()) { + if (expr.column_id() == static_cast(PgSystemAttrNum::kYBTupleId)) { rsrow->rscol(rscol_index)->set_binary_value(encoded_range_doc_key_.data(), encoded_range_doc_key_.size()); - continue; + } else { + RETURN_NOT_OK(EvalExpr(expr, table_row, rsrow->rscol(rscol_index))); } - return STATUS(NotSupported, "unsupported returning expression"); } + rscol_index++; } return Status::OK(); } diff --git a/src/yb/docdb/pgsql_operation.h b/src/yb/docdb/pgsql_operation.h index 607e918e2ac0..3171543290ba 100644 --- a/src/yb/docdb/pgsql_operation.h +++ b/src/yb/docdb/pgsql_operation.h @@ -71,7 +71,7 @@ class PgsqlWriteOperation : CHECKED_STATUS ReadColumns(const DocOperationApplyData& data, const QLTableRow::SharedPtr& table_row); - CHECKED_STATUS PopulateResultSet(); + CHECKED_STATUS PopulateResultSet(const QLTableRow::SharedPtr& table_row); // Reading path to operate on. CHECKED_STATUS GetDocPaths( diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 11f9b36086d3..889b82e59d8e 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -767,7 +767,8 @@ void TabletServiceImpl::Write(const WriteRequestPB* req, // For postgres requests check that the syscatalog version matches. if (tablet.peer->tablet()->table_type() == TableType::PGSQL_TABLE_TYPE) { for (const auto& pg_req : req->pgsql_write_batch()) { - if (pg_req.ysql_catalog_version() < server_->ysql_catalog_version()) { + if (pg_req.has_ysql_catalog_version() && + pg_req.ysql_catalog_version() < server_->ysql_catalog_version()) { SetupErrorAndRespond(resp->mutable_error(), STATUS_SUBSTITUTE(QLError, "Catalog Version Mismatch: A DDL occurred while processing " "this query. Try Again."), diff --git a/src/yb/yql/pggate/pg_session.cc b/src/yb/yql/pggate/pg_session.cc index b0f5bf89ed2c..7f2d3d8912b5 100644 --- a/src/yb/yql/pggate/pg_session.cc +++ b/src/yb/yql/pggate/pg_session.cc @@ -13,6 +13,9 @@ // //-------------------------------------------------------------------------------------------------- +#include + +#include "yb/yql/pggate/pg_expr.h" #include "yb/yql/pggate/pg_session.h" #include "yb/yql/pggate/pggate_if_cxx_decl.h" @@ -20,7 +23,10 @@ #include "yb/client/transaction.h" #include "yb/client/batcher.h" +#include "yb/common/ql_protocol_util.h" + #include "yb/util/string_util.h" +#include "yb/util/random_util.h" namespace yb { namespace pggate { @@ -42,6 +48,29 @@ using client::YBTableType; // TODO(neil) This should be derived from a GFLAGS. static MonoDelta kSessionTimeout = 60s; + +//-------------------------------------------------------------------------------------------------- +// Constants used for the sequences data table. +//-------------------------------------------------------------------------------------------------- +static constexpr const char* const kPgSequencesNamespaceName = "system_postgres"; +static constexpr const char* const kPgSequencesDataTableName = "sequences_data"; + +// Used to build this table's uuid. +static constexpr const YBCPgOid kPgSequencesDataTableOid = 0xFFFF; +static constexpr const YBCPgOid kPgSequencesDataDatabaseOid = 0xFFFF; +static const string kPgSequencesDataNamespaceId = GetPgsqlNamespaceId(kPgSequencesDataDatabaseOid); + +// Columns names and ids. +static constexpr const char* const kPgSequenceDbOidColName = "db_oid"; + +static constexpr const char* const kPgSequenceSeqOidColName = "seq_oid"; + +static constexpr const char* const kPgSequenceLastValueColName = "last_value"; +static constexpr const size_t kPgSequenceLastValueColIdx = 2; + +static constexpr const char* const kPgSequenceIsCalledColName = "is_called"; +static constexpr const size_t kPgSequenceIsCalledColIdx = 3; + //-------------------------------------------------------------------------------------------------- // Class PgSession //-------------------------------------------------------------------------------------------------- @@ -106,6 +135,199 @@ Status PgSession::GetCatalogMasterVersion(uint64_t *version) { return client_->GetYsqlCatalogMasterVersion(version); } +Status PgSession::CreateSequencesDataTable() { + const YBTableName table_name(kPgSequencesDataNamespaceId, + kPgSequencesNamespaceName, + kPgSequencesDataTableName); + RETURN_NOT_OK(client_->CreateNamespaceIfNotExists(kPgSequencesNamespaceName, + YQLDatabase::YQL_DATABASE_PGSQL, + "" /* creator_role_name */, + kPgSequencesDataNamespaceId)); + + // Set up the schema. + TableProperties table_properties; + table_properties.SetTransactional(true); + client::YBSchemaBuilder schemaBuilder; + schemaBuilder.SetTableProperties(table_properties); + schemaBuilder. + AddColumn(kPgSequenceDbOidColName)->HashPrimaryKey()->Type(yb::INT64)->NotNull(); + schemaBuilder. + AddColumn(kPgSequenceSeqOidColName)->HashPrimaryKey()->Type(yb::INT64)->NotNull(); + schemaBuilder.AddColumn(kPgSequenceLastValueColName)->Type(yb::INT64)->NotNull(); + schemaBuilder.AddColumn(kPgSequenceIsCalledColName)->Type(yb::BOOL)->NotNull(); + client::YBSchema schema; + CHECK_OK(schemaBuilder.Build(&schema)); + + // Generate the table id. + pggate::PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); + + // Try to create the table. + gscoped_ptr table_creator(client_->NewTableCreator()); + + Status s = table_creator->table_name(table_name) + .schema(&schema) + .table_type(yb::client::YBTableType::PGSQL_TABLE_TYPE) + .table_id(oid.GetYBTableId()) + .hash_schema(YBHashSchema::kPgsqlHash) + .Create(); + // If we could create it, then all good! + if (s.ok()) { + LOG(INFO) << "Table '" << table_name.ToString() << "' created."; + // If the table was already there, also not an error... + } else if (s.IsAlreadyPresent()) { + LOG(INFO) << "Table '" << table_name.ToString() << "' already exists"; + } else { + // If any other error, report that! + LOG(ERROR) << "Error creating table '" << table_name.ToString() << "': " << s; + RETURN_NOT_OK(s); + } + return Status::OK(); +} + +Status PgSession::InsertSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called) { + pggate::PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); + auto result = LoadTable(oid); + if (!result.ok()) { + RETURN_NOT_OK(CreateSequencesDataTable()); + // Try one more time. + result = LoadTable(oid); + } + PgTableDesc::ScopedRefPtr t = VERIFY_RESULT(result); + + std::shared_ptr psql_write; + psql_write.reset(t->NewPgsqlInsert()); + + auto write_request = psql_write->mutable_request(); + + write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); + write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); + + PgsqlColumnValuePB* column_value = write_request->add_column_values(); + column_value->set_column_id(t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + column_value->mutable_expr()->mutable_value()->set_int64_value(last_val); + + column_value = write_request->add_column_values(); + column_value->set_column_id(t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + column_value->mutable_expr()->mutable_value()->set_bool_value(is_called); + + return session_->ApplyAndFlush(psql_write); +} + +Status PgSession::UpdateSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped) { + pggate::PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); + PgTableDesc::ScopedRefPtr t = VERIFY_RESULT(LoadTable(oid)); + + std::shared_ptr psql_write; + psql_write.reset(t->NewPgsqlUpdate()); + + auto write_request = psql_write->mutable_request(); + + write_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); + write_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); + + PgsqlColumnValuePB* column_value = write_request->add_column_new_values(); + column_value->set_column_id(t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + column_value->mutable_expr()->mutable_value()->set_int64_value(last_val); + + column_value = write_request->add_column_new_values(); + column_value->set_column_id(t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + column_value->mutable_expr()->mutable_value()->set_bool_value(is_called); + + // WHERE clause => WHERE last_val == expected_last_val AND is_called == expected_is_called. + auto where_pb = write_request->mutable_where_expr()->mutable_condition(); + where_pb->set_op(QL_OP_AND); + auto cond = where_pb->add_operands()->mutable_condition(); + cond->set_op(QL_OP_EQUAL); + cond->add_operands()->set_column_id(t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + cond->add_operands()->mutable_value()->set_int64_value(expected_last_val); + + cond = where_pb->add_operands()->mutable_condition(); + cond->set_op(QL_OP_EQUAL); + cond->add_operands()->set_column_id(t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + cond->add_operands()->mutable_value()->set_bool_value(expected_is_called); + + write_request->mutable_column_refs()->add_ids( + t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + write_request->mutable_column_refs()->add_ids( + t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + + RETURN_NOT_OK(session_->ApplyAndFlush(psql_write)); + if (skipped) { + *skipped = psql_write->response().skipped(); + } + return Status::OK(); +} + +Status PgSession::ReadSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called) { + pggate::PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); + PgTableDesc::ScopedRefPtr t = VERIFY_RESULT(LoadTable(oid)); + + std::shared_ptr psql_read(t->NewPgsqlSelect()); + + auto read_request = psql_read->mutable_request(); + + read_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); + read_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); + + read_request->add_targets()->set_column_id( + t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + read_request->add_targets()->set_column_id( + t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + + read_request->mutable_column_refs()->add_ids( + t->table()->schema().ColumnId(kPgSequenceLastValueColIdx)); + read_request->mutable_column_refs()->add_ids( + t->table()->schema().ColumnId(kPgSequenceIsCalledColIdx)); + + RETURN_NOT_OK(session_->ReadSync(psql_read)); + + Slice cursor; + int64_t row_count = 0; + RETURN_NOT_OK(PgDocData::LoadCache(psql_read->rows_data(), &row_count, &cursor)); + if (row_count == 0) { + return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); + } + + PgWireDataHeader header = PgDocData::ReadDataHeader(&cursor); + if (header.is_null()) { + return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); + } + size_t read_size = PgDocData::ReadNumber(&cursor, last_val); + cursor.remove_prefix(read_size); + + header = PgDocData::ReadDataHeader(&cursor); + if (header.is_null()) { + return STATUS_SUBSTITUTE(NotFound, "Unable to find relation for sequence $0", seq_oid); + } + read_size = PgDocData::ReadNumber(&cursor, is_called); + return Status::OK(); +} + +Status PgSession::DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid) { + pggate::PgObjectId oid(kPgSequencesDataDatabaseOid, kPgSequencesDataTableOid); + PgTableDesc::ScopedRefPtr t = VERIFY_RESULT(LoadTable(oid)); + + std::shared_ptr psql_delete(t->NewPgsqlDelete()); + auto delete_request = psql_delete->mutable_request(); + + delete_request->add_partition_column_values()->mutable_value()->set_int64_value(db_oid); + delete_request->add_partition_column_values()->mutable_value()->set_int64_value(seq_oid); + + return session_->ApplyAndFlush(psql_delete); +} + //-------------------------------------------------------------------------------------------------- client::YBTableCreator *PgSession::NewTableCreator() { diff --git a/src/yb/yql/pggate/pg_session.h b/src/yb/yql/pggate/pg_session.h index 5fdd33646938..a1162b3fc533 100644 --- a/src/yb/yql/pggate/pg_session.h +++ b/src/yb/yql/pggate/pg_session.h @@ -76,6 +76,30 @@ class PgSession : public RefCountedThreadSafe { CHECKED_STATUS GetCatalogMasterVersion(uint64_t *version); + // API for sequences data operations. + CHECKED_STATUS CreateSequencesDataTable(); + + CHECKED_STATUS InsertSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called); + + + CHECKED_STATUS UpdateSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped); + + CHECKED_STATUS ReadSequenceTuple(int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called); + + CHECKED_STATUS DeleteSequenceTuple(int64_t db_oid, int64_t seq_oid); + // API for schema operations. // TODO(neil) Schema should be a sub-database that have some specialized property. CHECKED_STATUS CreateSchema(const std::string& schema_name, bool if_not_exist); diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index ea7a6f70f6cd..9a2184c97b82 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -149,6 +149,45 @@ Status PgApiImpl::InvalidateCache(PgSession *pg_session) { return Status::OK(); } +//-------------------------------------------------------------------------------------------------- + +Status PgApiImpl::CreateSequencesDataTable(PgSession *pg_session) { + return pg_session->CreateSequencesDataTable(); +} + +Status PgApiImpl::InsertSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called) { + return pg_session->InsertSequenceTuple(db_oid, seq_oid, last_val, is_called); +} + +Status PgApiImpl::UpdateSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped) { + return pg_session->UpdateSequenceTuple(db_oid, seq_oid, last_val, is_called, expected_last_val, + expected_is_called, skipped); +} + +Status PgApiImpl::ReadSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called) { + return pg_session->ReadSequenceTuple(db_oid, seq_oid, last_val, is_called); +} + +Status PgApiImpl::DeleteSequenceTuple(PgSession *pg_session, int64_t db_oid, int64_t seq_oid) { + return pg_session->DeleteSequenceTuple(db_oid, seq_oid); +} + + //-------------------------------------------------------------------------------------------------- Status PgApiImpl::DeleteStatement(PgStatement *handle) { diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index e98fa6d645ee..cde08f163275 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -86,6 +86,32 @@ class PgApiImpl { // Read statement. PgStatement::ScopedRefPtr GetStatement(PgStatement *handle); + // Setup the table to store sequences data. + CHECKED_STATUS CreateSequencesDataTable(PgSession *pg_session); + + CHECKED_STATUS InsertSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called); + + CHECKED_STATUS UpdateSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped); + + CHECKED_STATUS ReadSequenceTuple(PgSession *pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called); + + CHECKED_STATUS DeleteSequenceTuple(PgSession *pg_session, int64_t db_oid, int64_t seq_oid); + // Delete statement. CHECKED_STATUS DeleteStatement(PgStatement *handle); diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index 73b284536528..92fb506189ab 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -199,6 +199,38 @@ YBCStatus YBCPgExecDropSchema(YBCPgStatement handle) { #endif } +YBCStatus YBCInsertSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called) { + return ToYBCStatus(pgapi->InsertSequenceTuple(pg_session, db_oid, seq_oid, last_val, is_called)); +} + +YBCStatus YBCUpdateSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped) { + return ToYBCStatus(pgapi->UpdateSequenceTuple(pg_session, db_oid, seq_oid, last_val, is_called, + expected_last_val, expected_is_called, skipped)); +} + +YBCStatus YBCReadSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called) { + return ToYBCStatus(pgapi->ReadSequenceTuple(pg_session, db_oid, seq_oid, last_val, is_called)); +} + +YBCStatus YBCDeleteSequenceTuple(YBCPgSession pg_session, int64_t db_oid, int64_t seq_oid) { + return ToYBCStatus(pgapi->DeleteSequenceTuple(pg_session, db_oid, seq_oid)); +} + // Table Operations ------------------------------------------------------------------------------- YBCStatus YBCPgNewCreateTable(YBCPgSession pg_session, diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 6db2444d6d48..3debbd469da4 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -60,6 +60,29 @@ YBCStatus YBCPgClearBinds(YBCPgStatement handle); // Connect database. Switch the connected database to the given "database_name". YBCStatus YBCPgConnectDatabase(YBCPgSession pg_session, const char *database_name); +YBCStatus YBCInsertSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called); + +YBCStatus YBCUpdateSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t last_val, + bool is_called, + int64_t expected_last_val, + bool expected_is_called, + bool* skipped); + +YBCStatus YBCReadSequenceTuple(YBCPgSession pg_session, + int64_t db_oid, + int64_t seq_oid, + int64_t *last_val, + bool *is_called); + +YBCStatus YBCDeleteSequenceTuple(YBCPgSession pg_session, int64_t db_oid, int64_t seq_oid); + // Create database. YBCStatus YBCPgNewCreateDatabase(YBCPgSession pg_session, const char *database_name,