Skip to content

Commit

Permalink
drop alpakka references
Browse files Browse the repository at this point in the history
  • Loading branch information
majk-p committed May 29, 2023
1 parent d3b01cc commit 9b8e3c1
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ lazy val activemq = module("activemq", directory = "connectors")
resolvers ++= Resolver.sonatypeOssRepos("snapshots"),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+99-44451f91-SNAPSHOT",
// "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "4.0.0", // 5.x.x contains akka-streams +2.7.x which is licensed under BUSL 1.1
"org.apache.activemq" % "activemq-pool" % Versions.ActiveMq,
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.stream.connectors.{jms => alpakka}
import org.apache.pekko.stream.connectors.{jms => pekkojms}

private[activemq] object common {

def toAlpakkaDestination: (String, Jms.Type) => alpakka.Destination = {
case (name, Jms.Type.Topic) => alpakka.Topic(name)
case (name, Jms.Type.Queue) => alpakka.Queue(name)
def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = {
case (name, Jms.Type.Topic) => pekkojms.Topic(name)
case (name, Jms.Type.Queue) => pekkojms.Queue(name)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer
import org.apache.pekko.stream.connectors.{jms => alpakka}
import org.apache.pekko.stream.connectors.{jms => pekkojms}
import org.apache.pekko.stream.scaladsl.RestartSource
import cats.ApplicativeThrow
import cats.effect.Async
Expand Down Expand Up @@ -48,12 +48,12 @@ private[activemq] object consumer {
for {
JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source))

jmsConsumerSettings = alpakka
jmsConsumerSettings = pekkojms
.JmsConsumerSettings(as, connectionFactory)
.withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2)
.withSessionCount(settings.parallelSessions)
.withFailStreamOnAckTimeout(true)
.withDestination(common.toAlpakkaDestination(name, sourceType))
.withDestination(common.toPekkoDestination(name, sourceType))

txEnvelope <- RestartSource
.withBackoff(settings.restartSettings.toAkka) { () =>
Expand All @@ -72,7 +72,7 @@ private[activemq] object consumer {
)
}

private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: alpakka.TxEnvelope): F[Option[CommittableMessage[F]]] = {
private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = {
val commit = Sync[F].delay(txEnvelope.commit())
val rollback = Sync[F].delay(txEnvelope.rollback())
txEnvelope.message match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.ocadotechnology.pass4s.connectors.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer
import org.apache.pekko.stream.connectors.{jms => alpakka}
import org.apache.pekko.stream.connectors.{jms => pekkojms}
import cats.ApplicativeThrow
import cats.effect.Concurrent
import cats.effect.Resource
Expand All @@ -43,7 +43,7 @@ private[activemq] object producer {

private type Attempt = Either[Throwable, Unit]
private type Promise[F[_]] = Deferred[F, Attempt]
private type JmsPayload[F[_]] = alpakka.JmsEnvelope[Promise[F]]
private type JmsPayload[F[_]] = pekkojms.JmsEnvelope[Promise[F]]

def createMessageProducer[F[_]: Async](
connectionFactory: jms.ConnectionFactory,
Expand All @@ -65,9 +65,9 @@ private[activemq] object producer {
for {
jmsDestination <- extractJmsDestination[F](message.destination)
promise <- Deferred[F, Attempt]
alpakkaMessage = alpakka.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata)
alpakkaDestination = common.toAlpakkaDestination(jmsDestination.name, jmsDestination.destinationType)
_ <- enqueue(alpakkaMessage.to(alpakkaDestination))
pekkoMessage = pekkojms.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata)
pekkoDestination = common.toPekkoDestination(jmsDestination.name, jmsDestination.destinationType)
_ <- enqueue(pekkoMessage.to(pekkoDestination))
_ <- promise.get.rethrow
} yield ()

Expand All @@ -87,7 +87,7 @@ private[activemq] object producer {
* that the promise completion and Ref update are atomic
*/
Stream.eval((Ref.of[F, Set[JmsPayload[F]]](Set.empty), Semaphore[F](n = 1)).tupled).flatMap { case (inflightMessages, semaphore) =>
val jmsProducerSettings = alpakka
val jmsProducerSettings = pekkojms
.JmsProducerSettings(as, connectionFactory)
.withTopic("Pass4s.Default") // default destination is obligatory, but always overridden

Expand Down

0 comments on commit 9b8e3c1

Please sign in to comment.