Skip to content

Commit

Permalink
1.4.1 (#14)
Browse files Browse the repository at this point in the history
* Updated info logging for BQ, DP and GCS clients
  • Loading branch information
tharwaninitin authored Jan 17, 2023
1 parent 89bae0e commit 231144b
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 46 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ This project is compiled with scala versions 2.12.17, 2.13.10, 3.2.1
__SBT__
``` scala mdoc
libraryDependencies ++= List(
"com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.4.0",
"com.github.tharwaninitin" %% "gcp4zio-dp" % "1.4.0",
"com.github.tharwaninitin" %% "gcp4zio-bq" % "1.4.0",
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.4.0",
"com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.4.0"
"com.github.tharwaninitin" %% "gcp4zio-gcs" % "1.4.1",
"com.github.tharwaninitin" %% "gcp4zio-dp" % "1.4.1",
"com.github.tharwaninitin" %% "gcp4zio-bq" % "1.4.1",
"com.github.tharwaninitin" %% "gcp4zio-pubsub" % "1.4.1",
"com.github.tharwaninitin" %% "gcp4zio-monitoring" % "1.4.1"
)
```
__Maven__
```
<dependency>
<groupId>com.github.tharwaninitin</groupId>
<artifactId>gcp4zio-gcs_2.12</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>
</dependency>
```
# GCP4ZIO API's
Expand Down Expand Up @@ -152,12 +152,12 @@ job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint"))
import gcp4zio.bq._

// Execute DML/DDL query on Bigquery
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)")
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit

val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """)
val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit

// Execute SELECT query on Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.getData("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)
// Fetching data from Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)

(task1 *> task2 *> task3).provide(BQ.live())
```
Expand Down
8 changes: 4 additions & 4 deletions docs/readme.template.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ job.provide(DPJob.live("dpCluster", "gcpProject", "gcpRegion", "dpEndpoint"))
import gcp4zio.bq._

// Execute DML/DDL query on Bigquery
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)")
val task1: RIO[BQ, Unit] = BQ.executeQuery("CREATE TABLE dataset1.test1 (column1 STRING)").unit

val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """)
val task2: RIO[BQ, Unit] = BQ.executeQuery(""" INSERT INTO dataset1.test1 VALUES ("value1") """).unit

// Execute SELECT query on Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.getData("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)
// Fetching data from Bigquery
val task3: RIO[BQ, Iterable[String]] = BQ.fetchResults("SELECT * FROM dataset1.test1")(rs => rs.get("column1").getStringValue)

(task1 *> task2 *> task3).provide(BQ.live())
```
Expand Down
16 changes: 8 additions & 8 deletions modules/bq/src/main/scala/gcp4zio/bq/BQ.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package gcp4zio
package bq

import com.google.cloud.bigquery.{FieldValueList, JobInfo, Schema}
import com.google.cloud.bigquery.{FieldValueList, Job, JobInfo, Schema}
import zio.{RIO, Task, TaskLayer, ZIO, ZLayer}

trait BQ {
Expand All @@ -11,9 +11,9 @@ trait BQ {
* SQL query(INSERT, CREATE) to execute
* @return
*/
def executeQuery(query: String): Task[Unit]
def executeQuery(query: String): Task[Job]

/** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries
/** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows
* @param query
* SQL query(SELECT) to execute
* @param fn
Expand All @@ -22,7 +22,7 @@ trait BQ {
* Scala Type for output rows
* @return
*/
def getData[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]]
def fetchResults[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]]

/** Load data into BigQuery from GCS
* @param sourcePath
Expand Down Expand Up @@ -89,9 +89,9 @@ object BQ {
* SQL query(INSERT, CREATE) to execute
* @return
*/
def executeQuery(query: String): RIO[BQ, Unit] = ZIO.environmentWithZIO(_.get.executeQuery(query))
def executeQuery(query: String): RIO[BQ, Job] = ZIO.environmentWithZIO(_.get.executeQuery(query))

/** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries
/** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows
* @param query
* SQL query(SELECT) to execute
* @param fn
Expand All @@ -100,8 +100,8 @@ object BQ {
* Scala Type for output rows
* @return
*/
def getData[T](query: String)(fn: FieldValueList => T): RIO[BQ, Iterable[T]] =
ZIO.environmentWithZIO(_.get.getData[T](query)(fn))
def fetchResults[T](query: String)(fn: FieldValueList => T): RIO[BQ, Iterable[T]] =
ZIO.environmentWithZIO(_.get.fetchResults[T](query)(fn))

/** Load data into BigQuery from GCS
* @param sourcePath
Expand Down
6 changes: 3 additions & 3 deletions modules/bq/src/main/scala/gcp4zio/bq/BQClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ object BQClient {
val envPath: String = sys.env.getOrElse("GOOGLE_APPLICATION_CREDENTIALS", "NOT_SET_IN_ENV")
path match {
case Some(p) =>
logger.info("Using GCP credentials from values passed in function")
logger.info("Using credentials from values passed in function for BigQuery Client")
getBQ(p)
case None =>
if (envPath == "NOT_SET_IN_ENV") {
logger.info("Using GCP credentials from local sdk")
logger.info("Using credentials from local sdk for BigQuery Client")
BigQueryOptions.getDefaultInstance.getService
} else {
logger.info("Using GCP credentials from environment variable GOOGLE_APPLICATION_CREDENTIALS")
logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for BigQuery Client")
getBQ(envPath)
}
}
Expand Down
11 changes: 5 additions & 6 deletions modules/bq/src/main/scala/gcp4zio/bq/BQImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class BQImpl(client: BigQuery) extends BQ {
* SQL query(INSERT, CREATE) to execute
* @return
*/
def executeQuery(query: String): Task[Unit] = ZIO.attempt {
def executeQuery(query: String): Task[Job] = ZIO.attempt {
val queryConfig: QueryJobConfiguration = QueryJobConfiguration
.newBuilder(query)
.setUseLegacySql(false)
Expand All @@ -57,13 +57,12 @@ case class BQImpl(client: BigQuery) extends BQ {
logger.error(queryJob.getStatus.getState.toString)
throw new RuntimeException(s"Error ${queryJob.getStatus.getError.getMessage}")
} else {
logger.info(s"Job State: ${queryJob.getStatus.getState}")
// val stats = queryJob.getStatistics.asInstanceOf[QueryStatistics]
// query_logger.info(s"Query Plan : ${stats.getQueryPlan}")
logger.info(s"Executed query successfully")
queryJob
}
}

/** Execute SQL query on BigQuery, this API returns rows. So it can be used to run any SELECT queries
/** This API can be used to run any SQL(SELECT) query on BigQuery to fetch rows
* @param query
* SQL query(SELECT) to execute
* @param fn
Expand All @@ -72,7 +71,7 @@ case class BQImpl(client: BigQuery) extends BQ {
* Scala Type for output rows
* @return
*/
def getData[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] = ZIO.attempt {
def fetchResults[T](query: String)(fn: FieldValueList => T): Task[Iterable[T]] = ZIO.attempt {
val queryConfig: QueryJobConfiguration = QueryJobConfiguration
.newBuilder(query)
.setUseLegacySql(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object BQQueryTestSuite {
},
test("Execute BQ Query and get data") {
val task =
BQ.getData("SELECT * FROM dev.ratings")(rs =>
BQ.fetchResults("SELECT * FROM dev.ratings")(rs =>
Rating(
rs.get("userId").getLongValue,
rs.get("movieId").getLongValue,
Expand Down
6 changes: 4 additions & 2 deletions modules/dp/src/main/scala/gcp4zio/dp/DPClusterClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ object DPClusterClient {
*/
def apply(endpoint: String): RIO[Scope, ClusterControllerClient] = ZIO.fromAutoCloseable(ZIO.attempt {
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match {
case Some(_) => ClusterControllerClient.create(ClusterControllerSettings.newBuilder().setEndpoint(endpoint).build())
case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS")
case Some(_) =>
logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for Dataproc Cluster Client")
ClusterControllerClient.create(ClusterControllerSettings.newBuilder().setEndpoint(endpoint).build())
case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS")
}
})
}
6 changes: 4 additions & 2 deletions modules/dp/src/main/scala/gcp4zio/dp/DPJobClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ object DPJobClient {
*/
def apply(endpoint: String): RIO[Scope, JobControllerClient] = ZIO.fromAutoCloseable(ZIO.attempt {
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS") match {
case Some(_) => JobControllerClient.create(JobControllerSettings.newBuilder().setEndpoint(endpoint).build())
case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS")
case Some(_) =>
logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for Dataproc Job Client")
JobControllerClient.create(JobControllerSettings.newBuilder().setEndpoint(endpoint).build())
case None => throw new RuntimeException("Set environment variable GOOGLE_APPLICATION_CREDENTIALS")
}
})
}
6 changes: 3 additions & 3 deletions modules/gcs/src/main/scala/gcp4zio/gcs/GCSClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ object GCSClient {

path match {
case Some(p) =>
logger.info("Using GCP credentials from values passed in function")
logger.info("Using credentials from values passed in function for GCS Client")
getStorage(p)
case None =>
if (envPath == "NOT_SET_IN_ENV") {
logger.info("Using GCP credentials from local sdk")
logger.info("Using credentials from local sdk for GCS Client")
StorageOptions.newBuilder().build().getService
} else {
logger.info("Using GCP credentials from environment variable GOOGLE_APPLICATION_CREDENTIALS")
logger.info("Using credentials from GOOGLE_APPLICATION_CREDENTIALS for GCS Client")
getStorage(envPath)
}
}
Expand Down
12 changes: 6 additions & 6 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ object Versions {

val ScalaJavaCollectionCompat = "2.9.0"

val ZioVersion = "2.0.5"
val ZioVersion = "2.0.6"
val ZioLogVersion = "2.1.7"

val LogbackVersion = "1.4.5"
val GcpBqVersion = "2.20.1"
val GcpDpVersion = "4.4.0"
val GcpGcsVersion = "2.16.0"
val GcpPubSubVersion = "1.122.2"
val CloudMonitorVersion = "3.8.0"
val GcpBqVersion = "2.20.2"
val GcpDpVersion = "4.5.0"
val GcpGcsVersion = "2.17.1"
val GcpPubSubVersion = "1.123.0"
val CloudMonitorVersion = "3.9.0"
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "1.4.0"
ThisBuild / version := "1.4.1"

0 comments on commit 231144b

Please sign in to comment.