Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded support for Eventhubs management family of APIs #5315

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
41a231d
Enabled multithreaded calls to eventhubs management APIs
LarryOsterman Feb 2, 2024
ab5f7fa
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 2, 2024
14ac359
Added partition client properties tests, removed unused variables in …
LarryOsterman Feb 2, 2024
72edbb8
Restructured properties APIs in eventhubs to simplify producer client…
LarryOsterman Feb 2, 2024
40dfecb
Clang fix
LarryOsterman Feb 2, 2024
1b7f2a6
Removed AMQP code which logged an incoming management message
LarryOsterman Feb 2, 2024
d1a41e7
Removed unused lambda capture fields.
LarryOsterman Feb 3, 2024
84a5429
Fixed test crash in LinkAttachDetach AMQP test
LarryOsterman Feb 5, 2024
8a4c96e
Added test cases for management authn failures
LarryOsterman Feb 5, 2024
2ac800f
clang-format
LarryOsterman Feb 5, 2024
99cc884
Don't emit body contents in AmqpMessage insertion operator
LarryOsterman Feb 6, 2024
9089926
Don't take numeric value parameters by value to ostream insertion ope…
LarryOsterman Feb 6, 2024
847296a
clang-format
LarryOsterman Feb 6, 2024
c90ca72
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 6, 2024
924744c
Merge branch 'main' into larryo/multithreaded_management
LarryOsterman Feb 6, 2024
b31ccd9
Updated changelog to reflect changes in this PR
LarryOsterman Feb 6, 2024
64cdeb1
Updated eventhubs dependency to match reality
LarryOsterman Feb 7, 2024
1307c98
Pull request feedback
LarryOsterman Feb 8, 2024
5964ef9
Fixed test crashes in management tests
LarryOsterman Feb 8, 2024
51e432b
clang-format
LarryOsterman Feb 8, 2024
db550b9
Improved code coverage
LarryOsterman Feb 8, 2024
198af85
compiler didnt notice an impossible branch
LarryOsterman Feb 8, 2024
60318a3
clang-format
LarryOsterman Feb 8, 2024
a373ae5
Better code coverage
LarryOsterman Feb 8, 2024
29a9c47
clang fixes
LarryOsterman Feb 8, 2024
5209f1f
amqpvalue_create_described does not clone its inputs
LarryOsterman Feb 9, 2024
395e1c3
Added value based tests for enumeration stream inserters
LarryOsterman Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@

### Breaking Changes

- Claims Based Security authentication now longer throws a `std::runtime_error`, and instead follows the pattern of the rest of the AMQP library and returns an error.
- Authentication now throws `Azure::Core::Credentials::AuthenticationException` instead of `std::runtime_error`.
- Added `Cancelled` status to `CbsOperationResult` and `ManagementOperationStatus`.

### Bugs Fixed

- [[#5284]](https://github.com/Azure/azure-sdk-for-cpp/issues/5284) [azure-identity][azure-messaging-eventhubs] Impossible to catch exception resulting in SIGABRT signal.
- [[#5297]](https://github.com/Azure/azure-sdk-for-cpp/issues/5297): Enabled multiple simultaneous `ExecuteOperation` calls.
- Fixed crash when Link Detach message is received while link is being destroyed.
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

### Other Changes

- `std::ostream` inserter for message body no longer prints the body of the message.
- Tidied up the output of the `AmqpMessage` `std::ostream` inserter.
- Added several `std::ostream` inserters.
- Pass numeric values to `std::ostream` inserters by value not by reference.

## 1.0.0-beta.7 (2024-02-02)

### Features Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Mixed,
};

std::ostream& operator<<(std::ostream& os, SenderSettleMode const& mode);
std::ostream& operator<<(std::ostream& os, SenderSettleMode mode);

enum class ReceiverSettleMode
{
First,
Second,
};
std::ostream& operator<<(std::ostream& os, ReceiverSettleMode const& mode);
std::ostream& operator<<(std::ostream& os, ReceiverSettleMode mode);

}}}} // namespace Azure::Core::Amqp::_internal
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Error,
Failed,
InstanceClosed,
Cancelled,
};
std::ostream& operator<<(std::ostream& os, CbsOperationResult const& operationResult);
std::ostream& operator<<(std::ostream& os, CbsOperationResult operationResult);

enum class CbsOpenResult
{
Expand All @@ -27,7 +28,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Error,
Cancelled,
};
std::ostream& operator<<(std::ostream& os, CbsOpenResult const& operationResult);
std::ostream& operator<<(std::ostream& os, CbsOpenResult operationResult);

enum class CbsTokenType
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Error,
};

std::ostream& operator<<(std::ostream& stream, ConnectionState const value);
std::ostream& operator<<(std::ostream& stream, ConnectionState value);

class Connection;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Error,
};

std::ostream& operator<<(std::ostream& stream, LinkState const& linkState);
std::ostream& operator<<(std::ostream& stream, LinkState linkState);

enum class LinkTransferResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Error,
FailedBadStatus,
InstanceClosed,
Cancelled,
};

enum class ManagementOpenStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Closing,
Error,
};
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState const& state);
std::ostream& operator<<(std::ostream& stream, _internal::MessageReceiverState state);

class MessageReceiver;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Timeout,
Cancelled,
};
std::ostream& operator<<(std::ostream& stream, MessageSendStatus status);

enum class MessageSenderState
{
Invalid,
Expand All @@ -40,7 +42,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Closing,
Error,
};
std::ostream& operator<<(std::ostream& stream, MessageSenderState const& state);
std::ostream& operator<<(std::ostream& stream, MessageSenderState state);

class MessageSender;
class MessageSenderEvents {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
Unknown,
};

/**
* @brief ostream insertion operator for AmqpValueType.
*
* @param os - stream to insert to.
* @param value - value to insert.
*
* @returns the input ostream.
*/
std::ostream& operator<<(std::ostream& os, AmqpValueType value);

class AmqpArray;
class AmqpMap;
class AmqpList;
Expand Down
29 changes: 13 additions & 16 deletions sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
message,
context);
if (result.Status != ManagementOperationStatus::Ok)
{
throw std::runtime_error(
"Could not authenticate to client. Error Status: " + std::to_string(result.StatusCode)
+ " condition: " + result.Error.Condition.ToString()
+ " reason: " + result.Error.Description);
}
else
{
CbsOperationResult cbsResult;
switch (result.Status)
Expand All @@ -128,16 +121,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
case ManagementOperationStatus::InstanceClosed:
cbsResult = CbsOperationResult::InstanceClosed;
break;
case ManagementOperationStatus::Cancelled:
cbsResult = CbsOperationResult::Cancelled;
break;
default:
throw std::runtime_error("Unknown management operation status.");
}
Log::Stream(Logger::Level::Informational)
<< "CBS PutToken result: " << cbsResult << " status code: " << result.StatusCode
<< " Error: " << result.Error.Description << ".";
<< " Error: " << result.Error << ".";
return std::make_tuple(cbsResult, result.StatusCode, result.Error.Description);
}
else
{
return std::make_tuple(CbsOperationResult::Ok, result.StatusCode, result.Error.Description);
}
}
std::ostream& operator<<(std::ostream& os, CbsOperationResult const& operationResult)
std::ostream& operator<<(std::ostream& os, CbsOperationResult operationResult)
{
switch (operationResult)
{
Expand All @@ -156,14 +156,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
case CbsOperationResult::InstanceClosed:
os << "InstanceClosed";
break;
default:
os << "Unknown CbsOperationResult."
<< static_cast<std::underlying_type<CbsOperationResult>::type>(operationResult);
case CbsOperationResult::Cancelled:
os << "Cancelled";
break;
}
return os;
}

std::ostream& operator<<(std::ostream& os, CbsOpenResult const& openResult)
std::ostream& operator<<(std::ostream& os, CbsOpenResult openResult)
{
switch (openResult)
{
Expand All @@ -179,9 +179,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
case CbsOpenResult::Cancelled:
os << "Cancelled";
break;
default:
os << "Unknown CbsOpenResult."
<< static_cast<std::underlying_type<CbsOpenResult>::type>(openResult);
}
return os;
}
Expand Down
6 changes: 4 additions & 2 deletions sdk/core/azure-core-amqp/src/amqp/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
context);
if (std::get<0>(result) != CbsOperationResult::Ok)
{
throw std::runtime_error("Could not put Claims Based Security token.");
throw Azure::Core::Credentials::AuthenticationException(
"Could not authenticate client. Error Status: " + std::to_string(std::get<1>(result))
+ " reason: " + std::get<2>(result));
}
Log::Stream(Logger::Level::Verbose) << "Close CBS object";
claimsBasedSecurity->Close(context);
Expand All @@ -690,7 +692,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
m_tokenStore.emplace(audienceUrl, accessToken);
return accessToken;
}
catch (std::runtime_error const&)
catch (...)
{
// Ensure that the claims based security object is closed before we leave this scope.
claimsBasedSecurity->Close(context);
Expand Down
18 changes: 4 additions & 14 deletions sdk/core/azure-core-amqp/src/amqp/link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
#endif

std::ostream& operator<<(std::ostream& os, LinkState const& linkState)
std::ostream& operator<<(std::ostream& os, LinkState linkState)
{
switch (linkState)
{
Expand Down Expand Up @@ -209,7 +209,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Azure::Core::_internal::AzureNoReturnPath(
"Destroying link while link detach subscription is still active.");
}

auto lock{m_session->GetConnection()->Lock()};
if (m_link)
{
link_destroy(m_link);
Expand Down Expand Up @@ -512,19 +512,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
{
auto lock{m_session->GetConnection()->Lock()};
if (m_eventHandler)
if (link_attach(m_link, OnTransferReceivedFn, OnLinkStateChangedFn, OnLinkFlowOnFn, this))
{
if (link_attach(m_link, OnTransferReceivedFn, OnLinkStateChangedFn, OnLinkFlowOnFn, this))
{
throw std::runtime_error("Could not set attach properties.");
}
}
else
{
if (link_attach(m_link, nullptr, nullptr, nullptr, this))
{
throw std::runtime_error("Could not set attach properties.");
}
throw std::runtime_error("Could not set attach properties.");
}
}
// Mark the connection as async so that we can use the async APIs.
Expand Down
Loading
Loading