Skip to content

Commit

Permalink
Version 1.1.0 (#9)
Browse files Browse the repository at this point in the history
* Added PubSub Module
* Updated GCS Module (Added pubsub notifications api's)
  • Loading branch information
tharwaninitin authored Oct 24, 2022
1 parent 2a03575 commit 24201de
Show file tree
Hide file tree
Showing 51 changed files with 1,133 additions and 96 deletions.
17 changes: 16 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,19 @@ jobs:
distribution: 'adopt'
java-version: ${{ matrix.java-version }}
- name: Run GCS Tests
run: sbt ";project gcs; +Test/compile"
run: sbt ";project gcs; +Test/compile"
pubsub:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [ 8, 11, 14 ]
fail-fast: false
steps:
- uses: actions/checkout@v2
- name: Set up JDK
uses: actions/setup-java@v2
with:
distribution: 'adopt'
java-version: ${{ matrix.java-version }}
- name: Run PubSub Tests
run: sbt ";project pubsub; +Test/compile"
18 changes: 0 additions & 18 deletions .github/workflows/publish.yml

This file was deleted.

15 changes: 11 additions & 4 deletions .github/workflows/semver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ jobs:
semvercheck:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up JDK
uses: actions/setup-java@v2
uses: actions/setup-java@v3
with:
distribution: 'adopt'
java-version: 11
- name: Run Semantic Versioning Policy Check
run: sbt versionPolicyCheck
cache: 'sbt'
# - name: Run Semantic Versioning Policy Check for BQ
# run: sbt bq/versionPolicyCheck
# - name: Run Semantic Versioning Policy Check for DP
# run: sbt dp/versionPolicyCheck
- name: Run Semantic Versioning Policy Check for PUBSUB
run: sbt pubsub/versionPolicyCheck
# - name: Run Semantic Versioning Policy Check for GCS
# run: sbt gcs/versionPolicyCheck
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@
[![License](http://img.shields.io/:license-Apache%202-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Tests](https://github.com/tharwaninitin/gcp4zio/actions/workflows/ci.yml/badge.svg)](https://github.com/tharwaninitin/gcp4zio/actions/workflows/ci.yml)
[![Semantic Versioning Policy Check](https://github.com/tharwaninitin/gcp4zio/actions/workflows/semver.yml/badge.svg)](https://github.com/tharwaninitin/gcp4zio/actions/workflows/semver.yml)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio)
[![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio_2.12)
[![gcp4zio Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio)

**Gcp4zio** is simple Scala interface to Google Cloud API based on ZIO.

Add the latest release as a dependency to your project

[![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio)
| Module Name | Latest Version | Documentation | Scala Versions |
|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| GCS | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-gcs_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-gcs) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-gcs_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-gcs_2.12) | [![gcp4zio-gcs Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-gcs/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-gcs) |
| DP | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-dp_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-dp) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-dp_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-dp_2.12) | [![gcp4zio-dp Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-dp/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-dp) |
| BQ | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-bq_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-bq) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-bq_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-bq_2.12) | [![gcp4zio-bq Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-bq/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-bq) |
| PubSub | [![Latest Version](https://maven-badges.herokuapp.com/maven-central/com.github.tharwaninitin/gcp4zio-pubsub_2.12/badge.svg)](https://mvnrepository.com/artifact/com.github.tharwaninitin/gcp4zio-pubsub) | [![javadoc](https://javadoc.io/badge2/com.github.tharwaninitin/gcp4zio-pubsub_2.12/javadoc.svg)](https://javadoc.io/doc/com.github.tharwaninitin/gcp4zio-pubsub_2.12) | [![gcp4zio-pubsub Scala version support](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub/latest-by-scala-version.svg)](https://index.scala-lang.org/tharwaninitin/gcp4zio/gcp4zio-pubsub) |





__SBT__
```
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "gcp4zio-gcs" % x.x.x,
"com.github.tharwaninitin" %% "gcp4zio-dp" % x.x.x
"com.github.tharwaninitin" %% "gcp4zio-bq" % x.x.x
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % x.x.x
)
```
__Maven__
Expand Down Expand Up @@ -65,3 +73,7 @@ GCSApi.copyObjectsGCStoGCS(
```scala
//TODO
```
***PubSub API***
```scala
//TODO
```
20 changes: 16 additions & 4 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
IF (breaking changes) // major version will be bumped up
## STEP 1 => Update Readme.md

## STEP 2 => Merge PR into main

set ThisBuild / versionPolicyIntention := Compatibility.None
release
## STEP 3 => Create tag(x.x.x) in main branch locally for the latest version and update remotely (use GitHub Desktop)

## STEP 4 => Follow below release process locally from the main branch
IF (breaking changes) // major version will be bumped up
```
set ThisBuild / versionPolicyIntention := Compatibility.None
release
```
ELSE // patch version will be bumped up
```
release
```

## STEP 5 => Create release using the above tag (Using github.com)

release
## STEP 6 => Create new branch (vnext) from main with version(+1) changes
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ lazy val gcp4zio = (project in file("."))
crossScalaVersions := Nil, // crossScalaVersions must be set to Nil on the aggregating project
publish / skip := true
)
.aggregate(bq, dp, gcs)
.aggregate(bq, dp, gcs, pubsub)

lazy val bq = (project in file("modules/bq"))
.settings(commonSettings)
Expand All @@ -39,3 +39,9 @@ lazy val dp = (project in file("modules/dp"))
lazy val gcs = (project in file("modules/gcs"))
.settings(commonSettings)
.settings(name := "gcp4zio-gcs", libraryDependencies ++= coreLibs ++ gcsLibs ++ testLibs)

lazy val pubsub = (project in file("modules/pubsub"))
.settings(commonSettings)
.settings(name := "gcp4zio-pubsub", libraryDependencies ++= coreLibs ++ pubSubLibs ++ testLibs)

addCommandAlias("cd", "project")
1 change: 1 addition & 0 deletions examples.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy val examples = (project in file("examples"))
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "gcp4zio-dp" % version.value,
"com.github.tharwaninitin" %% "gcp4zio-gcs" % version.value,
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % version.value,
"ch.qos.logback" % "logback-classic" % LogbackVersion
),
scalacOptions ++= {
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Run PubSub Emulator in one terminal
```shell
gcloud beta emulators pubsub start --project=testproject --host-port=0.0.0.0:8085
```

Run below commands in other terminal
```shell
export GCS_PROJECT=testproject
export TOPIC=testtopic
export SUBSCRIPTION=testsub

$(gcloud beta emulators pubsub env-init)

sbt "project examples; runMain PS"
```

1 change: 0 additions & 1 deletion examples/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<pattern>%d{"yyyy-MM-dd'T'HH:mm:ss,SSS"} [%thread] %highlight(%-5level) %cyan(%logger{50}) - %msg %n</pattern>
</encoder>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.net.URI
// export DP_CLUSTER=
// export DP_ENDPOINT=

object Main extends ZIOAppDefault with ApplicationLogger {
object DPGCS extends ZIOAppDefault with ApplicationLogger {
val gcpProject: String = sys.env("GCP_PROJECT")
val gcpRegion: String = sys.env("GCP_REGION")
val dpCluster: String = sys.env("DP_CLUSTER")
Expand Down
64 changes: 64 additions & 0 deletions examples/src/main/scala/PS.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import gcp4zio.pubsub.publisher.{MessageEncoder, PSPublisher}
import gcp4zio.pubsub.subscriber.PSSubscriber
import gcp4zio.pubsub.subscription.PSSubscription
import gcp4zio.pubsub.topic.PSTopic
import zio._
import zio.logging.backend.SLF4J
import java.nio.charset.Charset

@SuppressWarnings(Array("org.wartremover.warts.ToString"))
object PS extends ZIOAppDefault {

override val bootstrap = Runtime.removeDefaultLoggers >>> SLF4J.slf4j

lazy val gcsProject: String = sys.env("GCS_PROJECT")
lazy val subscription: String = sys.env("SUBSCRIPTION")
lazy val topic: String = sys.env("TOPIC")

private val createTopic = PSTopic.createTopic(gcsProject, topic)

private val deleteTopic = PSTopic.deleteTopic(gcsProject, topic)

private val createSubscription = PSSubscription.createPullSubscription(gcsProject, subscription, topic)

private val deleteSubscription = PSSubscription.deleteSubscription(gcsProject, subscription)

implicit val encoder: MessageEncoder[String] = (a: String) => Right(a.getBytes(Charset.defaultCharset()))

private val produceMessages = Random.nextInt
.flatMap(ri => PSPublisher.produce(s"Test Message $ri"))
.tap(msgId => ZIO.logInfo(s"Message ID $msgId published"))
.repeat(Schedule.spaced(5.seconds) && Schedule.forever)

private val consumeMessages = PSSubscriber.subscribe
.mapZIO { msg =>
ZIO.logInfo(msg.value.toString) *> msg.ack
}
.take(10)
.runDrain

private val setup = for {
_ <- createTopic.tap(t => ZIO.logInfo(s"Created Topic ${t.toString}"))
_ <- createSubscription.tap(s => ZIO.logInfo(s"Created Subscription ${s.toString}"))
} yield ()

private val flow = for {
_ <- ZIO.logInfo("Starting Publisher") *> produceMessages.fork
_ <- ZIO.logInfo("Starting Subscriber") *> consumeMessages
} yield ()

private val cleanup = for {
_ <- deleteSubscription.zipLeft(ZIO.logInfo(s"Deleted Subscription"))
_ <- deleteTopic.zipLeft(ZIO.logInfo(s"Deleted Topic"))
} yield ()

override def run: ZIO[Scope, Throwable, Unit] =
(setup *> flow)
.ensuring(cleanup.ignore)
.provideSome[Scope](
PSTopic.test,
PSSubscription.test,
PSPublisher.test(gcsProject, topic),
PSSubscriber.test(gcsProject, subscription)
)
}
1 change: 0 additions & 1 deletion modules/bq/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<pattern>%d{"yyyy-MM-dd'T'HH:mm:ss,SSS"} [%thread] %highlight(%-5level) %cyan(%logger{50}) - %msg %n</pattern>
</encoder>
Expand Down
1 change: 0 additions & 1 deletion modules/dp/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<pattern>%d{"yyyy-MM-dd'T'HH:mm:ss,SSS"} [%thread] %highlight(%-5level) %cyan(%logger{50}) - %msg %n</pattern>
</encoder>
Expand Down
31 changes: 30 additions & 1 deletion modules/gcs/src/main/scala/gcp4zio/gcs/GCSApi.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package gcp4zio
package gcs

import com.google.cloud.storage.Blob
import com.google.cloud.storage.NotificationInfo.PayloadFormat
import com.google.cloud.storage.{Blob, Notification, NotificationInfo}
import com.google.cloud.storage.Storage.{BlobListOption, BlobTargetOption, BlobWriteOption}
import zio.stream._
import zio._
Expand Down Expand Up @@ -36,6 +37,17 @@ trait GCSApi {
parallelism: Int,
overwrite: Boolean
): Task[Unit]
def getPSNotification(bucket: String, notificationId: String): Task[Notification]
def createPSNotification(
bucket: String,
topic: String,
customAttributes: Map[String, String],
eventType: Option[NotificationInfo.EventType],
objectNamePrefix: Option[String],
payloadFormat: NotificationInfo.PayloadFormat
): Task[Notification]
def deletePSNotification(bucket: String, prefix: String): Task[Boolean]
def listPSNotification(bucket: String): Task[List[Notification]]
}

object GCSApi {
Expand Down Expand Up @@ -86,4 +98,21 @@ object GCSApi {
overwrite: Boolean
): ZIO[GCSEnv, Throwable, Unit] =
ZIO.environmentWithZIO(_.get.copyObjectsLOCALtoGCS(srcPath, targetBucket, targetPrefix, parallelism, overwrite))
def getPSNotification(bucket: String, notificationId: String): ZIO[GCSEnv, Throwable, Notification] =
ZIO.environmentWithZIO(_.get.getPSNotification(bucket, notificationId))
def createPSNotification(
bucket: String,
topic: String,
customAttributes: Map[String, String] = Map[String, String]().empty,
eventType: Option[NotificationInfo.EventType] = None,
objectNamePrefix: Option[String] = None,
payloadFormat: NotificationInfo.PayloadFormat = PayloadFormat.JSON_API_V1
): ZIO[GCSEnv, Throwable, Notification] =
ZIO.environmentWithZIO(
_.get.createPSNotification(bucket, topic, customAttributes, eventType, objectNamePrefix, payloadFormat)
)
def deletePSNotification(bucket: String, prefix: String): ZIO[GCSEnv, Throwable, Boolean] =
ZIO.environmentWithZIO(_.get.deletePSNotification(bucket, prefix))
def listPSNotification(bucket: String): ZIO[GCSEnv, Throwable, List[Notification]] =
ZIO.environmentWithZIO(_.get.listPSNotification(bucket: String))
}
48 changes: 47 additions & 1 deletion modules/gcs/src/main/scala/gcp4zio/gcs/GCSLive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package gcp4zio
package gcs

import com.google.cloud.storage.Storage.{BlobListOption, BlobTargetOption, BlobWriteOption}
import com.google.cloud.storage.{Blob, BlobId, BlobInfo, Storage}
import com.google.cloud.storage.{Blob, BlobId, BlobInfo, Notification, NotificationInfo, Storage}
import zio._
import zio.stream._
import java.io.{IOException, InputStream, OutputStream}
import java.nio.channels.Channels
import java.nio.file.{Files, Path, Paths}
import scala.jdk.CollectionConverters._

@SuppressWarnings(Array("org.wartremover.warts.ToString"))
case class GCSLive(client: Storage) extends GCSApi {
Expand Down Expand Up @@ -126,6 +127,51 @@ case class GCSLive(client: Storage) extends GCSApi {
}
.runDrain
}

override def getPSNotification(bucket: String, notificationId: String): Task[Notification] = ZIO.attempt {
client.getNotification(bucket, notificationId)
}

override def createPSNotification(
bucket: String,
topic: String,
customAttributes: Map[String, String],
eventType: Option[NotificationInfo.EventType],
objectNamePrefix: Option[String],
payloadFormat: NotificationInfo.PayloadFormat
): Task[Notification] = ZIO.attempt {
val prefix = objectNamePrefix.getOrElse("")
val notificationInfo =
eventType.fold {
NotificationInfo
.newBuilder(topic)
.setCustomAttributes(customAttributes.asJava)
.setEventTypes()
.setObjectNamePrefix(prefix)
.setPayloadFormat(payloadFormat)
.build
} { event =>
NotificationInfo
.newBuilder(topic)
.setCustomAttributes(customAttributes.asJava)
.setEventTypes(event)
.setObjectNamePrefix(prefix)
.setPayloadFormat(payloadFormat)
.build
}

logger.info(s"Creating Notification Configuration for gs://$bucket/$prefix")
client.createNotification(bucket, notificationInfo)
}

override def deletePSNotification(bucket: String, notificationId: String): Task[Boolean] = ZIO.attempt {
logger.info(s"Deleting Notification Configuration, ID : $notificationId")
client.deleteNotification(bucket, notificationId)
}

override def listPSNotification(bucket: String): Task[List[Notification]] = ZIO.attempt {
client.listNotifications(bucket).asScala.toList
}
}

object GCSLive {
Expand Down
Loading

0 comments on commit 24201de

Please sign in to comment.