From 231144bdf1855f2baaa08000d8a409a2f506989b Mon Sep 17 00:00:00 2001 From: Nitin Tharwani <52317230+tharwaninitin@users.noreply.github.com> Date: Tue, 17 Jan 2023 20:43:57 +0530 Subject: [PATCH] 1.4.1 (#14) * Updated info logging for BQ, DP and GCS clients --- README.md | 20 +++++++++---------- docs/readme.template.md | 8 ++++---- modules/bq/src/main/scala/gcp4zio/bq/BQ.scala | 16 +++++++-------- .../src/main/scala/gcp4zio/bq/BQClient.scala | 6 +++--- .../bq/src/main/scala/gcp4zio/bq/BQImpl.scala | 11 +++++----- .../scala/gcp4zio/bq/BQQueryTestSuite.scala | 2 +- .../scala/gcp4zio/dp/DPClusterClient.scala | 6 ++++-- .../main/scala/gcp4zio/dp/DPJobClient.scala | 6 ++++-- .../main/scala/gcp4zio/gcs/GCSClient.scala | 6 +++--- project/Versions.scala | 12 +++++------ version.sbt | 2 +- 11 files changed, 49 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 19ef311..28c8d2d 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,11 @@ This project is compiled with scala versions 2.12.17, 2.13.10, 3.2.1 __SBT__ ``` scala mdoc libraryDependencies ++= List( - "com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.4.0", - "com.github.tharwaninitin" %% "gcp4zio-dp" % "1.4.0", - "com.github.tharwaninitin" %% "gcp4zio-bq" % "1.4.0", - "com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.4.0", - "com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.4.0" + "com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.4.1", + "com.github.tharwaninitin" %% "gcp4zio-dp" % "1.4.1", + "com.github.tharwaninitin" %% "gcp4zio-bq" % "1.4.1", + "com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.4.1", + "com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.4.1" ) ``` __Maven__ @@ -33,7 +33,7 @@ __Maven__ com.github.tharwaninitin gcp4zio-gcs_2.12 - 1.4.0 + 1.4.1 ``` # GCP4ZIO API's @@ -152,12 +152,12 @@ job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint")) import gcp4zio.bq._ // Execute DML/DDL query on Bigquery -val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)") +val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit -val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """) +val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit -// Execute SELECT query on Bigquery -val task3: RIO[BQ, Iterable[String]] = BQ.getData("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue) +// Fetching data from Bigquery +val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue) (task1 *> task2 *> task3).provide(BQ.live()) ``` diff --git a/docs/readme.template.md b/docs/readme.template.md index fb32d54..4145cd4 100644 --- a/docs/readme.template.md +++ b/docs/readme.template.md @@ -152,12 +152,12 @@ job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint")) import gcp4zio.bq._ // Execute DML/DDL query on Bigquery -val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)") +val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit -val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """) +val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit -// Execute SELECT query on Bigquery -val task3: RIO[BQ, Iterable[String]] = BQ.getData("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue) +// Fetching data from Bigquery +val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue) (task1 *> task2 *> task3).provide(BQ.live()) ``` diff --git a/modules/bq/src/main/scala/gcp4zio/bq/BQ.scala b/modules/bq/src/main/scala/gcp4zio/bq/BQ.scala index 1cd48cf..dadaf88 100644 --- a/modules/bq/src/main/scala/gcp4zio/bq/BQ.scala +++ b/modules/bq/src/main/scala/gcp4zio/bq/BQ.scala @@ -1,7 +1,7 @@ package gcp4zio package bq -import com.google.cloud.bigquery.{FieldValueList, JobInfo, Schema} +import com.google.cloud.bigquery.{FieldValueList, Job, JobInfo, Schema} import zio.{RIO, Task, TaskLayer, ZIO, ZLayer} trait BQ { @@ -11,9 +11,9 @@ trait BQ { * SQL query(INSERT, CREATE) to execute * @return */ - def executeQuery(query: String): Task[Unit] + def executeQuery(query: String): Task[Job] - /** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries + /** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows * @param query * SQL query(SELECT) to execute * @param fn @@ -22,7 +22,7 @@ trait BQ { * Scala Type for output rows * @return */ - def getData[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] + def fetchResults[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] /** Load data into BigQuery from GCS * @param sourcePath @@ -89,9 +89,9 @@ object BQ { * SQL query(INSERT, CREATE) to execute * @return */ - def executeQuery(query: String): RIO[BQ, Unit] = ZIO.environmentWithZIO(_.get.executeQuery(query)) + def executeQuery(query: String): RIO[BQ, Job] = ZIO.environmentWithZIO(_.get.executeQuery(query)) - /** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries + /** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows * @param query * SQL query(SELECT) to execute * @param fn @@ -100,8 +100,8 @@ object BQ { * Scala Type for output rows * @return */ - def getData[T](query: String)(fn: FieldValueList => T): RIO[BQ, Iterable[T]] = - ZIO.environmentWithZIO(_.get.getData[T](query)(fn)) + def fetchResults[T](query: String)(fn: FieldValueList => T): RIO[BQ, Iterable[T]] = + ZIO.environmentWithZIO(_.get.fetchResults[T](query)(fn)) /** Load data into BigQuery from GCS * @param sourcePath diff --git a/modules/bq/src/main/scala/gcp4zio/bq/BQClient.scala b/modules/bq/src/main/scala/gcp4zio/bq/BQClient.scala index 0e0bc89..f1bfe46 100644 --- a/modules/bq/src/main/scala/gcp4zio/bq/BQClient.scala +++ b/modules/bq/src/main/scala/gcp4zio/bq/BQClient.scala @@ -22,14 +22,14 @@ object BQClient { val envPath: String = sys.env.getOrElse("GOOGLE_APPLICATION_CREDENTIALS", "NOT_SET_IN_ENV") path match { case Some(p) => - logger.info("Using GCP credentials from values passed in function") + logger.info("Using credentials from values passed in function for BigQuery Client") getBQ(p) case None => if (envPath == "NOT_SET_IN_ENV") { - logger.info("Using GCP credentials from local sdk") + logger.info("Using credentials from local sdk for BigQuery Client") BigQueryOptions.getDefaultInstance.getService } else { - logger.info("Using GCP credentials from environment variable GOOGLE_APPLICATION_CREDENTIALS") + logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for BigQuery Client") getBQ(envPath) } } diff --git a/modules/bq/src/main/scala/gcp4zio/bq/BQImpl.scala b/modules/bq/src/main/scala/gcp4zio/bq/BQImpl.scala index 9ecd115..3f40b62 100644 --- a/modules/bq/src/main/scala/gcp4zio/bq/BQImpl.scala +++ b/modules/bq/src/main/scala/gcp4zio/bq/BQImpl.scala @@ -35,7 +35,7 @@ case class BQImpl(client: BigQuery) extends BQ { * SQL query(INSERT, CREATE) to execute * @return */ - def executeQuery(query: String): Task[Unit] = ZIO.attempt { + def executeQuery(query: String): Task[Job] = ZIO.attempt { val queryConfig: QueryJobConfiguration = QueryJobConfiguration .newBuilder(query) .setUseLegacySql(false) @@ -57,13 +57,12 @@ case class BQImpl(client: BigQuery) extends BQ { logger.error(queryJob.getStatus.getState.toString) throw new RuntimeException(s"Error ${queryJob.getStatus.getError.getMessage}") } else { - logger.info(s"Job State: ${queryJob.getStatus.getState}") - // val stats = queryJob.getStatistics.asInstanceOf[QueryStatistics] - // query_logger.info(s"Query Plan : ${stats.getQueryPlan}") + logger.info(s"Executed query successfully") + queryJob } } - /** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries + /** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows * @param query * SQL query(SELECT) to execute * @param fn @@ -72,7 +71,7 @@ case class BQImpl(client: BigQuery) extends BQ { * Scala Type for output rows * @return */ - def getData[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] = ZIO.attempt { + def fetchResults[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] = ZIO.attempt { val queryConfig: QueryJobConfiguration = QueryJobConfiguration .newBuilder(query) .setUseLegacySql(false) diff --git a/modules/bq/src/test/scala/gcp4zio/bq/BQQueryTestSuite.scala b/modules/bq/src/test/scala/gcp4zio/bq/BQQueryTestSuite.scala index 818f4f3..f2fa13c 100644 --- a/modules/bq/src/test/scala/gcp4zio/bq/BQQueryTestSuite.scala +++ b/modules/bq/src/test/scala/gcp4zio/bq/BQQueryTestSuite.scala @@ -14,7 +14,7 @@ object BQQueryTestSuite { }, test("Execute BQ Query and get data") { val task = - BQ.getData("SELECT * FROM dev.ratings")(rs => + BQ.fetchResults("SELECT * FROM dev.ratings")(rs => Rating( rs.get("userId").getLongValue, rs.get("movieId").getLongValue, diff --git a/modules/dp/src/main/scala/gcp4zio/dp/DPClusterClient.scala b/modules/dp/src/main/scala/gcp4zio/dp/DPClusterClient.scala index 35ae059..1bd6e1e 100644 --- a/modules/dp/src/main/scala/gcp4zio/dp/DPClusterClient.scala +++ b/modules/dp/src/main/scala/gcp4zio/dp/DPClusterClient.scala @@ -14,8 +14,10 @@ object DPClusterClient { */ def apply(endpoint: String): RIO[Scope, ClusterControllerClient] = ZIO.fromAutoCloseable(ZIO.attempt { sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match { - case Some(_) => ClusterControllerClient.create(ClusterControllerSettings.newBuilder().setEndpoint(endpoint).build()) - case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS") + case Some(_) => + logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for Dataproc Cluster Client") + ClusterControllerClient.create(ClusterControllerSettings.newBuilder().setEndpoint(endpoint).build()) + case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS") } }) } diff --git a/modules/dp/src/main/scala/gcp4zio/dp/DPJobClient.scala b/modules/dp/src/main/scala/gcp4zio/dp/DPJobClient.scala index 9ed77ee..84d52e1 100644 --- a/modules/dp/src/main/scala/gcp4zio/dp/DPJobClient.scala +++ b/modules/dp/src/main/scala/gcp4zio/dp/DPJobClient.scala @@ -14,8 +14,10 @@ object DPJobClient { */ def apply(endpoint: String): RIO[Scope, JobControllerClient] = ZIO.fromAutoCloseable(ZIO.attempt { sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match { - case Some(_) => JobControllerClient.create(JobControllerSettings.newBuilder().setEndpoint(endpoint).build()) - case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS") + case Some(_) => + logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for Dataproc Job Client") + JobControllerClient.create(JobControllerSettings.newBuilder().setEndpoint(endpoint).build()) + case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS") } }) } diff --git a/modules/gcs/src/main/scala/gcp4zio/gcs/GCSClient.scala b/modules/gcs/src/main/scala/gcp4zio/gcs/GCSClient.scala index 5107944..e2cd663 100644 --- a/modules/gcs/src/main/scala/gcp4zio/gcs/GCSClient.scala +++ b/modules/gcs/src/main/scala/gcp4zio/gcs/GCSClient.scala @@ -33,14 +33,14 @@ object GCSClient { path match { case Some(p) => - logger.info("Using GCP credentials from values passed in function") + logger.info("Using credentials from values passed in function for GCS Client") getStorage(p) case None => if (envPath == "NOT_SET_IN_ENV") { - logger.info("Using GCP credentials from local sdk") + logger.info("Using credentials from local sdk for GCS Client") StorageOptions.newBuilder().build().getService } else { - logger.info("Using GCP credentials from environment variable GOOGLE_APPLICATION_CREDENTIALS") + logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for GCS Client") getStorage(envPath) } } diff --git a/project/Versions.scala b/project/Versions.scala index 5809b44..b9cfbd9 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -6,13 +6,13 @@ object Versions { val ScalaJavaCollectionCompat = "2.9.0" - val ZioVersion = "2.0.5" + val ZioVersion = "2.0.6" val ZioLogVersion = "2.1.7" val LogbackVersion = "1.4.5" - val GcpBqVersion = "2.20.1" - val GcpDpVersion = "4.4.0" - val GcpGcsVersion = "2.16.0" - val GcpPubSubVersion = "1.122.2" - val CloudMonitorVersion = "3.8.0" + val GcpBqVersion = "2.20.2" + val GcpDpVersion = "4.5.0" + val GcpGcsVersion = "2.17.1" + val GcpPubSubVersion = "1.123.0" + val CloudMonitorVersion = "3.9.0" } diff --git a/version.sbt b/version.sbt index e808296..a9bc84b 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "1.4.0" +ThisBuild / version := "1.4.1"