diff --git a/sdk/digitaltwins/azure-digitaltwins-core/src/main/java/com/azure/digitaltwins/core/DigitalTwinsAsyncClient.java b/sdk/digitaltwins/azure-digitaltwins-core/src/main/java/com/azure/digitaltwins/core/DigitalTwinsAsyncClient.java index 684127700b1c7..b5b64768332eb 100644 --- a/sdk/digitaltwins/azure-digitaltwins-core/src/main/java/com/azure/digitaltwins/core/DigitalTwinsAsyncClient.java +++ b/sdk/digitaltwins/azure-digitaltwins-core/src/main/java/com/azure/digitaltwins/core/DigitalTwinsAsyncClient.java @@ -788,7 +788,7 @@ Mono> createModelsSinglePageAsync(List models, objectPagedResponse.getHeaders(), convertedList, null, - ((PagedResponseBase) objectPagedResponse).getDeserializedHeaders()); + ((ResponseBase) objectPagedResponse).getDeserializedHeaders()); } ); } @@ -909,7 +909,7 @@ Mono> listModelsNextSinglePageAsync(String nextLink, Co @ServiceMethod(returns = ReturnType.SINGLE) public Mono deleteModel(String modelId) { return deleteModelWithResponse(modelId) - .map(Response::getValue); + .flatMap(voidResponse -> Mono.empty()); } /** @@ -939,7 +939,7 @@ PagedFlux listRelationships(String digitalTwinId, String relationshipNam */ public Mono decommissionModel(String modelId) { return decommissionModelWithResponse(modelId) - .map(Response::getValue); + .flatMap(voidResponse -> Mono.empty()); } /** @@ -1047,7 +1047,7 @@ Mono> getComponentWithResponse(String digitalTwinId, @ServiceMethod(returns = ReturnType.SINGLE) public Mono updateComponent(String digitalTwinId, String componentPath, List componentUpdateOperations) { return updateComponentWithResponse(digitalTwinId, componentPath, componentUpdateOperations, new UpdateComponentRequestOptions()) - .map(DigitalTwinsResponse::getValue); + .flatMap(voidResponse -> Mono.empty()); } /** diff --git a/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/DigitalTwinsLifecycleAsyncSample.java b/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/DigitalTwinsLifecycleAsyncSample.java index fffe45c62005f..4b52b106f6e73 100644 --- a/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/DigitalTwinsLifecycleAsyncSample.java +++ b/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/DigitalTwinsLifecycleAsyncSample.java @@ -8,6 +8,9 @@ import com.azure.digitaltwins.core.implementation.models.ErrorResponseException; import com.azure.digitaltwins.core.implementation.serialization.BasicRelationship; import com.azure.identity.ClientSecretCredentialBuilder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpStatus; import java.io.IOException; @@ -15,10 +18,19 @@ import java.net.URL; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static com.azure.digitaltwins.core.SamplesConstants.*; +import static com.azure.digitaltwins.core.SamplesUtil.IgnoreConflictError; +import static com.azure.digitaltwins.core.SamplesUtil.IgnoreNotFoundError; +import static java.util.Arrays.asList; + /** * This sample creates all the models in \DTDL\Models folder in the ADT service instance and creates the corresponding twins in \DTDL\DigitalTwins folder. * The Diagram for the Hospital model looks like this: @@ -49,6 +61,7 @@ public class DigitalTwinsLifecycleAsyncSample { private static final String endpoint = System.getenv("DIGITAL_TWINS_ENDPOINT"); private static final int MaxWaitTimeAsyncOperationsInSeconds = 10; + private static final ObjectMapper mapper = new ObjectMapper(); private static final URL DtdlDirectoryUrl = DigitalTwinsLifecycleAsyncSample.class.getClassLoader().getResource("DTDL"); private static final Path DtDlDirectoryPath; @@ -88,108 +101,231 @@ public static void main(String[] args) throws IOException, InterruptedException // Ensure existing twins with the same name are deleted first deleteTwins(); + // Delete existing models + deleteAllModels(); + + // Create all the models + createAllModels(); + + // Get all models + listAllModels(); + // Create twin counterparts for all the models - createTwins(); + createAllTwins(); + + // TODO: Get all twins + // queryTwins(); + + // Create all the relationships + connectTwinsTogether(); + + // TODO: Creating event route + // createEventRoute(); + + // TODO: Get all event routes + // listEventRoutes(); + + // TODO: Deleting event route + // deleteEventRoute(); } /** * Delete a twin, and any relationships it might have. * @throws IOException If an I/O error is thrown when accessing the starting file. - * @throws InterruptedException If the current thread is interrupted while waiting to acquire permits on a semaphore. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. */ public static void deleteTwins() throws IOException, InterruptedException { System.out.println("DELETE DIGITAL TWINS"); Map twins = FileHelper.loadAllFilesInPath(TwinsPath); - final Semaphore deleteTwinsSemaphore = new Semaphore(0); - final Semaphore deleteRelationshipsSemaphore = new Semaphore(0); // Call APIs to clean up any pre-existing resources that might be referenced by this sample. If digital twin does not exist, ignore. - twins - .forEach((twinId, twinContent) -> { - // Call APIs to delete all relationships. - client.listRelationships(twinId, BasicRelationship.class) - .doOnComplete(deleteRelationshipsSemaphore::release) - .doOnError(throwable -> { - if (throwable instanceof ErrorResponseException && ((ErrorResponseException) throwable).getResponse().getStatusCode() == HttpStatus.SC_NOT_FOUND) { - deleteRelationshipsSemaphore.release(); - } else { - System.err.println("List relationships error: " + throwable); - } - }) - .subscribe( - relationship -> client.deleteRelationship(twinId, relationship.getId()) - .subscribe( - aVoid -> System.out.println("Found and deleted relationship: " + relationship.getId()), - throwable -> System.err.println("Delete relationship error: " + throwable) - )); - - // Call APIs to delete any incoming relationships. - client.listIncomingRelationships(twinId) - .doOnComplete(deleteRelationshipsSemaphore::release) - .doOnError(throwable -> { - if (throwable instanceof ErrorResponseException && ((ErrorResponseException) throwable).getResponse().getStatusCode() == HttpStatus.SC_NOT_FOUND) { - deleteRelationshipsSemaphore.release(); - } else { - System.err.println("List incoming relationships error: " + throwable); - } - }) - .subscribe( - incomingRelationship -> client.deleteRelationship(incomingRelationship.getSourceId(), incomingRelationship.getRelationshipId()) - .subscribe( - aVoid -> System.out.println("Found and deleted incoming relationship: " + incomingRelationship.getRelationshipId()), - throwable -> System.err.println("Delete incoming relationship error: " + throwable) - )); + // Once the async API terminates (either successfully, or with an error), the latch count is decremented, or the semaphore is released. + for (Map.Entry twin : twins.entrySet()) { + String twinId = twin.getKey(); + + // This list contains all the relationships that existing between the twins referenced by this sample. + List relationshipList = Collections.synchronizedList(new ArrayList<>()); + + // These semaphores indicate when the relationship list and relationship delete operations have completed. + // We cannot use a latch here since we do not know the no. of relationships that will be deleted (so we do cannot set the latch initial count). + Semaphore listRelationshipSemaphore = new Semaphore(0); + Semaphore deleteRelationshipsSemaphore = new Semaphore(0); + + // This latch is to ensure that we wait for the delete twin operation to complete, before proceeding. + CountDownLatch deleteTwinsLatch = new CountDownLatch(1); + + // Call APIs to retrieve all relationships. + client.listRelationships(twinId, BasicRelationship.class) + .doOnNext(relationshipList::add) + .doOnError(IgnoreNotFoundError) + .doOnTerminate(listRelationshipSemaphore::release) + .subscribe(); + + // Call APIs to retrieve all incoming relationships. + client.listIncomingRelationships(twinId) + .doOnNext(e -> relationshipList.add(mapper.convertValue(e, BasicRelationship.class))) + .doOnError(IgnoreNotFoundError) + .doOnTerminate(listRelationshipSemaphore::release) + .subscribe(); + + // Call APIs to delete all relationships. + if (listRelationshipSemaphore.tryAcquire(2, MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS)) { + relationshipList + .forEach(relationship -> client.deleteRelationship(relationship.getSourceId(), relationship.getId()) + .doOnSuccess(aVoid -> { + if (twinId.equals(relationship.getSourceId())) { + System.out.println("Found and deleted relationship: " + relationship.getId()); + } else { + System.out.println("Found and deleted incoming relationship: " + relationship.getId()); + } + }) + .doOnError(IgnoreNotFoundError) + .doOnTerminate(deleteRelationshipsSemaphore::release) + .subscribe()); + } + + // Verify that the relationships have been deleted. + if (deleteRelationshipsSemaphore.tryAcquire(relationshipList.size(), MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS)) { + // Now the digital twin should be safe to delete + + // Call APIs to delete the twins. + client.deleteDigitalTwin(twinId) + .doOnSuccess(aVoid -> System.out.println("Deleted digital twin: " + twinId)) + .doOnError(IgnoreNotFoundError) + .doOnTerminate(deleteTwinsLatch::countDown) + .subscribe(); + + // Wait until the latch count reaches zero, signifying that the async calls have completed successfully. + deleteTwinsLatch.await(MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); + } + } + } + + /** + * Delete models created by FullLifecycleSample for the ADT service instance. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. + */ + public static void deleteAllModels() throws InterruptedException { + System.out.println("DELETING MODELS"); + // This is to ensure models are deleted in an order such that no other models are referencing it. + List models = asList(RoomModelId, WifiModelId, BuildingModelId, FloorModelId, HvacModelId); + + // Call APIs to delete the models. + // Note that we are blocking the async API call. This is to ensure models are deleted in an order such that no other models are referencing it. + models + .forEach(modelId -> { try { - // Verify that the list relationships and list incoming relationships async operations have completed. - if (deleteRelationshipsSemaphore.tryAcquire(2, MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS)) { - // Now the digital twin should be safe to delete - - // Call APIs to delete the twins. - client.deleteDigitalTwin(twinId) - .doOnSuccess(aVoid -> { - System.out.println("Deleted digital twin: " + twinId); - deleteTwinsSemaphore.release(); - }) - .doOnError(throwable -> { - if (throwable instanceof ErrorResponseException && ((ErrorResponseException) throwable).getResponse().getStatusCode() == HttpStatus.SC_NOT_FOUND) { - deleteTwinsSemaphore.release(); - } else { - System.err.println("Could not delete digital twin " + twinId + " due to " + throwable); - } - }) - .subscribe(); + client.deleteModel(modelId).block(); + System.out.println("Deleted model: " + modelId); + } catch (ErrorResponseException ex) { + if (ex.getResponse().getStatusCode() != HttpStatus.SC_NOT_FOUND) { + System.err.println("Could not delete model " + modelId + " due to " + ex); } - } catch (InterruptedException e) { - throw new RuntimeException("Could not cleanup the pre-existing resources: ", e); } }); + } + + /** + * Loads all the models found in the Models directory into memory and uses CreateModelsAsync API to create all the models in the ADT service instance. + * @throws IOException If an I/O error is thrown when accessing the starting file. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. + */ + public static void createAllModels() throws IOException, InterruptedException { + System.out.println("CREATING MODELS"); + List modelsToCreate = new ArrayList<>(FileHelper.loadAllFilesInPath(ModelsPath).values()); + final CountDownLatch createModelsLatch = new CountDownLatch(1); - // Verify that a semaphore has been released for each delete async operation, signifying that the async call has completed successfully.. - boolean created = deleteTwinsSemaphore.tryAcquire(twins.size(), MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); - System.out.println("Twins deleted: " + created); + // Call API to create the models. For each async operation, once the operation is completed successfully, a latch is counted down. + client.createModels(modelsToCreate) + .doOnNext(modelData -> System.out.println("Created model: " + modelData.getId())) + .doOnError(IgnoreConflictError) + .doOnTerminate(createModelsLatch::countDown) + .subscribe(); + + // Wait until the latch count reaches zero, signifying that the async calls have completed successfully. + createModelsLatch.await(MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); + } + + /** + * Gets all the models within the ADT service instance. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. + */ + public static void listAllModels() throws InterruptedException { + System.out.println("LISTING MODELS"); + final CountDownLatch listModelsLatch = new CountDownLatch(1); + + // Call API to list the models. For each async operation, once the operation is completed successfully, a latch is counted down. + client.listModels() + .doOnNext(modelData -> System.out.println("Retrieved model: " + modelData.getId() + ", display name '" + modelData.getDisplayName().get("en") + "'," + + " upload time '" + modelData.getUploadTime() + "' and decommissioned '" + modelData.isDecommissioned() + "'")) + .doOnError(throwable -> System.err.println("List models error: " + throwable)) + .doOnTerminate(listModelsLatch::countDown) + .subscribe(); + + // Wait until the latch count reaches zero, signifying that the async calls have completed successfully. + listModelsLatch.await(MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); } /** * Creates all twins specified in the DTDL->DigitalTwins directory. * @throws IOException If an I/O error is thrown when accessing the starting file. - * @throws InterruptedException If the current thread is interrupted while waiting to acquire permits on a semaphore. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. */ - public static void createTwins() throws IOException, InterruptedException { + public static void createAllTwins() throws IOException, InterruptedException { System.out.println("CREATE DIGITAL TWINS"); Map twins = FileHelper.loadAllFilesInPath(TwinsPath); - final Semaphore createTwinsSemaphore = new Semaphore(0); + final CountDownLatch createTwinsLatch = new CountDownLatch(twins.size()); - // Call APIs to create the twins. For each async operation, once the operation is completed successfully, a semaphore is released. + // Call APIs to create the twins. For each async operation, once the operation is completed successfully, a latch is counted down. twins - .forEach((twinId, twinContent) -> client.createDigitalTwinWithResponse(twinId, twinContent) + .forEach((twinId, twinContent) -> client.createDigitalTwin(twinId, twinContent) .subscribe( - response -> System.out.println("Created digital twin: " + twinId + "\n\t Body: " + response.getValue()), + twin -> System.out.println("Created digital twin: " + twinId + "\n\t Body: " + twin), throwable -> System.err.println("Could not create digital twin " + twinId + " due to " + throwable), - createTwinsSemaphore::release)); + createTwinsLatch::countDown)); + + // Wait until the latch count reaches zero, signifying that the async calls have completed successfully. + createTwinsLatch.await(MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); + } + + /** + * Creates all the relationships defined in the DTDL->Relationships directory + * @throws IOException If an I/O error is thrown when accessing the starting file. + * @throws InterruptedException If the current thread is interrupted while waiting to acquire latch. + */ + public static void connectTwinsTogether() throws IOException, InterruptedException { + System.out.println("CONNECT DIGITAL TWINS"); + Map allRelationships = FileHelper.loadAllFilesInPath(RelationshipsPath); + final CountDownLatch connectTwinsLatch = new CountDownLatch(4); + + // For each relationship array we deserialize it first. + // We deserialize as BasicRelationship to get the entire custom relationship (custom relationship properties). + allRelationships.values().forEach( + relationshipContent -> { + try { + List relationships = mapper.readValue(relationshipContent, new TypeReference>() { }); + + // From loaded relationships, get the source Id and Id from each one, and create it with full relationship payload. + relationships + .forEach(relationship -> { + try { + client.createRelationship(relationship.getSourceId(), relationship.getId(), mapper.writeValueAsString(relationship)) + .doOnSuccess(s -> System.out.println("Linked twin " + relationship.getSourceId() + " to twin " + relationship.getTargetId() + " as " + relationship.getName())) + .doOnError(IgnoreConflictError) + .doOnTerminate(connectTwinsLatch::countDown) + .subscribe(); + } catch (JsonProcessingException e) { + throw new RuntimeException("JsonProcessingException while serializing relationship string from BasicRelationship: ", e); + } + }); + } catch (JsonProcessingException e) { + throw new RuntimeException("JsonProcessingException while deserializing relationship string to BasicRelationship: ", e); + } + } + ); - // Verify that a semaphore has been released for each async operation, signifying that the async call has completed successfully. - boolean created = createTwinsSemaphore.tryAcquire(twins.size(), MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); - System.out.println("Twins created: " + created); + // Wait until the latch count reaches zero, signifying that the async calls have completed successfully. + connectTwinsLatch.await(MaxWaitTimeAsyncOperationsInSeconds, TimeUnit.SECONDS); } } diff --git a/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/SamplesUtil.java b/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/SamplesUtil.java new file mode 100644 index 0000000000000..6ca813c4e48b8 --- /dev/null +++ b/sdk/digitaltwins/azure-digitaltwins-core/src/samples/java/com/azure/digitaltwins/core/SamplesUtil.java @@ -0,0 +1,23 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.digitaltwins.core; + +import com.azure.digitaltwins.core.implementation.models.ErrorResponseException; +import org.apache.http.HttpStatus; + +import java.util.function.Consumer; + +public class SamplesUtil { + public static final Consumer IgnoreNotFoundError = throwable -> { + if (!(throwable instanceof ErrorResponseException) || ((ErrorResponseException) throwable).getResponse().getStatusCode() != HttpStatus.SC_NOT_FOUND) { + System.err.println("Error received: " + throwable); + } + }; + + public static final Consumer IgnoreConflictError = throwable -> { + if (!(throwable instanceof ErrorResponseException) || ((ErrorResponseException) throwable).getResponse().getStatusCode() != HttpStatus.SC_CONFLICT) { + System.err.println("Error received: " + throwable); + } + }; +}