Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport + storage changes #903

Merged
merged 17 commits into from
Jun 12, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

import os
import sys
import tempfile

from opencensus.ext.azure.common.protocol import BaseObject

INGESTION_ENDPOINT = 'ingestionendpoint'
INSTRUMENTATION_KEY = 'instrumentationkey'
TEMPDIR_PREFIX = "opencensus-python-"


def process_options(options):
# Connection string/ikey
code_cs = parse_connection_string(options.connection_string)
code_ikey = options.instrumentation_key
env_cs = parse_connection_string(
Expand All @@ -46,6 +49,13 @@ def process_options(options):
or 'https://dc.services.visualstudio.com'
options.endpoint = endpoint + '/v2/track'

# storage path
if options.storage_path is None:
options.storage_path = os.path.join(
tempfile.gettempdir(),
TEMPDIR_PREFIX + options.instrumentation_key or ""
)


def parse_connection_string(connection_string):
if connection_string is None:
Expand Down Expand Up @@ -98,12 +108,7 @@ def __init__(self, *args, **kwargs):
proxy=None,
storage_maintenance_period=60,
storage_max_size=50*1024*1024, # 50MiB
storage_path=os.path.join(
os.path.expanduser('~'),
'.opencensus',
'.azure',
os.path.basename(sys.argv[0]) or '.console',
),
storage_path=None,
storage_retention_period=7*24*60*60,
timeout=10.0, # networking timeout in seconds
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,23 @@ class LocalFileBlob(object):
def __init__(self, fullpath):
self.fullpath = fullpath

def delete(self, silent=False):
def delete(self):
try:
os.remove(self.fullpath)
except Exception:
if not silent:
raise
pass # keep silent

def get(self, silent=False):
def get(self):
try:
with open(self.fullpath, 'r') as file:
return tuple(
json.loads(line.strip())
for line in file.readlines()
)
except Exception:
if not silent:
raise
pass # keep silent

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
try:
fullpath = self.fullpath + '.tmp'
with open(fullpath, 'w') as file:
Expand All @@ -59,8 +57,7 @@ def put(self, data, lease_period=0, silent=False):
os.rename(fullpath, self.fullpath)
return self
except Exception:
if not silent:
raise
pass # keep silent

def lease(self, period):
timestamp = _now() + _seconds(period)
Expand Down Expand Up @@ -90,11 +87,11 @@ def __init__(
self.maintenance_period = maintenance_period
self.retention_period = retention_period
self.write_timeout = write_timeout
self._maintenance_routine(silent=False)
# Run maintenance routine once upon instantiating
self._maintenance_routine()
self._maintenance_task = PeriodicTask(
interval=self.maintenance_period,
function=self._maintenance_routine,
kwargs={'silent': True},
)
self._maintenance_task.daemon = True
self._maintenance_task.start()
Expand All @@ -109,19 +106,18 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.close()

def _maintenance_routine(self, silent=False):
def _maintenance_routine(self):
try:
if not os.path.isdir(self.path):
os.makedirs(self.path)
except Exception:
if not silent:
raise
# Race case will throw OSError which we can ignore
pass
try:
for blob in self.gets():
pass
except Exception:
if not silent:
raise
pass # keep silent

def gets(self):
now = _now()
Expand Down Expand Up @@ -164,7 +160,7 @@ def get(self):
pass
return None

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
if not self._check_storage_size():
return None
blob = LocalFileBlob(os.path.join(
Expand All @@ -174,7 +170,7 @@ def put(self, data, lease_period=0, silent=False):
'{:08x}'.format(random.getrandbits(32)), # thread-safe random
),
))
return blob.put(data, lease_period=lease_period, silent=silent)
return blob.put(data, lease_period=lease_period)

def _check_storage_size(self):
size = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def _transmit_from_storage(self):
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get() # TODO: handle error
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
blob.lease(result)
else:
blob.delete(silent=True)
blob.delete()

def _transmit(self, envelopes):
"""
Expand All @@ -41,6 +41,8 @@ def _transmit(self, envelopes):
Return the next retry time in seconds for retryable failure.
This function should never throw exception.
"""
if not envelopes:
return 0
try:
response = requests.post(
url=self.options.endpoint,
Expand All @@ -51,8 +53,11 @@ def _transmit(self, envelopes):
},
timeout=self.options.timeout,
)
except requests.Timeout:
logger.warning('Request time out. Ingestion service may be backed up. Retrying.')
return self.options.minimum_retry_interval
except Exception as ex: # TODO: consider RequestException
logger.warning('Transient client side error %s.', ex)
logger.warning('Transient client side error %s. Retrying.', ex)
lzchen marked this conversation as resolved.
Show resolved Hide resolved
# client side error (retryable)
return self.options.minimum_retry_interval

Expand Down
181 changes: 0 additions & 181 deletions contrib/opencensus-ext-azure/tests/test_azure_trace_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import shutil
import unittest
Expand Down Expand Up @@ -734,183 +733,3 @@ def test_span_data_to_envelope(self):
self.assertFalse(envelope.data.baseData.success)

exporter._stop()

def test_transmission_nothing(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
with mock.patch('requests.post') as post:
post.return_value = None
exporter._transmit_from_storage()
exporter._stop()

def test_transmission_pre_exception(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post', throw(Exception)):
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 1)
exporter._stop()

@mock.patch('requests.post', return_value=mock.Mock())
def test_transmission_lease_failure(self, requests_mock):
requests_mock.return_value = MockResponse(200, 'unknown')
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('opencensus.ext.azure.common.storage.LocalFileBlob.lease') as lease: # noqa: E501
lease.return_value = False
exporter._transmit_from_storage()
self.assertTrue(exporter.storage.get())
exporter._stop()

def test_transmission_exception(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(200, None)
del post.return_value.text
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

def test_transmission_200(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(200, 'unknown')
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

def test_transmission_206(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(206, 'unknown')
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 1)
exporter._stop()

def test_transmission_206_500(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3, 4, 5])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(206, json.dumps({
'itemsReceived': 5,
'itemsAccepted': 3,
'errors': [
{
'index': 0,
'statusCode': 400,
'message': '',
},
{
'index': 2,
'statusCode': 500,
'message': 'Internal Server Error',
},
],
}))
exporter._transmit_from_storage()
self.assertEqual(len(os.listdir(exporter.storage.path)), 1)
self.assertEqual(exporter.storage.get().get(), (3,))
exporter._stop()

def test_transmission_206_no_retry(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(206, json.dumps({
'itemsReceived': 3,
'itemsAccepted': 2,
'errors': [
{
'index': 0,
'statusCode': 400,
'message': '',
},
],
}))
exporter._transmit_from_storage()
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

def test_transmission_206_bogus(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3, 4, 5])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(206, json.dumps({
'itemsReceived': 5,
'itemsAccepted': 3,
'errors': [
{
'foo': 0,
'bar': 1,
},
],
}))
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

def test_transmission_400(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(400, '{}')
exporter._transmit_from_storage()
self.assertEqual(len(os.listdir(exporter.storage.path)), 0)
exporter._stop()

def test_transmission_500(self):
exporter = trace_exporter.AzureExporter(
instrumentation_key='12345678-1234-5678-abcd-12345678abcd',
storage_path=os.path.join(TEST_FOLDER, self.id()),
)
exporter.storage.put([1, 2, 3])
with mock.patch('requests.post') as post:
post.return_value = MockResponse(500, '{}')
exporter._transmit_from_storage()
self.assertIsNone(exporter.storage.get())
self.assertEqual(len(os.listdir(exporter.storage.path)), 1)
exporter._stop()


class MockResponse(object):
def __init__(self, status_code, text):
self.status_code = status_code
self.text = text
Loading