Skip to content

Commit

Permalink
Bump versions and fix minor issues
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jan 10, 2025
1 parent 0373b3c commit dcd8131
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 23 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ version := "1.0"

scalaVersion := "2.13.15"

val pekkoVersion = "1.1.2"
val pekkoVersion = "1.1.3"
val pekkoHTTPVersion = "1.1.0"
val pekkoConnectorVersion = "1.0.2"
val pekkoConnectorKafkaVersion = "1.1.0"

val kafkaVersion = "3.8.0"
val kafkaVersion = "3.8.1"
val activemqVersion = "5.18.5" // We are stuck with 5.x
val artemisVersion = "2.37.0"
val testContainersVersion = "1.20.4"
Expand Down Expand Up @@ -124,8 +124,8 @@ libraryDependencies ++= Seq(
"org.testcontainers" % "localstack" % testContainersVersion,
"org.testcontainers" % "clickhouse" % testContainersVersion,

"com.clickhouse" % "clickhouse-jdbc" % "0.6.5",
"com.crobox.clickhouse" %% "client" % "1.2.2",
"com.clickhouse" % "clickhouse-jdbc" % "0.7.2",
"com.crobox.clickhouse" %% "client" % "1.2.6",

"org.opensearch" % "opensearch-testcontainers" % "2.0.1",
"com.github.dasniko" % "testcontainers-keycloak" % "3.5.1",
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.6
sbt.version=1.10.7
2 changes: 1 addition & 1 deletion src/main/java/util/ConnectionStatusChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private long testSockets(String hostAddress, int port) {
*/
private boolean testPing(String hostAddress) throws IOException, InterruptedException {
String os = System.getProperty("os.name");
String pingOption = (os.equals("Windows 10")) ? "n" : "c";
String pingOption = (os.startsWith("windows")) ? "n" : "c";

String[] cmd = {"ping", "-" + pingOption, "1", hostAddress};
Process p1 = java.lang.Runtime.getRuntime().exec(cmd);
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/alpakka/file/uploader/DirectoryWatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.commons.io.monitor.{FileAlterationListenerAdaptor, FileAlterat
import org.apache.pekko.actor.{ActorSystem, Terminated}
import org.apache.pekko.stream.connectors.file.scaladsl.Directory
import org.apache.pekko.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.apache.pekko.stream.{OverflowStrategy, QueueOfferResult}
import org.apache.pekko.stream.{ActorAttributes, OverflowStrategy, QueueOfferResult, Supervision}
import org.slf4j.{Logger, LoggerFactory}

import java.io.File
Expand Down Expand Up @@ -43,6 +43,7 @@ class DirectoryWatcher(uploadDir: Path, processedDir: Path) {
private val uploadSourceQueue: SourceQueueWithComplete[Path] = Source
.queue[Path](bufferSize = 1000, OverflowStrategy.backpressure, maxConcurrentOffers = 1000)
.mapAsync(1)(path => uploadAndMove(path))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.to(Sink.ignore)
.run()

Expand Down Expand Up @@ -103,11 +104,15 @@ class DirectoryWatcher(uploadDir: Path, processedDir: Path) {
}

private def uploadAndMove(path: Path) = {
if (path.toFile.isFile) {
if (Files.exists(path) && path.toFile.isFile && Files.isReadable(path)) {
logger.info(s"About to upload and move file: $path")
uploader.upload(path.toFile).andThen { case _ => move(path) }
val response = uploader.upload(path.toFile).andThen { case _ => move(path) }
logger.info(s"Successfully uploaded and moved file: $path")
response
} else {
Future.successful("Do nothing on dir")
val msg = s"Do nothing for: $path (because unreadable file or dir)"
logger.info(msg)
Future.successful(msg)
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/main/scala/alpakka/influxdb/InfluxdbReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ class InfluxdbReader(baseURL: String, token: String, org: String = "testorg", bu
implicit val system: ActorSystem = actorSystem
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher

//import system.dispatcher

val deciderFlow: Supervision.Decider = {
case NonFatal(e) =>
logger.info(s"Stream failed with: ${e.getMessage}, going to restart")
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private static void browserClient(Integer port) throws IOException, InterruptedE
if (os.equals("mac os x")) {
String[] cmd = {"open", tabuxURL};
Runtime.getRuntime().exec(cmd);
} else if (os.equals("windows 10")) {
} else if (os.startsWith("windows")) {
String[] cmd = {"cmd /c start", tabuxURL};
Runtime.getRuntime().exec(cmd);
} else {
Expand Down
20 changes: 12 additions & 8 deletions src/test/scala/alpakka/file/DirectoryWatcherSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,43 @@ final class DirectoryWatcherSpec extends AsyncWordSpec with Matchers with Before
var processedDir: Path = _

"DirectoryWatcher" should {
"detect_files_on_startup" in {
"detect_files_on_startup_in_parent_dir" in {
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2) shouldBe true
}

"detect_added_files_at_runtime_in_parent" in {
copyTestFileToDir(uploadDir)
"detect_added_file_at_runtime_in_parent_dir" in {
watcher = DirectoryWatcher(uploadDir, processedDir)
copyTestFileToDir(uploadDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 1) shouldBe true
}

"detect_added_files_at_runtime_in_subdir" in {
copyTestFileToDir(uploadDir.resolve("subdir"))
"detect_added_files_at_runtime_in_sub_dir" in {
watcher = DirectoryWatcher(uploadDir, processedDir)
copyTestFileToDir(uploadDir.resolve("subdir"))
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 1) shouldBe true
}

"detect_added_nested_subdir_at_runtime_with_files_in_subdir" in {
watcher = DirectoryWatcher(uploadDir, processedDir)
val tmpDir = Files.createTempDirectory("tmp")
val sourcePath = Paths.get("src/main/resources/testfile.jpg")
val targetPath = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
val targetPath2 = tmpDir.resolve(createUniqueFileName(sourcePath.getFileName))
Files.copy(sourcePath, targetPath)
Files.copy(sourcePath, targetPath2)

val targetDir = Files.createDirectories(uploadDir.resolve("subdir").resolve("nestedDirWithFiles"))
FileUtils.copyDirectory(tmpDir.toFile, targetDir.toFile)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 2) shouldBe true
}

"handle_large_number_of_files_in_parent_dir" in {
(1 to 1000).foreach(_ => copyTestFileToDir(uploadDir))
watcher = DirectoryWatcher(uploadDir, processedDir)
waitForCondition(3.seconds)(watcher.countFilesProcessed() == 2 + 2) shouldBe true
waitForCondition(5.seconds)(watcher.countFilesProcessed() == 2 + 1000) shouldBe true
}

"handle invalid parent directory path" in {
"handle_invalid_parent_directory_path" in {
val invalidParentDir = Paths.get("/path/to/non-existent/directory")
val processedDir = Files.createTempDirectory("processed")

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/alpakka/firehose/FirehoseEchoIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static void browserClient() throws IOException {
if (os.equals("mac os x")) {
String[] cmd = {"open", elasticsearchEndpoint};
Runtime.getRuntime().exec(cmd);
} else if (os.equals("windows 10")) {
} else if (os.startsWith("windows")) {
String[] cmd = {"cmd /c start", elasticsearchEndpoint};
Runtime.getRuntime().exec(cmd);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/alpakka/influxdb/InfluxdbIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static void browserClient() throws IOException {
if (os.equals("mac os x")) {
String[] cmd = {"open", influxURL};
Runtime.getRuntime().exec(cmd);
} else if (os.equals("windows 10")) {
} else if (os.startsWith("windows")) {
String[] cmd = {"cmd /c start", influxURL};
Runtime.getRuntime().exec(cmd);
} else {
Expand Down

0 comments on commit dcd8131

Please sign in to comment.