-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathKinesisEcho.scala
167 lines (142 loc) · 6.17 KB
/
KinesisEcho.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package alpakka.kinesis
import org.apache.commons.validator.routines.UrlValidator
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.awsspi.PekkoHttpClient
import org.apache.pekko.stream.connectors.kinesis.scaladsl.{KinesisFlow, KinesisSource}
import org.apache.pekko.stream.connectors.kinesis.{KinesisFlowSettings, ShardIterator, ShardSettings}
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.util.ByteString
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequestEntry, PutRecordsResultEntry, Record}
import java.net.URI
import java.nio.charset.StandardCharsets
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
/**
* Echo flow via a Kinesis "provisioned data stream" with 1 shard:
* - upload n binary elements
* - download n binary elements
*
* Run this class against your AWS account using hardcoded accessKey/secretKey
* Prerequisite: Create a "provisioned data stream" with 1 shard on AWS console
* or
* Run via [[alpakka.kinesis.KinesisEchoIT]] against localStack docker container
*
* Remarks:
* - No computation on the server side, just echo routing
* - Default data retention time is 24h, hence with the setting `TrimHorizon` we may receive old records...
* - A Kinesis data stream with 1 shard should preserve order, however duplicates may occur due to retries
* - Be warned that running against AWS (and thus setting up resources) can cost you money
*
* Doc:
* https://doc.akka.io/docs/alpakka/current/kinesis.html
*/
class KinesisEcho(urlWithMappedPort: URI = new URI(""), accessKey: String = "", secretKey: String = "", region: String = "") {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit val system: ActorSystem = ActorSystem("KinesisEcho")
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val kinesisDataStreamName = "kinesisDataStreamProvisioned"
val shardIdName = "shardId-000000000000"
val batchSize = 10
implicit val awsKinesisClient: KinesisAsyncClient = {
if (new UrlValidator().isValid(urlWithMappedPort.toString)) {
logger.info("Running against localStack on local container...")
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
KinesisAsyncClient
.builder()
.endpointOverride(urlWithMappedPort)
.credentialsProvider(credentialsProvider)
.region(Region.of(region))
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possible to configure retry policy
// see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
} else {
logger.info("Running against AWS...")
// For now use hardcoded credentials
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("***", "***"))
KinesisAsyncClient
.builder()
.credentialsProvider(credentialsProvider)
.region(Region.EU_WEST_1)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// Possible to configure retry policy
// see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
// .overrideConfiguration(...)
.build()
}
}
system.registerOnTermination(awsKinesisClient.close())
def run(): Int = {
val done = for {
_ <- producerClient()
consumed <- consumerClient()
} yield consumed
val result: Seq[String] = Await.result(done, 60.seconds)
logger.info(s"Successfully downloaded: ${result.size} records")
terminateWhen(done)
result.size
}
private def producerClient() = {
logger.info(s"About to start upload...")
val defaultFlowSettings = KinesisFlowSettings.Defaults
val kinesisFlow: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow(kinesisDataStreamName, defaultFlowSettings)
val done: Future[Seq[PutRecordsResultEntry]] = Source(1 to batchSize)
.map(each => convertToRecord(each))
.via(kinesisFlow)
.runWith(Sink.seq)
done.onComplete(result => logger.info(s"Successfully uploaded: ${result.get.size} records"))
done
}
private def consumerClient(): Future[Seq[String]] = {
logger.info(s"About to start download...")
val settings = {
ShardSettings(streamName = kinesisDataStreamName, shardId = shardIdName)
.withRefreshInterval(1.second)
.withLimit(500)
// TrimHorizon: same as "earliest" in Kafka
.withShardIterator(ShardIterator.TrimHorizon)
}
val source: Source[software.amazon.awssdk.services.kinesis.model.Record, NotUsed] =
KinesisSource.basic(settings, awsKinesisClient)
source
.map(each => convertToString(each))
.wireTap(each => logger.debug(s"Downloaded: $each"))
.take(batchSize)
.runWith(Sink.seq)
}
private def convertToRecord(each: Int) = {
PutRecordsRequestEntry
.builder()
.partitionKey(s"partition-key-$each")
.data(SdkBytes.fromByteBuffer(ByteString(s"data-$each").asByteBuffer))
.build()
}
private def convertToString(record: Record) = {
val processingTimestamp = record.approximateArrivalTimestamp
val data = record.data.asString(StandardCharsets.UTF_8)
s"Processing time: $processingTimestamp. Data:$data"
}
private def terminateWhen(done: Future[Seq[String]]): Unit = {
done.onComplete {
case Success(_) =>
logger.info(s"Flow Success. About to terminate...")
system.terminate()
case Failure(e) =>
logger.info(s"Flow Failure: $e. About to terminate...")
system.terminate()
}
}
}
object KinesisEcho extends App {
val echo = new KinesisEcho()
echo.run()
// TODO Add stream removal
}