Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration apply queueing #4590

Merged
merged 6 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ namespace AppInstaller::SQLite::Builder
// Specify the ordering to use.
StatementBuilder& OrderBy(std::string_view column);
StatementBuilder& OrderBy(const QualifiedColumn& column);
StatementBuilder& OrderBy(std::initializer_list<std::string_view> columns);

// Specify the ordering behavior.
StatementBuilder& Ascending();
Expand Down
6 changes: 6 additions & 0 deletions src/AppInstallerSharedLib/SQLiteStatementBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,12 @@ namespace AppInstaller::SQLite::Builder
return *this;
}

StatementBuilder& StatementBuilder::OrderBy(std::initializer_list<std::string_view> columns)
{
OutputColumns(m_stream, " ORDER BY ", columns);
return *this;
}

StatementBuilder& StatementBuilder::Ascending()
{
m_stream << " ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,149 @@ public void ApplySet_Progress()
this.VerifySummaryEvent(configurationSet, result, ConfigurationUnitResultSource.Precondition);
}

/// <summary>
/// Ensures that multiple apply operations are sequenced.
/// </summary>
[Fact]
public void ApplySet_Sequenced()
{
ConfigurationSet configurationSet = this.ConfigurationSet();
ConfigurationUnit configurationUnitApply = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSet.Units = new ConfigurationUnit[] { configurationUnitApply };

ManualResetEvent startProcessing = new ManualResetEvent(true);
TestConfigurationProcessorFactory factory = new TestConfigurationProcessorFactory();
factory.CreateSetProcessorDelegate = (f, c) =>
{
WaitOn(startProcessing);
return f.DefaultCreateSetProcessor(c);
};

TestConfigurationSetProcessor setProcessor = factory.CreateTestProcessor(configurationSet);
TestConfigurationUnitProcessor unitProcessorApply = setProcessor.CreateTestProcessor(configurationUnitApply);
unitProcessorApply.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent applyEventWaiting = new ManualResetEvent(false);
ManualResetEvent completeApplyEvent = new ManualResetEvent(false);
unitProcessorApply.ApplySettingsDelegate = () =>
{
applyEventWaiting.Set();
WaitOn(completeApplyEvent);
return new ApplySettingsResultInstance(configurationUnitApply);
};

ConfigurationSet configurationSetThatWaits = this.ConfigurationSet();
ConfigurationUnit configurationUnitThatWaits = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSetThatWaits.Units = new ConfigurationUnit[] { configurationUnitThatWaits };

TestConfigurationSetProcessor setThatWaitsProcessor = factory.CreateTestProcessor(configurationSetThatWaits);
TestConfigurationUnitProcessor unitThatWaitsProcessor = setProcessor.CreateTestProcessor(configurationUnitThatWaits);
unitThatWaitsProcessor.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent waitingUnitApply = new ManualResetEvent(false);
unitThatWaitsProcessor.ApplySettingsDelegate = () =>
{
WaitOn(waitingUnitApply);
return new ApplySettingsResultInstance(configurationUnitThatWaits);
};

ConfigurationProcessor processor = this.CreateConfigurationProcessorWithDiagnostics(factory);

var applySetOperation = processor.ApplySetAsync(configurationSet, ApplyConfigurationSetFlags.None);
WaitOn(applyEventWaiting);

startProcessing.Reset();
var waitingSetOperation = processor.ApplySetAsync(configurationSetThatWaits, ApplyConfigurationSetFlags.None);
AutoResetEvent waitingProgress = new AutoResetEvent(false);
ConfigurationSetState progressState = ConfigurationSetState.Unknown;
waitingSetOperation.Progress += (result, changeData) =>
{
if (changeData.Change == ConfigurationSetChangeEventType.SetStateChanged)
{
progressState = changeData.SetState;
waitingProgress.Set();
}
};

startProcessing.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.Pending, progressState);

completeApplyEvent.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.InProgress, progressState);

waitingUnitApply.Set();
WaitOn(waitingProgress);
Assert.Equal(ConfigurationSetState.Completed, progressState);
}

/// <summary>
/// Ensures that a consistency check apply is not blocked.
/// </summary>
[Fact]
public void ApplySet_ConsistencyCheckNotSequenced()
{
ConfigurationSet configurationSet = this.ConfigurationSet();
ConfigurationUnit configurationUnitApply = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSet.Units = new ConfigurationUnit[] { configurationUnitApply };

ManualResetEvent startProcessing = new ManualResetEvent(true);
TestConfigurationProcessorFactory factory = new TestConfigurationProcessorFactory();
factory.CreateSetProcessorDelegate = (f, c) =>
{
WaitOn(startProcessing);
return f.DefaultCreateSetProcessor(c);
};

TestConfigurationSetProcessor setProcessor = factory.CreateTestProcessor(configurationSet);
TestConfigurationUnitProcessor unitProcessorApply = setProcessor.CreateTestProcessor(configurationUnitApply);
unitProcessorApply.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent applyEventWaiting = new ManualResetEvent(false);
ManualResetEvent completeApplyEvent = new ManualResetEvent(false);
unitProcessorApply.ApplySettingsDelegate = () =>
{
applyEventWaiting.Set();
WaitOn(completeApplyEvent);
return new ApplySettingsResultInstance(configurationUnitApply);
};

ConfigurationSet configurationSetThatWaits = this.ConfigurationSet();
ConfigurationUnit configurationUnitThatWaits = this.ConfigurationUnit().Assign(new { Intent = ConfigurationUnitIntent.Apply });
configurationSetThatWaits.Units = new ConfigurationUnit[] { configurationUnitThatWaits };

TestConfigurationSetProcessor setThatWaitsProcessor = factory.CreateTestProcessor(configurationSetThatWaits);
TestConfigurationUnitProcessor unitThatWaitsProcessor = setProcessor.CreateTestProcessor(configurationUnitThatWaits);
unitThatWaitsProcessor.TestSettingsDelegate = () => new TestSettingsResultInstance(configurationUnitApply) { TestResult = ConfigurationTestResult.Negative };

ManualResetEvent waitingUnitApply = new ManualResetEvent(false);
unitThatWaitsProcessor.ApplySettingsDelegate = () =>
{
WaitOn(waitingUnitApply);
return new ApplySettingsResultInstance(configurationUnitThatWaits);
};

ConfigurationProcessor processor = this.CreateConfigurationProcessorWithDiagnostics(factory);

var applySetOperation = processor.ApplySetAsync(configurationSet, ApplyConfigurationSetFlags.None);
WaitOn(applyEventWaiting);

startProcessing.Reset();
var waitingSetOperation = processor.ApplySetAsync(configurationSetThatWaits, ApplyConfigurationSetFlags.PerformConsistencyCheckOnly);
Assert.True(waitingSetOperation.AsTask().Wait(10000));

completeApplyEvent.Set();
}

private static void WaitOn(WaitHandle waitable)
{
if (!waitable.WaitOne(10000))
{
throw new TimeoutException();
}
}

private struct ExpectedConfigurationChangeData
{
public ConfigurationSetChangeEventType Change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "GetConfigurationUnitDetailsResult.h"
#include "GetConfigurationSetDetailsResult.h"
#include "DefaultSetGroupProcessor.h"
#include "ConfigurationSequencer.h"

#include <AppInstallerErrors.h>
#include <AppInstallerStrings.h>
Expand Down Expand Up @@ -520,7 +521,24 @@ namespace winrt::Microsoft::Management::Configuration::implementation

try
{
// TODO: Send pending when blocked by another configuration run
ConfigurationSequencer sequencer{ m_database };

if (!WI_IsFlagSet(flags, ApplyConfigurationSetFlags::PerformConsistencyCheckOnly))
{
if (sequencer.Enqueue(configurationSet))
{
try
{
progress.Progress(implementation::ConfigurationSetChangeData::Create(ConfigurationSetState::Pending));
}
CATCH_LOG();

sequencer.Wait(progress);
}
}

progress.ThrowIfCancelled();

try
{
progress.Progress(implementation::ConfigurationSetChangeData::Create(ConfigurationSetState::InProgress));
Expand Down
164 changes: 164 additions & 0 deletions src/Microsoft.Management.Configuration/ConfigurationSequencer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "pch.h"
#include "ConfigurationSequencer.h"
#include <AppInstallerStrings.h>

using namespace std::chrono_literals;

namespace winrt::Microsoft::Management::Configuration::implementation
{
ConfigurationSequencer::ConfigurationSequencer(ConfigurationDatabase& database) : m_database(database) {}

ConfigurationSequencer::~ConfigurationSequencer()
{
// Best effort attempt to remove our queue row
try
{
m_database.RemoveQueueItem(m_queueItemObjectName);
}
CATCH_LOG();
}

// This function creates necessary objects and records this operation into the table.
// It then performs the equivalent of `Wait` with a timeout of 0.
bool ConfigurationSequencer::Enqueue(const Configuration::ConfigurationSet& configurationSet)
{
// Create an arbitrarily named object
std::wstring objectName = L"WinGetConfigQueue_" + AppInstaller::Utility::CreateNewGuidNameWString();
m_queueItemObjectName = AppInstaller::Utility::ConvertToUTF8(objectName);
m_queueItemObject.create(wil::EventOptions::None, objectName.c_str());

m_database.AddQueueItem(configurationSet, m_queueItemObjectName);

// Create shared mutex
constexpr PCWSTR applyMutexName = L"WinGetConfigQueueApplyMutex";

for (int i = 0; !m_applyMutex && i < 2; ++i)
{
if (!m_applyMutex.try_create(applyMutexName, 0, SYNCHRONIZE))
{
m_applyMutex.try_open(applyMutexName, SYNCHRONIZE);
}
}

THROW_LAST_ERROR_IF(!m_applyMutex);

// Probe for an empty queue
DWORD status = 0;
m_applyMutexScope = m_applyMutex.acquire(&status, 0);
THROW_LAST_ERROR_IF(status == WAIT_FAILED);

if (status == WAIT_TIMEOUT)
{
return true;
}

if (GetQueuePosition() == 0)
{
m_database.SetActiveQueueItem(m_queueItemObjectName);
return false;
}
else
{
m_applyMutexScope.reset();
return true;
}
}

// The configuration queue consists of a table in the shared database and cooperative handling of said table.
// At any moment, the active processor must be holding a common named mutex.
// Each active queue entry also holds their own arbitrarily named object, recorded in the table.
//
// The general mechanism to wait is:
// 1. Wait on common named mutex
// 2. Check if first in queue, including probing arbitrary named objects of entries ahead of us
// 3. If not first, wait for X * queue position, where X is sufficiently high to prevent contention on main mutex
void ConfigurationSequencer::Wait(AppInstaller::WinRT::AsyncCancellation& cancellation)
{
THROW_HR_IF(E_NOT_VALID_STATE, !m_applyMutex);

wil::unique_event cancellationEvent;
cancellationEvent.create();

HANDLE waitHandles[2];
waitHandles[0] = cancellationEvent.get();
waitHandles[1] = m_applyMutex.get();

cancellation.Callback([&]() { cancellationEvent.SetEvent(); });
auto clearCancelCallback = wil::scope_exit([&cancellation]() { cancellation.Callback([]() {}); });

for (;;)
{
DWORD waitResult = WaitForMultipleObjects(ARRAYSIZE(waitHandles), waitHandles, FALSE, INFINITE);
THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED);

if (waitResult == WAIT_OBJECT_0)
{
// Cancellation
break;
}
else if (waitResult == WAIT_OBJECT_0 + 1 || waitResult == WAIT_ABANDONED_0 + 1)
{
// We now hold the apply mutex
wil::mutex_release_scope_exit applyMutexScope{ m_applyMutex.get() };

size_t queuePosition = GetQueuePosition();
if (queuePosition == 0)
{
m_applyMutexScope = std::move(applyMutexScope);
m_database.SetActiveQueueItem(m_queueItemObjectName);
break;
}
else
{
applyMutexScope.reset();
std::this_thread::sleep_for(queuePosition * 100ms);
}
}
}
}

size_t ConfigurationSequencer::GetQueuePosition()
{
auto queueItems = m_database.GetQueueItems();

// If we get no queue items at all, we assume that the database doesn't support queueing.
if (queueItems.empty())
{
return 0;
}

size_t result = 0;
bool found = false;

for (const auto& item : queueItems)
{
if (item.ObjectName == m_queueItemObjectName)
{
found = true;
break;
}

std::wstring objectName = AppInstaller::Utility::ConvertToUTF16(item.ObjectName);
QueueObjectType itemObject;
if (itemObject.try_open(objectName.c_str(), SYNCHRONIZE))
{
++result;
}
else
{
// Best effort attempt to remove the dead queue row
try
{
m_database.RemoveQueueItem(item.ObjectName);
}
CATCH_LOG();
}
}

THROW_HR_IF(E_NOT_SET, !found);

return result;
}
}
Loading
Loading