Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: native Scala 3 modules for google pub sub and cloud storage #3350

Merged
merged 6 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
# v4.1.1
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
with: # https://github.com/olafurpg/setup-scala#faster-checkout-of-big-repos
fetch-depth: 100
fetch-tags: true
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0

- name: Cache Coursier cache
# https://github.com/coursier/cache-action/releases
Expand Down
24 changes: 20 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ lazy val googleCommon = alpakkaProject(
"google.common",
Dependencies.GoogleCommon,
Test / fork := true,
headerSources / excludeFilter := HiddenFileFilter || "JwtSprayJsonParser.scala"
headerSources / excludeFilter := HiddenFileFilter || "JwtSprayJsonParser.scala",
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
)

lazy val googleCloudBigQuery = alpakkaProject(
Expand Down Expand Up @@ -246,7 +250,11 @@ lazy val googleCloudPubSub = alpakkaProject(
Dependencies.GooglePubSub,
Test / fork := true,
// See docker-compose.yml gcloud-pubsub-emulator_prep
Test / envVars := Map("PUBSUB_EMULATOR_HOST" -> "localhost", "PUBSUB_EMULATOR_PORT" -> "8538")
Test / envVars := Map("PUBSUB_EMULATOR_HOST" -> "localhost", "PUBSUB_EMULATOR_PORT" -> "8538"),
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).dependsOn(googleCommon)

lazy val googleCloudPubSubGrpc = alpakkaProject(
Expand All @@ -262,14 +270,22 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation"),
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).enablePlugins(AkkaGrpcPlugin).dependsOn(googleCommon)

lazy val googleCloudStorage = alpakkaProject(
"google-cloud-storage",
"google.cloud.storage",
Test / fork := true,
Dependencies.GoogleStorage
Dependencies.GoogleStorage,
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).dependsOn(googleCommon)

lazy val googleFcm = alpakkaProject("google-fcm", "google.firebase.fcm", Dependencies.GoogleFcm, Test / fork := true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PullResponse]
Expand Down Expand Up @@ -208,7 +208,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) =>
response.discardEntityBytes().future
Expand Down Expand Up @@ -261,7 +261,7 @@ private[pubsub] trait PubSubApi {
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PublishResponse]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ object PubSubConfig {
pullReturnImmediately = true,
pullMaxMessagesPerInternalBatch = 1000,
Some(
GoogleSettings().copy(
projectId = projectId,
credentials =
ServiceAccountCredentials(projectId, clientEmail, privateKey, Seq("https://www.googleapis.com/auth/pubsub"))
)
GoogleSettings
.apply()
.copy(
projectId = projectId,
credentials = ServiceAccountCredentials(projectId,
clientEmail,
privateKey,
Seq("https://www.googleapis.com/auth/pubsub"))
)
)
)

Expand All @@ -85,11 +89,15 @@ object PubSubConfig {
pullReturnImmediately = pullReturnImmediately,
pullMaxMessagesPerInternalBatch = pullMaxMessagesPerInternalBatch,
Some(
GoogleSettings().copy(
projectId = projectId,
credentials =
ServiceAccountCredentials(projectId, clientEmail, privateKey, Seq("https://www.googleapis.com/auth/pubsub"))
)
GoogleSettings
.apply()
.copy(
projectId = projectId,
credentials = ServiceAccountCredentials(projectId,
clientEmail,
privateKey,
Seq("https://www.googleapis.com/auth/pubsub"))
)
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import akka.stream.alpakka.googlecloud.storage._
import spray.json.{DefaultJsonProtocol, JsObject, JsValue, RootJsonFormat, RootJsonReader}

import scala.util.Try
import spray.json.enrichAny

@akka.annotation.InternalApi
object Formats extends DefaultJsonProtocol {

private final case class CustomerEncryption(encryptionAlgorithm: String, keySha256: String)
private implicit val customerEncryptionJsonFormat: RootJsonFormat[CustomerEncryption] = jsonFormat2(
CustomerEncryption
CustomerEncryption.apply
)

private final case class Owner(entity: String, entityId: Option[String])
private implicit val OwnerJsonFormat: RootJsonFormat[Owner] = jsonFormat2(Owner)
private implicit val OwnerJsonFormat: RootJsonFormat[Owner] = jsonFormat2(Owner.apply)

private final case class ProjectTeam(projectNumber: String, team: String)
private implicit val ProjectTeamJsonFormat: RootJsonFormat[ProjectTeam] = jsonFormat2(ProjectTeam)
private implicit val ProjectTeamJsonFormat: RootJsonFormat[ProjectTeam] = jsonFormat2(ProjectTeam.apply)

private final case class ObjectAccessControls(kind: String,
id: String,
Expand All @@ -40,7 +41,7 @@ object Formats extends DefaultJsonProtocol {
projectTeam: ProjectTeam,
etag: String)
private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] = jsonFormat13(
ObjectAccessControls
ObjectAccessControls.apply
)

/**
Expand Down Expand Up @@ -73,7 +74,7 @@ object Formats extends DefaultJsonProtocol {
)

private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] = jsonFormat18(
StorageObjectReadOnlyJson
StorageObjectReadOnlyJson.apply
)

// private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue
Expand All @@ -95,7 +96,7 @@ object Formats extends DefaultJsonProtocol {
)

private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] = jsonFormat14(
StorageObjectWriteableJson
StorageObjectWriteableJson.apply
)

private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] {
Expand Down Expand Up @@ -130,7 +131,7 @@ object Formats extends DefaultJsonProtocol {
items: Option[List[StorageObjectJson]]
)

private implicit val bucketInfoJsonFormat: RootJsonFormat[BucketInfoJson] = jsonFormat6(BucketInfoJson)
private implicit val bucketInfoJsonFormat: RootJsonFormat[BucketInfoJson] = jsonFormat6(BucketInfoJson.apply)

/**
* Google API rewrite response object
Expand All @@ -146,7 +147,9 @@ object Formats extends DefaultJsonProtocol {
resource: Option[StorageObjectJson]
)

private implicit val rewriteResponseFormat: RootJsonFormat[RewriteResponseJson] = jsonFormat6(RewriteResponseJson)
private implicit val rewriteResponseFormat: RootJsonFormat[RewriteResponseJson] = jsonFormat6(
RewriteResponseJson.apply
)

/**
* Google API bucket response object
Expand All @@ -162,7 +165,7 @@ object Formats extends DefaultJsonProtocol {
etag: String
)

implicit val bucketInfoFormat: RootJsonFormat[BucketInfo] = jsonFormat2(BucketInfo)
implicit val bucketInfoFormat: RootJsonFormat[BucketInfo] = jsonFormat2(BucketInfo.apply)

implicit object BucketListResultReads extends RootJsonReader[BucketListResult] {
override def read(json: JsValue): BucketListResult = {
Expand All @@ -177,7 +180,7 @@ object Formats extends DefaultJsonProtocol {
}

private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] = jsonFormat4(
BucketListResultJson
BucketListResultJson.apply
)

implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ import scala.concurrent.Future
metadata: Option[Map[String, String]] = None): Sink[ByteString, Future[StorageObject]] =
Sink
.fromMaterializer { (mat, attr) =>
implicit val settings = {
implicit val settings: GoogleSettings = {
val s = resolveSettings(mat, attr)
s.copy(requestSettings = s.requestSettings.copy(uploadChunkSize = chunkSize))
}
Expand All @@ -158,7 +158,7 @@ import scala.concurrent.Future
}: PartialFunction[HttpResponse, Future[StorageObject]]
}.withDefaultRetry

ResumableUpload[StorageObject](request).addAttributes(GoogleAttributes.settings(settings))
ResumableUpload[StorageObject](request)(um).addAttributes(GoogleAttributes.settings(settings))
}
.mapMaterializedValue(_.flatten)

Expand Down Expand Up @@ -236,7 +236,7 @@ import scala.concurrent.Future
getBucketPath(bucket) / "o" / objectName

implicit def unmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, T] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T]
Expand All @@ -248,7 +248,7 @@ import scala.concurrent.Future
}.withDefaultRetry

implicit def optionUnmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, Option[T]] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T].map(Some(_))
Expand Down Expand Up @@ -324,7 +324,7 @@ import scala.concurrent.Future
)
}

GoogleSettings(
GoogleSettings.apply(
legacySettings.projectId,
credentials,
settings.requestSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GCSExtSpec extends AnyFlatSpec with Matchers with LogCapturing {
).asJava
)

implicit val system = ActorSystem.create("gcs", config)
implicit val system: ActorSystem = ActorSystem.create("gcs", config)
val ext = GCSExt(system)

ext.settings.endpointUrl shouldBe endpointUrl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GCStorageExtSpec extends AnyFlatSpec with Matchers with LogCapturing {
"alpakka.google.cloud.storage.token-scope" -> tokenScope
).asJava
)
implicit val system = ActorSystem.create("gcStorage", config)
implicit val system: ActorSystem = ActorSystem.create("gcStorage", config)
@nowarn("msg=deprecated")
val ext = GCStorageExt(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ object GoogleSettings {

}

final case class GoogleSettings @InternalApi private (projectId: String,
credentials: Credentials,
requestSettings: RequestSettings) {
final case class GoogleSettings @InternalApi private[akka] (projectId: String,
credentials: Credentials,
requestSettings: RequestSettings) {
def getProjectId = projectId
def getCredentials = credentials
def getRequestSettings = requestSettings
Expand Down Expand Up @@ -125,7 +125,7 @@ object RequestSettings {
apply(userIp.toScala, quotaUser.toScala, prettyPrint, chunkSize, retrySettings, forwardProxy.toScala)
}

final case class RequestSettings @InternalApi private (
final case class RequestSettings @InternalApi private[akka] (
userIp: Option[String],
quotaUser: Option[String],
prettyPrint: Boolean,
Expand Down Expand Up @@ -247,7 +247,7 @@ object ForwardProxy {
credentials: Option[BasicHttpCredentials],
trustPem: Option[String])(implicit system: ClassicActorSystemProvider): ForwardProxy = {
ForwardProxy(
trustPem.fold(Http(system).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
trustPem.fold(Http(system.classicSystem).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
ForwardProxyPoolSettings(scheme, host, port, credentials)(system.classicSystem)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[alpakka] object ResumableUpload {
import implicits._

implicit val um: FromResponseUnmarshaller[Uri] = Unmarshaller.withMaterializer {
implicit ec => implicit mat => response: HttpResponse =>
implicit ec => implicit mat => (response: HttpResponse) =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
Expand All @@ -107,7 +107,7 @@ private[alpakka] object ResumableUpload {
)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = {
implicit val system: ActorSystem = mat.system

val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case PermanentRedirect =>
response.discardEntityBytes().future.map(_ => None)
Expand Down Expand Up @@ -145,7 +145,7 @@ private[alpakka] object ResumableUpload {
import implicits._

implicit val um: FromResponseUnmarshaller[Either[T, Long]] = Unmarshaller.withMaterializer {
implicit ec => implicit mat => response: HttpResponse =>
implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Credentials {
*/
def apply(c: Config)(implicit system: ClassicActorSystemProvider): Credentials = c.getString("provider") match {
case "application-default" =>
val log = Logging(system.classicSystem, getClass)
val log = Logging(system.classicSystem, classOf[Credentials])
try {
val creds = parseServiceAccount(c)
log.info("Using service account credentials")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ private[auth] object GoogleOAuth2 {
}

final case class JwtClaimContent(scope: String)
implicit val jwtClaimContentFormat: JsonFormat[JwtClaimContent] = jsonFormat1(JwtClaimContent)
implicit val jwtClaimContentFormat: JsonFormat[JwtClaimContent] = jsonFormat1(JwtClaimContent.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import akka.stream.alpakka.google.util.Retry
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

final case class GoogleOAuth2Exception private (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info)
final case class GoogleOAuth2Exception private[akka] (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info)

private[google] object GoogleOAuth2Exception {

private val internalFailure = "internal_failure"
private final case class OAuth2ErrorResponse(error: Option[String], error_description: Option[String])
private implicit val oAuth2ErrorResponseFormat: RootJsonFormat[OAuth2ErrorResponse] = jsonFormat2(OAuth2ErrorResponse)
private implicit val oAuth2ErrorResponseFormat: RootJsonFormat[OAuth2ErrorResponse] = jsonFormat2(
OAuth2ErrorResponse.apply
)

implicit val unmarshaller: FromResponseUnmarshaller[Throwable] =
Unmarshaller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object `X-Upload-Content-Type` extends ModeledCustomHeaderCompanion[`X-Upload-Co
)
}

final case class `X-Upload-Content-Type` private (contentType: ContentType)
final case class `X-Upload-Content-Type` private[akka] (contentType: ContentType)
extends ModeledCustomHeader[`X-Upload-Content-Type`]
with XUploadContentType {
override def value(): String = contentType.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OAuth2CredentialsSpec
implicit val settings: RequestSettings = GoogleSettings().requestSettings
implicit val clock: Clock = Clock.systemUTC()

final object AccessTokenProvider {
object AccessTokenProvider {
@volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException)
}

Expand Down
Loading