Skip to content

tharwaninitin/gcp4zio

Repository files navigation

Gcp4zio

License Tests

Gcp4zio is simple Scala interface to Google Cloud API based on ZIO.

Add the latest release as a dependency to your project

Module Latest Version Documentation Scala Versions
Google Cloud Storage Latest Version javadoc gcp4zio-gcs Scala version support
Dataproc Latest Version javadoc gcp4zio-dp Scala version support
BigQuery Latest Version javadoc gcp4zio-bq Scala version support
PubSub Latest Version javadoc gcp4zio-pubsub Scala version support
Cloud Monitoring Latest Version javadoc gcp4zio-monitoring Scala version support

This project is tested with scala versions 2.13.12, 3.3.1 and java versions 17, 21

SBT

libraryDependencies ++= List(
      "com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.5.1",
      "com.github.tharwaninitin" %% "gcp4zio-dp"  % "1.5.1",
      "com.github.tharwaninitin" %% "gcp4zio-bq"  % "1.5.1",
      "com.github.tharwaninitin" %% "gcp4zio-pubsub"  % "1.5.1",
      "com.github.tharwaninitin" %% "gcp4zio-monitoring"  % "1.5.1"
   )

Maven

<dependency>
    <groupId>com.github.tharwaninitin</groupId>
    <artifactId>gcp4zio-gcs_2.12</artifactId>
    <version>1.5.1</version>
</dependency>

GCP4ZIO API's

# To run all these examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key. 
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json

Google Cloud Storage API

CRUD Operations

import gcp4zio.gcs._
import java.nio.file.Paths
import zio._

// Upload single object from local to provided bucket at provided prefix
val localPath1 = Paths.get("/local/path/to/file1.csv")
GCS.putObject("targetBucket", "temp/gcs/prefix/file1.csv", localPath1)

// Download single object from bucket at provided prefix to local
val localPath2 = Paths.get("/local/path/to/file2.csv")
GCS.getObject("srcBucket", "temp/gcs/prefix/file1.csv", localPath2)

// Delete single object from bucket at provided prefix
GCS.deleteObject("gcsBucket", "temp/gcs/prefix/file1.csv")

CRUD Operations (Streaming)

import gcp4zio.gcs._
import com.google.cloud.storage.Storage.BlobWriteOption

val src = GCS.getObject("gcsBucket", "temp/gcs/prefix/file1.csv", 4096)

val opts = List(BlobWriteOption.doesNotExist())
val sink = GCS.putObject("gcsBucket", "temp/test/ratings2.csv", opts)

src.run(sink)

Copy Objects from GCS to GCS

import gcp4zio.gcs._

// Copy single object from source bucket to target bucket
GCS.copyObjectsGCStoGCS(
  srcBucket = "srcBucket",
  srcPrefix = Some("temp/gcs/prefix/file1.csv"),
  targetBucket = "targetBucket",
  targetPrefix = Some("temp2/gcs/prefix/file1.csv")
)

// Copy all objects from source bucket to target bucket
GCS.copyObjectsGCStoGCS(
  srcBucket = "srcBucket",
  targetBucket = "targetBucket"
)

// Copy all objects from source bucket with prefix to target bucket
GCS.copyObjectsGCStoGCS(
  srcBucket = "srcBucket",
  srcPrefix = Some("temp/gcs/prefix"),
  targetBucket = "targetBucket"
)

Dataproc API

Dataproc Cluster

import gcp4zio.dp._

// Create Dataproc Cluster Properties
val props = new ClusterProps("dataproc-logs")
// Create Dataproc Cluster
val createTask = DPCluster.createDataproc("cluster1", props)

// Delete Dataproc Cluster
val deleteTask = DPCluster.deleteDataproc("dpCluster")

(createTask *> deleteTask).provide(DPCluster.live("gcpProject", "gcpRegion"))

Dataproc Job

import gcp4zio.dp._

val libs = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")
val mainClass = "org.apache.spark.examples.SparkPi"

val job = DPJob.executeSparkJob(List("1000"), "mainClass", libs, conf)

job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint"))

Bigquery API

Running SQL Queries in BigQuery

import gcp4zio.bq._

// Execute DML/DDL query on Bigquery
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit

val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit

// Fetching data from Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)

(task1 *> task2 *> task3).provide(BQ.live())

Loading/Exporting data from/to GCS

import gcp4zio.bq._
import gcp4zio.bq.FileType.PARQUET

// Load PARQUET file into Bigquery
val step = BQ.loadTable("inputFilePathParquet", PARQUET, Some("gcpProject"), "outputDataset", "outputTable")

step.provide(BQ.live())

PubSub API

Topic

import gcp4zio.pubsub.topic._

// Create PubSub Topic
PSTopic.createTopic(project = "gcsProjectId", topic = "topicName")

// Add IAM Policy Binding to existing Topic, where you grant basic pubsub role to a member  
PSTopic.addIAMPolicyBindingToTopic(project = "gcsProjectId", topic = "topicName", member = "example@principalAccount.com", role = "roles/<IAM_Role>")

// Delete PubSub Topic
PSTopic.deleteTopic(project = "gcsProjectId", topic = "topicName")

Subscription

import gcp4zio.pubsub.subscription._

// Create Pull Subscription
PSSubscription.createPullSubscription(
    project = "gcsProjectId", 
    subscription = "subName", 
    topic = "topicName",
    ackDeadlineSeconds = 20 // default 10 seconds
  )

// Create Push Subscription
PSSubscription.createPushSubscription(
    project = "gcsProjectId",
    subscription = "subName",
    topic = "topicName",
    ackDeadlineSeconds = 20, // default 10 seconds
    pushEndpoint = "https://example.com/push"
  )

// Create Bigquery Subscription
PSSubscription.createBQSubscription(
    project = "gcsProjectId",
    subscription = "subName",
    topic = "topicName",
    bqTableId = "projectId:datasetId.tableId"
  )

// Delete Subscription
PSSubscription.deleteSubscription(
  project = "gcsProjectId",
  subscription = "subName"
)

Publisher

import gcp4zio.pubsub.publisher._

// Create encoder for sending String messages to Topic
implicit val encoder: MessageEncoder[String] = (a: String) => Right(a.getBytes(java.nio.charset.Charset.defaultCharset()))

// Publish message to topic
val publishMsg = PSPublisher.produce[String]("String Message")

// Provide Publisher layer
publishMsg.provide(PSPublisher.live("gcsProjectId", "topic"))

Subscriber

import gcp4zio.pubsub.subscriber._
import zio._

// Create stream to consume messages from the subscription
val subscriberStream = PSSubscriber.subscribe("gcsProjectId", "subscription")

// Print first 10 messages from stream to console
val task = subscriberStream.mapZIO { msg =>
      ZIO.logInfo(msg.value.toString) *> msg.ack
    }
    .take(10)
    .runDrain

Check this example to use PubSub APIs

Monitoring API

import gcp4zio.monitoring._
import com.google.monitoring.v3.TimeInterval

// Get GCS Cloud Monitoring metric data (time-series data)
Monitoring.getMetric(
  project = "gcsProjectId", 
  metric = "compute.googleapis.com/instance/cpu/usage_time", 
  interval = TimeInterval.getDefaultInstance  // Provide TimeInterval with start and end time
)