Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[S3] feature: minio support #38

Merged
merged 2 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions aws-s3/pureharm-docker-min-io.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

# Script that sets up a local minIO server for testing.
CONTAINER_NAME=pureharm_test_minio # Name of the docker container used to run postgres
EXPOSED_PORT=31312 # this is the port on the host machine;
INTERNAL_PORT=9000 # this is the default port on which minIO starts on within the container.
MINIO_ACCESS_KEY=AKIAIOSFODOO3EXAMPLE
MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRACABEXAMPLEKEY

# actual script #
if [ "$(docker ps -aq -f name=$CONTAINER_NAME)" ]; then
if [ ! "$(docker ps -aq -f name=$CONTAINER_NAME -f status=exited)" ]; then
echo "Stopping minIO container"
docker stop $CONTAINER_NAME
fi
echo "Starting minIO container"
docker start $CONTAINER_NAME
else
echo "Creating & starting minIO container — no stable data mapping done"
docker run -d \
--name $CONTAINER_NAME \
-p $EXPOSED_PORT:$INTERNAL_PORT \
-e "MINIO_ACCESS_KEY=$MINIO_ACCESS_KEY" \
-e "MINIO_SECRET_KEY=$MINIO_SECRET_KEY" \
minio/minio server /data
fi
10 changes: 10 additions & 0 deletions aws-s3/src/it/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,14 @@ test-live.pureharm.aws.s3 {

api-call-timeout = 10 seconds
api-call-timeout = ${?LIVE_TEST_PUREHARM_AWS_API_CALL_TIMEOUT}
}

test-live.pureharm.aws.s3.minio {
region = "eu-central-1"
access-key-id = "AKIAIOSFODOO3EXAMPLE" # taken from dockerfile, it's ok, it runs locally
secret-access-key = "wJalrXUtnFEMI/K7MDENG/bPxRACABEXAMPLEKEY" # taken from dockerfile, it's ok, it runs locally
bucket = "minio-bucket"
api-call-attempt-timeout = 2 seconds
api-call-timeout = 10 seconds
endpoint-override = "http://localhost:31312"
}
129 changes: 129 additions & 0 deletions aws-s3/src/it/scala/busymachines/pureharm/aws/s3/S3MinIOTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/** Copyright (c) 2017-2019 BusyMachines
*
* See company homepage at: https://www.busymachines.com/
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package busymachines.pureharm.aws.s3

import io.chrisdavenport.log4cats.StructuredLogger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import busymachines.pureharm.effects._
import busymachines.pureharm.effects.implicits._
import busymachines.pureharm.testkit._

/** —-- test expects a minIO server to run on localport 9000 ---
*
* Before running this ensure that you actually have started the local
* docker
*
* We can't commit to github the proper configuration to make this run.
*
* @author Lorand Szakacs, https://github.com/lorandszakacs
* @since 22 May 2019
*/
final class S3MinIOTest extends PureharmTestWithResource {
private val UTF_8 = java.nio.charset.StandardCharsets.UTF_8
implicit val l: StructuredLogger[IO] = Slf4jLogger.getLogger[IO]

override type ResourceType = (S3Config, AmazonS3Client[IO])

override def resource(meta: MetaData): Resource[IO, ResourceType] = for {
config <- S3Config.fromNamespaceR[IO]("test-live.pureharm.aws.s3.minio")
s3Client <- AmazonS3Client.resource[IO](config)
_ <- Resource.liftF(l.info(s"creating minio bucket"))
_ <- Resource.liftF(s3Client.deleteBucket(config.bucket).attempt.void) // just in case
_ <- Resource.make(s3Client.createBucket(config.bucket))(_ => s3Client.deleteBucket(config.bucket))
_ <- Resource.liftF(l.info(s"created minio bucket"))
} yield (config, s3Client)

private val f1S3Key: S3FileKey =
S3FileKey("folder", "subfolder", "file.txt").unsafeGet()

private val f2S3Key: S3FileKey =
S3FileKey("folder", "file_copy.txt").unsafeGet()

private val f1_contents: S3BinaryContent = S3BinaryContent(
"GOOGLE_MURRAY_BOOKCHIN".getBytes(java.nio.charset.StandardCharsets.UTF_8)
)

test("minio upload + get + delete") { case (config, client) =>
for {
_ <- l.info(s"acquired client resource: ${client.toString}")
_ <-
client
.put(config.bucket, f1S3Key, f1_contents)
.void
.handleErrorWith(e => l.error(e)(s"PUT failed w/: $e"))
_ <- l.info(s"1 — after PUT — trying GET")
got <- client.get(config.bucket, f1S3Key)
_ <- l.info(s"2 — after GET — we got back: ${new String(got, UTF_8)}")
_ <- l.info(s"2 — after GET — we expect: ${new String(f1_contents, UTF_8)}")
_ <- IO(assert(f1_contents.toList == got.toList)).onErrorF(l.info("comparison failed :(("))

_ <- l.info("---- deleting file ----")
_ <- client.delete(config.bucket, f1S3Key)
_ <- l.info("---- DELETED — now trying to get back file to see----")
_ <-
client
.get(config.bucket, f1S3Key)
.flatMap(g => l.error(s"SHOULD HAVE DELETED, but got: ${new String(g, UTF_8)}"))
.void
.handleErrorWith(t => l.info(s"AFTER DELETE — expected failure, and got it: ${t.toString}"))
} yield succeed
}

test("minio copy + exists + list") { case (config, client) =>
for {
_ <- l.info(s"acquired client resource: ${client.toString}")
_ <-
client
.put(config.bucket, f1S3Key, f1_contents)
.void
.handleErrorWith(e => l.error(e)(s"PUT failed w/: $e"))
_ <- l.info(s"1 — trying COPY")
_ <-
client
.copy(config.bucket, f1S3Key, config.bucket, f2S3Key)
.void
.handleErrorWith(e => l.error(e)(s"COPY failed w/: $e"))
_ <- l.info(s"2 - after COPY - trying EXISTS")
exists <-
client
.exists(config.bucket, f2S3Key)
.handleErrorWith(e => l.error(e)(s"EXISTS failed w/: $e").map(_ => false))
_ <- l.info(s"2 — after EXISTS — we got back: $exists")
_ <- l.info(s"2 — after EXISTS — we expect: true")
_ <- IO(assert(exists)).onErrorF(l.info("comparison failed :(("))
_ <- l.info(s"3 - after EXISTS - trying LIST")
listReqPrefix = S3Path.unsafe("folder")
listResult <-
client
.list(config.bucket, listReqPrefix)
.handleErrorWith(e => l.error(e)(s"LIST failed w/: $e").map(_ => List()))
_ <- l.info(s"3 — after LIST — we got back: ${listResult.mkString("[", ",", "]")}")
_ <- l.info(s"3 — after LIST — we expect: ${List(f1S3Key, f2S3Key).mkString("[", ",", "]")}")
_ <- IO(assert(listResult.toSet == Set(f1S3Key, f2S3Key))).onErrorF(l.info("comparison failed :(("))
_ <- l.info("---- cleanup ----")
_ <-
client
.delete(config.bucket, f1S3Key)
.handleErrorWith(e => l.error(e)(s"DELETE failed w/: $e"))
_ <-
client
.delete(config.bucket, f2S3Key)
.handleErrorWith(e => l.error(e)(s"DELETE failed w/: $e"))
} yield succeed
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ package busymachines.pureharm.aws.s3
*/
trait AmazonS3Client[F[_]] {

def createBucket(bucket: S3Bucket): F[Unit]

def deleteBucket(bucket: S3Bucket): F[Unit]

def put(bucket: S3Bucket, key: S3FileKey, content: S3BinaryContent): F[Unit]

def get(bucket: S3Bucket, key: S3FileKey): F[S3BinaryContent]
Expand Down Expand Up @@ -87,6 +91,12 @@ object AmazonS3Client {
implicit private val shifter: BlockingShifter[F],
) extends AmazonS3Client[F] {

override def createBucket(bucket: S3Bucket): F[Unit] =
shifter.blockOn(internals.ImpureJavaS3.createBucket(s3Client)(bucket, config.region))

override def deleteBucket(bucket: S3Bucket): F[Unit] =
shifter.blockOn(internals.ImpureJavaS3.deleteBucket(s3Client)(bucket, config.region))

override def put(bucket: S3Bucket, key: S3FileKey, content: S3BinaryContent): F[Unit] =
shifter.blockOn(internals.ImpureJavaS3.put(s3Client)(bucket, key, content).void)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package busymachines.pureharm.aws.s3

import busymachines.pureharm.aws.core._
import busymachines.pureharm.effects._

/** @author Lorand Szakacs, https://github.com/lorandszakacs
* @since 10 Jul 2019
*/

final case class S3Config(
region: AmazonRegion,
accessKeyID: S3AccessKeyID,
Expand All @@ -30,6 +30,7 @@ final case class S3Config(
apiCallAttemptTimeout: S3ApiCallAttemptTimeout,
apiCallTimeout: S3ApiCallTimeout,
headers: List[AmazonRequestHeader] = List.empty,
endpointOverride: Option[S3EndpointOverride],
)

import busymachines.pureharm.config._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package busymachines.pureharm.aws.s3.internals

import busymachines.pureharm.aws.core.AmazonRegion
import busymachines.pureharm.aws.s3._
import busymachines.pureharm.effects._
import busymachines.pureharm.effects.implicits._
Expand Down Expand Up @@ -124,6 +125,36 @@ private[s3] object ImpureJavaS3 {
} yield ()
}

def createBucket[F[_]: Async](client: S3AsyncClient)(
bucket: S3Bucket,
region: AmazonRegion,
): F[Unit] = {
val req = CreateBucketRequest
.builder()
.bucket(bucket)
.createBucketConfiguration(
CreateBucketConfiguration
.builder()
.locationConstraint(region)
.build()
)
.build()

Interop.toF(Sync[F].delay(client.createBucket(req))).void
}

def deleteBucket[F[_]: Async](client: S3AsyncClient)(
bucket: S3Bucket,
region: AmazonRegion,
): F[Unit] = {
val req = DeleteBucketRequest
.builder()
.bucket(bucket)
.build()

Interop.toF(Sync[F].delay(client.deleteBucket(req))).void
}

private def asyncBytesTransformer[F[_]: Sync]: F[AsyncResponseTransformer[GetObjectResponse, S3BinaryContent]] =
for {
bf <- Sync[F].delay(AsyncResponseTransformer.toBytes[GetObjectResponse])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ private[s3] object PureJavaS3 {
.asyncConfiguration(asyncConfig)
.overrideConfiguration(overrideConfiguration)

val client: S3AsyncClient = builder.build()
val client: S3AsyncClient = config.endpointOverride match {
case None => builder.build()
case Some(endpoint) => builder.endpointOverride(endpoint).build()
}
client
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ package object s3 {
object S3DownloadURL extends PhantomType[String]
type S3DownloadURL = S3DownloadURL.Type

object S3EndpointOverride extends PhantomType[java.net.URI]
type S3EndpointOverride = S3EndpointOverride.Type

}