Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport + storage changes #903

Merged
merged 17 commits into from
Jun 12, 2020
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ OpenCensus - A stats collection and distributed tracing framework
=================================================================

|gitter|
|travisci|
|circleci|
|pypi|
|compat_check_pypi|
|compat_check_github|


.. |travisci| image:: https://travis-ci.org/census-instrumentation/opencensus-python.svg?branch=master
:target: https://travis-ci.org/census-instrumentation/opencensus-python
.. |circleci| image:: https://circleci.com/gh/census-instrumentation/opencensus-python.svg?style=shield
:target: https://circleci.com/gh/census-instrumentation/opencensus-python
.. |gitter| image:: https://badges.gitter.im/census-instrumentation/lobby.svg
Expand Down
3 changes: 3 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Change default path of local storage
([#903](https://github.com/census-instrumentation/opencensus-python/pull/903))

## 1.0.1
Released 2019-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
# limitations under the License.

import os
import sys
import tempfile

from opencensus.ext.azure.common.protocol import BaseObject

INGESTION_ENDPOINT = 'ingestionendpoint'
INSTRUMENTATION_KEY = 'instrumentationkey'
TEMPDIR_PREFIX = "opencensus-python-"


def process_options(options):
# Connection string/ikey
code_cs = parse_connection_string(options.connection_string)
code_ikey = options.instrumentation_key
env_cs = parse_connection_string(
Expand All @@ -46,6 +48,14 @@ def process_options(options):
or 'https://dc.services.visualstudio.com'
options.endpoint = endpoint + '/v2/track'

# storage path
if options.storage_path is None:
TEMPDIR_SUFFIX = options.instrumentation_key or ""
options.storage_path = os.path.join(
tempfile.gettempdir(),
TEMPDIR_PREFIX + TEMPDIR_SUFFIX
)


def parse_connection_string(connection_string):
if connection_string is None:
Expand Down Expand Up @@ -98,12 +108,7 @@ def __init__(self, *args, **kwargs):
proxy=None,
storage_maintenance_period=60,
storage_max_size=50*1024*1024, # 50MiB
storage_path=os.path.join(
os.path.expanduser('~'),
'.opencensus',
'.azure',
os.path.basename(sys.argv[0]) or '.console',
),
storage_path=None,
storage_retention_period=7*24*60*60,
timeout=10.0, # networking timeout in seconds
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,23 @@ class LocalFileBlob(object):
def __init__(self, fullpath):
self.fullpath = fullpath

def delete(self, silent=False):
def delete(self):
try:
os.remove(self.fullpath)
except Exception:
if not silent:
raise
pass # keep silent

def get(self, silent=False):
def get(self):
try:
with open(self.fullpath, 'r') as file:
return tuple(
json.loads(line.strip())
for line in file.readlines()
)
except Exception:
if not silent:
raise
pass # keep silent

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
try:
fullpath = self.fullpath + '.tmp'
with open(fullpath, 'w') as file:
Expand All @@ -59,8 +57,7 @@ def put(self, data, lease_period=0, silent=False):
os.rename(fullpath, self.fullpath)
return self
except Exception:
if not silent:
raise
pass # keep silent

def lease(self, period):
timestamp = _now() + _seconds(period)
Expand Down Expand Up @@ -90,11 +87,11 @@ def __init__(
self.maintenance_period = maintenance_period
self.retention_period = retention_period
self.write_timeout = write_timeout
self._maintenance_routine(silent=False)
# Run maintenance routine once upon instantiating
self._maintenance_routine()
self._maintenance_task = PeriodicTask(
interval=self.maintenance_period,
function=self._maintenance_routine,
kwargs={'silent': True},
)
self._maintenance_task.daemon = True
self._maintenance_task.start()
Expand All @@ -109,19 +106,18 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.close()

def _maintenance_routine(self, silent=False):
def _maintenance_routine(self):
try:
if not os.path.isdir(self.path):
os.makedirs(self.path)
except Exception:
if not silent:
raise
# Race case will throw OSError which we can ignore
pass
try:
for blob in self.gets():
pass
except Exception:
if not silent:
raise
pass # keep silent

def gets(self):
now = _now()
Expand Down Expand Up @@ -164,7 +160,7 @@ def get(self):
pass
return None

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
if not self._check_storage_size():
return None
blob = LocalFileBlob(os.path.join(
Expand All @@ -174,7 +170,7 @@ def put(self, data, lease_period=0, silent=False):
'{:08x}'.format(random.getrandbits(32)), # thread-safe random
),
))
return blob.put(data, lease_period=lease_period, silent=silent)
return blob.put(data, lease_period=lease_period)

def _check_storage_size(self):
size = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def _transmit_from_storage(self):
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get() # TODO: handle error
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
blob.lease(result)
else:
blob.delete(silent=True)
blob.delete()

def _transmit(self, envelopes):
"""
Expand All @@ -41,6 +41,8 @@ def _transmit(self, envelopes):
Return the next retry time in seconds for retryable failure.
This function should never throw exception.
"""
if not envelopes:
return 0
try:
response = requests.post(
url=self.options.endpoint,
Expand All @@ -51,8 +53,13 @@ def _transmit(self, envelopes):
},
timeout=self.options.timeout,
)
except requests.Timeout:
logger.warning(
'Request time out. Ingestion may be backed up. Retrying.')
return self.options.minimum_retry_interval
except Exception as ex: # TODO: consider RequestException
logger.warning('Transient client side error %s.', ex)
logger.warning(
'Retrying due to transient client side error %s.', ex)
# client side error (retryable)
return self.options.minimum_retry_interval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class ProcessorTimeMetric(object):
NAME = "\\Processor(_Total)\\% Processor Time"

@staticmethod
def get_value():
cpu_times_percent = psutil.cpu_times_percent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class AvailableMemoryMetric(object):
NAME = "\\Memory\\Available Bytes"

@staticmethod
def get_value():
return psutil.virtual_memory().available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

class ProcessMemoryMetric(object):
NAME = "\\Process(??APP_WIN32_PROC??)\\Private Bytes"

@staticmethod
def get_value():
try:
Expand Down Expand Up @@ -54,6 +55,7 @@ def __call__(self):

class ProcessCPUMetric(object):
NAME = "\\Process(??APP_WIN32_PROC??)\\% Processor Time"

@staticmethod
def get_value():
try:
Expand Down
Loading