Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compression protocol #219

Merged
merged 3 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Refer to the table below to determine the appropriate version of r2dbc-mysql for
This driver provides the following features:

- [x] Unix domain socket.
- [x] Compression protocols, including zstd and zlib.
- [x] Execution of simple or batch statements without bindings.
- [x] Execution of prepared statements with bindings.
- [x] Reactive LOB types (e.g. BLOB, CLOB)
Expand Down Expand Up @@ -143,6 +144,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(Option.valueOf("allowLoadLocalInfileInPath"), "/opt") // optional, default null, null means LOCAL INFILE not be allowed (since 1.1.0)
.option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
.option(Option.valueOf("tcpNoDelay"), true) // optional, default false
.option(Option.valueOf("compressionAlgorithms"), "zstd") // optional, default UNCOMPRESSED
.option(Option.valueOf("autodetectExtensions"), false) // optional, default false
.option(Option.valueOf("passwordPublisher"), Mono.just("password")) // optional, default null, null means has no passwordPublisher (since 1.0.5 / 0.9.6)
.build();
Expand Down Expand Up @@ -191,6 +193,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde
.allowLoadLocalInfileInPath("/opt") // optional, default null, null means LOCAL INFILE not be allowed
.tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
.tcpNoDelay(true) // optional, controls TCP No Delay, default is false
.compressionAlgorithms(CompressionAlgorithm.ZSTD, CompressionAlgotihm.ZLIB) // optional, default is UNCOMPRESSED
.autodetectExtensions(false) // optional, controls extension auto-detect, default is true
.extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
.passwordPublisher(Mono.just("password")) // optional, default null, null means has no password publisher (since 1.0.5 / 0.9.6)
Expand Down Expand Up @@ -242,6 +245,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
| autodetectExtensions | `true` or `false` | Optional, default is `true` | Controls auto-detect `Extension`s |
| useServerPrepareStatement | `true`, `false` or `Predicate<String>` | Optional, default is `false` | See following notice |
| allowLoadLocalInfileInPath | A path | Optional, default is `null` | The path that allows `LOAD DATA LOCAL INFILE` to load file data |
| compressionAlgorithms | A list of `CompressionAlgorithm` | Optional, default is `UNCOMPRESSED` | The compression algorithms for MySQL connection |
| passwordPublisher | A `Publisher<String>` | Optional, default is `null` | The password publisher, see following notice |

- `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available**
Expand Down Expand Up @@ -269,6 +273,11 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
- The `Extensions` will not remove duplicates, make sure it would be not extended twice or more
- The auto-detected `Extension`s will not affect manual extends and will not remove duplicates
- `passwordPublisher` Every time the client attempts to authenticate, it will use the password provided by the `passwordPublisher`.(Since `1.0.5` / `0.9.6`) e.g., You can employ this method for IAM-based authentication when connecting to an AWS Aurora RDS database.
- `compressionAlgorithms` Considers compression protocol for MySQL connection, it is **NOT** RECOMMENDED to use compression protocol in the general case, because it will increase the CPU usage and decrease the performance.
- `UNCOMPRESSED` (default) No compression
- `ZLIB` Use Zlib compression protocol, it is available on almost all MySQL versions (`5.x` and above)
- `ZSTD` Use Z-standard compression protocol, it is available since MySQL `8.0.18` or above, requires an extern dependency `com.github.luben:zstd-jni`
- For scenarios where the network environment is poor or the amount of data is always large, using a compression protocol may be useful

Should use `enum` in [Programmatic](#programmatic-configuration) configuration that not like discovery configurations, except `TlsVersions` (All elements of `TlsVersions` will be always `String` which is case-sensitive).

Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<jackson.version>2.16.0</jackson.version>
<mbr.version>0.3.0.RELEASE</mbr.version>
<jsr305.version>3.0.2</jsr305.version>
<zstd-jni.version>1.5.5-11</zstd-jni.version>
<java-annotations.version>24.1.0</java-annotations.version>
<bouncy-castle.version>1.77</bouncy-castle.version>
</properties>
Expand Down Expand Up @@ -153,6 +154,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/io/asyncer/r2dbc/mysql/Capability.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public final class Capability {
// Allow the server not to send column metadata in result set,
// should NEVER enable this option.
// private static final long OPTIONAL_RESULT_SET_METADATA = 1L << 25;
// private static final long Z_STD_COMPRESSION = 1L << 26;
private static final long ZSTD_COMPRESS = 1L << 26;

// A reserved flag, used to extend the 32-bits capability bitmap to 64-bits.
// There is no available MySql server version/edition to support it.
Expand All @@ -175,7 +175,7 @@ public final class Capability {
private static final long ALL_SUPPORTED = CLIENT_MYSQL | FOUND_ROWS | LONG_FLAG | CONNECT_WITH_DB |
NO_SCHEMA | COMPRESS | LOCAL_FILES | IGNORE_SPACE | PROTOCOL_41 | INTERACTIVE | SSL |
TRANSACTIONS | SECURE_SALT | MULTI_STATEMENTS | MULTI_RESULTS | PS_MULTI_RESULTS |
PLUGIN_AUTH | CONNECT_ATTRS | VAR_INT_SIZED_AUTH | SESSION_TRACK | DEPRECATE_EOF;
PLUGIN_AUTH | CONNECT_ATTRS | VAR_INT_SIZED_AUTH | SESSION_TRACK | DEPRECATE_EOF | ZSTD_COMPRESS;

private final long bitmap;

Expand Down Expand Up @@ -278,6 +278,33 @@ public boolean isTransactionAllowed() {
return (bitmap & TRANSACTIONS) != 0;
}

/**
* Checks if any compression enabled.
*
* @return if any compression enabled.
*/
public boolean isCompression() {
return (bitmap & (COMPRESS | ZSTD_COMPRESS)) != 0;
}

/**
* Checks if zlib compression enabled.
*
* @return if zlib compression enabled.
*/
public boolean isZlibCompression() {
return (bitmap & COMPRESS) != 0;
}

/**
* Checks if zstd compression enabled.
*
* @return if zstd compression enabled.
*/
public boolean isZstdCompression() {
return (bitmap & ZSTD_COMPRESS) != 0;
}

/**
* Extends MariaDB capabilities.
*
Expand Down Expand Up @@ -362,9 +389,17 @@ void disableDatabasePinned() {
}

void disableCompression() {
this.bitmap &= ~(COMPRESS | ZSTD_COMPRESS);
}

void disableZlibCompression() {
this.bitmap &= ~COMPRESS;
}

void disableZstdCompression() {
this.bitmap &= ~ZSTD_COMPRESS;
}

void disableLoadDataLocalInfile() {
this.bitmap &= ~LOCAL_FILES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
import io.asyncer.r2dbc.mysql.extension.Extension;
Expand All @@ -30,9 +31,12 @@
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -97,6 +101,10 @@ public final class MySqlConnectionConfiguration {

private final int prepareCacheSize;

private final Set<CompressionAlgorithm> compressionAlgorithms;

private final int zstdCompressionLevel;

private final Extensions extensions;

@Nullable
Expand All @@ -109,8 +117,9 @@ private MySqlConnectionConfiguration(
String user, @Nullable CharSequence password, @Nullable String database,
boolean createDatabaseIfNotExist, @Nullable Predicate<String> preferPrepareStatement,
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
int queryCacheSize, int prepareCacheSize, Extensions extensions,
@Nullable Publisher<String> passwordPublisher
int queryCacheSize, int prepareCacheSize,
Set<CompressionAlgorithm> compressionAlgorithms, int zstdCompressionLevel,
Extensions extensions, @Nullable Publisher<String> passwordPublisher
) {
this.isHost = isHost;
this.domain = domain;
Expand All @@ -130,6 +139,8 @@ private MySqlConnectionConfiguration(
this.localInfileBufferSize = localInfileBufferSize;
this.queryCacheSize = queryCacheSize;
this.prepareCacheSize = prepareCacheSize;
this.compressionAlgorithms = compressionAlgorithms;
this.zstdCompressionLevel = zstdCompressionLevel;
this.extensions = extensions;
this.passwordPublisher = passwordPublisher;
}
Expand Down Expand Up @@ -220,6 +231,14 @@ int getPrepareCacheSize() {
return prepareCacheSize;
}

Set<CompressionAlgorithm> getCompressionAlgorithms() {
return compressionAlgorithms;
}

int getZstdCompressionLevel() {
return zstdCompressionLevel;
}

Extensions getExtensions() {
return extensions;
}
Expand Down Expand Up @@ -256,6 +275,8 @@ public boolean equals(Object o) {
localInfileBufferSize == that.localInfileBufferSize &&
queryCacheSize == that.queryCacheSize &&
prepareCacheSize == that.prepareCacheSize &&
compressionAlgorithms.equals(that.compressionAlgorithms) &&
zstdCompressionLevel == that.zstdCompressionLevel &&
extensions.equals(that.extensions) &&
Objects.equals(passwordPublisher, that.passwordPublisher);
}
Expand All @@ -265,7 +286,7 @@ public int hashCode() {
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist,
preferPrepareStatement, loadLocalInfilePath, localInfileBufferSize, queryCacheSize,
prepareCacheSize, extensions, passwordPublisher);
prepareCacheSize, compressionAlgorithms, zstdCompressionLevel, extensions, passwordPublisher);
}

@Override
Expand All @@ -280,6 +301,8 @@ public String toString() {
", loadLocalInfilePath=" + loadLocalInfilePath +
", localInfileBufferSize=" + localInfileBufferSize +
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
", compressionAlgorithms=" + compressionAlgorithms +
", zstdCompressionLevel=" + zstdCompressionLevel +
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
}

Expand All @@ -291,8 +314,10 @@ public String toString() {
", loadLocalInfilePath=" + loadLocalInfilePath +
", localInfileBufferSize=" + localInfileBufferSize +
", queryCacheSize=" + queryCacheSize +
", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions +
", passwordPublisher=" + passwordPublisher + '}';
", prepareCacheSize=" + prepareCacheSize +
", compressionAlgorithms=" + compressionAlgorithms +
", zstdCompressionLevel=" + zstdCompressionLevel +
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
}

/**
Expand Down Expand Up @@ -363,6 +388,11 @@ public static final class Builder {

private int prepareCacheSize = 256;

private Set<CompressionAlgorithm> compressionAlgorithms =
Collections.singleton(CompressionAlgorithm.UNCOMPRESSED);

private int zstdCompressionLevel = 3;

private boolean autodetectExtensions = true;

private final List<Extension> extensions = new ArrayList<>();
Expand Down Expand Up @@ -395,6 +425,7 @@ public MySqlConnectionConfiguration build() {
connectTimeout, zeroDateOption, serverZoneId, user, password, database,
createDatabaseIfNotExist, preferPrepareStatement, loadLocalInfilePath,
localInfileBufferSize, queryCacheSize, prepareCacheSize,
compressionAlgorithms, zstdCompressionLevel,
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
}

Expand Down Expand Up @@ -822,6 +853,64 @@ public Builder prepareCacheSize(int prepareCacheSize) {
return this;
}

/**
* Configures the compression algorithms. Default to [{@link CompressionAlgorithm#UNCOMPRESSED}].
* <p>
* It will auto choose an algorithm that's contained in the list and supported by the server,
* preferring zstd, then zlib. If the list does not contain {@link CompressionAlgorithm#UNCOMPRESSED}
* and the server does not support any algorithm in the list, an exception will be thrown when
* connecting.
* <p>
* Note: zstd requires a dependency {@code com.github.luben:zstd-jni}.
*
* @param compressionAlgorithms the list of compression algorithms.
* @return {@link Builder this}.
* @throws IllegalArgumentException if {@code compressionAlgorithms} is {@code null} or empty.
* @since 1.1.2
*/
public Builder compressionAlgorithms(CompressionAlgorithm... compressionAlgorithms) {
requireNonNull(compressionAlgorithms, "compressionAlgorithms must not be null");
require(compressionAlgorithms.length != 0, "compressionAlgorithms must not be empty");

if (compressionAlgorithms.length == 1) {
requireNonNull(compressionAlgorithms[0], "compressionAlgorithms must not contain null");
this.compressionAlgorithms = Collections.singleton(compressionAlgorithms[0]);
} else {
Set<CompressionAlgorithm> algorithms = EnumSet.noneOf(CompressionAlgorithm.class);

for (CompressionAlgorithm algorithm : compressionAlgorithms) {
requireNonNull(algorithm, "compressionAlgorithms must not contain null");
algorithms.add(algorithm);
}

this.compressionAlgorithms = algorithms;
}

return this;
}

/**
* Configures the zstd compression level. Default to {@code 3}.
* <p>
* It is only used if zstd is chosen for the connection.
* <p>
* Note: MySQL protocol does not allow to set the zlib compression level of the server, only zstd is
* configurable.
*
* @param level the compression level.
* @return {@link Builder this}.
* @throws IllegalArgumentException if {@code level} is not between 1 and 22.
* @since 1.1.2
* @see <a href="https://dev.mysql.com/doc/refman/8.0/en/connection-options.html">
* MySQL Connection Options --zstd-compression-level</a>
*/
public Builder zstdCompressionLevel(int level) {
require(level >= 1 && level <= 22, "level must be between 1 and 22");

this.zstdCompressionLevel = level;
return this;
}

/**
* Configures whether to use {@link ServiceLoader} to discover and register extensions. Defaults to
* {@code true}.
Expand Down
Loading
Loading