Skip to content

Commit

Permalink
GH-244: Add UserRecordResponse adapter for UserRecordResult
Browse files Browse the repository at this point in the history
Fixes: #244

The `UserRecordResult` has an `attempts()` property which might be used post-put request logic
in the output channel.

* Introduce a `KinesisResponse` extension to adapt a `UserRecordResult`.
This way the further flow logic may consult low-level result from KPL via existing `AwsHeaders.SERVICE_RESULT`
header in the reply message
  • Loading branch information
artembilan committed Jul 16, 2024
1 parent 6f5b58f commit f64f77c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,6 @@
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
Expand All @@ -50,6 +49,7 @@
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.aws.support.UserRecordResponse;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.mapping.HeaderMapper;
Expand Down Expand Up @@ -305,7 +305,7 @@ else if (message.getPayload() instanceof UserRecord userRecord) {

@Override
protected Map<String, ?> additionalOnSuccessHeaders(AwsRequest request, AwsResponse response) {
if (response instanceof PutRecordResponse putRecordResponse) {
if (response instanceof UserRecordResponse putRecordResponse) {
return Map.of(AwsHeaders.SHARD, putRecordResponse.shardId(),
AwsHeaders.SEQUENCE_NUMBER, putRecordResponse.sequenceNumber());
}
Expand Down Expand Up @@ -367,14 +367,10 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
}
}

private CompletableFuture<PutRecordResponse> handleUserRecord(UserRecord userRecord) {
private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
return listenableFutureToCompletableFuture(recordResult)
.thenApply(result ->
PutRecordResponse.builder()
.shardId(result.getShardId())
.sequenceNumber(result.getSequenceNumber())
.build());
.thenApply(UserRecordResponse::new);
}

private PutRecordRequest buildPutRecordRequest(Message<?> message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.aws.support;

import java.util.List;

import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.SdkField;
import software.amazon.awssdk.services.kinesis.model.KinesisResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;

/**
* The {@link KinesisResponse} adapter for the KPL {@link UserRecordResult} response.
*
* @author Artem Bilan
*
* @since 3.0.8
*/
public class UserRecordResponse extends KinesisResponse {

private final String shardId;

private final String sequenceNumber;

private final List<Attempt> attempts;

public UserRecordResponse(UserRecordResult userRecordResult) {
super(PutRecordResponse.builder());
this.shardId = userRecordResult.getShardId();
this.sequenceNumber = userRecordResult.getSequenceNumber();
this.attempts = userRecordResult.getAttempts();
}

public String shardId() {
return this.shardId;
}

public String sequenceNumber() {
return this.sequenceNumber;
}

public List<Attempt> attempts() {
return this.attempts;
}

@Override
public AwsResponse.Builder toBuilder() {
throw new UnsupportedOperationException();
}

@Override
public List<SdkField<?>> sdkFields() {
throw new UnsupportedOperationException();
}

}

0 comments on commit f64f77c

Please sign in to comment.