Skip to content

Commit

Permalink
Updated dependencies and dataproc module
Browse files Browse the repository at this point in the history
  • Loading branch information
tharwaninitin committed Dec 20, 2023
1 parent f416e0c commit 01b26b3
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version = 3.3.1
runner.dialect = scala212
maxColumn = 130
maxColumn = 150
project.git = true
align.preset = more
assumeStandardLibraryStripMargin = true
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import Versions._
Global / onChangedBuildSource := ReloadOnSourceChanges

lazy val commonSettings = Seq(
scalaVersion := Scala212,
crossScalaVersions := AllScalaVersions,
scalaVersion := scala212,
crossScalaVersions := allScalaVersions,
dependencyUpdatesFailBuild := true,
dependencyUpdatesFilter -= moduleFilter(organization = "org.scala-lang"),
scalacOptions ++= {
Expand Down Expand Up @@ -66,7 +66,7 @@ lazy val examples = (project in file("examples"))
.settings(
name := "examples",
publish / skip := true,
libraryDependencies ++= List("ch.qos.logback" % "logback-classic" % LogbackVersion)
libraryDependencies ++= List("ch.qos.logback" % "logback-classic" % logbackVersion)
)
.dependsOn(dp, gcs, pubsub, batch)

Expand All @@ -76,7 +76,7 @@ lazy val docs = project
.settings(
name := "gcp4zio-docs",
publish / skip := true,
mdocVariables := Map("VERSION" -> version.value, "Scala212" -> Scala212, "Scala213" -> Scala213, "Scala3" -> Scala3),
mdocVariables := Map("VERSION" -> version.value, "Scala212" -> scala212, "Scala213" -> scala213, "Scala3" -> scala3),
mdocIn := new File("docs/readme.template.md"),
mdocOut := new File("README.md")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object BQLoadExportTestSuite {
},
test("Run BQLoad CSV") {
val schema: Option[Schema] = Encoder[RatingCSV]
val step = BQ.loadTable(inputFileCsv, CSV(), Some(gcpProject), outputDataset, outputTable, schema = schema)
val step = BQ.loadTable(inputFileCsv, CSV(), Some(gcpProject), outputDataset, outputTable, schema = schema)
assertZIO(step.foldZIO(ex => ZIO.fail(ex.getMessage), _ => ZIO.succeed("ok")))(equalTo("ok"))
},
test("Run BQExport CSV") {
Expand Down
39 changes: 39 additions & 0 deletions modules/dp/src/main/scala/gcp4zio/dp/DPCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,56 @@ import com.google.cloud.dataproc.v1.Cluster
import zio.{Task, TaskLayer, ZIO, ZLayer}

trait DPCluster {

/** Submits a request to create Dataproc Cluster
* @param cluster
* name of the cluster to be created
* @param props
* cluster props
* @return
*/
def createDataproc(cluster: String, props: ClusterProps): Task[Cluster]

/** Submits a request to delete Dataproc Cluster
* @param cluster
* name of the cluster to be created
* @return
*/
def deleteDataproc(cluster: String): Task[Unit]
}

object DPCluster {

/** Submits a request to create Dataproc Cluster
*
* @param cluster
* name of the cluster to be created
* @param props
* cluster props
* @return
*/
def createDataproc(cluster: String, props: ClusterProps): ZIO[DPCluster, Throwable, Cluster] =
ZIO.environmentWithZIO(_.get.createDataproc(cluster, props))

/** Submits a request to delete Dataproc Cluster
*
* @param cluster
* name of the cluster to be created
* @return
*/
def deleteDataproc(cluster: String): ZIO[DPCluster, Throwable, Unit] =
ZIO.environmentWithZIO(_.get.deleteDataproc(cluster))

/** Creates live layer required for all [[DPCluster]] API's
*
* @param project
* GCP projectID
* @param region
* GCP Region name
* @param endpoint
* GCP dataproc API
* @return
*/
def live(project: String, region: String, endpoint: String): TaskLayer[DPCluster] =
ZLayer.scoped(DPClusterClient(endpoint).map(dp => DPClusterImpl(dp, project, region)))
}
106 changes: 47 additions & 59 deletions modules/dp/src/main/scala/gcp4zio/dp/DPClusterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,57 @@ import scala.jdk.CollectionConverters._

case class DPClusterImpl(client: ClusterControllerClient, project: String, region: String) extends DPCluster {

def createDataproc(clusterName: String, props: ClusterProps): Task[Cluster] = ZIO
.fromFutureJava {
val endPointConfig = EndpointConfig.newBuilder().setEnableHttpPortAccess(true)
val softwareConfig = SoftwareConfig.newBuilder().setImageVersion(props.imageVersion)
val diskConfigM =
DiskConfig
.newBuilder()
.setBootDiskType(props.bootDiskType)
.setBootDiskSizeGb(props.masterBootDiskSizeGb)
val diskConfigW =
DiskConfig
.newBuilder()
.setBootDiskType(props.bootDiskType)
.setBootDiskSizeGb(props.workerBootDiskSizeGb)
private def createClusterConfig(props: ClusterProps): ClusterConfig = {
val endPointConfig = EndpointConfig.newBuilder().setEnableHttpPortAccess(true)
val softwareConfig = SoftwareConfig.newBuilder().setImageVersion(props.imageVersion)

val diskConfigM = DiskConfig.newBuilder().setBootDiskType(props.bootDiskType).setBootDiskSizeGb(props.masterBootDiskSizeGb)
val diskConfigW = DiskConfig.newBuilder().setBootDiskType(props.bootDiskType).setBootDiskSizeGb(props.workerBootDiskSizeGb)

val gceClusterBuilder = props.subnetUri match {
case Some(value) =>
GceClusterConfig
.newBuilder()
.setInternalIpOnly(props.internalIpOnly)
.setSubnetworkUri(value)
.addAllTags(props.networkTags.asJava)
.addServiceAccountScopes("https://www.googleapis.com/auth/cloud-platform")
case None =>
GceClusterConfig
.newBuilder()
.setInternalIpOnly(props.internalIpOnly)
.addAllTags(props.networkTags.asJava)
.addServiceAccountScopes("https://www.googleapis.com/auth/cloud-platform")
}
val gceClusterBuilder: GceClusterConfig.Builder = props.subnetUri match {
case Some(value) =>
GceClusterConfig.newBuilder().setInternalIpOnly(props.internalIpOnly).setSubnetworkUri(value).addAllTags(props.networkTags.asJava)
case None =>
GceClusterConfig.newBuilder().setInternalIpOnly(props.internalIpOnly).addAllTags(props.networkTags.asJava)
}

val gceClusterConfig: GceClusterConfig.Builder = props.serviceAccount match {
case Some(value) => gceClusterBuilder.setServiceAccount(value)
case _ => gceClusterBuilder.addServiceAccountScopes("https://www.googleapis.com/auth/cloud-platform")
}

val gceClusterConfig = props.serviceAccount match {
case Some(value) => gceClusterBuilder.setServiceAccount(value)
case _ => gceClusterBuilder
}
val masterConfig = InstanceGroupConfig.newBuilder
.setMachineTypeUri(props.masterMachineType)
.setNumInstances(props.masterNumInstance)
.setDiskConfig(diskConfigM)
.build

val masterConfig = InstanceGroupConfig.newBuilder
.setMachineTypeUri(props.masterMachineType)
.setNumInstances(props.masterNumInstance)
.setDiskConfig(diskConfigM)
.build
val workerConfig = InstanceGroupConfig.newBuilder
.setMachineTypeUri(props.workerMachineType)
.setNumInstances(props.workerNumInstance)
.setDiskConfig(diskConfigW)
.build
val clusterConfigBuilder = ClusterConfig.newBuilder
.setMasterConfig(masterConfig)
.setWorkerConfig(workerConfig)
.setSoftwareConfig(softwareConfig)
.setConfigBucket(props.bucketName)
.setGceClusterConfig(gceClusterConfig)
.setEndpointConfig(endPointConfig)
val workerConfig = InstanceGroupConfig.newBuilder
.setMachineTypeUri(props.workerMachineType)
.setNumInstances(props.workerNumInstance)
.setDiskConfig(diskConfigW)
.build

val clusterConfig = props.idleDeletionDurationSecs match {
case Some(value) =>
clusterConfigBuilder
.setLifecycleConfig(
LifecycleConfig.newBuilder().setIdleDeleteTtl(Duration.newBuilder().setSeconds(value))
)
.build
case _ => clusterConfigBuilder.build
}
val clusterConfigBuilder = ClusterConfig.newBuilder
.setMasterConfig(masterConfig)
.setWorkerConfig(workerConfig)
.setSoftwareConfig(softwareConfig)
.setConfigBucket(props.bucketName)
.setGceClusterConfig(gceClusterConfig)
.setEndpointConfig(endPointConfig)

props.idleDeletionDurationSecs match {
case Some(value) =>
clusterConfigBuilder
.setLifecycleConfig(LifecycleConfig.newBuilder().setIdleDeleteTtl(Duration.newBuilder().setSeconds(value)))
.build
case _ => clusterConfigBuilder.build
}
}

def createDataproc(clusterName: String, props: ClusterProps): Task[Cluster] = ZIO
.fromFutureJava {
val clusterConfig = createClusterConfig(props)

val cluster = Cluster.newBuilder.setClusterName(clusterName).setConfig(clusterConfig).build

Expand Down
15 changes: 5 additions & 10 deletions modules/dp/src/main/scala/gcp4zio/dp/DPJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import java.time.Duration

trait DPJob {

/** Submits a Spark Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use
* [[trackJobProgress]])
/** Submits a Spark Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use [[trackJobProgress]])
* @param args
* command line arguments which will be passed to spark application
* @param mainClass
Expand All @@ -21,8 +20,7 @@ trait DPJob {
*/
def submitSparkJob(args: List[String], mainClass: String, libs: List[String], conf: Map[String, String]): Task[Job]

/** Submits a Hive Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use
* [[trackJobProgress]])
/** Submits a Hive Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use [[trackJobProgress]])
* @param query
* Hive SQL query to run
* @return
Expand All @@ -41,8 +39,7 @@ trait DPJob {

object DPJob {

/** Submits a Spark Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use
* [[trackJobProgress]])
/** Submits a Spark Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use [[trackJobProgress]])
* @param args
* command line arguments which will be passed to spark application
* @param mainClass
Expand Down Expand Up @@ -81,16 +78,14 @@ object DPJob {
_ <- ZIO.environmentWithZIO[DPJob](_.get.trackJobProgress(job, trackingInterval))
} yield job

/** Submits a Hive Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use
* [[trackJobProgress]])
/** Submits a Hive Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use [[trackJobProgress]])
* @param query
* Hive SQL query to run
* @return
*/
def submitHiveJob(query: String): RIO[DPJob, Job] = ZIO.environmentWithZIO(_.get.submitHiveJob(query))

/** Submits a Hive Job in Dataproc Cluster. (This API will not wait for Job Completion, to wait for Job completion use
* [[trackJobProgress]])
/** Submits a Hive Job in Dataproc Cluster and waits till Job Completion
* @param query
* Hive SQL query to run
* @param trackingInterval
Expand Down
8 changes: 4 additions & 4 deletions modules/dp/src/main/scala/gcp4zio/dp/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ package object dp {
idleDeletionDurationSecs: Option[Long] = Some(1800L),
masterMachineType: String = "n1-standard-4",
workerMachineType: String = "n1-standard-4",
imageVersion: String = "1.5-debian10",
bootDiskType: String = "pd-ssd",
masterBootDiskSizeGb: Int = 400,
workerBootDiskSizeGb: Int = 200,
imageVersion: String = "2.1-debian11",
bootDiskType: String = "pd-standard", // pd-standard, pd-balanced, pd-ssd // https://cloud.google.com/compute/docs/disks#disk-types
masterBootDiskSizeGb: Int = 100,
workerBootDiskSizeGb: Int = 100,
masterNumInstance: Int = 1,
workerNumInstance: Int = 3
)
Expand Down
42 changes: 21 additions & 21 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,41 @@ import sbt._

object Dependencies {

lazy val coreLibs = List(
"dev.zio" %% "zio" % ZioVersion,
"dev.zio" %% "zio-streams" % ZioVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % ScalaJavaCollectionCompat,
"dev.zio" %% "zio-logging-slf4j" % ZioLogVersion
lazy val coreLibs: List[ModuleID] = List(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % scalaJavaCollectionCompat,
"dev.zio" %% "zio-logging-slf4j" % zioLogVersion
)

lazy val bqLibs = List(
"com.google.cloud" % "google-cloud-bigquery" % GcpBqVersion
lazy val bqLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-bigquery" % gcpBqVersion
)

lazy val dpLibs = List(
"com.google.cloud" % "google-cloud-dataproc" % GcpDpVersion
lazy val dpLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-dataproc" % gcpDpVersion
)

lazy val gcsLibs = List(
"com.google.cloud" % "google-cloud-storage" % GcpGcsVersion
lazy val gcsLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-storage" % gcpGcsVersion
)

lazy val pubSubLibs = List(
"com.google.cloud" % "google-cloud-pubsub" % GcpPubSubVersion
lazy val pubSubLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-pubsub" % gcpPubSubVersion
)

lazy val monitoringLibs = List(
"com.google.cloud" % "google-cloud-monitoring" % CloudMonitorVersion
lazy val monitoringLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-monitoring" % cloudMonitorVersion
)

lazy val batchLibs = List(
"com.google.cloud" % "google-cloud-batch" % BatchVersion
lazy val batchLibs: List[ModuleID] = List(
"com.google.cloud" % "google-cloud-batch" % batchVersion
)

lazy val testLibs = List(
"dev.zio" %% "zio-test" % ZioVersion,
"dev.zio" %% "zio-test-sbt" % ZioVersion,
"ch.qos.logback" % "logback-classic" % LogbackVersion
lazy val testLibs: List[ModuleID] = List(
"dev.zio" %% "zio-test" % zioVersion,
"dev.zio" %% "zio-test-sbt" % zioVersion,
"ch.qos.logback" % "logback-classic" % logbackVersion
).map(_ % Test)

}
29 changes: 15 additions & 14 deletions project/Versions.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
object Versions {
val Scala212 = "2.12.17"
val Scala213 = "2.13.10"
val Scala3 = "3.2.1"
val AllScalaVersions = List(Scala212, Scala213, Scala3)
val scala212 = "2.12.17"
val scala213 = "2.13.10"
val scala3 = "3.2.1"

val ScalaJavaCollectionCompat = "2.10.0"
val allScalaVersions: List[String] = List(scala212, scala213, scala3)

val ZioVersion = "2.0.13"
val ZioLogVersion = "2.1.12"
val scalaJavaCollectionCompat = "2.10.0"

val LogbackVersion = "1.4.7"
val GcpBqVersion = "2.25.0"
val GcpDpVersion = "4.13.0"
val GcpGcsVersion = "2.22.1"
val GcpPubSubVersion = "1.123.11"
val CloudMonitorVersion = "3.17.0"
val BatchVersion = "0.16.0"
val zioVersion = "2.0.20"
val zioLogVersion = "2.1.16"

val logbackVersion = "1.4.14"
val gcpBqVersion = "2.35.0"
val gcpDpVersion = "4.29.0"
val gcpGcsVersion = "2.30.1"
val gcpPubSubVersion = "1.125.13"
val cloudMonitorVersion = "3.33.0"
val batchVersion = "0.32.0"
}

0 comments on commit 01b26b3

Please sign in to comment.