Skip to content

Commit

Permalink
Service Bus message transfer fixes (#96)
Browse files Browse the repository at this point in the history
* Support message delivery tag

* Added headers

* Removed null init

* Added memory cleanup

* Fix build error

* Moved delivery tag to message

* Cython fixes

* Binary type

* Attempt to set message tag

* Converted to AMQP_VALUE

* Syntax fixes

* Build error

* Renamed value

* Get tag type

* Fixed name

* Extract tag bytes

* Some C cleanup

* More logging

* Updated test

* pylint fix

* More logging

* More logging

* More logging

* Fixed print formatting

* More logging

* Syntax error

* TLSIO logging

* Log socket error

* Added sleep

* Fixed sleep

* Reduced sleep

* Another attempt

* Ping CI

* Attempt to move outgoing flow

* Moved send flow frame

* Removed debug logging

* Update link status

* Fix diff

* pylint fixes

* Py2.7

* Updated status description

* Fixed executor

* Some review feedback
  • Loading branch information
annatisch authored Oct 2, 2019
1 parent d2c88f4 commit 8c2f790
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 21 deletions.
4 changes: 3 additions & 1 deletion samples/test_azure_event_hubs_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ def test_event_hubs_client_receive_sync(live_eventhub_config):
assert len(batch) <= 10
for message in batch:
annotations = message.annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
log.info("Sequence Number: {}, Delivery tag: {}".format(
annotations.get(b'x-opt-sequence-number'),
message.delivery_tag))
batch = receive_client.receive_message_batch(max_batch_size=10)
log.info("Finished receiving")

Expand Down
3 changes: 3 additions & 0 deletions src/link.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ cdef class cLink(StructBase):
self._value_error()
return value_factory(value)

cpdef do_work(self):
c_link.link_dowork(self._c_value)

cpdef set_prefetch_count(self, stdint.uint32_t prefetch):
if c_link.link_set_max_link_credit(self._c_value, prefetch) != 0:
self._value_error("Unable to set link credit.")
Expand Down
10 changes: 10 additions & 0 deletions src/message.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,16 @@ cdef class cMessage(StructBase):
if c_message.message_set_message_format(self._c_value, value) != 0:
self._value_error()

@property
def delivery_tag(self):
cdef c_amqpvalue.AMQP_VALUE value
if c_message.message_get_delivery_tag(self._c_value, &value) == 0:
if <void*>value == NULL:
return None
return value_factory(value)
else:
self._value_error()

cpdef add_body_data(self, bytes value):
cdef c_message.BINARY_DATA _binary
length = len(value)
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/azure-uamqp-c/inc/azure_uamqp_c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ extern "C" {
MOCKABLE_FUNCTION(, int, message_get_body_type, MESSAGE_HANDLE, message, MESSAGE_BODY_TYPE*, body_type);
MOCKABLE_FUNCTION(, int, message_set_message_format, MESSAGE_HANDLE, message, uint32_t, message_format);
MOCKABLE_FUNCTION(, int, message_get_message_format, MESSAGE_HANDLE, message, uint32_t*, message_format);
MOCKABLE_FUNCTION(, int, message_set_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE, delivery_tag_value);
MOCKABLE_FUNCTION(, int, message_get_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE*, delivery_tag_value);

#ifdef __cplusplus
}
Expand Down
13 changes: 11 additions & 2 deletions src/vendor/azure-uamqp-c/src/amqp_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,17 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
desc_value = amqpvalue_get_map_value(map, desc_key);
if (desc_value != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
AMQP_TYPE amqp_type = amqpvalue_get_type(desc_value);
if (amqp_type == AMQP_TYPE_STRING)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
}
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
Expand Down
11 changes: 6 additions & 5 deletions src/vendor/azure-uamqp-c/src/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,6 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t

link_instance->current_link_credit--;
link_instance->delivery_count++;
if (link_instance->current_link_credit == 0)
{
link_instance->current_link_credit = link_instance->max_link_credit;
send_flow(link_instance);
}

more = false;
/* Attempt to get more flag, default to false */
Expand Down Expand Up @@ -1640,6 +1635,12 @@ void link_dowork(LINK_HANDLE link)
{
tickcounter_ms_t current_tick;

if (link->current_link_credit <= 0)
{
link->current_link_credit = link->max_link_credit;
send_flow(link);
}

if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
{
LogError("Cannot get tick counter value");
Expand Down
104 changes: 104 additions & 0 deletions src/vendor/azure-uamqp-c/src/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typedef struct MESSAGE_INSTANCE_TAG
application_properties application_properties;
annotations footer;
uint32_t message_format;
AMQP_VALUE delivery_tag;
} MESSAGE_INSTANCE;

MESSAGE_BODY_TYPE internal_get_body_type(MESSAGE_HANDLE message)
Expand Down Expand Up @@ -119,6 +120,7 @@ MESSAGE_HANDLE message_create(void)
result->body_amqp_value = NULL;
result->body_amqp_sequence_items = NULL;
result->body_amqp_sequence_count = 0;
result->delivery_tag = NULL;

/* Codes_SRS_MESSAGE_01_135: [ By default a message on which `message_set_message_format` was not called shall have message format set to 0. ]*/
result->message_format = 0;
Expand Down Expand Up @@ -229,6 +231,17 @@ MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message)
}
}

if ((result != NULL) && (source_message->delivery_tag != NULL))
{
result->delivery_tag = amqpvalue_clone(source_message->delivery_tag);
if (result->delivery_tag == NULL)
{
LogError("Cannot clone message delivery tag");
message_destroy(result);
result = NULL;
}
}

if ((result != NULL) && (source_message->body_amqp_data_count > 0))
{
size_t i;
Expand Down Expand Up @@ -375,6 +388,11 @@ void message_destroy(MESSAGE_HANDLE message)
amqpvalue_destroy(message->body_amqp_value);
}

if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

/* Codes_SRS_MESSAGE_01_136: [ If the message body is made of several AMQP data items, they shall all be freed. ]*/
free_all_body_data_items(message);

Expand Down Expand Up @@ -1447,3 +1465,89 @@ int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format)

return result;
}

int message_set_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE delivery_tag_value)
{
int result;

if (message == NULL)
{
LogError("NULL message");
result = __FAILURE__;
}
else
{
if (delivery_tag_value == NULL)
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
message->delivery_tag = NULL;
}

/* Codes_SRS_MESSAGE_01_053: [ On success it shall return 0. ]*/
result = 0;
}
else
{

AMQP_VALUE new_delivery_tag = amqpvalue_clone(delivery_tag_value);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = __FAILURE__;
}
else
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}

message->delivery_tag = new_delivery_tag;

/* Codes_SRS_MESSAGE_01_102: [ On success it shall return 0. ]*/
result = 0;
}
}
}

return result;
}

int message_get_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE *delivery_tag_value)
{
int result;

if ((message == NULL) ||
(delivery_tag_value == NULL))
{
LogError("Bad arguments: message = %p, delivery_tag = %p",
message, delivery_tag_value);
result = __FAILURE__;
}
else
{
if (message->delivery_tag == NULL)
{
*delivery_tag_value = NULL;
result = 0;
}
else
{
AMQP_VALUE new_delivery_tag = amqpvalue_clone(message->delivery_tag);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = __FAILURE__;
}
else
{
*delivery_tag_value = new_delivery_tag;
result = 0;
}
}
}

return result;
}
50 changes: 37 additions & 13 deletions src/vendor/azure-uamqp-c/src/message_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
AMQP_VALUE result = NULL;
MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;

(void)transfer;
if (message_receiver->on_message_received != NULL)
{
MESSAGE_HANDLE message = message_create();
Expand All @@ -236,35 +235,60 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
}
else
{
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
delivery_tag received_message_tag;
if (transfer_get_delivery_tag(transfer, &received_message_tag) != 0)
{
LogError("Cannot create AMQP value decoder");
LogError("Could not get the delivery tag from the transfer performative");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
message_receiver->decoded_message = message;
message_receiver->decode_error = false;
if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
AMQP_VALUE delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag);
if (delivery_tag_value == NULL)
{
LogError("Cannot decode bytes");
LogError("Could not create delivery tag value");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else if (message_set_delivery_tag(message, delivery_tag_value) != 0)
{
LogError("Could not set message delivery tag");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
if (message_receiver->decode_error)
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
{
LogError("Error decoding message");
LogError("Cannot create AMQP value decoder");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
result = message_receiver->on_message_received(message_receiver->callback_context, message);
message_receiver->decoded_message = message;
message_receiver->decode_error = false;
if (amqpvalue_decode_bytes(amqpvalue_decoder, payload_bytes, payload_size) != 0)
{
LogError("Cannot decode bytes");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
if (message_receiver->decode_error)
{
LogError("Error decoding message");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
else
{
result = message_receiver->on_message_received(message_receiver->callback_context, message);
}
}

amqpvalue_decoder_destroy(amqpvalue_decoder);
}
}

amqpvalue_decoder_destroy(amqpvalue_decoder);
amqpvalue_destroy(delivery_tag_value);
}
}

message_destroy(message);
Expand Down
1 change: 1 addition & 0 deletions src/vendor/inc/c_link.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cdef extern from "azure_uamqp_c/link.h":

LINK_HANDLE link_create(c_session.SESSION_HANDLE session, const char* name, c_amqp_definitions.role role, c_amqpvalue.AMQP_VALUE source, c_amqpvalue.AMQP_VALUE target)
void link_destroy(LINK_HANDLE handle)
void link_dowork(LINK_HANDLE link)
int link_set_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode snd_settle_mode)
int link_get_snd_settle_mode(LINK_HANDLE link, c_amqp_definitions.sender_settle_mode* snd_settle_mode)
int link_set_rcv_settle_mode(LINK_HANDLE link, c_amqp_definitions.receiver_settle_mode rcv_settle_mode)
Expand Down
2 changes: 2 additions & 0 deletions src/vendor/inc/c_message.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ cdef extern from "azure_uamqp_c/message.h":
int message_get_body_type(MESSAGE_HANDLE message, MESSAGE_BODY_TYPE_TAG* body_type)
int message_set_message_format(MESSAGE_HANDLE message, stdint.uint32_t message_format)
int message_get_message_format(MESSAGE_HANDLE message, stdint.uint32_t* message_format)
int message_get_delivery_tag(MESSAGE_HANDLE message, c_amqpvalue.AMQP_VALUE* delivery_tag)



5 changes: 5 additions & 0 deletions tests/test_c_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ def test_body_value():

body = message.get_body_value()
assert body.type == c_uamqp.AMQPType.StringValue


def test_delivery_tag():
message = c_uamqp.create_message()
assert not message.delivery_tag
2 changes: 2 additions & 0 deletions uamqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# license information.
#--------------------------------------------------------------------------

# pylint: disable=no-member

import logging
import sys

Expand Down
4 changes: 4 additions & 0 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ async def _client_ready_async(self):
properties=self._link_properties,
error_policy=self._error_policy,
encoding=self._encoding,
executor=self._connection._executor,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async())
return False
Expand Down Expand Up @@ -567,6 +568,7 @@ async def _client_run_async(self):
:rtype: bool
"""
# pylint: disable=protected-access
await self.message_handler.work_async()
self._waiting_messages = 0
async with self._pending_messages_lock:
self._pending_messages = await self._filter_pending_async()
Expand Down Expand Up @@ -812,6 +814,7 @@ async def _client_ready_async(self):
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
executor=self._connection._executor,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async())
return False
Expand All @@ -832,6 +835,7 @@ async def _client_run_async(self):
:rtype: bool
"""
await self.message_handler.work_async()
await self._connection.work_async()
now = self._counter.get_current_ms()
if self._last_activity_timestamp and not self._was_message_received:
Expand Down
Loading

0 comments on commit 8c2f790

Please sign in to comment.