-
Notifications
You must be signed in to change notification settings - Fork 160
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement CloudEventMessageConverter for spring-amqp
- Loading branch information
Showing
7 changed files
with
378 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
spring/src/main/java/io/cloudevents/spring/amqp/CloudEventMessageConverter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Copyright 2020-Present The CloudEvents Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.cloudevents.spring.amqp; | ||
|
||
import io.cloudevents.CloudEvent; | ||
import io.cloudevents.CloudEventContext; | ||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.core.CloudEventUtils; | ||
import io.cloudevents.core.format.EventFormat; | ||
import io.cloudevents.core.message.MessageReader; | ||
import io.cloudevents.core.message.impl.GenericStructuredMessageReader; | ||
import io.cloudevents.core.message.impl.MessageUtils; | ||
import org.springframework.amqp.core.Message; | ||
import org.springframework.amqp.core.MessageProperties; | ||
import org.springframework.amqp.support.converter.MessageConverter; | ||
|
||
/** | ||
* A {@link MessageConverter} that can translate to and from a {@link Message} and a {@link CloudEvent}. | ||
* The {@link CloudEventContext} is canonicalized, with key names given a {@code ce-} prefix in the | ||
* {@link MessageProperties}. | ||
* | ||
* @author Lars Michele | ||
* @see io.cloudevents.spring.messaging.CloudEventMessageConverter used as stencil for the implementation | ||
*/ | ||
public class CloudEventMessageConverter implements MessageConverter { | ||
|
||
@Override | ||
public CloudEvent fromMessage(Message message) { | ||
return createMessageReader(message).toEvent(); | ||
} | ||
|
||
@Override | ||
public Message toMessage(Object object, MessageProperties messageProperties) { | ||
if (object instanceof CloudEvent) { | ||
CloudEvent event = (CloudEvent) object; | ||
return CloudEventUtils.toReader(event).read(new MessageBuilderMessageWriter(messageProperties)); | ||
} | ||
return null; | ||
} | ||
|
||
private MessageReader createMessageReader(Message message) { | ||
return MessageUtils.parseStructuredOrBinaryMessage( | ||
() -> contentType(message.getMessageProperties()), | ||
format -> structuredMessageReader(message, format), | ||
() -> version(message.getMessageProperties()), | ||
version -> binaryMessageReader(message, version) | ||
); | ||
} | ||
|
||
private String version(MessageProperties properties) { | ||
Object header = properties.getHeader(CloudEventsHeaders.SPEC_VERSION); | ||
return header == null ? null : header.toString(); | ||
} | ||
|
||
private MessageReader binaryMessageReader(Message message, SpecVersion version) { | ||
return new MessageBinaryMessageReader(version, message.getMessageProperties(), message.getBody()); | ||
} | ||
|
||
private MessageReader structuredMessageReader(Message message, EventFormat format) { | ||
return new GenericStructuredMessageReader(format, message.getBody()); | ||
} | ||
|
||
private String contentType(MessageProperties properties) { | ||
String contentType = properties.getContentType(); | ||
if (contentType == null) { | ||
Object header = properties.getHeader(CloudEventsHeaders.CONTENT_TYPE); | ||
return header == null ? null : header.toString(); | ||
} | ||
return contentType; | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
spring/src/main/java/io/cloudevents/spring/amqp/CloudEventsHeaders.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright 2020-Present The CloudEvents Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.cloudevents.spring.amqp; | ||
|
||
public class CloudEventsHeaders { | ||
|
||
/** | ||
* CloudEvent attributes MUST be prefixed with either "cloudEvents_" or "cloudEvents:" for use in the application-properties section. | ||
* @see <a href="https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md#3131-amqp-application-property-names"> | ||
* AMQP Protocol Binding for CloudEvents</a> | ||
* */ | ||
public static final String CE_PREFIX = "cloudEvents_"; | ||
|
||
public static final String SPEC_VERSION = CE_PREFIX + "specversion"; | ||
|
||
public static final String CONTENT_TYPE = CE_PREFIX + "datacontenttype"; | ||
} |
69 changes: 69 additions & 0 deletions
69
spring/src/main/java/io/cloudevents/spring/amqp/MessageBinaryMessageReader.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Copyright 2020-Present The CloudEvents Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.cloudevents.spring.amqp; | ||
|
||
import static io.cloudevents.spring.amqp.CloudEventsHeaders.*; | ||
import static org.springframework.amqp.support.AmqpHeaders.CONTENT_TYPE; | ||
|
||
import java.util.Map; | ||
import java.util.function.BiConsumer; | ||
|
||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.core.data.BytesCloudEventData; | ||
import io.cloudevents.core.impl.StringUtils; | ||
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl; | ||
import org.springframework.amqp.core.MessageProperties; | ||
|
||
/** | ||
* Utility for converting {@link MessageProperties} (message headers) to `CloudEvent` contexts. | ||
* | ||
* @author Lars Michele | ||
* @see io.cloudevents.spring.messaging.MessageBinaryMessageReader used as stencil for the implementation | ||
*/ | ||
class MessageBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> { | ||
|
||
private final Map<String, Object> headers; | ||
|
||
public MessageBinaryMessageReader(SpecVersion version, MessageProperties properties, byte[] payload) { | ||
super(version, payload == null ? null : BytesCloudEventData.wrap(payload)); | ||
this.headers = properties.getHeaders(); | ||
} | ||
|
||
@Override | ||
protected boolean isContentTypeHeader(String key) { | ||
return CONTENT_TYPE.equalsIgnoreCase(key); | ||
} | ||
|
||
@Override | ||
protected boolean isCloudEventsHeader(String key) { | ||
return key != null && key.length() > CE_PREFIX.length() && StringUtils.startsWithIgnoreCase(key, CE_PREFIX); | ||
} | ||
|
||
@Override | ||
protected String toCloudEventsKey(String key) { | ||
return key.substring(CE_PREFIX.length()).toLowerCase(); | ||
} | ||
|
||
@Override | ||
protected void forEachHeader(BiConsumer<String, Object> fn) { | ||
headers.forEach(fn); | ||
} | ||
|
||
@Override | ||
protected String toCloudEventsValue(Object value) { | ||
return value.toString(); | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
spring/src/main/java/io/cloudevents/spring/amqp/MessageBuilderMessageWriter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright 2020-Present The CloudEvents Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.cloudevents.spring.amqp; | ||
|
||
import static io.cloudevents.spring.amqp.CloudEventsHeaders.*; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
import io.cloudevents.CloudEventData; | ||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.core.format.EventFormat; | ||
import io.cloudevents.core.message.MessageWriter; | ||
import io.cloudevents.rw.CloudEventContextWriter; | ||
import io.cloudevents.rw.CloudEventRWException; | ||
import io.cloudevents.rw.CloudEventWriter; | ||
import org.springframework.amqp.core.Message; | ||
import org.springframework.amqp.core.MessageBuilder; | ||
import org.springframework.amqp.core.MessageProperties; | ||
|
||
/** | ||
* Internal utility class for copying <code>CloudEvent</code> context to {@link MessageProperties} (message | ||
* headers). | ||
* | ||
* @author Lars Michele | ||
* @see io.cloudevents.spring.messaging.MessageBuilderMessageWriter used as stencil for the implementation | ||
*/ | ||
class MessageBuilderMessageWriter implements CloudEventWriter<Message>, MessageWriter<MessageBuilderMessageWriter, Message> { | ||
|
||
private final Map<String, Object> headers = new HashMap<>(); | ||
|
||
public MessageBuilderMessageWriter(MessageProperties properties) { | ||
this.headers.putAll(properties.getHeaders()); | ||
} | ||
|
||
@Override | ||
public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException { | ||
headers.put(CONTENT_TYPE, format.serializedContentType()); | ||
return MessageBuilder.withBody(value).copyHeaders(headers).build(); | ||
} | ||
|
||
@Override | ||
public Message end(CloudEventData value) throws CloudEventRWException { | ||
return MessageBuilder.withBody(value == null ? new byte[0] : value.toBytes()).copyHeaders(headers).build(); | ||
} | ||
|
||
@Override | ||
public Message end() { | ||
return MessageBuilder.withBody(new byte[0]).copyHeaders(headers).build(); | ||
} | ||
|
||
@Override | ||
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException { | ||
headers.put(CE_PREFIX + name, value); | ||
return this; | ||
} | ||
|
||
@Override | ||
public MessageBuilderMessageWriter create(SpecVersion version) { | ||
headers.put(SPEC_VERSION, version.toString()); | ||
return this; | ||
} | ||
} |
4 changes: 4 additions & 0 deletions
4
spring/src/main/java/io/cloudevents/spring/amqp/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
/** | ||
* Provides classes related to working with Cloud Events within the context of Spring Amqp. | ||
*/ | ||
package io.cloudevents.spring.amqp; |
109 changes: 109 additions & 0 deletions
109
spring/src/test/java/io/cloudevents/spring/amqp/CloudEventMessageConverterTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Copyright 2019-2019 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.cloudevents.spring.amqp; | ||
|
||
import static org.assertj.core.api.Assertions.*; | ||
|
||
import java.net.URI; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Map; | ||
|
||
import io.cloudevents.CloudEvent; | ||
import io.cloudevents.SpecVersion; | ||
import io.cloudevents.core.builder.CloudEventBuilder; | ||
import io.cloudevents.rw.CloudEventRWException; | ||
import org.junit.jupiter.api.Test; | ||
import org.springframework.amqp.core.Message; | ||
import org.springframework.amqp.core.MessageBuilder; | ||
import org.springframework.amqp.core.MessageProperties; | ||
|
||
/** | ||
* @author Lars Michele | ||
* @see io.cloudevents.spring.messaging.CloudEventMessageConverterTests used as stencil for the implementation | ||
*/ | ||
class CloudEventMessageConverterTests { | ||
|
||
private static final String JSON = "{\"specversion\":\"1.0\"," // | ||
+ "\"id\":\"12345\"," // | ||
+ "\"source\":\"https://spring.io/events\"," // | ||
+ "\"type\":\"io.spring.event\"," // | ||
+ "\"datacontenttype\":\"application/json\"," // | ||
+ "\"data\":{\"value\":\"Dave\"}" // | ||
+ "}"; | ||
|
||
private final CloudEventMessageConverter converter = new CloudEventMessageConverter(); | ||
|
||
@Test | ||
void noSpecVersion() { | ||
Message message = MessageBuilder.withBody(new byte[0]).build(); | ||
assertThatExceptionOfType(CloudEventRWException.class).isThrownBy(() -> { | ||
assertThat(converter.fromMessage(message)).isNull(); | ||
}); | ||
} | ||
|
||
@Test | ||
void notValidCloudEvent() { | ||
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0").build(); | ||
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> { | ||
assertThat(converter.fromMessage(message)).isNull(); | ||
}); | ||
} | ||
|
||
@Test | ||
void validCloudEvent() { | ||
Message message = MessageBuilder.withBody(new byte[0]).setHeader("cloudEvents_specversion", "1.0") | ||
.setHeader("cloudEvents_id", "12345").setHeader("cloudEvents_source", "https://spring.io/events") | ||
.setHeader("cloudEvents_type", "io.spring.event").build(); | ||
CloudEvent event = converter.fromMessage(message); | ||
assertThat(event).isNotNull(); | ||
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1); | ||
assertThat(event.getId()).isEqualTo("12345"); | ||
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events")); | ||
assertThat(event.getType()).isEqualTo("io.spring.event"); | ||
} | ||
|
||
@Test | ||
void structuredCloudEvent() { | ||
byte[] payload = JSON.getBytes(StandardCharsets.UTF_8); | ||
Message message = MessageBuilder.withBody(payload) | ||
.setContentType("application/cloudevents+json").build(); | ||
CloudEvent event = converter.fromMessage(message); | ||
assertThat(event).isNotNull(); | ||
assertThat(event.getSpecVersion()).isEqualTo(SpecVersion.V1); | ||
assertThat(event.getId()).isEqualTo("12345"); | ||
assertThat(event.getSource()).isEqualTo(URI.create("https://spring.io/events")); | ||
assertThat(event.getType()).isEqualTo("io.spring.event"); | ||
} | ||
|
||
@Test | ||
void fromCloudEvent() { | ||
CloudEvent attributes = CloudEventBuilder.v1().withId("A234-1234-1234") | ||
.withSource(URI.create("https://spring.io/")).withType("org.springframework") | ||
.withData("hello".getBytes(StandardCharsets.UTF_8)).build(); | ||
Message message = converter.toMessage(attributes, new MessageProperties()); | ||
Map<String, ?> headers = message.getMessageProperties().getHeaders(); | ||
assertThat(headers.get("cloudEvents_id")).isEqualTo("A234-1234-1234"); | ||
assertThat(headers.get("cloudEvents_specversion")).isEqualTo("1.0"); | ||
assertThat(headers.get("cloudEvents_source")).isEqualTo("https://spring.io/"); | ||
assertThat(headers.get("cloudEvents_type")).isEqualTo("org.springframework"); | ||
assertThat("hello".getBytes(StandardCharsets.UTF_8)).isEqualTo(message.getBody()); | ||
} | ||
|
||
@Test | ||
void fromNonCloudEvent() { | ||
assertThat(converter.toMessage(new byte[0], new MessageProperties())).isNull(); | ||
} | ||
} |