Skip to content

Commit

Permalink
Merge pull request #1264 from ConsenSys/concurrent-legacy-pg-persists
Browse files Browse the repository at this point in the history
Concurrent legacy pg persists
  • Loading branch information
Krish1979 authored Apr 14, 2021
2 parents 42ce7ac + 2818fc1 commit 8f07952
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,35 @@ public Config call() throws Exception {

try(OutputStream outputStream = new TeeOutputStream(Files.newOutputStream(outputFile),System.out)) {
JaxbUtil.marshal(config, outputStream);
}

net.consensys.orion.config.Config orionConfig = orionKeyHelper.getOrionConfig();
//TODO: add any other orion config validations
Objects.requireNonNull(orionConfig.storage(),"Storage config is required. Not found in toml or env");
net.consensys.orion.config.Config orionConfig = orionKeyHelper.getOrionConfig();
//TODO: add any other orion config validations
Objects.requireNonNull(orionConfig.storage(), "Storage config is required. Not found in toml or env");

InboundDbHelper inboundDbHelper = InboundDbHelper.from(orionConfig);
InboundDbHelper inboundDbHelper = InboundDbHelper.from(orionConfig);

MigrationInfo migrationInfo = MigrationInfoFactory.create(inboundDbHelper);
System.out.println("Found "+ migrationInfo + " to migrate.");
MigrationInfo migrationInfo = MigrationInfoFactory.create(inboundDbHelper);
System.out.println("Found " + migrationInfo + " to migrate.");

if(migrationInfo.getRowCount() == 0) {
throw new IllegalStateException(String.format("No data found for %s. Check orion storage config string and/or storage env",inboundDbHelper.getStorageInfo()));
}
if (migrationInfo.getRowCount() == 0) {
throw new IllegalStateException(String.format("No data found for %s. Check orion storage config string and/or storage env", inboundDbHelper.getStorageInfo()));
}

MigrateDataCommand migrateDataCommand =
MigrateDataCommand migrateDataCommand =
new MigrateDataCommand(inboundDbHelper, tesseraJdbcOptions, orionKeyHelper);

Map<PayloadType,Long> outcome = migrateDataCommand.call();

System.out.println("=== Migration report ===");
System.out.printf("Migrated %s of %s transactions",outcome.get(PayloadType.ENCRYPTED_PAYLOAD),migrationInfo.getTransactionCount());
System.out.println();
System.out.printf("Migrated %s of %s privacy groups",outcome.get(PayloadType.PRIVACY_GROUP_PAYLOAD),migrationInfo.getPrivacyGroupCount());
System.out.println();
System.out.printf("Tessera config file %s",outputFile);
System.out.println();
assert outcome.get(PayloadType.ENCRYPTED_PAYLOAD) == migrationInfo.getTransactionCount();
assert outcome.get(PayloadType.PRIVACY_GROUP_PAYLOAD) == migrationInfo.getPrivacyGroupCount();
Map<PayloadType, Long> outcome = migrateDataCommand.call();

System.out.println("=== Migration report ===");
System.out.printf("Migrated %s of %s transactions", outcome.get(PayloadType.ENCRYPTED_PAYLOAD), migrationInfo.getTransactionCount());
System.out.println();
System.out.printf("Migrated %s of %s privacy groups", outcome.get(PayloadType.PRIVACY_GROUP_PAYLOAD), migrationInfo.getPrivacyGroupCount());
System.out.println();
System.out.printf("Tessera config file %s", outputFile);
System.out.println();
assert outcome.get(PayloadType.ENCRYPTED_PAYLOAD) == migrationInfo.getTransactionCount();
assert outcome.get(PayloadType.PRIVACY_GROUP_PAYLOAD) == migrationInfo.getPrivacyGroupCount();
}

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,10 @@ public PrivacyGroup createLegacyPrivacyGroup(PublicKey from, List<PublicKey> rec
.withState(PrivacyGroup.State.ACTIVE)
.build();

if (privacyGroupDAO.retrieve(groupIdBytes).isPresent()) {
return created;
}

final byte[] lookupId = privacyGroupUtil.generateLookupId(members);
final byte[] encodedData = privacyGroupUtil.encode(created);

privacyGroupDAO.save(new PrivacyGroupEntity(groupIdBytes, lookupId, encodedData));
privacyGroupDAO.retrieveOrSave(new PrivacyGroupEntity(groupIdBytes, lookupId, encodedData));

return created;
}
Expand Down Expand Up @@ -144,18 +140,20 @@ public void storePrivacyGroup(byte[] encodedData) {
final PrivacyGroup privacyGroup = privacyGroupUtil.decode(encodedData);

if (privacyGroup.getState() == PrivacyGroup.State.DELETED) {
privacyGroupDAO.retrieve(privacyGroup.getId().getBytes()).ifPresent(et -> {
et.setData(encodedData);
privacyGroupDAO.update(et);
});
privacyGroupDAO
.retrieve(privacyGroup.getId().getBytes())
.ifPresent(
et -> {
et.setData(encodedData);
privacyGroupDAO.update(et);
});
return;
}
final byte[] id = privacyGroup.getId().getBytes();
final byte[] lookupId = privacyGroupUtil.generateLookupId(privacyGroup.getMembers());
final PrivacyGroupEntity newEntity = new PrivacyGroupEntity(id, lookupId, encodedData);

privacyGroupDAO.save(newEntity);

}

@Override
Expand All @@ -172,8 +170,7 @@ public PrivacyGroup deletePrivacyGroup(PublicKey from, PrivacyGroup.Id privacyGr

final byte[] updatedData = privacyGroupUtil.encode(updated);
final byte[] lookupId = privacyGroupUtil.generateLookupId(updated.getMembers());
final PrivacyGroupEntity updatedEt =
new PrivacyGroupEntity(updated.getId().getBytes(), lookupId, updatedData);
final PrivacyGroupEntity updatedEt = new PrivacyGroupEntity(updated.getId().getBytes(), lookupId, updatedData);

final Set<PublicKey> localKeys = enclave.getPublicKeys();
final List<PublicKey> forwardingMembers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ public void testCreatePrivacyGroup() {
final List<PublicKey> members = List.of(localKey, recipient1, recipient2);

doAnswer(
invocation -> {
Callable callable = invocation.getArgument(1);
callable.call();
return mock(PrivacyGroupEntity.class);
})
.when(privacyGroupDAO)
.save(any(), any());
invocation -> {
Callable callable = invocation.getArgument(1);
callable.call();
return mock(PrivacyGroupEntity.class);
})
.when(privacyGroupDAO)
.save(any(), any());

final PrivacyGroup privacyGroup =
privacyGroupManager.createPrivacyGroup("name", "description", localKey, members, new byte[1]);
privacyGroupManager.createPrivacyGroup("name", "description", localKey, members, new byte[1]);

// Verify entity being saved has the correct values
ArgumentCaptor<PrivacyGroupEntity> argCaptor = ArgumentCaptor.forClass(PrivacyGroupEntity.class);
Expand Down Expand Up @@ -115,10 +115,10 @@ public void testCreateFromKeyNotValid() {
final List<PublicKey> members = List.of(mock(PublicKey.class), mock(PublicKey.class));

assertThatThrownBy(
() ->
privacyGroupManager.createPrivacyGroup(
"name", "description", localKey, members, new byte[1]))
.isInstanceOf(PrivacyViolationException.class);
() ->
privacyGroupManager.createPrivacyGroup(
"name", "description", localKey, members, new byte[1]))
.isInstanceOf(PrivacyViolationException.class);
}

@Test
Expand All @@ -134,7 +134,7 @@ public void testCreateLegacyPrivacyGroup() {

// Verify entity being saved has the correct values
ArgumentCaptor<PrivacyGroupEntity> argCaptor = ArgumentCaptor.forClass(PrivacyGroupEntity.class);
verify(privacyGroupDAO).save(argCaptor.capture());
verify(privacyGroupDAO).retrieveOrSave(argCaptor.capture());
PrivacyGroupEntity savedEntity = argCaptor.getValue();
assertThat(savedEntity).isNotNull();
assertThat(savedEntity.getId()).isEqualTo("generatedId".getBytes());
Expand All @@ -146,12 +146,12 @@ public void testCreateLegacyPrivacyGroup() {
assertThat(privacyGroup.getId().getBytes()).isEqualTo("generatedId".getBytes());
assertThat(privacyGroup.getName()).isEqualTo("legacy");
assertThat(privacyGroup.getDescription())
.isEqualTo("Privacy groups to support the creation of groups by privateFor and privateFrom");
.isEqualTo("Privacy groups to support the creation of groups by privateFor and privateFrom");
assertThat(privacyGroup.getMembers()).containsAll(members).contains(localKey);
assertThat(privacyGroup.getType()).isEqualTo(PrivacyGroup.Type.LEGACY);
assertThat(privacyGroup.getState()).isEqualTo(PrivacyGroup.State.ACTIVE);

verify(privacyGroupDAO).retrieve("generatedId".getBytes());
verify(privacyGroupDAO).retrieveOrSave(any());
}

@Test
Expand All @@ -161,13 +161,13 @@ public void testLegacyPrivacyGroupExisted() {
when(privacyGroupUtil.generateId(anyList())).thenReturn("generatedId".getBytes());

when(privacyGroupDAO.retrieve("generatedId".getBytes()))
.thenReturn(Optional.of(mock(PrivacyGroupEntity.class)));
.thenReturn(Optional.of(mock(PrivacyGroupEntity.class)));

final PrivacyGroup privacyGroup = privacyGroupManager.createLegacyPrivacyGroup(localKey, members);

assertThat(privacyGroup).isNotNull();

verify(privacyGroupDAO).retrieve("generatedId".getBytes());
verify(privacyGroupDAO).retrieveOrSave(any());
}

@Test
Expand Down Expand Up @@ -333,13 +333,13 @@ public void testDeletePrivacyGroup() {
when(privacyGroupUtil.generateLookupId(any())).thenReturn("lookup".getBytes());

doAnswer(
invocation -> {
Callable callable = invocation.getArgument(1);
callable.call();
return mock(PrivacyGroupEntity.class);
})
.when(privacyGroupDAO)
.update(any(), any());
invocation -> {
Callable callable = invocation.getArgument(1);
callable.call();
return mock(PrivacyGroupEntity.class);
})
.when(privacyGroupDAO)
.update(any(), any());

PrivacyGroup result = privacyGroupManager.deletePrivacyGroup(from, PrivacyGroup.Id.fromBytes("id".getBytes()));

Expand All @@ -355,7 +355,7 @@ public void testDeletePrivacyGroup() {
assertThat(payloadCaptor.getValue()).isEqualTo("deletedData".getBytes());

assertThat(recipientsCaptor.getValue())
.containsAll(List.of(PublicKey.from("r1".getBytes()), PublicKey.from("r2".getBytes())));
.containsAll(List.of(PublicKey.from("r1".getBytes()), PublicKey.from("r2".getBytes())));

ArgumentCaptor<PrivacyGroup> argCaptor = ArgumentCaptor.forClass(PrivacyGroup.class);
verify(privacyGroupUtil).encode(argCaptor.capture());
Expand All @@ -371,10 +371,10 @@ public void testDeletePrivacyGroupNotExist() {
when(privacyGroupUtil.encode(any())).thenReturn("deletedData".getBytes());

assertThatThrownBy(
() ->
privacyGroupManager.deletePrivacyGroup(
mock(PublicKey.class), PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyGroupNotFoundException.class);
() ->
privacyGroupManager.deletePrivacyGroup(
mock(PublicKey.class), PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyGroupNotFoundException.class);

verify(privacyGroupDAO).retrieve("id".getBytes());
}
Expand All @@ -396,8 +396,9 @@ public void testDeletePrivacyGroupFromKeyNotBelong() {

when(privacyGroupUtil.decode("data".getBytes())).thenReturn(mockPG);

assertThatThrownBy(() -> privacyGroupManager.deletePrivacyGroup(from, PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyViolationException.class);
assertThatThrownBy(
() -> privacyGroupManager.deletePrivacyGroup(from, PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyViolationException.class);

verify(privacyGroupDAO).retrieve("id".getBytes());
}
Expand All @@ -419,10 +420,10 @@ public void testDeleteDeletedPrivacyGroup() {
when(privacyGroupUtil.encode(any())).thenReturn("deletedData".getBytes());

assertThatThrownBy(
() ->
privacyGroupManager.deletePrivacyGroup(
mock(PublicKey.class), PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyGroupNotFoundException.class);
() ->
privacyGroupManager.deletePrivacyGroup(
mock(PublicKey.class), PrivacyGroup.Id.fromBytes("id".getBytes())))
.isInstanceOf(PrivacyGroupNotFoundException.class);

verify(privacyGroupDAO).retrieve("id".getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import javax.persistence.PersistenceException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

public class EntityManagerTemplate {

Expand Down Expand Up @@ -39,4 +42,28 @@ public <T> T execute(EntityManagerCallback<T> callback) {
entityManager.close();
}
}

public <T> T retrieveOrSave(Supplier<T> retriever, Supplier<T> factory) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();

return Optional.ofNullable(retriever.get())
.orElseGet(
() -> {
try {
transaction.begin();
T result = factory.get();
entityManager.persist(result);
transaction.commit();
return result;
} catch (PersistenceException ex) {
return Optional.ofNullable(retriever.get()).orElseThrow(() -> ex);
} catch (Throwable throwable) {
if (transaction.isActive()) transaction.rollback();
throw throwable;
} finally {
entityManager.close();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public interface PrivacyGroupDAO {
*/
Optional<PrivacyGroupEntity> retrieve(byte[] id);

/**
* Retrieve privacy group entity from database based on id If not already exists will persist entity to database
*
* @param entity
* @return persisted or retrieved entity
*/
PrivacyGroupEntity retrieveOrSave(PrivacyGroupEntity entity);

/**
* Retrieve matching privacy groups based on its lookup id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ public <T> PrivacyGroupEntity update(PrivacyGroupEntity entity, Callable<T> cons
public Optional<PrivacyGroupEntity> retrieve(byte[] id) {
return entityManagerTemplate.execute(
entityManager ->
entityManager
.createNamedQuery("PrivacyGroup.FindById", PrivacyGroupEntity.class)
.setParameter("id", id)
.getResultStream()
.findAny());
entityManager
.createNamedQuery("PrivacyGroup.FindById", PrivacyGroupEntity.class)
.setParameter("id", id)
.getResultStream()
.findAny());
}

@Override
public PrivacyGroupEntity retrieveOrSave(PrivacyGroupEntity entity) {
return entityManagerTemplate.retrieveOrSave(() -> retrieve(entity.getId()).orElse(null), () -> entity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void saveDoesntAllowNullEncyptedPayload() {
encryptedRawTransactionDAO.save(encryptedRawTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
String expectedMessage = String.format(testConfig.getRequiredFieldColumTemplate(), "ENCRYPTED_PAYLOAD");
String expectedMessage = String.format(testConfig.getRequiredFieldColumnTemplate(), "ENCRYPTED_PAYLOAD");
assertThat(ex).hasMessageContaining(expectedMessage).hasMessageContaining("ENCRYPTED_PAYLOAD");
}
}
Expand All @@ -86,7 +86,7 @@ public void saveDoesntAllowNullHash() {
encryptedRawTransactionDAO.save(encryptedRawTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
String expectedMessage = String.format(testConfig.getRequiredFieldColumTemplate(), "HASH");
String expectedMessage = String.format(testConfig.getRequiredFieldColumnTemplate(), "HASH");
assertThat(ex).hasMessageContaining(expectedMessage).hasMessageContaining("HASH");
}
}
Expand All @@ -104,7 +104,7 @@ public void saveDoesntAllowNullNonce() {
encryptedRawTransactionDAO.save(encryptedRawTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
String expectedMessage = String.format(testConfig.getRequiredFieldColumTemplate(), "NONCE");
String expectedMessage = String.format(testConfig.getRequiredFieldColumnTemplate(), "NONCE");
assertThat(ex).hasMessageContaining(expectedMessage).hasMessageContaining("NONCE");
}
}
Expand All @@ -122,7 +122,7 @@ public void saveDoesntAllowNullEncryptedKey() {
encryptedRawTransactionDAO.save(encryptedRawTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
String expectedMessage = String.format(testConfig.getRequiredFieldColumTemplate(), "ENCRYPTED_KEY");
String expectedMessage = String.format(testConfig.getRequiredFieldColumnTemplate(), "ENCRYPTED_KEY");
assertThat(ex).hasMessageContaining(expectedMessage).hasMessageContaining("ENCRYPTED_KEY");
}
}
Expand All @@ -140,7 +140,7 @@ public void saveDoesntAllowNullSender() {
encryptedRawTransactionDAO.save(encryptedRawTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
String expectedMessage = String.format(testConfig.getRequiredFieldColumTemplate(), "SENDER");
String expectedMessage = String.format(testConfig.getRequiredFieldColumnTemplate(), "SENDER");
assertThat(ex).hasMessageContaining(expectedMessage).hasMessageContaining("SENDER");
}
}
Expand All @@ -167,7 +167,7 @@ public void cannotPersistMultipleOfSameHash() {
encryptedRawTransactionDAO.save(duplicateTransaction);
failBecauseExceptionWasNotThrown(PersistenceException.class);
} catch (PersistenceException ex) {
assertThat(ex).hasMessageContaining(testConfig.getUniqueContraintViolationMessage());
assertThat(ex).hasMessageContaining(testConfig.getUniqueConstraintViolationMessage());
}
}

Expand Down
Loading

0 comments on commit 8f07952

Please sign in to comment.