Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Bus message transfer fixes #96

Merged
merged 44 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8f3c772
Support message delivery tag
annatisch Sep 19, 2019
fbacfa0
Added headers
annatisch Sep 19, 2019
2b73ec8
Removed null init
annatisch Sep 19, 2019
f486ec1
Added memory cleanup
annatisch Sep 19, 2019
5e80d63
Fix build error
annatisch Sep 19, 2019
cb68069
Moved delivery tag to message
annatisch Sep 23, 2019
b6693e6
Cython fixes
annatisch Sep 23, 2019
8bd0d5f
Binary type
annatisch Sep 23, 2019
9ce9c57
Attempt to set message tag
annatisch Sep 23, 2019
ecc9e11
Converted to AMQP_VALUE
annatisch Sep 23, 2019
3072807
Syntax fixes
annatisch Sep 23, 2019
b911094
Build error
annatisch Sep 23, 2019
f8ce342
Renamed value
annatisch Sep 23, 2019
4e2341a
Get tag type
annatisch Sep 24, 2019
8907a7b
Fixed name
annatisch Sep 24, 2019
5680a4a
Extract tag bytes
annatisch Sep 24, 2019
c6999b1
Some C cleanup
annatisch Sep 24, 2019
7920ff8
More logging
annatisch Sep 25, 2019
3ab1ad4
Updated test
annatisch Sep 25, 2019
056156b
pylint fix
annatisch Sep 25, 2019
1845be8
More logging
annatisch Sep 25, 2019
da3b5d1
More logging
annatisch Sep 25, 2019
fe86db8
More logging
annatisch Sep 25, 2019
9f8b605
Fixed print formatting
annatisch Sep 25, 2019
c001347
More logging
annatisch Sep 25, 2019
b750356
Syntax error
annatisch Sep 25, 2019
a99a20d
TLSIO logging
annatisch Sep 25, 2019
c705df2
Log socket error
annatisch Sep 25, 2019
de13960
Added sleep
annatisch Sep 25, 2019
8e9368f
Fixed sleep
annatisch Sep 25, 2019
84b2a7d
Reduced sleep
annatisch Sep 25, 2019
a39383a
Another attempt
annatisch Sep 26, 2019
c5ee77a
Ping CI
annatisch Sep 26, 2019
5cc7311
Attempt to move outgoing flow
annatisch Sep 26, 2019
a4bf82d
Moved send flow frame
annatisch Sep 26, 2019
df6dc51
Removed debug logging
annatisch Sep 26, 2019
54d7269
Update link status
annatisch Sep 27, 2019
c403706
Fix diff
annatisch Sep 27, 2019
cdfc56e
pylint fixes
annatisch Sep 27, 2019
7d22a64
Py2.7
annatisch Sep 27, 2019
ac1f7e8
Updated status description
annatisch Sep 27, 2019
29e2641
Fixed executor
annatisch Sep 27, 2019
d1d1340
Some review feedback
annatisch Sep 30, 2019
51d144a
Merge branch 'v1.2.3' into debug
annatisch Sep 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion samples/test_azure_event_hubs_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ def test_event_hubs_client_receive_sync(live_eventhub_config):
assert len(batch) <= 10
for message in batch:
annotations = message.annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
log.info("Sequence Number: {}, Delivery tag: {}".format(
annotations.get(b'x-opt-sequence-number'),
message.delivery_tag))
batch = receive_client.receive_message_batch(max_batch_size=10)
log.info("Finished receiving")

Expand Down
3 changes: 3 additions & 0 deletions src/link.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ cdef class cLink(StructBase):
self._value_error()
return value_factory(value)

cpdef do_work(self):
c_link.link_dowork(self._c_value)

cpdef set_prefetch_count(self, stdint.uint32_t prefetch):
if c_link.link_set_max_link_credit(self._c_value, prefetch) != 0:
self._value_error("Unable to set link credit.")
Expand Down
10 changes: 10 additions & 0 deletions src/message.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ cdef class cMessage(StructBase):
if c_message.message_set_message_format(self._c_value, value) != 0:
self._value_error()

@property
def delivery_tag(self):
annatisch marked this conversation as resolved.
Show resolved Hide resolved
cdef c_amqpvalue.AMQP_VALUE value
if c_message.message_get_delivery_tag(self._c_value, &value) == 0:
if <void*>value == NULL:
return None
return value_factory(value)
else:
self._value_error()

cpdef add_body_data(self, bytes value):
cdef c_message.BINARY_DATA _binary
length = len(value)
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ extern "C" {
MOCKABLE_FUNCTION(, int, message_get_body_type, MESSAGE_HANDLE, message, MESSAGE_BODY_TYPE*, body_type);
MOCKABLE_FUNCTION(, int, message_set_message_format, MESSAGE_HANDLE, message, uint32_t, message_format);
MOCKABLE_FUNCTION(, int, message_get_message_format, MESSAGE_HANDLE, message, uint32_t*, message_format);
MOCKABLE_FUNCTION(, int, message_set_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE, delivery_tag_value);
MOCKABLE_FUNCTION(, int, message_get_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE*, delivery_tag_value);

#ifdef __cplusplus
}
Expand Down
13 changes: 11 additions & 2 deletions src/vendor/azure-uamqp-c/src/amqp_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,17 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
desc_value = amqpvalue_get_map_value(map, desc_key);
if (desc_value != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
AMQP_TYPE amqp_type = amqpvalue_get_type(desc_value);
if (amqp_type == AMQP_TYPE_STRING)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
}
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
Expand Down
11 changes: 6 additions & 5 deletions src/vendor/azure-uamqp-c/src/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,6 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t

link_instance->current_link_credit--;
link_instance->delivery_count++;
if (link_instance->current_link_credit == 0)
{
link_instance->current_link_credit = link_instance->max_link_credit;
send_flow(link_instance);
}

more = false;
/* Attempt to get more flag, default to false */
Expand Down Expand Up @@ -1640,6 +1635,12 @@ void link_dowork(LINK_HANDLE link)
{
tickcounter_ms_t current_tick;

if (link->current_link_credit <= 0)
{
link->current_link_credit = link->max_link_credit;
send_flow(link);
}

if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
{
LogError("Cannot get tick counter value");
Expand Down
104 changes: 104 additions & 0 deletions src/vendor/azure-uamqp-c/src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typedef struct MESSAGE_INSTANCE_TAG
application_properties application_properties;
annotations footer;
uint32_t message_format;
AMQP_VALUE delivery_tag;
} MESSAGE_INSTANCE;

MESSAGE_BODY_TYPE internal_get_body_type(MESSAGE_HANDLE message)
Expand Down Expand Up @@ -119,6 +120,7 @@ MESSAGE_HANDLE message_create(void)
result->body_amqp_value = NULL;
result->body_amqp_sequence_items = NULL;
result->body_amqp_sequence_count = 0;
result->delivery_tag = NULL;

/* Codes_SRS_MESSAGE_01_135: [ By default a message on which `message_set_message_format` was not called shall have message format set to 0. ]*/
result->message_format = 0;
Expand Down Expand Up @@ -229,6 +231,17 @@ MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message)
}
}

if ((result != NULL) && (source_message->delivery_tag != NULL))
{
result->delivery_tag = amqpvalue_clone(source_message->delivery_tag);
if (result->delivery_tag == NULL)
{
LogError("Cannot clone message delivery tag");
message_destroy(result);
result = NULL;
}
}

if ((result != NULL) && (source_message->body_amqp_data_count > 0))
{
size_t i;
Expand Down Expand Up @@ -375,6 +388,11 @@ void message_destroy(MESSAGE_HANDLE message)
amqpvalue_destroy(message->body_amqp_value);
}

if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

/* Codes_SRS_MESSAGE_01_136: [ If the message body is made of several AMQP data items, they shall all be freed. ]*/
free_all_body_data_items(message);

Expand Down Expand Up @@ -1447,3 +1465,89 @@ int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format)

return result;
}

int message_set_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE delivery_tag_value)
{
int result;

if (message == NULL)
{
LogError("NULL message");
result = __FAILURE__;
}
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
else
{
if (delivery_tag_value == NULL)
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
message->delivery_tag = NULL;
}

/* Codes_SRS_MESSAGE_01_053: [ On success it shall return 0. ]*/
result = 0;
}
else
{

annatisch marked this conversation as resolved.
Show resolved Hide resolved
AMQP_VALUE new_delivery_tag = amqpvalue_clone(delivery_tag_value);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = __FAILURE__;
}
else
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

message->delivery_tag = new_delivery_tag;

/* Codes_SRS_MESSAGE_01_102: [ On success it shall return 0. ]*/
result = 0;
}
}
}

return result;
}

int message_get_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE *delivery_tag_value)
{
int result;

if ((message == NULL) ||
(delivery_tag_value == NULL))
{
LogError("Bad arguments: message = %p, delivery_tag = %p",
message, delivery_tag_value);
result = __FAILURE__;
}
else
{
if (message->delivery_tag == NULL)
{
*delivery_tag_value = NULL;
result = 0;
}
else
{
AMQP_VALUE new_delivery_tag = amqpvalue_clone(message->delivery_tag);
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = __FAILURE__;
}
else
{
*delivery_tag_value = new_delivery_tag;
result = 0;
}
}
}

return result;
}
50 changes: 37 additions & 13 deletions src/vendor/azure-uamqp-c/src/message_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
AMQP_VALUE result = NULL;
MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;

(void)transfer;
if (message_receiver->on_message_received != NULL)
{
MESSAGE_HANDLE message = message_create();
Expand All @@ -236,35 +235,60 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
}
else
{
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
delivery_tag received_message_tag;
if (transfer_get_delivery_tag(transfer, &received_message_tag) != 0)
{
LogError("Cannot create AMQP value decoder");
LogError("Could not get the delivery tag from the transfer performative");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
message_receiver->decoded_message = message;
message_receiver->decode_error = false;
if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
AMQP_VALUE delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag);
if (delivery_tag_value == NULL)
{
LogError("Cannot decode bytes");
LogError("Could not create delivery tag value");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else if (message_set_delivery_tag(message, delivery_tag_value) != 0)
{
LogError("Could not set message delivery tag");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
if (message_receiver->decode_error)
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
{
LogError("Error decoding message");
LogError("Cannot create AMQP value decoder");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
result = message_receiver->on_message_received(message_receiver->callback_context, message);
message_receiver->decoded_message = message;
message_receiver->decode_error = false;
if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
{
LogError("Cannot decode bytes");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
if (message_receiver->decode_error)
{
LogError("Error decoding message");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
result = message_receiver->on_message_received(message_receiver->callback_context, message);
}
}

amqpvalue_decoder_destroy(amqpvalue_decoder);
}
}

amqpvalue_decoder_destroy(amqpvalue_decoder);
amqpvalue_destroy(delivery_tag_value);
}
}

message_destroy(message);
Expand Down
1 change: 1 addition & 0 deletions src/vendor/inc/c_link.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cdef extern from "azure_uamqp_c/link.h":

LINK_HANDLE link_create(c_session.SESSION_HANDLE session, const char* name, c_amqp_definitions.role role, c_amqpvalue.AMQP_VALUE source, c_amqpvalue.AMQP_VALUE target)
void link_destroy(LINK_HANDLE handle)
void link_dowork(LINK_HANDLE link)
int link_set_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode snd_settle_mode)
int link_get_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode* snd_settle_mode)
int link_set_rcv_settle_mode(LINK_HANDLE link, c_amqp_definitions.receiver_settle_mode rcv_settle_mode)
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/inc/c_message.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ cdef extern from "azure_uamqp_c/message.h":
int message_get_body_type(MESSAGE_HANDLE message, MESSAGE_BODY_TYPE_TAG* body_type)
int message_set_message_format(MESSAGE_HANDLE message, stdint.uint32_t message_format)
int message_get_message_format(MESSAGE_HANDLE message, stdint.uint32_t* message_format)
int message_get_delivery_tag(MESSAGE_HANDLE message, c_amqpvalue.AMQP_VALUE* delivery_tag)



5 changes: 5 additions & 0 deletions tests/test_c_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ def test_body_value():

body = message.get_body_value()
assert body.type == c_uamqp.AMQPType.StringValue


def test_delivery_tag():
message = c_uamqp.create_message()
assert not message.delivery_tag
2 changes: 2 additions & 0 deletions uamqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# license information.
#--------------------------------------------------------------------------

# pylint: disable=no-member

import logging
import sys

Expand Down
4 changes: 4 additions & 0 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ async def _client_ready_async(self):
properties=self._link_properties,
error_policy=self._error_policy,
encoding=self._encoding,
executor=self._connection._executor,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async())
return False
Expand Down Expand Up @@ -567,6 +568,7 @@ async def _client_run_async(self):
:rtype: bool
"""
# pylint: disable=protected-access
await self.message_handler.work_async()
self._waiting_messages = 0
async with self._pending_messages_lock:
self._pending_messages = await self._filter_pending_async()
Expand Down Expand Up @@ -812,6 +814,7 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
executor=self._connection._executor,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async())
return False
Expand All @@ -832,6 +835,7 @@ async def _client_run_async(self):

:rtype: bool
"""
await self.message_handler.work_async()
await self._connection.work_async()
now = self._counter.get_current_ms()
if self._last_activity_timestamp and not self._was_message_received:
Expand Down
Loading