Skip to content

Commit

Permalink
Pivotal ID #186675047: FTP connection pool (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan-EBI authored Dec 15, 2023
1 parent 55d754d commit 2724872
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 116 deletions.
3 changes: 3 additions & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Versions.CommonsFileUploadVersion
import Versions.CommonsIOVersion
import Versions.CommonsLang3Version
import Versions.CommonsNetVersion
import Versions.CommonsPoolVersion
import Versions.GuavaVersion
import Versions.H2Version
import Versions.JSONOrgVersion
Expand Down Expand Up @@ -88,6 +89,7 @@ object Versions {
const val CommonsLang3Version = "3.8.1"
const val CommonsIOVersion = "2.6"
const val CommonsNetVersion = "3.6"
const val CommonsPoolVersion = "2.12.0"
const val CommonsCsvVersion = "1.8"
const val MySqlVersion = "8.0.25"
const val XmlBuilderVersion = "1.7.4"
Expand Down Expand Up @@ -216,6 +218,7 @@ object Dependencies {
const val CommonsLang3 = "org.apache.commons:commons-lang3:$CommonsLang3Version"
const val CommonsIO = "commons-io:commons-io:$CommonsIOVersion"
const val CommonsNet = "commons-net:commons-net:$CommonsNetVersion"
const val CommonsPool = "org.apache.commons:commons-pool2:$CommonsPoolVersion"
const val CommonsCsv = "org.apache.commons:commons-csv:$CommonsCsvVersion"
const val PoiOxml = "org.apache.poi:poi-ooxml:$PoiVersion"
}
Expand Down
2 changes: 1 addition & 1 deletion client/cluster-client/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
coverage=0.31
coverage=0.30
6 changes: 6 additions & 0 deletions client/ftp-webclient/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import Dependencies.CommonsNet
import Dependencies.CommonsPool
import Dependencies.KotlinLogging
import Dependencies.KotlinStdLib
import Projects.CommonsUtil
import TestDependencies.BaseTestCompileDependencies
import TestDependencies.BaseTestRuntimeDependencies
import TestDependencies.FtpServer
import io.spring.gradle.dependencymanagement.dsl.DependencyManagementExtension
import org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES

plugins {
id("io.spring.dependency-management") version "1.0.12.RELEASE"
id("org.springframework.boot") version "2.7.1" apply false
`java-test-fixtures`
}

the<DependencyManagementExtension>().apply {
Expand All @@ -23,6 +26,9 @@ dependencies {
implementation(KotlinStdLib)
implementation(KotlinLogging)
implementation(CommonsNet)
implementation(CommonsPool)

testFixturesImplementation(FtpServer)

BaseTestCompileDependencies.forEach { testImplementation(it) }
BaseTestRuntimeDependencies.forEach { testImplementation(it) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package ebi.ac.uk.ftp

import mu.KotlinLogging
import org.apache.commons.net.ftp.FTPClient
import org.apache.commons.pool2.BasePooledObjectFactory
import org.apache.commons.pool2.PooledObject
import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.impl.GenericObjectPool
import kotlin.time.Duration.Companion.milliseconds

private val logger = KotlinLogging.logger {}

/**
* Pooled FTP client. Allows to re use FTPClient instances so socket connections and ftp logging does not need to be
* performed on each FTP operation.
*/
internal class FTPClientPool(
private val ftpUser: String,
private val ftpPassword: String,
private val ftpUrl: String,
private val ftpPort: Int,
private val ftpClientPool: GenericObjectPool<FTPClient> = createFtpPool(ftpUser, ftpPassword, ftpUrl, ftpPort),
) {
fun <T> execute(action: (FTPClient) -> T): T {
val ftpClient = ftpClientPool.borrowObject()
return try {
action(ftpClient)
} finally {
ftpClientPool.returnObject(ftpClient)
}
}

private class FTPClientFactory(
private val ftpUser: String,
private val ftpPassword: String,
private val ftpUrl: String,
private val ftpPort: Int,
) : BasePooledObjectFactory<FTPClient>() {
override fun create(): FTPClient {
val ftp = ftpClient(3000.milliseconds, 3000.milliseconds)
logger.info { "Connecting to $ftpUrl, $ftpPort" }
ftp.connect(ftpUrl, ftpPort)
ftp.login(ftpUser, ftpPassword)
ftp.enterLocalPassiveMode()
return ftp
}

override fun wrap(ftpClient: FTPClient): PooledObject<FTPClient> {
return DefaultPooledObject(ftpClient)
}

override fun destroyObject(p: PooledObject<FTPClient>) {
val ftpClient = p.`object`
if (ftpClient.isConnected) {
ftpClient.logout()
ftpClient.disconnect()
}
}

@Suppress("TooGenericExceptionCaught")
override fun validateObject(p: PooledObject<FTPClient>): Boolean {
val ftpClient = p.`object`
return try {
ftpClient.sendNoOp()
} catch (e: Exception) {
logger.error(e) { "Error checking ftp connection" }
false
}
}
}

private companion object {
private const val MIN_CONNECTION = 2

fun createFtpPool(
ftpUser: String,
ftpPassword: String,
ftpUrl: String,
ftpPort: Int,
): GenericObjectPool<FTPClient> {
val factory = FTPClientFactory(ftpUser, ftpPassword, ftpUrl, ftpPort)
var connections = GenericObjectPool(factory)
connections.minIdle = MIN_CONNECTION
return connections
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.apache.commons.net.ftp.FTPFile
import java.io.InputStream
import java.io.OutputStream
import java.nio.file.Path
import kotlin.time.Duration.Companion.milliseconds

private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -42,69 +41,52 @@ interface FtpClient {
*/
fun deleteFile(path: Path)

/**
* Executes operations creating a new Ftp Client class every time as
* @see [documented](https://cwiki.apache.org/confluence/display/COMMONS/Net+FrequentlyAskedQuestions)
* class is not thread safe.
*/
fun <T> execute(function: (FTPClient) -> T): T

companion object {
fun create(ftpUser: String, ftpPassword: String, ftpUrl: String, ftpPort: Int): FtpClient =
SimpleFtpClient(ftpUser, ftpPassword, ftpUrl, ftpPort)
fun create(ftpUser: String, ftpPassword: String, ftpUrl: String, ftpPort: Int): FtpClient {
val connectionPool = FTPClientPool(ftpUser, ftpPassword, ftpUrl, ftpPort)
return SimpleFtpClient(connectionPool)
}
}
}

private class SimpleFtpClient(
private val ftpUser: String,
private val ftpPassword: String,
private val ftpUrl: String,
private val ftpPort: Int,
private val ftpClientPool: FTPClientPool,
) : FtpClient {
override fun uploadFiles(folder: Path, files: List<Pair<Path, () -> InputStream>>) {
execute { ftp ->
ftpClientPool.execute { ftp ->
for ((path, inputStream) in files) {
ftp.createFtpFolder(path.parent)
inputStream().use { ftp.storeFile(path.toString(), it) }
}
}
}

/**
* As FTP does not support nested folder creation in a single path the full path is
* transverse and required missing folder are created.
*/
private fun FTPClient.createFtpFolder(path: Path) {
val paths = path.runningReduce { acc, value -> acc.resolve(value) }
paths.forEach { this.makeDirectory(it.toString()) }
}

/**
* Upload the given input stream in the provided FTP location. Stream is closed after transfer completion.
*/
override fun uploadFile(path: Path, source: () -> InputStream) {
execute { ftp -> source().use { ftp.storeFile(path.toString(), it) } }
ftpClientPool.execute { ftp -> source().use { ftp.storeFile(path.toString(), it) } }
}

/**
* Download the given file in the output stream. Output stream is NOT closed after completion.
*/
override fun downloadFile(path: Path, source: OutputStream) {
execute { ftp -> ftp.retrieveFile(path.toString(), source) }
ftpClientPool.execute { ftp -> ftp.retrieveFile(path.toString(), source) }
}

/**
* Create the given folder.
*/
override fun createFolder(path: Path) {
execute { ftp -> ftp.createFtpFolder(path) }
ftpClientPool.execute { ftp -> ftp.createFtpFolder(path) }
}

/**
* List the files in the given path.
*/
override fun listFiles(path: Path): List<FTPFile> {
return execute { ftp ->
return ftpClientPool.executeRestoringWorkingDirectory { ftp ->
ftp.changeWorkingDirectory(path.toString())
ftp.listFiles().toList()
}
Expand All @@ -114,38 +96,44 @@ private class SimpleFtpClient(
* Delete the file or folder in the given path.
*/
override fun deleteFile(path: Path) {
execute { ftp ->
ftpClientPool.executeRestoringWorkingDirectory { ftp ->
val fileDeleted = ftp.deleteFile(path.toString())
if (fileDeleted.not()) ftp.deleteDirectory(path)
}
}

/**
* As FTP does not support nested folder creation in a single path the full path is
* transverse and required missing folder are created.
*/
private fun FTPClient.createFtpFolder(path: Path) {
val paths = path.runningReduce { acc, value -> acc.resolve(value) }
paths.forEach { makeDirectory(it.toString()) }
}

/**
* As Ftp clients are re used we need to guarantee that, if the working directory is changed, it is restored after
* the operation is completed.
*/
private fun <T> FTPClientPool.executeRestoringWorkingDirectory(action: (FTPClient) -> T): T {
return execute {
val source = it.printWorkingDirectory()
try {
action(it)
} finally {
it.changeWorkingDirectory(source)
}
}
}

/**
* As delete multiple files are not supported by apache client its neccessary delete by iterating over each file.
*/
private fun FTPClient.deleteDirectory(dirPath: Path) {
changeWorkingDirectory(dirPath.toString())
listNames().forEach { deleteFile(it); }
listNames().forEach { deleteFile(it) }

changeToParentDirectory()
removeDirectory(dirPath.fileName.toString())
}

/**
* Executes operations creating a new Ftp Client class every time as
* @see <a href="https://cwiki.apache.org/confluence/display/COMMONS/Net+FrequentlyAskedQuestions">Documentation</a>
* class is not thread safe.
*/
override fun <T> execute(function: (FTPClient) -> T): T {
val ftp = ftpClient(3000.milliseconds, 3000.milliseconds)
logger.info { "Connecting to $ftpUrl, $ftpPort" }
ftp.connect(ftpUrl, ftpPort)
ftp.login(ftpUser, ftpPassword)
ftp.enterLocalPassiveMode()
val result = function(ftp)
ftp.logout()
ftp.disconnect()
logger.info { "Disconnecting from $ftpUrl, $ftpPort" }
return result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ebi.ac.uk.ftp

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.io.File
import java.nio.file.Paths
import kotlin.io.path.createTempFile
import kotlin.io.path.inputStream
import kotlin.io.path.outputStream
import kotlin.io.path.writeText

class FtpClientTest {
private val ftpServer = createFtpServer().apply { start() }
private val testInstance = FtpClient.create(FTP_USER, FTP_PASSWORD, ftpServer.getUrl(), ftpServer.ftpPort)

@Test
fun `upload a file, list it and download it`() {
val tempFile = createTempFile()
tempFile.writeText("test-file")
tempFile.writeText("test-file")

val rootPath = Paths.get("")
val filePath = rootPath.resolve("test-file.txt")

testInstance.uploadFile(filePath, { tempFile.inputStream() })

val files = testInstance.listFiles(rootPath)
assertThat(files).hasSize(1)
val file = files.first()
assertThat(file.name).isEqualTo("test-file.txt")

val outputFile = createTempFile()
outputFile.outputStream().use { testInstance.downloadFile(filePath, it) }
assertThat(outputFile).hasSameContentAs(tempFile)
}

companion object {
fun createFtpServer(): FtpServer {
return FtpServer.createServer(
FtpConfig(
sslConfig = SslConfig(File(this::class.java.getResource("/mykeystore.jks").toURI()), "123456"),
userName = FTP_USER,
password = FTP_PASSWORD
)
)
}

const val FTP_USER = "ftpUser"
const val FTP_PASSWORD = "ftpPassword"
}
}

This file was deleted.

Binary file not shown.
Loading

0 comments on commit 2724872

Please sign in to comment.