Skip to content

Commit

Permalink
Add support for MariaDB version pattern (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth authored Jan 16, 2024
1 parent 96c11dc commit 8752f66
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 83 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/asyncer/r2dbc/mysql/Capability.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public final class Capability {

private final long bitmap;

/**
* Checks if the connection is using MariaDB capabilities.
*
* @return if using MariaDB capabilities.
*/
public boolean isMariaDb() {
return (bitmap & CLIENT_MYSQL) == 0;
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public ZoneId getServerZoneId() {
return serverZoneId;
}

@Override
public boolean isMariaDb() {
return capability.isMariaDb() || serverVersion.isMariaDb();
}

boolean shouldSetServerZoneId() {
return serverZoneId == null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Integer getScale() {
@Override
public CharCollation getCharCollation(CodecContext context) {
return collationId == CharCollation.BINARY_ID ? context.getClientCollation() :
CharCollation.fromId(collationId, context.getServerVersion());
CharCollation.fromId(collationId, context);
}

@Override
Expand Down
52 changes: 31 additions & 21 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,13 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private static final int PREFIX_LENGTH = 6;

/**
* If MySQL server version greater than or equal to {@literal 8.0.3}, or greater than or equal to
* {@literal 5.7.20} and less than {@literal 8.0.0}, the column name of current session isolation level
* will be {@literal @@transaction_isolation}, otherwise it is {@literal @@tx_isolation}.
*
* @see #init judge server version before get the isolation level.
*/
private static final ServerVersion TRAN_LEVEL_8X = ServerVersion.create(8, 0, 3);
private static final ServerVersion MARIA_11_1_1 = ServerVersion.create(11, 1, 1, true);

private static final ServerVersion TRAN_LEVEL_5X = ServerVersion.create(5, 7, 20);
private static final ServerVersion MYSQL_8_0_3 = ServerVersion.create(8, 0, 3);

private static final ServerVersion TX_LEVEL_8X = ServerVersion.create(8, 0, 0);
private static final ServerVersion MYSQL_5_7_20 = ServerVersion.create(5, 7, 20);

private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0);

private static final BiConsumer<ServerMessage, SynchronousSink<Boolean>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
Expand Down Expand Up @@ -427,17 +422,10 @@ static Mono<MySqlConnection> init(
QueryCache queryCache, PrepareCache prepareCache,
@Nullable Predicate<String> prepare
) {
ServerVersion version = context.getServerVersion();
StringBuilder query = new StringBuilder(128);

// Maybe create a InitFlow for data initialization after login?
if (version.isGreaterThanOrEqualTo(TRAN_LEVEL_8X) ||
(version.isGreaterThanOrEqualTo(TRAN_LEVEL_5X) && version.isLessThan(TX_LEVEL_8X))) {
query.append(
"SELECT @@transaction_isolation AS i,@@innodb_lock_wait_timeout AS l,@@version_comment AS v");
} else {
query.append("SELECT @@tx_isolation AS i,@@innodb_lock_wait_timeout AS l,@@version_comment AS v");
}
StringBuilder query = new StringBuilder(128)
.append("SELECT ")
.append(transactionIsolationColumn(context))
.append(",@@innodb_lock_wait_timeout AS l,@@version_comment AS v");

Function<MySqlResult, Publisher<InitData>> handler;

Expand Down Expand Up @@ -587,6 +575,28 @@ private static long convertLockWaitTimeout(@Nullable Long timeout) {
return timeout;
}

/**
* Resolves the column of session isolation level, the {@literal @@tx_isolation} has been marked as
* deprecated.
* <p>
* If server is MariaDB, {@literal @@transaction_isolation} is used starting from {@literal 11.1.1}.
* <p>
* If the server is MySQL, use {@literal @@transaction_isolation} starting from {@literal 8.0.3}, or
* between {@literal 5.7.20} and {@literal 8.0.0} (exclusive).
*/
private static String transactionIsolationColumn(ConnectionContext context) {
ServerVersion version = context.getServerVersion();

if (context.isMariaDb()) {
return version.isGreaterThanOrEqualTo(MARIA_11_1_1) ? "@@transaction_isolation AS i" :
"@@tx_isolation AS i";
}

return version.isGreaterThanOrEqualTo(MYSQL_8_0_3) ||
(version.isGreaterThanOrEqualTo(MYSQL_5_7_20) && version.isLessThan(MYSQL_8)) ?
"@@transaction_isolation AS i" : "@@tx_isolation AS i";
}

private static class InitData {

private final IsolationLevel level;
Expand Down
88 changes: 64 additions & 24 deletions src/main/java/io/asyncer/r2dbc/mysql/ServerVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
*/
public final class ServerVersion implements Comparable<ServerVersion> {

/**
* MariaDB's replication hack prefix.
* <p>
* Note: MySQL 5.5.5 is not a stable version, so it should be safe.
*/
private static final String MARIADB_RPL_HACK_PREFIX = "5.5.5-";

private static final String ENTERPRISE = "enterprise";

private static final String COMMERCIAL = "commercial";
Expand All @@ -42,11 +49,14 @@ public final class ServerVersion implements Comparable<ServerVersion> {

private final int patch;

private ServerVersion(String origin, int major, int minor, int patch) {
private final boolean isMariaDb;

private ServerVersion(String origin, int major, int minor, int patch, boolean isMariaDb) {
this.origin = origin;
this.major = major;
this.minor = minor;
this.patch = patch;
this.isMariaDb = isMariaDb;
}

/**
Expand Down Expand Up @@ -97,6 +107,15 @@ public int getPatch() {
return patch;
}

/**
* Checks {@link ServerVersion this} contains MariaDB prefix or postfix.
*
* @return if it contains.
*/
public boolean isMariaDb() {
return isMariaDb;
}

/**
* Checks if the version is enterprise edition.
* <p>
Expand Down Expand Up @@ -150,32 +169,39 @@ public static ServerVersion parse(String version) {

int length = version.length();
int[] index = new int[] { 0 };
int major = readInt(version, length, index);

if (index[0] >= length) {
// End-of-string.
return create0(version, major, 0, 0);
} else if (version.charAt(index[0]) != '.') {
// Is not '.', has only postfix after major.
return create0(version, major, 0, 0);
} else {
// Skip last '.' after major.
++index[0];
boolean isMariaDb = false;

if (version.startsWith(MARIADB_RPL_HACK_PREFIX)) {
isMariaDb = true;
index[0] = MARIADB_RPL_HACK_PREFIX.length();
}

int minor = readInt(version, length, index);
int[] parts = new int[] { 0, 0, 0 };
int i = 0;

while (true) {
parts[i] = readInt(version, length, index);

if (index[0] >= length) {
return create0(version, major, minor, 0);
} else if (version.charAt(index[0]) != '.') {
// Is not '.', has only postfix after minor.
return create0(version, major, minor, 0);
} else {
// Skip last '.' after minor.
if (index[0] >= length) {
// End of version.
break;
}

if (i == 2 || version.charAt(index[0]) != '.') {
// End of version number parts, check postfix if needed.
if (!isMariaDb) {
isMariaDb = version.indexOf("MariaDB", index[0]) >= 0;
}

break;
}

// Skip last '.' after current number part.
++index[0];
++i;
}

return create0(version, major, minor, readInt(version, length, index));
return create0(version, parts[0], parts[1], parts[2], isMariaDb);
}

/**
Expand All @@ -188,15 +214,29 @@ public static ServerVersion parse(String version) {
* @throws IllegalArgumentException if any version part is negative integer.
*/
public static ServerVersion create(int major, int minor, int patch) {
return create0("", major, minor, patch);
return create0("", major, minor, patch, false);
}

/**
* Create a {@link ServerVersion} that value is {@literal major.minor.patch} with MariaDB flag.
*
* @param major must not be a negative integer
* @param minor must not be a negative integer
* @param patch must not be a negative integer
* @param isMariaDb MariaDB flag
* @return A server version that value is {@literal major.minor.patch}
* @throws IllegalArgumentException if any version part is negative integer.
*/
public static ServerVersion create(int major, int minor, int patch, boolean isMariaDb) {
return create0("", major, minor, patch, isMariaDb);
}

private static ServerVersion create0(String origin, int major, int minor, int patch) {
private static ServerVersion create0(String origin, int major, int minor, int patch, boolean isMariaDb) {
require(major >= 0, "major version must not be a negative integer");
require(minor >= 0, "minor version must not be a negative integer");
require(patch >= 0, "patch version must not be a negative integer");

return new ServerVersion(origin, major, minor, patch);
return new ServerVersion(origin, major, minor, patch, isMariaDb);
}

/**
Expand Down
47 changes: 34 additions & 13 deletions src/main/java/io/asyncer/r2dbc/mysql/client/SslBridgeHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@ final class SslBridgeHandler extends ChannelDuplexHandler {

private static final String[] OLD_TLS_PROTOCOLS = new String[] { TlsVersions.TLS1_1, TlsVersions.TLS1 };

private static final ServerVersion VER_5_6_0 = ServerVersion.create(5, 6, 0);
private static final ServerVersion MARIA_10_2_16 = ServerVersion.create(10, 2, 16, true);

private static final ServerVersion VER_5_6_46 = ServerVersion.create(5, 6, 46);
private static final ServerVersion MARIA_10_3_0 = ServerVersion.create(10, 3, 0, true);

private static final ServerVersion VER_5_7_0 = ServerVersion.create(5, 7, 0);
private static final ServerVersion MARIA_10_3_8 = ServerVersion.create(10, 3, 8, true);

private static final ServerVersion VER_5_7_28 = ServerVersion.create(5, 7, 28);
private static final ServerVersion MYSQL_5_6_0 = ServerVersion.create(5, 6, 0);

private static final ServerVersion MYSQL_5_6_46 = ServerVersion.create(5, 6, 46);

private static final ServerVersion MYSQL_5_7_0 = ServerVersion.create(5, 7, 0);

private static final ServerVersion MYSQL_5_7_28 = ServerVersion.create(5, 7, 28);

private final ConnectionContext context;

Expand Down Expand Up @@ -144,7 +150,7 @@ private void handleSslState(ChannelHandlerContext ctx, SslState state) {
logger.debug("SSL event triggered, enable SSL handler to pipeline");

SslProvider sslProvider = SslProvider.builder()
.sslContext(MySqlSslContextSpec.forClient(ssl, context.getServerVersion()))
.sslContext(MySqlSslContextSpec.forClient(ssl, context))
.build();
SslHandler sslHandler = sslProvider.getSslContext().newHandler(ctx.alloc());

Expand All @@ -167,13 +173,23 @@ private HostnameVerifier hostnameVerifier() {
return verifier == null ? DefaultHostnameVerifier.INSTANCE : verifier;
}

private static boolean isCurrentTlsEnabled(ServerVersion version) {
// See also https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-using-ssl.html
private static boolean isTls13Enabled(ConnectionContext context) {
ServerVersion version = context.getServerVersion();

if (context.isMariaDb()) {
// See also https://mariadb.com/kb/en/secure-connections-overview/#tls-protocol-version-support
// Quoting fragment: MariaDB binaries built with the OpenSSL library (OpenSSL 1.1.1 or later)
// support TLSv1.3 since MariaDB 10.2.16 and MariaDB 10.3.8.
return (version.isGreaterThanOrEqualTo(MARIA_10_2_16) && version.isLessThan(MARIA_10_3_0))
|| version.isGreaterThanOrEqualTo(MARIA_10_3_8);
}

// See also https://dev.mysql.com/doc/relnotes/connector-j/en/news-8-0-19.html
// Quoting fragment: TLSv1,TLSv1.1,TLSv1.2,TLSv1.3 for MySQL Community Servers 8.0, 5.7.28 and
// later, and 5.6.46 and later, and for all commercial versions of MySQL Servers.
return version.isGreaterThanOrEqualTo(VER_5_7_28)
|| (version.isGreaterThanOrEqualTo(VER_5_6_46) && version.isLessThan(VER_5_7_0))
|| (version.isGreaterThanOrEqualTo(VER_5_6_0) && version.isEnterprise());
return version.isGreaterThanOrEqualTo(MYSQL_5_7_28)
|| (version.isGreaterThanOrEqualTo(MYSQL_5_6_46) && version.isLessThan(MYSQL_5_7_0))
|| (version.isGreaterThanOrEqualTo(MYSQL_5_6_0) && version.isEnterprise());
}

private static final class MySqlSslContextSpec implements SslProvider.ProtocolSslContextSpec {
Expand All @@ -196,7 +212,7 @@ public SslContext sslContext() throws SSLException {
return builder.build();
}

static MySqlSslContextSpec forClient(MySqlSslConfiguration ssl, ServerVersion version) {
static MySqlSslContextSpec forClient(MySqlSslConfiguration ssl, ConnectionContext context) {
// Same default configuration as TcpSslContextSpec.
SslContextBuilder builder = SslContextBuilder.forClient()
.sslProvider(OpenSsl.isAvailable() ? OPENSSL : JDK)
Expand All @@ -206,11 +222,16 @@ static MySqlSslContextSpec forClient(MySqlSslConfiguration ssl, ServerVersion ve

if (tlsProtocols.length > 0) {
builder.protocols(tlsProtocols);
} else if (isCurrentTlsEnabled(version)) {
} else if (isTls13Enabled(context)) {
builder.protocols(TLS_PROTOCOLS);
} else {
// Not sure if we need to check the JDK version, suggest not.
logger.warn("MySQL {} does not support TLS1.2, TLS1.1 is disabled in latest JDKs", version);
if (logger.isWarnEnabled()) {
logger.warn("{} {} does not support TLS1.2, TLS1.1 is disabled in latest JDKs",
context.isMariaDb() ? "MariaDB" : "MySQL",
context.getServerVersion());
}

builder.protocols(OLD_TLS_PROTOCOLS);
}

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/asyncer/r2dbc/mysql/codec/CodecContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ public interface CodecContext {
* @return the {@link CharCollation}.
*/
CharCollation getClientCollation();

/**
* Checks server is MariaDB or not.
*
* @return if is MariaDB.
*/
boolean isMariaDb();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.asyncer.r2dbc.mysql.collation;

import io.asyncer.r2dbc.mysql.ServerVersion;
import io.asyncer.r2dbc.mysql.codec.CodecContext;

import java.nio.charset.Charset;

Expand Down Expand Up @@ -62,17 +62,17 @@ public interface CharCollation {

/**
* Obtain an instance of {@link CharCollation} from the identifier and server version, if not found, it
* will fallback to UTF-8. (i.e. utf8mb4)
* will fall back to UTF-8. (i.e. utf8mb4)
*
* @param id character collation identifier.
* @param version the version of MySQL server.
* @param context the codec context of server.
* @return the {@link CharCollation}.
* @throws IllegalArgumentException if {@code version} is {@code null}.
*/
static CharCollation fromId(int id, ServerVersion version) {
requireNonNull(version, "version must not be null");
static CharCollation fromId(int id, CodecContext context) {
requireNonNull(context, "version must not be null");

return CharCollations.fromId(id, version);
return CharCollations.fromId(id, context);
}

/**
Expand Down
Loading

0 comments on commit 8752f66

Please sign in to comment.