Skip to content

Commit

Permalink
Add codec integration tests for temporal arrays.
Browse files Browse the repository at this point in the history
[#555]

Signed-off-by: Mark Paluch <mpaluch@vmware.com>
  • Loading branch information
mp911de committed Nov 8, 2022
1 parent b210fd1 commit e1ef07f
Showing 1 changed file with 77 additions and 122 deletions.
199 changes: 77 additions & 122 deletions src/test/java/io/r2dbc/postgresql/AbstractCodecIntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,15 @@ void charPrimitive() {

@Test
void blob() {
BiConsumer<Blob, Blob> equality = (actual, expected) -> Flux.zip(
Flux.from(actual.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes),
Flux.from(expected.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes)
)
.as(StepVerifier::create)
.assertNext(t -> {
try {
assertThat(t.getT1()).isEqualTo(t.getT2());
} finally {
t.getT1().release();
t.getT2().release();
}
})
.verifyComplete();
BiConsumer<Blob, Blob> equality = (actual, expected) -> Flux.zip(Flux.from(actual.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes),
Flux.from(expected.stream()).reduce(TEST.heapBuffer(), ByteBuf::writeBytes)).as(StepVerifier::create).assertNext(t -> {
try {
assertThat(t.getT1()).isEqualTo(t.getT2());
} finally {
t.getT1().release();
t.getT2().release();
}
}).verifyComplete();

Function<byte[], Blob> byteToBlob = (bytes) -> new Blob() {

Expand Down Expand Up @@ -209,33 +204,25 @@ void circle() {

@Test
void circleTwoDimensionalArray() {
testCodec(Circle[][].class, new Circle[][]{{Circle.of(Point.of(1.12, 2.12), 3.12), Circle.of(Point.of(Double.MIN_VALUE, Double.MIN_VALUE), Double.MAX_VALUE)},
{Circle.of(Point.of(-2.4, -456.2), 20), null}}, "CIRCLE[][]");
testCodec(Circle[][].class, new Circle[][]{{Circle.of(Point.of(1.12, 2.12), 3.12), Circle.of(Point.of(Double.MIN_VALUE, Double.MIN_VALUE), Double.MAX_VALUE)}, {Circle.of(Point.of(-2.4,
-456.2), 20), null}}, "CIRCLE[][]");
}

@Test
void clob() {
testCodec(Clob.class,
new Clob() {
testCodec(Clob.class, new Clob() {

@Override
public Publisher<Void> discard() {
return Mono.empty();
}
@Override
public Publisher<Void> discard() {
return Mono.empty();
}

@Override
public Publisher<CharSequence> stream() {
return Mono.just("test-value");
}
},
(actual, expected) -> Flux.zip(
Flux.from(actual.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString),
Flux.from(expected.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString)
)
.as(StepVerifier::create)
.assertNext(t -> assertThat(t.getT1()).isEqualToIgnoringWhitespace(t.getT2()))
.verifyComplete()
, "TEXT");
@Override
public Publisher<CharSequence> stream() {
return Mono.just("test-value");
}
}, (actual, expected) -> Flux.zip(Flux.from(actual.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString),
Flux.from(expected.stream()).reduce(new StringBuilder(), StringBuilder::append).map(StringBuilder::toString)).as(StepVerifier::create).assertNext(t -> assertThat(t.getT1()).isEqualToIgnoringWhitespace(t.getT2())).verifyComplete(), "TEXT");
}

@Test
Expand Down Expand Up @@ -468,11 +455,22 @@ void localDateTime() {
}, "TIMESTAMPTZ", "$1", null);
}

@Test
void localDateTimeArray() {
testCodec(LocalDateTime[].class, new LocalDateTime[]{LocalDateTime.now().truncatedTo(ChronoUnit.MICROS)}, "TIMESTAMP[]");
testCodec(LocalDateTime[].class, new LocalDateTime[]{LocalDateTime.now().truncatedTo(ChronoUnit.MICROS)}, "TIMESTAMPTZ[]");
}

@Test
void localTime() {
testCodec(LocalTime.class, LocalTime.now().truncatedTo(ChronoUnit.MICROS), "TIME");
}

@Test
void localTimeArray() {
testCodec(LocalTime[].class, new LocalTime[]{LocalTime.now().truncatedTo(ChronoUnit.MICROS)}, "TIME[]");
}

@Test
void longArray() {
testCodec(Long[].class, new Long[]{100L, 200L, 300L}, "INT8[]");
Expand Down Expand Up @@ -672,8 +670,8 @@ void pathTwoDimensionalArray() {

@Test
void polygonArray() {
testCodec(Polygon[].class, new Polygon[]{Polygon.of(Point.of(1.1, 2.2), Point.of(10.10, 10.10), Point.of(.42, 5.3)),
Polygon.of(Point.of(1.1, 2.2), Point.of(10.10, 10.10), Point.of(.42, 5.3), Point.of(-3.5, 0.))}, "POLYGON[]");
testCodec(Polygon[].class, new Polygon[]{Polygon.of(Point.of(1.1, 2.2), Point.of(10.10, 10.10), Point.of(.42, 5.3)), Polygon.of(Point.of(1.1, 2.2), Point.of(10.10, 10.10), Point.of(.42,
5.3), Point.of(-3.5, 0.))}, "POLYGON[]");
}

@Test
Expand All @@ -689,9 +687,7 @@ void polygonTwoDimensionalArray() {
}

private static <T> Mono<T> close(Connection connection) {
return Mono.from(connection
.close())
.then(Mono.empty());
return Mono.from(connection.close()).then(Mono.empty());
}

private <T> void testCodec(Class<T> javaType, T value, String sqlType) {
Expand Down Expand Up @@ -750,94 +746,64 @@ private <IN, OUT> void testCodec(Class<IN> javaType, IN value, Class<OUT> outTyp

if (parameterType == null) {

this.connectionFactory.create()
.flatMapMany(connection -> connection
this.connectionFactory.create().flatMapMany(connection -> connection

.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")")
.bindNull("$1", javaType)
.execute()
.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")").bindNull("$1", javaType).execute()

.flatMap(PostgresqlResult::getRowsUpdated)
.flatMap(PostgresqlResult::getRowsUpdated)

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1L)
.verifyComplete();
.concatWith(close(connection))).as(StepVerifier::create).expectNext(1L).verifyComplete();

SERVER.getJdbcOperations().execute("DELETE FROM test");

this.connectionFactory.create()
.flatMapMany(connection -> connection
this.connectionFactory.create().flatMapMany(connection -> connection

.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")")
.bind("$1", value)
.execute()
.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")").bind("$1", value).execute()

.flatMap(PostgresqlResult::getRowsUpdated)
.flatMap(PostgresqlResult::getRowsUpdated)

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1L)
.verifyComplete();
.concatWith(close(connection))).as(StepVerifier::create).expectNext(1L).verifyComplete();
} else {

this.connectionFactory.create()
.flatMapMany(connection -> connection
this.connectionFactory.create().flatMapMany(connection -> connection

.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")")
.bind("$1", Parameters.in(parameterType))
.execute()
.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")").bind("$1", Parameters.in(parameterType)).execute()

.flatMap(PostgresqlResult::getRowsUpdated)
.flatMap(PostgresqlResult::getRowsUpdated)

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1L)
.verifyComplete();
.concatWith(close(connection))).as(StepVerifier::create).expectNext(1L).verifyComplete();

SERVER.getJdbcOperations().execute("DELETE FROM test");

this.connectionFactory.create()
.flatMapMany(connection -> connection
this.connectionFactory.create().flatMapMany(connection -> connection

.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")")
.bind("$1", Parameters.in(parameterType, value))
.execute()
.createStatement("INSERT INTO test VALUES (" + insertPlaceholder + ")").bind("$1", Parameters.in(parameterType, value)).execute()

.flatMap(PostgresqlResult::getRowsUpdated)
.flatMap(PostgresqlResult::getRowsUpdated)

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1L)
.verifyComplete();
.concatWith(close(connection))).as(StepVerifier::create).expectNext(1L).verifyComplete();
}

if (value instanceof Buffer) {
((Buffer) value).rewind();
}

this.connectionFactory.create()
.flatMapMany(connection -> {

PostgresqlStatement statement;
if (insertPlaceholder.equals("$1")) {
statement = connection
// where clause added to force using extended query instead of simple query
.createStatement("SELECT value FROM test WHERE " + insertPlaceholder + " <> 1")
.bind("$1", 2);
} else {
statement = connection.createStatement("SELECT value FROM test");
}
return statement.execute()

.map(result -> result.map((row, metadata) -> row.get("value", outType)))
.flatMap(Function.identity())

.concatWith(close(connection));
})
.as(StepVerifier::create)
.assertNext(r2dbc -> equality.accept(r2dbc, value))
.verifyComplete();
this.connectionFactory.create().flatMapMany(connection -> {

PostgresqlStatement statement;
if (insertPlaceholder.equals("$1")) {
statement = connection
// where clause added to force using extended query instead of simple query
.createStatement("SELECT value FROM test WHERE " + insertPlaceholder + " <> 1").bind("$1", 2);
} else {
statement = connection.createStatement("SELECT value FROM test");
}
return statement.execute()

.map(result -> result.map((row, metadata) -> row.get("value", outType))).flatMap(Function.identity())

.concatWith(close(connection));
}).as(StepVerifier::create).assertNext(r2dbc -> equality.accept(r2dbc, value)).verifyComplete();
} finally {
SERVER.getJdbcOperations().execute("DROP TABLE test");
}
Expand Down Expand Up @@ -868,32 +834,21 @@ private <W, R> void testCodecReadAs(W toWrite, Class<R> javaTypeToRead, Consumer
SERVER.getJdbcOperations().execute(String.format("CREATE TABLE test (value %s)", sqlType));

try {
this.connectionFactory.create()
.flatMapMany(connection -> connection
this.connectionFactory.create().flatMapMany(connection -> connection

.createStatement("INSERT INTO test VALUES ($1)")
.bind("$1", toWrite)
.execute()
.createStatement("INSERT INTO test VALUES ($1)").bind("$1", toWrite).execute()

.flatMap(PostgresqlResult::getRowsUpdated)
.flatMap(PostgresqlResult::getRowsUpdated)

.concatWith(close(connection)))
.as(StepVerifier::create)
.expectNext(1L)
.verifyComplete();
.concatWith(close(connection))).as(StepVerifier::create).expectNext(1L).verifyComplete();

this.connectionFactory.create()
.flatMapMany(connection -> {
return connection.createStatement("SELECT value FROM test").execute()
this.connectionFactory.create().flatMapMany(connection -> {
return connection.createStatement("SELECT value FROM test").execute()

.map(result -> result.map((row, metadata) -> row.get("value", javaTypeToRead)))
.flatMap(Function.identity())
.map(result -> result.map((row, metadata) -> row.get("value", javaTypeToRead))).flatMap(Function.identity())

.concatWith(close(connection));
})
.as(StepVerifier::create)
.assertNext(equality)
.verifyComplete();
.concatWith(close(connection));
}).as(StepVerifier::create).assertNext(equality).verifyComplete();
} finally {
SERVER.getJdbcOperations().execute("DROP TABLE test");
}
Expand Down

0 comments on commit e1ef07f

Please sign in to comment.