Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivotal ID # 186637755: Re use same FTP connection for multiple file uploads #786

Merged
merged 4 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import uk.ac.ebi.biostd.client.cluster.model.JobSpec

interface ClusterClient {
suspend fun triggerJobAsync(jobSpec: JobSpec): Try<Job>
suspend fun jobStatus(jobId: String): String

suspend fun triggerJobSync(jobSpec: JobSpec, checkJobInterval: Long = 30, maxSecondsDuration: Long = 60): Job

suspend fun jobStatus(jobId: String): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ class LocalClusterClient : ClusterClient {
}
}

override suspend fun jobStatus(jobId: String): String {
val process = activeProcess.getValue(jobId.toLong())
return if (process.isAlive) "RUNNING" else "DONE"
}

override suspend fun triggerJobSync(jobSpec: JobSpec, checkJobInterval: Long, maxSecondsDuration: Long): Job {
return withContext(Dispatchers.IO) {
val logFile = createTempFile().toFile()
Expand All @@ -34,6 +29,11 @@ class LocalClusterClient : ClusterClient {
}
}

override suspend fun jobStatus(jobId: String): String {
val process = activeProcess.getValue(jobId.toLong())
return if (process.isAlive) "RUNNING" else "DONE"
}

companion object {
internal const val LOCAL_QUEUE = "local"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import uk.ac.ebi.biostd.client.cluster.model.Job
import uk.ac.ebi.biostd.client.cluster.model.JobSpec
import java.time.Duration.ofSeconds

private const val DONE_STATUS = "DONE"

private val logger = KotlinLogging.logger {}

class RemoteClusterClient(
Expand All @@ -34,6 +32,14 @@ class RemoteClusterClient(
}
}

override suspend fun triggerJobSync(
jobSpec: JobSpec,
checkJobInterval: Long,
maxSecondsDuration: Long,
): Job {
return triggerJobAsync(jobSpec).fold({ throw it }, { await(it, checkJobInterval, maxSecondsDuration) })
}

override suspend fun jobStatus(jobId: String): String {
logger.info { "Checking Job id ='$jobId' status" }
return runInSession {
Expand All @@ -43,20 +49,23 @@ class RemoteClusterClient(
}
}

override suspend fun triggerJobSync(
jobSpec: JobSpec,
checkJobInterval: Long,
maxSecondsDuration: Long,
): Job {
suspend fun await(job: Job) = runInSession {
private suspend fun await(job: Job, checkJobInterval: Long, maxSecondsDuration: Long): Job {
return runInSession {
waitUntil(
interval = ofSeconds(checkJobInterval),
duration = ofSeconds(maxSecondsDuration)
) { jobStatus(job.id) == DONE_STATUS }
) {
val status = jobStatus(job.id)
val isDone = status == "DONE"
when {
isDone -> logger.info { "Job ${job.id} status is $status. Execution completed" }
else -> logger.info { "Job ${job.id} status is $status. Waiting for completion" }
}
return@waitUntil isDone
}

job
}

return triggerJobAsync(jobSpec).fold({ throw it }, { await(it) })
}

private fun asJobReturn(exitCode: Int, response: String, logsPath: String): Try<Job> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import kotlin.time.Duration.Companion.milliseconds

private val logger = KotlinLogging.logger {}

fun interface InputStreamSource {
fun inputStream(): InputStream
}

interface FtpClient {
/**
* Upload the given input stream in the provided FTP location. Stream is closed after transfer completion.
*/
fun uploadFile(path: Path, source: InputStreamSource)
fun uploadFiles(folder: Path, files: List<Pair<Path, () -> InputStream>>)

/**
* Upload the given input stream in the provided FTP location. Stream is closed after transfer completion.
*/
fun uploadFile(path: Path, source: () -> InputStream)

/**
* Download the given file in the output stream. Output stream is NOT closed after completion.
Expand Down Expand Up @@ -60,11 +61,29 @@ private class SimpleFtpClient(
private val ftpUrl: String,
private val ftpPort: Int,
) : FtpClient {
override fun uploadFiles(folder: Path, files: List<Pair<Path, () -> InputStream>>) {
execute { ftp ->
for ((path, inputStream) in files) {
ftp.createFtpFolder(path.parent)
inputStream().use { ftp.storeFile(path.toString(), it) }
}
}
}

/**
* As FTP does not support nested folder creation in a single path the full path is
* transverse and required missing folder are created.
*/
private fun FTPClient.createFtpFolder(path: Path) {
val paths = path.runningReduce { acc, value -> acc.resolve(value) }
paths.forEach { this.makeDirectory(it.toString()) }
}

/**
* Upload the given input stream in the provided FTP location. Stream is closed after transfer completion.
*/
override fun uploadFile(path: Path, source: InputStreamSource) {
execute { ftp -> source.inputStream().use { ftp.storeFile(path.toString(), it) } }
override fun uploadFile(path: Path, source: () -> InputStream) {
execute { ftp -> source().use { ftp.storeFile(path.toString(), it) } }
}

/**
Expand All @@ -75,12 +94,10 @@ private class SimpleFtpClient(
}

/**
* Create the given folder. As FTP does not support nested folder creation in a single path the full path is
* transverse and required missing folder are created.
* Create the given folder.
*/
override fun createFolder(path: Path) {
val paths = path.runningReduce { acc, value -> acc.resolve(value) }
execute { ftp -> paths.forEach { ftp.makeDirectory(it.toString()) } }
execute { ftp -> ftp.createFtpFolder(path) }
}

/**
Expand All @@ -99,24 +116,36 @@ private class SimpleFtpClient(
override fun deleteFile(path: Path) {
execute { ftp ->
val fileDeleted = ftp.deleteFile(path.toString())
if (fileDeleted.not()) ftp.removeDirectory(path.toString())
if (fileDeleted.not()) ftp.deleteDirectory(path)
}
}

/**
* As delete multiple files are not supported by apache client its neccessary delete by iterating over each file.
*/
private fun FTPClient.deleteDirectory(dirPath: Path) {
changeWorkingDirectory(dirPath.toString())
listNames().forEach { deleteFile(it); }

changeToParentDirectory()
removeDirectory(dirPath.fileName.toString())
}

/**
* Executes operations creating a new Ftp Client class every time as
* @see [documented](https://cwiki.apache.org/confluence/display/COMMONS/Net+FrequentlyAskedQuestions)
* @see <a href="https://cwiki.apache.org/confluence/display/COMMONS/Net+FrequentlyAskedQuestions">Documentation</a>
* class is not thread safe.
*/
override fun <T> execute(function: (FTPClient) -> T): T {
val ftp = ftpClient(3000.milliseconds, 3000.milliseconds)
logger.info { "connecting to $ftpUrl, $ftpPort" }
logger.info { "Connecting to $ftpUrl, $ftpPort" }
ftp.connect(ftpUrl, ftpPort)
ftp.login(ftpUser, ftpPassword)
ftp.enterLocalPassiveMode()
val result = function(ftp)
ftp.logout()
ftp.disconnect()
logger.info { "Disconnecting from $ftpUrl, $ftpPort" }
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class FtpFileService(
}

override fun uploadFiles(path: String, files: List<MultipartFile>) {
ftp.createFolder(basePath.resolve(path))
files.forEach { ftp.uploadFile(basePath.resolve(path).resolve(it.originalFilename!!)) { it.inputStream } }
val ftpFiles = files.map { basePath.resolve(path).resolve(it.originalFilename!!) to { it.inputStream } }
ftp.uploadFiles(basePath.resolve(path), ftpFiles)
}

override fun getFile(path: String, fileName: String): File {
Expand Down