Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mgmt: fix playback fail #15074

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ public void canDeleteZoneWithExplicitETag() throws Exception {

@Test
public void canCreateRecordSetsWithDefaultETag() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
final Region region = Region.US_EAST;
final String topLevelDomain = "www.contoso" + generateRandomResourceName("z", 10) + ".com";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ public void canDeleteZoneWithExplicitETag() {

@Test
public void canCreateRecordSetsWithDefaultETag() {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
final Region region = Region.US_EAST;
final String topLevelDomain = "www.contoso" + generateRandomResourceName("z", 10) + ".com";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,6 @@ public void canCRUDRedisCache() throws Exception {

@Test
public void canCRUDLinkedServers() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}

RedisCache rgg =
redisManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.resourcemanager.resources.fluentcore.model.Indexable;
import com.azure.resourcemanager.resources.fluentcore.utils.SdkContext;
import com.azure.resourcemanager.resources.fluentcore.utils.Utils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -399,7 +400,20 @@ private Flux<Indexable> invokeReadyTasksAsync(final InvocationContext context) {
}
readyTaskEntry = super.getNext();
}
return Flux.mergeDelayError(32, observables.toArray(new Flux[0]));
return Flux.mergeOrdered(32,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some difference on "DelayError" part as well. Might need to check a bit on processFaultedTaskAsync method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergeOrdered will automatically delay error to the last, due to it cannot compare item with error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the mergeOrdered test about it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If convenient, try write a few tests for it (positive cases and negative cases)?

(some, other) -> {
if (rootTaskEntry.key().endsWith(some.key())) { // rootTaskEntry will be proxy-<UUID>
if (rootTaskEntry.key().endsWith(other.key())) {
return 0;
}
return 1;
} else if (rootTaskEntry.key().endsWith(other.key())) {
return -1;
ChenTanyi marked this conversation as resolved.
Show resolved Hide resolved
} else {
return some.key().compareTo(other.key());
}
},
(Flux<Indexable>[]) observables.toArray(new Flux[0]));
}

/**
Expand Down Expand Up @@ -435,7 +449,8 @@ private Flux<Indexable> invokeTaskAsync(final TaskGroupEntry<TaskItem> entry, fi
} else {
taskObservable = entry.invokeTaskAsync(ignoreCachedResult, context);
}
return taskObservable.flatMapMany((indexable) -> Flux.just(indexable),
return Utils.flatMapSequential(taskObservable.flux(),
(indexable) -> Flux.just(indexable),
(throwable) -> processFaultedTaskAsync(entry, throwable, context),
() -> processCompletedTaskAsync(entry, context));
}
Expand Down Expand Up @@ -463,24 +478,23 @@ private Flux<Indexable> invokeAfterPostRunAsync(final TaskGroupEntry<TaskItem> e
}
final boolean isFaulted = entry.hasFaultedDescentDependencyTasks() || isGroupCancelled.get();

return proxyTaskItem.invokeAfterPostRunAsync(isFaulted)
.flatMapMany(indexable -> Flux.error(
new IllegalStateException("This onNext should never be called")),
(error) -> processFaultedTaskAsync(entry, error, context),
() -> {
if (isFaulted) {
if (entry.hasFaultedDescentDependencyTasks()) {
return processFaultedTaskAsync(entry,
new ErroredDependencyTaskException(), context);
} else {
return processFaultedTaskAsync(entry, taskCancelledException, context);
}
} else {
return Flux.concat(Flux.just(proxyTaskItem.result()),
processCompletedTaskAsync(entry, context));
}
});

return Utils.flatMapSequential(proxyTaskItem.invokeAfterPostRunAsync(isFaulted).flux(),
indexable -> Flux.error(
new IllegalStateException("This onNext should never be called")),
(error) -> processFaultedTaskAsync(entry, error, context),
() -> {
if (isFaulted) {
if (entry.hasFaultedDescentDependencyTasks()) {
return processFaultedTaskAsync(entry,
new ErroredDependencyTaskException(), context);
} else {
return processFaultedTaskAsync(entry, taskCancelledException, context);
}
} else {
return Flux.concat(Flux.just(proxyTaskItem.result()),
processCompletedTaskAsync(entry, context));
}
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,10 +1460,12 @@ protected Mono<Indexable> invokeTaskAsync(TaskGroup.InvocationContext context) {
down.await();

boolean b1 = seen.equals(new ArrayList<>(Arrays.asList(new String[]{"A", "C", "B", "C"})));
boolean b2 = seen.equals(new ArrayList<>(Arrays.asList(new String[]{"C", "A", "B", "C"})));
// boolean b2 = seen.equals(new ArrayList<>(Arrays.asList(new String[]{"C", "A", "B", "C"})));
boolean b2 = seen.equals(new ArrayList<>(Arrays.asList(new String[]{"A", "B", "C", "C"}))); // due to new compare reason, 'C' will always after 'A' and 'C' could return after 'B'

if (!b1 && !b2) {
Assertions.assertTrue(false, "Emission order should be either [A, C, B, C] or [C, A, B, C] but got " + seen);
// Assertions.assertTrue(false, "Emission order should be either [A, C, B, C] or [C, A, B, C] but got " + seen);
Assertions.assertTrue(false, "Emission order should be either [A, C, B, C] or [A, B, C, C] but got " + seen);
}

Assertions.assertEquals(beforeGroupInvokeCntB[0], 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,17 @@ public void testFlatMapSequentialDelayError() {
// .expectNext(1, 3, 7)
// .verifyErrorMessage("Test");
}

@Test
public void testMergeOrdered() {
StepVerifier.create(
Flux.mergeOrdered(
Flux.just(1, 5),
Flux.just(4, 3),
Flux.just(2).mergeWith(Flux.error(new RuntimeException("Test")))
)
)
.expectNext(1, 2, 4, 3, 5)
.verifyErrorMessage("Test");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public class SqlSampleTests extends SamplesTestBase {

@Test
public void testManageSqlDatabase() {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
Assertions.assertTrue(ManageSqlDatabase.runSample(azure));
}

Expand All @@ -44,9 +41,6 @@ public void testManageSqlDatabasesAcrossDifferentDataCenters() {

@Test
public void testManageSqlFirewallRules() {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
Assertions.assertTrue(ManageSqlFirewallRules.runSample(azure));
}

Expand All @@ -70,9 +64,6 @@ public void testManageSqlImportExportDatabase() {

@Test
public void testManageSqlWithRecoveredOrRestoredDatabase() {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
// This test can take significant time to run since it depends on the availability of certain resources on the service side.
Assertions.assertTrue(ManageSqlWithRecoveredOrRestoredDatabase.runSample(azure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
public class TrafficManagerSampleTests extends SamplesTestBase {
@Test
public void testManageSimpleTrafficManager() {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
Assertions.assertTrue(ManageSimpleTrafficManager.runSample(azure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public class SqlServerOperationsTests extends SqlServerTest {

@Test
public void canCRUDSqlSyncMember() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
final String dbName = "dbSample";
final String dbSyncName = "dbSync";
final String dbMemberName = "dbMember";
Expand Down Expand Up @@ -153,9 +150,6 @@ public void canCRUDSqlSyncMember() throws Exception {

@Test
public void canCRUDSqlSyncGroup() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
final String dbName = "dbSample";
final String dbSyncName = "dbSync";
final String syncGroupName = "groupName";
Expand Down Expand Up @@ -876,9 +870,6 @@ public void canCRUDSqlServer() throws Exception {

@Test
public void canUseCoolShortcutsForResourceCreation() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
String database2Name = "database2";
String database1InEPName = "database1InEP";
String database2InEPName = "database2InEP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,9 +1095,6 @@ public void createStorageAccount() throws Exception {

@Test
public void testTrafficManager() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
new TestTrafficManager(azure.publicIpAddresses())
.runTest(azure.trafficManagerProfiles(), azure.resourceGroups());
}
Expand All @@ -1115,17 +1112,11 @@ public void testRedis() throws Exception {

@Test
public void testDnsZones() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
new TestDns().runTest(azure.dnsZones(), azure.resourceGroups());
}

@Test
public void testPrivateDnsZones() throws Exception {
if (isPlaybackMode()) {
return; // TODO: fix playback random fail
}
new TestPrivateDns().runTest(azure.privateDnsZones(), azure.resourceGroups());
}

Expand Down