Skip to content
This repository has been archived by the owner on Jul 27, 2023. It is now read-only.

Commit

Permalink
transaction support (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
rickfast committed Oct 30, 2016
1 parent ec8b90e commit 731d282
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 1 deletion.
74 changes: 74 additions & 0 deletions src/main/java/com/orbitz/consul/KeyValueClient.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.orbitz.consul;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.UnsignedLongs;
import com.orbitz.consul.async.ConsulResponseCallback;
import com.orbitz.consul.model.ConsulResponse;
import com.orbitz.consul.model.kv.Operation;
import com.orbitz.consul.model.kv.TxResponse;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.SessionInfo;
import com.orbitz.consul.option.DeleteOptions;
import com.orbitz.consul.option.ConsistencyMode;
import com.orbitz.consul.option.ImmutablePutOptions;
import com.orbitz.consul.option.PutOptions;
import com.orbitz.consul.option.QueryOptions;
import com.orbitz.consul.util.Jackson;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -360,6 +365,55 @@ public boolean releaseLock(final String key, final String sessionId) {
return putValue(key, "", 0, ImmutablePutOptions.builder().release(sessionId).build());
}

/**
* Performs a Consul transaction.
*
* PUT /v1/tx
*
* @param operations A list of KV operations.
* @return A {@link ConsulResponse} containing results and potential errors.
*/
public ConsulResponse<TxResponse> performTransaction(Operation... operations) {
return performTransaction(ConsistencyMode.DEFAULT, operations);
}

/**
* Performs a Consul transaction.
*
* PUT /v1/tx
*
* @param consistency The consistency to use for the transaction.
* @param operations A list of KV operations.
* @return A {@link ConsulResponse} containing results and potential errors.
*/
public ConsulResponse<TxResponse> performTransaction(ConsistencyMode consistency, Operation... operations) {
Map<String, String> query = consistency == ConsistencyMode.DEFAULT
? ImmutableMap.<String, String>of()
: ImmutableMap.of(consistency.toParam().get(), "true");

try {
return extractConsulResponse(api.performTransaction(RequestBody.create(MediaType.parse("application/json"),
Jackson.MAPPER.writeValueAsString(kv(operations))), query));
} catch (JsonProcessingException e) {
throw new ConsulException(e);
}
}

/**
* Wraps {@link Operation} in a <code>"KV": { }</code> block.
* @param operations An array of ops.
* @return An array of wrapped ops.
*/
static Kv[] kv(Operation... operations) {
Kv[] kvs = new Kv[operations.length];

for (int i = 0; i < operations.length; i ++) {
kvs[i] = new Kv(operations[i]);
}

return kvs;
}

/**
* Retrofit API interface.
*/
Expand All @@ -384,5 +438,25 @@ Call<Boolean> putValue(@Path("key") String key,
@DELETE("kv/{key}")
Call<Void> deleteValues(@Path("key") String key,
@QueryMap Map<String, Object> query);

@PUT("txn")
@Headers("Content-Type: application/json")
Call<TxResponse> performTransaction(@Body RequestBody body,
@QueryMap Map<String, String> query);
}

/**
* Wrapper for Transaction KV entry.
*/
static class Kv {
private Operation kv;

private Kv(Operation operation) {
kv = operation;
}

public Operation getKv() {
return kv;
}
}
}
43 changes: 43 additions & 0 deletions src/main/java/com/orbitz/consul/model/kv/Operation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.orbitz.consul.model.kv;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Optional;
import com.orbitz.consul.util.Base64EncodingDeserializer;
import com.orbitz.consul.util.Base64EncodingSerializer;
import org.immutables.value.Value;

import java.math.BigInteger;

@Value.Immutable
@JsonDeserialize(as = ImmutableOperation.class)
@JsonSerialize(as = ImmutableOperation.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class Operation {

@JsonProperty("Verb")
public abstract String verb();

@JsonProperty("Key")
public abstract Optional<String> key();

@JsonProperty("Value")
@JsonSerialize(using = Base64EncodingSerializer.class)
@JsonDeserialize(using = Base64EncodingDeserializer.class)
public abstract Optional<String> value();

@JsonProperty("Flags")
public abstract Optional<Long> flags();

@JsonProperty("Index")
public abstract Optional<BigInteger> index();

@JsonProperty("Session")
public abstract Optional<String> session();

public static ImmutableOperation.Builder builder(Verb verb) {
return ImmutableOperation.builder().verb(verb.toValue());
}
}
22 changes: 22 additions & 0 deletions src/main/java/com/orbitz/consul/model/kv/TxError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.orbitz.consul.model.kv;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Optional;

import java.math.BigInteger;

@org.immutables.value.Value.Immutable
@JsonDeserialize(as = ImmutableTxError.class)
@JsonSerialize(as = ImmutableTxError.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class TxError {

@JsonProperty("OpIndex")
public abstract Optional<BigInteger> opIndex();

@JsonProperty("What")
public abstract Optional<String> what();
}
22 changes: 22 additions & 0 deletions src/main/java/com/orbitz/consul/model/kv/TxResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.orbitz.consul.model.kv;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.util.List;
import java.util.Map;

@org.immutables.value.Value.Immutable
@JsonDeserialize(as = ImmutableTxResponse.class)
@JsonSerialize(as = ImmutableTxResponse.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class TxResponse {

@JsonProperty("Results")
public abstract List<Map<String, Value>> results();

@JsonProperty("Errors")
public abstract List<TxError> errors();
}
18 changes: 18 additions & 0 deletions src/main/java/com/orbitz/consul/model/kv/Verb.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.orbitz.consul.model.kv;

public enum Verb {

SET("set"), CHECK_AND_SET("cas"), LOCK("lock"), UNLOCK("unlock"), GET("get"),
GET_TREE("get-tree"), CHECK_INDEX("check-index"), CHECK_SESSION("check-session"),
DELETE("delete"), DELETE_TREE("delete-tree"), DELETE_CHECK_AND_SET("delete-cas");

private String value;

Verb(String value) {
this.value = value;
}

public String toValue() {
return value;
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/orbitz/consul/option/ConsistencyMode.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
package com.orbitz.consul.option;

import com.google.common.base.Optional;

public enum ConsistencyMode {
DEFAULT, STALE, CONSISTENT
DEFAULT(null), STALE("stale"), CONSISTENT("consistent");

private String param;

ConsistencyMode(String param) {
this.param = param;
}

public Optional<String> toParam() {
return Optional.fromNullable(param);
}
}
22 changes: 22 additions & 0 deletions src/main/java/com/orbitz/consul/util/Base64EncodingSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.orbitz.consul.util;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.common.base.Optional;
import com.google.common.io.BaseEncoding;

import java.io.IOException;

public class Base64EncodingSerializer extends JsonSerializer<Optional<String>> {

@Override
public void serialize(Optional<String> string, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
if (string.isPresent()) {
jsonGenerator.writeString(BaseEncoding.base64().encode(string.get().getBytes()));
} else {
jsonGenerator.writeNull();
}
}
}
22 changes: 22 additions & 0 deletions src/test/java/com/orbitz/consul/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
import com.google.common.base.Optional;
import com.orbitz.consul.async.ConsulResponseCallback;
import com.orbitz.consul.model.ConsulResponse;
import com.orbitz.consul.model.kv.ImmutableOperation;
import com.orbitz.consul.model.kv.Operation;
import com.orbitz.consul.model.kv.TxResponse;
import com.orbitz.consul.model.kv.Value;
import com.orbitz.consul.model.session.ImmutableSession;
import com.orbitz.consul.model.session.SessionCreatedResponse;
import com.orbitz.consul.option.ImmutableDeleteOptions;
import com.orbitz.consul.option.ImmutableDeleteOptions.Builder;
import com.orbitz.consul.option.QueryOptions;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Ignore;
import org.junit.Test;

import java.net.UnknownHostException;
Expand Down Expand Up @@ -316,4 +322,20 @@ public void onFailure(Throwable throwable) {
keyValueClient.deleteKey(key);
assertFalse(success.get());
}

@Test
@Ignore
public void testBasicTxn() throws Exception {
KeyValueClient keyValueClient = client.keyValueClient();
String key = UUID.randomUUID().toString();
String value = Base64.encodeBase64String(RandomStringUtils.random(20).getBytes());
Operation[] operation = new Operation[] {ImmutableOperation.builder().verb("set")
.key(key)
.value(value).build()};

ConsulResponse<TxResponse> response = keyValueClient.performTransaction(operation);

assertEquals(value, keyValueClient.getValueAsString(key).get());
assertEquals(response.getIndex(), keyValueClient.getValue(key).get().getModifyIndex());
}
}

0 comments on commit 731d282

Please sign in to comment.