From 731d28201a272734b9ddb169ce3e462c8c9da936 Mon Sep 17 00:00:00 2001 From: Rick Fast Date: Sun, 30 Oct 2016 06:30:36 -0500 Subject: [PATCH] transaction support (#181) --- .../com/orbitz/consul/KeyValueClient.java | 74 +++++++++++++++++++ .../com/orbitz/consul/model/kv/Operation.java | 43 +++++++++++ .../com/orbitz/consul/model/kv/TxError.java | 22 ++++++ .../orbitz/consul/model/kv/TxResponse.java | 22 ++++++ .../java/com/orbitz/consul/model/kv/Verb.java | 18 +++++ .../orbitz/consul/option/ConsistencyMode.java | 14 +++- .../consul/util/Base64EncodingSerializer.java | 22 ++++++ .../java/com/orbitz/consul/KeyValueTests.java | 22 ++++++ 8 files changed, 236 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/orbitz/consul/model/kv/Operation.java create mode 100644 src/main/java/com/orbitz/consul/model/kv/TxError.java create mode 100644 src/main/java/com/orbitz/consul/model/kv/TxResponse.java create mode 100644 src/main/java/com/orbitz/consul/model/kv/Verb.java create mode 100644 src/main/java/com/orbitz/consul/util/Base64EncodingSerializer.java diff --git a/src/main/java/com/orbitz/consul/KeyValueClient.java b/src/main/java/com/orbitz/consul/KeyValueClient.java index 3fa036e2..a44b86ea 100644 --- a/src/main/java/com/orbitz/consul/KeyValueClient.java +++ b/src/main/java/com/orbitz/consul/KeyValueClient.java @@ -1,16 +1,21 @@ package com.orbitz.consul; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.UnsignedLongs; import com.orbitz.consul.async.ConsulResponseCallback; import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.kv.Operation; +import com.orbitz.consul.model.kv.TxResponse; import com.orbitz.consul.model.kv.Value; import com.orbitz.consul.model.session.SessionInfo; import com.orbitz.consul.option.DeleteOptions; +import com.orbitz.consul.option.ConsistencyMode; import com.orbitz.consul.option.ImmutablePutOptions; import com.orbitz.consul.option.PutOptions; import com.orbitz.consul.option.QueryOptions; +import com.orbitz.consul.util.Jackson; import okhttp3.MediaType; import okhttp3.RequestBody; import org.apache.commons.lang3.StringUtils; @@ -360,6 +365,55 @@ public boolean releaseLock(final String key, final String sessionId) { return putValue(key, "", 0, ImmutablePutOptions.builder().release(sessionId).build()); } + /** + * Performs a Consul transaction. + * + * PUT /v1/tx + * + * @param operations A list of KV operations. + * @return A {@link ConsulResponse} containing results and potential errors. + */ + public ConsulResponse performTransaction(Operation... operations) { + return performTransaction(ConsistencyMode.DEFAULT, operations); + } + + /** + * Performs a Consul transaction. + * + * PUT /v1/tx + * + * @param consistency The consistency to use for the transaction. + * @param operations A list of KV operations. + * @return A {@link ConsulResponse} containing results and potential errors. + */ + public ConsulResponse performTransaction(ConsistencyMode consistency, Operation... operations) { + Map query = consistency == ConsistencyMode.DEFAULT + ? ImmutableMap.of() + : ImmutableMap.of(consistency.toParam().get(), "true"); + + try { + return extractConsulResponse(api.performTransaction(RequestBody.create(MediaType.parse("application/json"), + Jackson.MAPPER.writeValueAsString(kv(operations))), query)); + } catch (JsonProcessingException e) { + throw new ConsulException(e); + } + } + + /** + * Wraps {@link Operation} in a "KV": { } block. + * @param operations An array of ops. + * @return An array of wrapped ops. + */ + static Kv[] kv(Operation... operations) { + Kv[] kvs = new Kv[operations.length]; + + for (int i = 0; i < operations.length; i ++) { + kvs[i] = new Kv(operations[i]); + } + + return kvs; + } + /** * Retrofit API interface. */ @@ -384,5 +438,25 @@ Call putValue(@Path("key") String key, @DELETE("kv/{key}") Call deleteValues(@Path("key") String key, @QueryMap Map query); + + @PUT("txn") + @Headers("Content-Type: application/json") + Call performTransaction(@Body RequestBody body, + @QueryMap Map query); + } + + /** + * Wrapper for Transaction KV entry. + */ + static class Kv { + private Operation kv; + + private Kv(Operation operation) { + kv = operation; + } + + public Operation getKv() { + return kv; + } } } diff --git a/src/main/java/com/orbitz/consul/model/kv/Operation.java b/src/main/java/com/orbitz/consul/model/kv/Operation.java new file mode 100644 index 00000000..cae1068e --- /dev/null +++ b/src/main/java/com/orbitz/consul/model/kv/Operation.java @@ -0,0 +1,43 @@ +package com.orbitz.consul.model.kv; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.Optional; +import com.orbitz.consul.util.Base64EncodingDeserializer; +import com.orbitz.consul.util.Base64EncodingSerializer; +import org.immutables.value.Value; + +import java.math.BigInteger; + +@Value.Immutable +@JsonDeserialize(as = ImmutableOperation.class) +@JsonSerialize(as = ImmutableOperation.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public abstract class Operation { + + @JsonProperty("Verb") + public abstract String verb(); + + @JsonProperty("Key") + public abstract Optional key(); + + @JsonProperty("Value") + @JsonSerialize(using = Base64EncodingSerializer.class) + @JsonDeserialize(using = Base64EncodingDeserializer.class) + public abstract Optional value(); + + @JsonProperty("Flags") + public abstract Optional flags(); + + @JsonProperty("Index") + public abstract Optional index(); + + @JsonProperty("Session") + public abstract Optional session(); + + public static ImmutableOperation.Builder builder(Verb verb) { + return ImmutableOperation.builder().verb(verb.toValue()); + } +} diff --git a/src/main/java/com/orbitz/consul/model/kv/TxError.java b/src/main/java/com/orbitz/consul/model/kv/TxError.java new file mode 100644 index 00000000..96484d13 --- /dev/null +++ b/src/main/java/com/orbitz/consul/model/kv/TxError.java @@ -0,0 +1,22 @@ +package com.orbitz.consul.model.kv; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.Optional; + +import java.math.BigInteger; + +@org.immutables.value.Value.Immutable +@JsonDeserialize(as = ImmutableTxError.class) +@JsonSerialize(as = ImmutableTxError.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public abstract class TxError { + + @JsonProperty("OpIndex") + public abstract Optional opIndex(); + + @JsonProperty("What") + public abstract Optional what(); +} diff --git a/src/main/java/com/orbitz/consul/model/kv/TxResponse.java b/src/main/java/com/orbitz/consul/model/kv/TxResponse.java new file mode 100644 index 00000000..9ec46eb9 --- /dev/null +++ b/src/main/java/com/orbitz/consul/model/kv/TxResponse.java @@ -0,0 +1,22 @@ +package com.orbitz.consul.model.kv; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.util.List; +import java.util.Map; + +@org.immutables.value.Value.Immutable +@JsonDeserialize(as = ImmutableTxResponse.class) +@JsonSerialize(as = ImmutableTxResponse.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public abstract class TxResponse { + + @JsonProperty("Results") + public abstract List> results(); + + @JsonProperty("Errors") + public abstract List errors(); +} diff --git a/src/main/java/com/orbitz/consul/model/kv/Verb.java b/src/main/java/com/orbitz/consul/model/kv/Verb.java new file mode 100644 index 00000000..fb7df024 --- /dev/null +++ b/src/main/java/com/orbitz/consul/model/kv/Verb.java @@ -0,0 +1,18 @@ +package com.orbitz.consul.model.kv; + +public enum Verb { + + SET("set"), CHECK_AND_SET("cas"), LOCK("lock"), UNLOCK("unlock"), GET("get"), + GET_TREE("get-tree"), CHECK_INDEX("check-index"), CHECK_SESSION("check-session"), + DELETE("delete"), DELETE_TREE("delete-tree"), DELETE_CHECK_AND_SET("delete-cas"); + + private String value; + + Verb(String value) { + this.value = value; + } + + public String toValue() { + return value; + } +} diff --git a/src/main/java/com/orbitz/consul/option/ConsistencyMode.java b/src/main/java/com/orbitz/consul/option/ConsistencyMode.java index 94945796..cc371f17 100644 --- a/src/main/java/com/orbitz/consul/option/ConsistencyMode.java +++ b/src/main/java/com/orbitz/consul/option/ConsistencyMode.java @@ -1,5 +1,17 @@ package com.orbitz.consul.option; +import com.google.common.base.Optional; + public enum ConsistencyMode { - DEFAULT, STALE, CONSISTENT + DEFAULT(null), STALE("stale"), CONSISTENT("consistent"); + + private String param; + + ConsistencyMode(String param) { + this.param = param; + } + + public Optional toParam() { + return Optional.fromNullable(param); + } } diff --git a/src/main/java/com/orbitz/consul/util/Base64EncodingSerializer.java b/src/main/java/com/orbitz/consul/util/Base64EncodingSerializer.java new file mode 100644 index 00000000..711f27d2 --- /dev/null +++ b/src/main/java/com/orbitz/consul/util/Base64EncodingSerializer.java @@ -0,0 +1,22 @@ +package com.orbitz.consul.util; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.google.common.base.Optional; +import com.google.common.io.BaseEncoding; + +import java.io.IOException; + +public class Base64EncodingSerializer extends JsonSerializer> { + + @Override + public void serialize(Optional string, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + if (string.isPresent()) { + jsonGenerator.writeString(BaseEncoding.base64().encode(string.get().getBytes())); + } else { + jsonGenerator.writeNull(); + } + } +} diff --git a/src/test/java/com/orbitz/consul/KeyValueTests.java b/src/test/java/com/orbitz/consul/KeyValueTests.java index 1d7aa9c3..dd9f02de 100644 --- a/src/test/java/com/orbitz/consul/KeyValueTests.java +++ b/src/test/java/com/orbitz/consul/KeyValueTests.java @@ -3,12 +3,18 @@ import com.google.common.base.Optional; import com.orbitz.consul.async.ConsulResponseCallback; import com.orbitz.consul.model.ConsulResponse; +import com.orbitz.consul.model.kv.ImmutableOperation; +import com.orbitz.consul.model.kv.Operation; +import com.orbitz.consul.model.kv.TxResponse; import com.orbitz.consul.model.kv.Value; import com.orbitz.consul.model.session.ImmutableSession; import com.orbitz.consul.model.session.SessionCreatedResponse; import com.orbitz.consul.option.ImmutableDeleteOptions; import com.orbitz.consul.option.ImmutableDeleteOptions.Builder; import com.orbitz.consul.option.QueryOptions; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Ignore; import org.junit.Test; import java.net.UnknownHostException; @@ -316,4 +322,20 @@ public void onFailure(Throwable throwable) { keyValueClient.deleteKey(key); assertFalse(success.get()); } + + @Test + @Ignore + public void testBasicTxn() throws Exception { + KeyValueClient keyValueClient = client.keyValueClient(); + String key = UUID.randomUUID().toString(); + String value = Base64.encodeBase64String(RandomStringUtils.random(20).getBytes()); + Operation[] operation = new Operation[] {ImmutableOperation.builder().verb("set") + .key(key) + .value(value).build()}; + + ConsulResponse response = keyValueClient.performTransaction(operation); + + assertEquals(value, keyValueClient.getValueAsString(key).get()); + assertEquals(response.getIndex(), keyValueClient.getValue(key).get().getModifyIndex()); + } }