-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Updated BQ, PubSub, DP modules and added Batch Module (#15)
- Loading branch information
1 parent
231144b
commit 8642073
Showing
29 changed files
with
560 additions
and
173 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,9 @@ | ||
import org.slf4j.{Logger, LoggerFactory} | ||
import zio.logging.backend.SLF4J | ||
import zio.{Runtime, ULayer} | ||
|
||
trait ApplicationLogger { | ||
lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName) | ||
protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) | ||
|
||
protected val zioSlf4jLogger: ULayer[Unit] = Runtime.removeDefaultLoggers >>> SLF4J.slf4j | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import gcp4zio.batch.Batch | ||
import zio._ | ||
import scala.jdk.CollectionConverters._ | ||
|
||
// export GOOGLE_APPLICATION_CREDENTIALS= | ||
// export GCP_PROJECT= | ||
// export GCP_REGION= | ||
|
||
object GCPCloudBatch extends ZIOAppDefault with ApplicationLogger { | ||
|
||
// https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/batch/snippets/src/main/java | ||
// https://cloud.google.com/batch/docs/create-run-basic-job | ||
|
||
override val bootstrap: ULayer[Unit] = zioSlf4jLogger | ||
|
||
val gcpProject: String = sys.env("GCP_PROJECT") | ||
val gcpRegion: String = sys.env("GCP_REGION") | ||
|
||
val layer: TaskLayer[Batch] = Batch.live(gcpProject, gcpRegion) | ||
|
||
override def run: Task[Unit] = (Batch | ||
.createJob( | ||
"example-container-job", | ||
"gcr.io/google-containers/busybox", | ||
List("-c", "echo Hello world! This is task ${BATCH_TASK_INDEX}. This job has a total of ${BATCH_TASK_COUNT} tasks."), | ||
Some("/bin/sh"), | ||
None | ||
) *> | ||
Batch.listJobs | ||
.map(_.foreach(job => logger.info(job.getName + " " + job.getStatus.getStatusEventsList.asScala.toList.mkString)))) | ||
.provide(layer) | ||
|
||
// override def run: Task[Unit] = Batch.deleteJob("example-container-job").provide(layer).unit | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package gcp4zio.batch | ||
|
||
import com.google.cloud.batch.v1.Job | ||
import zio.{RIO, Task, TaskLayer, ZIO, ZLayer} | ||
|
||
trait Batch { | ||
def createJob( | ||
name: String, | ||
image: String, | ||
commands: List[String], | ||
entrypoint: Option[String], | ||
serviceAccount: Option[String] | ||
): Task[Job] | ||
|
||
def deleteJob(name: String): Task[Unit] | ||
|
||
def listJobs: Task[Iterable[Job]] | ||
} | ||
|
||
object Batch { | ||
def createJob( | ||
name: String, | ||
image: String, | ||
commands: List[String], | ||
entrypoint: Option[String], | ||
serviceAccount: Option[String] | ||
): RIO[Batch, Job] = | ||
ZIO.serviceWithZIO(_.createJob(name, image, commands, entrypoint, serviceAccount)) | ||
|
||
def deleteJob(name: String): RIO[Batch, Unit] = ZIO.serviceWithZIO(_.deleteJob(name)) | ||
|
||
def listJobs: RIO[Batch, Iterable[Job]] = ZIO.serviceWithZIO(_.listJobs) | ||
|
||
def live(project: String, region: String): TaskLayer[Batch] = | ||
ZLayer.scoped(BatchClient().map(batch => BatchImpl(batch, project, region))) | ||
} |
15 changes: 15 additions & 0 deletions
15
modules/batch/src/main/scala/gcp4zio/batch/BatchClient.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package gcp4zio.batch | ||
|
||
import com.google.cloud.batch.v1.BatchServiceClient | ||
import zio.{RIO, Scope, ZIO} | ||
|
||
object BatchClient { | ||
|
||
/** Returns AutoCloseable BatchServiceClient object wrapped in ZIO | ||
* @return | ||
* RIO[Scope, BatchServiceClient] | ||
*/ | ||
def apply(): RIO[Scope, BatchServiceClient] = ZIO.fromAutoCloseable(ZIO.attempt { | ||
BatchServiceClient.create() | ||
}) | ||
} |
119 changes: 119 additions & 0 deletions
119
modules/batch/src/main/scala/gcp4zio/batch/BatchImpl.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package gcp4zio.batch | ||
|
||
import com.google.cloud.batch.v1.AllocationPolicy.{InstancePolicy, InstancePolicyOrTemplate, NetworkInterface, NetworkPolicy} | ||
import com.google.cloud.batch.v1.LogsPolicy.Destination | ||
import com.google.cloud.batch.v1.Runnable.Container | ||
import com.google.cloud.batch.v1._ | ||
import com.google.protobuf.Duration | ||
import zio.{Task, ZIO} | ||
import scala.jdk.CollectionConverters._ | ||
|
||
case class BatchImpl(client: BatchServiceClient, project: String, region: String) extends Batch { | ||
override def createJob( | ||
name: String, | ||
image: String, | ||
commands: List[String], | ||
entrypoint: Option[String], | ||
serviceAccount: Option[String] | ||
): Task[Job] = | ||
ZIO.attempt { | ||
// Define what will be done as part of the job. | ||
|
||
val container = entrypoint match { | ||
case Some(value) => | ||
Container.newBuilder().setImageUri(image).addAllCommands(commands.asJava).setEntrypoint(value).build() | ||
case None => Container.newBuilder().setImageUri(image).addAllCommands(commands.asJava).build() | ||
} | ||
|
||
val runnable = Runnable.newBuilder().setContainer(container).build() | ||
|
||
// We can specify what resources are requested by each task. | ||
// In milliseconds per cpu-second. This means the task requires 2 whole CPUs. | ||
// In MiB. | ||
val computeResource = ComputeResource.newBuilder().setCpuMilli(2000).setMemoryMib(16).build() | ||
|
||
// Jobs can be divided into tasks. In this case, we have only one task. | ||
val task = TaskSpec | ||
.newBuilder() | ||
.addRunnables(runnable) | ||
.setComputeResource(computeResource) | ||
.setMaxRetryCount(0) | ||
.setMaxRunDuration(Duration.newBuilder().setSeconds(3600).build()) | ||
.build() | ||
|
||
// Tasks are grouped inside a job using TaskGroups. | ||
// Currently, it's possible to have only one task group. | ||
val taskGroup = TaskGroup.newBuilder().setTaskCount(1).setTaskSpec(task).build() | ||
|
||
// Policies are used to define on what kind of virtual machines the tasks will run on. | ||
// In this case, we tell the system to use "e2-standard-4" machine type. | ||
// Read more about machine types here: https://cloud.google.com/compute/docs/machine-types | ||
val instancePolicy = InstancePolicy.newBuilder().setMachineType("e2-standard-4").build() | ||
|
||
val sa = serviceAccount.map(ServiceAccount.newBuilder().setEmail(_).build()) | ||
|
||
val ni = NetworkInterface | ||
.newBuilder() | ||
.setNetwork("global/networks/default") | ||
.setSubnetwork("regions/us-central1/subnetworks/default") | ||
.setNoExternalIpAddress(true) | ||
.build() | ||
|
||
val np = NetworkPolicy.newBuilder().addNetworkInterfaces(ni) | ||
|
||
val allocationPolicy = | ||
sa match { | ||
case Some(value) => | ||
AllocationPolicy | ||
.newBuilder() | ||
.addInstances(InstancePolicyOrTemplate.newBuilder().setPolicy(instancePolicy).build()) | ||
.setServiceAccount(value) | ||
.setNetwork(np) | ||
.build() | ||
case None => | ||
AllocationPolicy | ||
.newBuilder() | ||
.addInstances(InstancePolicyOrTemplate.newBuilder().setPolicy(instancePolicy).build()) | ||
.setNetwork(np) | ||
.build() | ||
} | ||
|
||
val job = Job | ||
.newBuilder() | ||
.addTaskGroups(taskGroup) | ||
.setAllocationPolicy(allocationPolicy) | ||
.putLabels("env", "testing") | ||
.putLabels("type", "container") | ||
// We use Cloud Logging as it's an out of the box available option. | ||
.setLogsPolicy(LogsPolicy.newBuilder().setDestination(Destination.CLOUD_LOGGING).build()) | ||
.build() | ||
|
||
val createJobRequest = CreateJobRequest | ||
.newBuilder() | ||
// The job's parent is the region in which the job will run. | ||
.setParent(String.format("projects/%s/locations/%s", project, region)) | ||
.setJob(job) | ||
.setJobId(name) | ||
.build() | ||
|
||
logger.info(s"Creating batch job") | ||
|
||
val jobResult = client.createJobCallable().call(createJobRequest) | ||
|
||
logger.info(s"Successfully created the job ${jobResult.getName}") | ||
|
||
jobResult | ||
} | ||
|
||
override def deleteJob(name: String): Task[Unit] = { | ||
val jobName = s"projects/$project/locations/$region/jobs/$name" | ||
logger.info(s"Deleting batch job $jobName") | ||
ZIO.fromFutureJava(client.deleteJobAsync(jobName)) *> ZIO.logInfo(s"Deleted batch job: $jobName") | ||
} | ||
|
||
override def listJobs: Task[Iterable[Job]] = ZIO.attempt { | ||
val parent = s"projects/$project/locations/$region" | ||
logger.info(s"Listing batch jobs under $parent") | ||
client.listJobs(parent).iterateAll().asScala | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package gcp4zio | ||
|
||
import org.slf4j.{Logger, LoggerFactory} | ||
|
||
package object batch { | ||
private[batch] lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName) | ||
} |
Oops, something went wrong.