Skip to content

Commit

Permalink
1.2.1 (#11)
Browse files Browse the repository at this point in the history
* Updated GCS Copy API to enable/disable detailed logging.
* Updated libraries.
  • Loading branch information
tharwaninitin authored Dec 4, 2022
1 parent 8738a24 commit 0b0bdb4
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/semver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ jobs:
# run: sbt pubsub/versionPolicyCheck
# - name: Run Semantic Versioning Policy Check for GCS
# run: sbt gcs/versionPolicyCheck
- name: Run Semantic Versioning Policy Check for Monitoring
run: sbt monitoring/versionPolicyCheck
# - name: Run Semantic Versioning Policy Check for Monitoring
# run: sbt monitoring/versionPolicyCheck
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@ Add the latest release as a dependency to your project
| PubSub | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-pubsub_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-pubsub) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-pubsub_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-pubsub_2.12) | [![gcp4zio-pubsub Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub) |
| Cloud Monitoring | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-monitoring_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-monitoring) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-monitoring_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-monitoring_2.12) | [![gcp4zio-monitoring Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-monitoring/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-monitoring) |

This project is compiled with scala versions 2.12.17, 2.13.10, 3.2.1

__SBT__
``` scala mdoc
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.2.0",
"com.github.tharwaninitin" %% "gcp4zio-dp" % "1.2.0",
"com.github.tharwaninitin" %% "gcp4zio-bq" % "1.2.0",
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.2.0",
"com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.2.0"
"com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.2.1",
"com.github.tharwaninitin" %% "gcp4zio-dp" % "1.2.1",
"com.github.tharwaninitin" %% "gcp4zio-bq" % "1.2.1",
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.2.1",
"com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.2.1"
)
```
__Maven__
```
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>gcp4zio-gcs_2.12</artifactId>
<version>1.2.0</version>
<version>1.2.1</version>
</dependency>
```
# GCP4ZIO API's
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ lazy val docs = project
.settings(
name := "gcp4zio-docs",
publish / skip := true,
mdocVariables := Map("VERSION" -> version.value),
mdocVariables := Map("VERSION" -> version.value, "Scala212" -> Scala212, "Scala213" -> Scala213, "Scala3" -> Scala3),
mdocIn := new File("docs/readme.template.md"),
mdocOut := new File("README.md")
)
Expand Down
2 changes: 2 additions & 0 deletions docs/readme.template.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Add the latest release as a dependency to your project
| PubSub | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-pubsub_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-pubsub) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-pubsub_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-pubsub_2.12) | [![gcp4zio-pubsub Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub) |
| Cloud Monitoring | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-monitoring_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-monitoring) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-monitoring_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-monitoring_2.12) | [![gcp4zio-monitoring Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-monitoring/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-monitoring) |

This project is compiled with scala versions @Scala212@, @Scala213@, @Scala3@

__SBT__
``` scala mdoc
libraryDependencies ++= List(
Expand Down
32 changes: 19 additions & 13 deletions modules/gcs/src/main/scala/gcp4zio/gcs/GCS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.nio.file.Path
trait GCS {
def getObject(bucket: String, prefix: String, file: Path): Task[Unit]
def getObject(bucket: String, prefix: String, chunkSize: Int): GCSStream
def putObject(bucket: String, prefix: String, file: Path, options: List[BlobTargetOption]): Task[Blob]
def putObject(bucket: String, prefix: String, file: Path, options: List[BlobTargetOption], log: Boolean): Task[Blob]
def putObject(bucket: String, prefix: String, options: List[BlobWriteOption]): GCSSink
def lookupObject(bucket: String, prefix: String): Task[Boolean]
def deleteObject(bucket: String, prefix: String): Task[Boolean]
Expand All @@ -23,15 +23,17 @@ trait GCS {
srcOptions: List[BlobListOption],
targetBucket: String,
targetPrefix: Option[String],
parallelism: Int
): Task[Unit]
parallelism: Int,
log: Boolean
): Task[Long]
def copyObjectsLOCALtoGCS(
srcPath: String,
targetBucket: String,
targetPrefix: String,
parallelism: Int,
overwrite: Boolean
): Task[Unit]
overwrite: Boolean,
log: Boolean
): Task[Long]
def getPSNotification(bucket: String, notificationId: String): Task[Notification]
def createPSNotification(
bucket: String,
Expand All @@ -54,9 +56,10 @@ object GCS {
bucket: String,
prefix: String,
file: Path,
options: List[BlobTargetOption] = List.empty
options: List[BlobTargetOption] = List.empty,
log: Boolean = false
): ZIO[GCS, Throwable, Blob] =
ZIO.environmentWithZIO(_.get.putObject(bucket, prefix, file, options))
ZIO.environmentWithZIO(_.get.putObject(bucket, prefix, file, options, log))
def putObject(bucket: String, prefix: String, options: List[BlobWriteOption]): GCSSinkWithEnv =
ZSink.environmentWithSink(_.get.putObject(bucket, prefix, options))
def lookupObject(bucket: String, prefix: String): ZIO[GCS, Throwable, Boolean] =
Expand All @@ -77,8 +80,9 @@ object GCS {
srcOptions: List[BlobListOption] = List.empty,
targetBucket: String,
targetPrefix: Option[String] = None,
parallelism: Int = 2
): ZIO[GCS, Throwable, Unit] =
parallelism: Int = 2,
log: Boolean = false
): ZIO[GCS, Throwable, Long] =
ZIO.environmentWithZIO(
_.get.copyObjectsGCStoGCS(
srcBucket,
Expand All @@ -87,17 +91,19 @@ object GCS {
srcOptions,
targetBucket,
targetPrefix,
parallelism
parallelism,
log
)
)
def copyObjectsLOCALtoGCS(
srcPath: String,
targetBucket: String,
targetPrefix: String,
parallelism: Int,
overwrite: Boolean
): ZIO[GCS, Throwable, Unit] =
ZIO.environmentWithZIO(_.get.copyObjectsLOCALtoGCS(srcPath, targetBucket, targetPrefix, parallelism, overwrite))
overwrite: Boolean,
log: Boolean = false
): ZIO[GCS, Throwable, Long] =
ZIO.environmentWithZIO(_.get.copyObjectsLOCALtoGCS(srcPath, targetBucket, targetPrefix, parallelism, overwrite, log))
def getPSNotification(bucket: String, notificationId: String): ZIO[GCS, Throwable, Notification] =
ZIO.environmentWithZIO(_.get.getPSNotification(bucket, notificationId))
def createPSNotification(
Expand Down
35 changes: 20 additions & 15 deletions modules/gcs/src/main/scala/gcp4zio/gcs/GCSImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ case class GCSImpl(client: Storage) extends GCS {
}
.catchAll(_ => ZIO.succeed(false))

override def putObject(bucket: String, prefix: String, file: Path, options: List[BlobTargetOption]): Task[Blob] = ZIO.attempt {
val blobId = BlobId.of(bucket, prefix)
val blobInfo = BlobInfo.newBuilder(blobId).build
logger.info(s"Copying object from local fs $file to gs://$bucket/$prefix")
client.create(blobInfo, Files.readAllBytes(file), options: _*)
}
override def putObject(bucket: String, prefix: String, file: Path, options: List[BlobTargetOption], log: Boolean): Task[Blob] =
ZIO.attempt {
val blobId = BlobId.of(bucket, prefix)
val blobInfo = BlobInfo.newBuilder(blobId).build
if (log) logger.info(s"Copying object from local fs $file to gs://$bucket/$prefix")
client.create(blobInfo, Files.readAllBytes(file), options: _*)
}

override def putObject(bucket: String, prefix: String, options: List[BlobWriteOption]): GCSSink = {
val os: ZIO[Scope, IOException, OutputStream] = ZIO
Expand Down Expand Up @@ -95,37 +96,41 @@ case class GCSImpl(client: Storage) extends GCS {
srcOptions: List[BlobListOption],
targetBucket: String,
targetPrefix: scala.Option[String],
parallelism: Int
): Task[Unit] = listObjects(srcBucket, srcPrefix, srcRecursive, srcOptions)
parallelism: Int,
log: Boolean
): Task[Long] = listObjects(srcBucket, srcPrefix, srcRecursive, srcOptions)
.mapZIOPar(parallelism) { blob =>
ZIO.attempt {
targetPrefix.fold {
logger.info(s"Copying object from gs://$srcBucket/${blob.getName} to gs://$targetBucket/${blob.getName}")
if (log) logger.info(s"Copying object from gs://$srcBucket/${blob.getName} to gs://$targetBucket/${blob.getName}")
blob.copyTo(targetBucket)
} { tp =>
val targetPath = getTargetPath(srcPrefix.getOrElse(""), tp, blob.getName)
logger.info(s"Copying object from gs://$srcBucket/${blob.getName} to gs://$targetBucket/$targetPath")
if (log) logger.info(s"Copying object from gs://$srcBucket/${blob.getName} to gs://$targetBucket/$targetPath")
blob.copyTo(targetBucket, targetPath)
}
}
}
.runDrain
.runCount
.tap(count => ZIO.succeed(logger.info(s"Copied $count files from GCS to GCS")))

override def copyObjectsLOCALtoGCS(
srcPath: String,
targetBucket: String,
targetPrefix: String,
parallelism: Int,
overwrite: Boolean
): Task[Unit] = {
overwrite: Boolean,
log: Boolean
): Task[Long] = {
val opts = if (overwrite) List.empty else List(BlobTargetOption.doesNotExist())
GCSImpl
.listLocalFsObjects(srcPath)
.mapZIOPar(parallelism) { path =>
val targetPath = getTargetPath(srcPath, targetPrefix, path.toString)
putObject(targetBucket, targetPath, path, opts)
putObject(targetBucket, targetPath, path, opts, log)
}
.runDrain
.runCount
.tap(count => ZIO.succeed(logger.info(s"Copied $count file/s from LOCAL to GCS")))
}

override def getPSNotification(bucket: String, notificationId: String): Task[Notification] = ZIO.attempt {
Expand Down
29 changes: 14 additions & 15 deletions modules/gcs/src/test/scala/gcp4zio/gcs/GCSCopyTestSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,30 @@ object GCSCopyTestSuite {
val spec: Spec[GCS, Any] =
suite("GCS Copy Apis")(
test("Execute copyObjectsLOCALtoGCS single file") {
val step = GCS.copyObjectsLOCALtoGCS(filePathCsv, gcsBucket, "temp/test/ratings.csv", 2, true)
val step = GCS.copyObjectsLOCALtoGCS(filePathCsv, gcsBucket, "temp/test/ratings.csv", 2, true, true)
assertZIO(step.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
},
test("Execute copyObjectsLOCALtoGCS directory") {
val step = GCS.copyObjectsLOCALtoGCS(filePathCsv.replaceAll("ratings.csv", ""), gcsBucket, "temp/test", 2, true)
assertZIO(step.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
},
test("Execute copyObjectsGCStoGCS single file") {
val step =
GCS.copyObjectsGCStoGCS(
srcBucket = gcsBucket,
srcPrefix = Some("temp/test/ratings.csv"),
targetBucket = gcsBucket,
targetPrefix = Some("temp2/test/ratings.csv")
)
val step = GCS.copyObjectsGCStoGCS(
srcBucket = gcsBucket,
srcPrefix = Some("temp/test/ratings.csv"),
targetBucket = gcsBucket,
targetPrefix = Some("temp2/test/ratings.csv"),
log = true
)
assertZIO(step.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
},
test("Execute copyObjectsGCStoGCS directory") {
val step =
GCS.copyObjectsGCStoGCS(
gcsBucket,
Some("temp/test/"),
targetBucket = gcsBucket,
targetPrefix = Some("temp2/test/")
)
val step = GCS.copyObjectsGCStoGCS(
gcsBucket,
Some("temp/test/"),
targetBucket = gcsBucket,
targetPrefix = Some("temp2/test/")
)
assertZIO(step.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
}
) @@ TestAspect.sequential
Expand Down
18 changes: 9 additions & 9 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ object Versions {
val Scala3 = "3.2.1"
val AllScalaVersions = List(Scala212, Scala213, Scala3)

val ScalaJavaCollectionCompat = "2.8.1"
val ScalaJavaCollectionCompat = "2.9.0"

val ZioVersion = "2.0.2"
val ZioLogVersion = "2.1.3"
val ZioVersion = "2.0.4"
val ZioLogVersion = "2.1.5"

val LogbackVersion = "1.4.4"
val GcpBqVersion = "2.18.2"
val GcpDpVersion = "4.2.0"
val GcpGcsVersion = "2.14.0"
val GcpPubSubVersion = "1.120.24"
val CloudMonitorVersion = "3.6.0"
val LogbackVersion = "1.4.5"
val GcpBqVersion = "2.19.1"
val GcpDpVersion = "4.3.0"
val GcpGcsVersion = "2.15.1"
val GcpPubSubVersion = "1.122.1"
val CloudMonitorVersion = "3.7.0"
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.7.2
sbt.version=1.8.0
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "1.2.0"
ThisBuild / version := "1.2.1"

0 comments on commit 0b0bdb4

Please sign in to comment.