Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hub Track 2 Support #74

Merged
merged 22 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3974358
Missing application_properties in BatchMessage fixes
yunhaoling Mar 21, 2019
4001f61
Missing data bytes in BatchMessage fixes
yunhaoling Mar 24, 2019
4bfd710
Updated pylint version
annatisch Mar 25, 2019
f38f900
improve type checking to enable message-like objects in BatchMessage
yunhaoling Mar 25, 2019
3f4c16a
pylint error fixes
yunhaoling Mar 26, 2019
f752efe
remove Python 3.4 from the travis config as Python 3.4 won't be suppo…
yunhaoling Mar 26, 2019
4061360
merge to fix macos build issue
yunhaoling Mar 28, 2019
592881b
Merge remote-tracking branch 'azure_origin/master'
yunhaoling Apr 26, 2019
eb81eac
Fix memory leak in sending BatchEventData
yunhaoling Apr 29, 2019
633bb6b
improve solution by typecasting NULL to AMQP types
yunhaoling May 1, 2019
a5222fa
init commit for websocket support
yunhaoling May 3, 2019
5b4eabf
Websocket functionality done, but further design is needed
yunhaoling May 4, 2019
4a59dfa
Reorganzie WebSocket related code
yunhaoling May 8, 2019
0a9ac20
Remove unnecessary tls http_proxy setting
yunhaoling May 10, 2019
3385445
Module import and proxy username password update
yunhaoling May 10, 2019
5f24397
Update comment
May 13, 2019
50ceffb
Update code
yunhaoling May 14, 2019
8210ea1
Merge remote-tracking branch 'azure_origin/dev' into dev-websocket
yunhaoling May 14, 2019
17d2290
Fix pylint error
yunhaoling May 14, 2019
a751722
OAuth support
yunhaoling May 17, 2019
6af1651
Add Message app_prop checking
May 20, 2019
ee0d8d4
Remove scope
yunhaoling May 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/constants.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ cimport c_amqp_management
APACHE = "apache.org"
AMQP_BATCH_MESSAGE_FORMAT = 0x80013700
AMQPS_PORT = 5671
AMQP_WSS_PORT = 443
AUTH_EXPIRATION_SECS = 60 * 60
AUTH_REFRESH_SECS = 48 * 60 # 80% of expiration period
MAX_PARTITION_KEY_LENGTH = 128
Expand Down
24 changes: 24 additions & 0 deletions src/vendor/inc/c_wsio.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

from libc cimport stdint

cimport c_xio


cdef extern from "azure_c_shared_utility/wsio.h":

ctypedef struct WSIO_CONFIG_TAG:
const char* hostname
int port
const char* resource_name
const char* protocol
const c_xio.IO_INTERFACE_DESCRIPTION* underlying_io_interface
void* underlying_io_parameters

ctypedef WSIO_CONFIG_TAG WSIO_CONFIG

const c_xio.IO_INTERFACE_DESCRIPTION* wsio_get_interface_description()
58 changes: 58 additions & 0 deletions src/wsio.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

# C imports
cimport c_wsio
cimport c_tlsio
cimport c_xio


DEFAULT_WS_PORT = 443
cdef const char* DEFAULT_WS_PROTOCOL_NAME = "AMQPWSB10"
cdef const char* DEFAULT_WS_RELATIVE_PATH = "/$servicebus/websocket/"


cdef class WSIOConfig(object):

cdef c_wsio.WSIO_CONFIG _c_value

def __cinit__(self):
self._c_value = c_wsio.WSIO_CONFIG(NULL, DEFAULT_WS_PORT, DEFAULT_WS_RELATIVE_PATH, DEFAULT_WS_PROTOCOL_NAME, NULL, NULL)

@property
def hostname(self):
return self._c_value.hostname

@hostname.setter
def hostname(self, const char* value):
self._c_value.hostname = value

@property
def port(self):
return self._c_value.port

@port.setter
def port(self, int port):
self._c_value.port = port

@property
def resource_name(self):
return self._c_value.resource_name

@resource_name.setter
def resource_name(self, const char* value):
self._c_value.resource_name = value

@property
def protocol(self):
return self._c_value.protocol

@protocol.setter
def protocol(self, const char* value):
self._c_value.protocol = value

cpdef set_tlsio_config(self, IOInterfaceDescription underlying_io_interface, TLSIOConfig underlying_io_parameters):
self._c_value.underlying_io_interface = underlying_io_interface._c_value
self._c_value.underlying_io_parameters = &underlying_io_parameters._c_value
9 changes: 9 additions & 0 deletions src/xio.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@ import logging

# C imports
cimport c_xio
cimport c_wsio
cimport c_sasl_mechanism


_logger = logging.getLogger(__name__)


cpdef xio_from_wsioconfig(WSIOConfig io_config):
cdef const c_xio.IO_INTERFACE_DESCRIPTION* ws_io_interface
ws_io_interface = c_wsio.wsio_get_interface_description()
xio = XIO()
xio.create(ws_io_interface, &io_config._c_value)
return xio


cpdef xio_from_tlsioconfig(IOInterfaceDescription io_desc, TLSIOConfig io_config):
xio = XIO()
xio.create(io_desc._c_value, &io_config._c_value)
Expand Down
1 change: 1 addition & 0 deletions uamqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from uamqp.client import AMQPClient, SendClient, ReceiveClient
from uamqp.sender import MessageSender
from uamqp.receiver import MessageReceiver
from uamqp.constants import TransportType

try:
from uamqp.async_ops import ConnectionAsync
Expand Down
4 changes: 2 additions & 2 deletions uamqp/authentication/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
#--------------------------------------------------------------------------

from .common import AMQPAuth, SASLPlain, SASLAnonymous
from .cbs_auth import TokenRetryPolicy, CBSAuthMixin, SASTokenAuth
from .cbs_auth import TokenRetryPolicy, CBSAuthMixin, SASTokenAuth, JWTTokenAuth
try:
from .cbs_auth_async import CBSAsyncAuthMixin, SASTokenAsync
from .cbs_auth_async import CBSAsyncAuthMixin, SASTokenAsync, JWTTokenAsync
except (ImportError, SyntaxError):
pass # No Python 3.4+ support
106 changes: 105 additions & 1 deletion uamqp/authentication/cbs_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import time

from uamqp import Session, c_uamqp, compat, constants, errors, utils
from uamqp.constants import TransportType

from .common import _SASL, AMQPAuth

Expand Down Expand Up @@ -192,6 +193,10 @@ class SASTokenAuth(AMQPAuth, CBSAuthMixin):
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
tranport type is explictly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
Expand All @@ -208,6 +213,7 @@ def __init__(self, audience, uri, token,
verify=None,
token_type=b"servicebus.windows.net:sastoken",
http_proxy=None,
transport_type=TransportType.Amqp,
encoding='UTF-8'): # pylint: disable=no-member
self._retry_policy = retry_policy
self._encoding = encoding
Expand Down Expand Up @@ -238,7 +244,7 @@ def __init__(self, audience, uri, token,
self.timeout = timeout
self.retries = 0
self.sasl = _SASL()
self.set_tlsio(self.hostname, port, http_proxy)
self.set_io(self.hostname, port, http_proxy, transport_type)

def update_token(self):
"""If a username and password are present - attempt to use them to
Expand Down Expand Up @@ -267,6 +273,7 @@ def from_shared_access_key(
retry_policy=TokenRetryPolicy(),
verify=None,
http_proxy=None,
transport_type=TransportType.Amqp,
encoding='UTF-8'):
"""Attempt to create a CBS token session using a Shared Access Key such
as is used to connect to Azure services.
Expand Down Expand Up @@ -295,6 +302,10 @@ def from_shared_access_key(
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
tranport type is explictly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
Expand All @@ -319,4 +330,97 @@ def from_shared_access_key(
retry_policy=retry_policy,
verify=verify,
http_proxy=http_proxy,
transport_type=transport_type,
encoding=encoding)


class JWTTokenAuth(AMQPAuth, CBSAuthMixin):
"""CBS authentication using JWT tokens.

:param audience: The token audience field. For JWT tokens
this is usually the URI.
:type audience: str or bytes
:param uri: The AMQP endpoint URI. This must be provided as
a decoded string.
:type uri: str
:param get_token: The callback function used for getting and refreshing
tokens. It should return a valid jwt token each time it is called.
:type get_token: function or functools.partial
:param expires_in: The total remaining seconds until the token
expires - default for JWT token generated by AAD is 3600s (1 hour).
:type expires_in: ~datetime.timedelta
:param expires_at: The timestamp at which the JWT token will expire
formatted as seconds since epoch.
:type expires_at: float
:param port: The TLS port - default for AMQP is 5671.
:type port: int
:param timeout: The timeout in seconds in which to negotiate the token.
The default value is 10 seconds.
:type timeout: int
:param retry_policy: The retry policy for the PUT token request. The default
retry policy has 3 retries.
:type retry_policy: ~uamqp.authentication.cbs_auth.TokenRetryPolicy
:param verify: The path to a user-defined certificate.
:type verify: str
:param token_type: The type field of the token request.
Default value is `b"jwt"`.
:type token_type: bytes
:param http_proxy: HTTP proxy configuration. This should be a dictionary with
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
tranport type is explictly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
"""

def __init__(self, audience, uri,
get_token,
expires_in=datetime.timedelta(seconds=constants.AUTH_EXPIRATION_SECS),
expires_at=None,
port=constants.DEFAULT_AMQPS_PORT,
timeout=10,
retry_policy=TokenRetryPolicy(),
verify=None,
token_type=b"jwt",
http_proxy=None,
transport_type=TransportType.Amqp,
encoding='UTF-8'): # pylint: disable=no-member
self._retry_policy = retry_policy
self._encoding = encoding
self.uri = uri
parsed = compat.urlparse(uri) # pylint: disable=no-member

self.cert_file = verify
self.hostname = parsed.hostname.encode(self._encoding)

if not get_token or not callable(get_token):
raise ValueError("get_token must be a callable object.")

self.get_token = get_token
self.audience = self._encode(audience)
self.token_type = self._encode(token_type)
self.token = self._encode(self.get_token())
if not expires_at and not expires_in:
raise ValueError("Must specify either 'expires_at' or 'expires_in'.")
if not expires_at:
self.expires_in = expires_in
self.expires_at = time.time() + expires_in.seconds
else:
self.expires_at = expires_at
expires_in = expires_at - time.time()
if expires_in < 1:
raise ValueError("Token has already expired.")
self.expires_in = datetime.timedelta(seconds=expires_in)
self.timeout = timeout
self.retries = 0
self.sasl = _SASL()
self.set_io(self.hostname, port, http_proxy, transport_type)

def update_token(self):
self.expires_at = time.time() + self.expires_in.seconds
self.token = self._encode(self.get_token())
52 changes: 50 additions & 2 deletions uamqp/authentication/cbs_auth_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from uamqp.utils import get_running_loop
from uamqp.async_ops import SessionAsync

from .cbs_auth import CBSAuthMixin, SASTokenAuth
from .cbs_auth import CBSAuthMixin, SASTokenAuth, JWTTokenAuth

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -168,8 +168,56 @@ class SASTokenAsync(SASTokenAuth, CBSAsyncAuthMixin):
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
tranport type is explictly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
"""


class JWTTokenAsync(JWTTokenAuth, CBSAsyncAuthMixin):
"""CBS authentication using JWT tokens.

:param audience: The token audience field. For JWT tokens
this is usually the URI.
:type audience: str or bytes
:param uri: The AMQP endpoint URI. This must be provided as
a decoded string.
:type uri: str
:param get_token: The callback function used for getting and refreshing
tokens. It should return a valid jwt token each time it is called.
:type get_token: function or functools.partial
:param expires_in: The total remaining seconds until the token
expires - default for JWT token generated by AAD is 3600s (1 hour).
:type expires_in: ~datetime.timedelta
:param expires_at: The timestamp at which the JWT token will expire
formatted as seconds since epoch.
:type expires_at: float
:param port: The TLS port - default for AMQP is 5671.
:type port: int
:param timeout: The timeout in seconds in which to negotiate the token.
The default value is 10 seconds.
:type timeout: int
:param retry_policy: The retry policy for the PUT token request. The default
retry policy has 3 retries.
:type retry_policy: ~uamqp.authentication.cbs_auth.TokenRetryPolicy
:param verify: The path to a user-defined certificate.
:type verify: str
:param token_type: The type field of the token request.
Default value is `b"jwt"`.
:type token_type: bytes
:param http_proxy: HTTP proxy configuration. This should be a dictionary with
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
tranport type is explictly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
"""

Loading