Skip to content

Commit

Permalink
Transport + storage changes (#903)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Jun 12, 2020
1 parent fce9d22 commit 7ec4ff6
Show file tree
Hide file tree
Showing 16 changed files with 2,563 additions and 2,512 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ OpenCensus - A stats collection and distributed tracing framework
=================================================================

|gitter|
|travisci|
|circleci|
|pypi|
|compat_check_pypi|
|compat_check_github|


.. |travisci| image:: https://travis-ci.org/census-instrumentation/opencensus-python.svg?branch=master
:target: https://travis-ci.org/census-instrumentation/opencensus-python
.. |circleci| image:: https://circleci.com/gh/census-instrumentation/opencensus-python.svg?style=shield
:target: https://circleci.com/gh/census-instrumentation/opencensus-python
.. |gitter| image:: https://badges.gitter.im/census-instrumentation/lobby.svg
Expand Down
3 changes: 3 additions & 0 deletions contrib/opencensus-ext-azure/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Change default path of local storage
([#903](https://github.com/census-instrumentation/opencensus-python/pull/903))

## 1.0.1
Released 2019-11-26

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
# limitations under the License.

import os
import sys
import tempfile

from opencensus.ext.azure.common.protocol import BaseObject

INGESTION_ENDPOINT = 'ingestionendpoint'
INSTRUMENTATION_KEY = 'instrumentationkey'
TEMPDIR_PREFIX = "opencensus-python-"


def process_options(options):
# Connection string/ikey
code_cs = parse_connection_string(options.connection_string)
code_ikey = options.instrumentation_key
env_cs = parse_connection_string(
Expand All @@ -46,6 +48,14 @@ def process_options(options):
or 'https://dc.services.visualstudio.com'
options.endpoint = endpoint + '/v2/track'

# storage path
if options.storage_path is None:
TEMPDIR_SUFFIX = options.instrumentation_key or ""
options.storage_path = os.path.join(
tempfile.gettempdir(),
TEMPDIR_PREFIX + TEMPDIR_SUFFIX
)


def parse_connection_string(connection_string):
if connection_string is None:
Expand Down Expand Up @@ -98,12 +108,7 @@ def __init__(self, *args, **kwargs):
proxy=None,
storage_maintenance_period=60,
storage_max_size=50*1024*1024, # 50MiB
storage_path=os.path.join(
os.path.expanduser('~'),
'.opencensus',
'.azure',
os.path.basename(sys.argv[0]) or '.console',
),
storage_path=None,
storage_retention_period=7*24*60*60,
timeout=10.0, # networking timeout in seconds
)
32 changes: 14 additions & 18 deletions contrib/opencensus-ext-azure/opencensus/ext/azure/common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,23 @@ class LocalFileBlob(object):
def __init__(self, fullpath):
self.fullpath = fullpath

def delete(self, silent=False):
def delete(self):
try:
os.remove(self.fullpath)
except Exception:
if not silent:
raise
pass # keep silent

def get(self, silent=False):
def get(self):
try:
with open(self.fullpath, 'r') as file:
return tuple(
json.loads(line.strip())
for line in file.readlines()
)
except Exception:
if not silent:
raise
pass # keep silent

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
try:
fullpath = self.fullpath + '.tmp'
with open(fullpath, 'w') as file:
Expand All @@ -59,8 +57,7 @@ def put(self, data, lease_period=0, silent=False):
os.rename(fullpath, self.fullpath)
return self
except Exception:
if not silent:
raise
pass # keep silent

def lease(self, period):
timestamp = _now() + _seconds(period)
Expand Down Expand Up @@ -90,11 +87,11 @@ def __init__(
self.maintenance_period = maintenance_period
self.retention_period = retention_period
self.write_timeout = write_timeout
self._maintenance_routine(silent=False)
# Run maintenance routine once upon instantiating
self._maintenance_routine()
self._maintenance_task = PeriodicTask(
interval=self.maintenance_period,
function=self._maintenance_routine,
kwargs={'silent': True},
)
self._maintenance_task.daemon = True
self._maintenance_task.start()
Expand All @@ -109,19 +106,18 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.close()

def _maintenance_routine(self, silent=False):
def _maintenance_routine(self):
try:
if not os.path.isdir(self.path):
os.makedirs(self.path)
except Exception:
if not silent:
raise
# Race case will throw OSError which we can ignore
pass
try:
for blob in self.gets():
pass
except Exception:
if not silent:
raise
pass # keep silent

def gets(self):
now = _now()
Expand Down Expand Up @@ -164,7 +160,7 @@ def get(self):
pass
return None

def put(self, data, lease_period=0, silent=False):
def put(self, data, lease_period=0):
if not self._check_storage_size():
return None
blob = LocalFileBlob(os.path.join(
Expand All @@ -174,7 +170,7 @@ def put(self, data, lease_period=0, silent=False):
'{:08x}'.format(random.getrandbits(32)), # thread-safe random
),
))
return blob.put(data, lease_period=lease_period, silent=silent)
return blob.put(data, lease_period=lease_period)

def _check_storage_size(self):
size = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def _transmit_from_storage(self):
# give a few more seconds for blob lease operation
# to reduce the chance of race (for perf consideration)
if blob.lease(self.options.timeout + 5):
envelopes = blob.get() # TODO: handle error
envelopes = blob.get()
result = self._transmit(envelopes)
if result > 0:
blob.lease(result)
else:
blob.delete(silent=True)
blob.delete()

def _transmit(self, envelopes):
"""
Expand All @@ -41,6 +41,8 @@ def _transmit(self, envelopes):
Return the next retry time in seconds for retryable failure.
This function should never throw exception.
"""
if not envelopes:
return 0
try:
response = requests.post(
url=self.options.endpoint,
Expand All @@ -51,8 +53,13 @@ def _transmit(self, envelopes):
},
timeout=self.options.timeout,
)
except requests.Timeout:
logger.warning(
'Request time out. Ingestion may be backed up. Retrying.')
return self.options.minimum_retry_interval
except Exception as ex: # TODO: consider RequestException
logger.warning('Transient client side error %s.', ex)
logger.warning(
'Retrying due to transient client side error %s.', ex)
# client side error (retryable)
return self.options.minimum_retry_interval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class ProcessorTimeMetric(object):
NAME = "\\Processor(_Total)\\% Processor Time"

@staticmethod
def get_value():
cpu_times_percent = psutil.cpu_times_percent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

class AvailableMemoryMetric(object):
NAME = "\\Memory\\Available Bytes"

@staticmethod
def get_value():
return psutil.virtual_memory().available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

class ProcessMemoryMetric(object):
NAME = "\\Process(??APP_WIN32_PROC??)\\Private Bytes"

@staticmethod
def get_value():
try:
Expand Down Expand Up @@ -54,6 +55,7 @@ def __call__(self):

class ProcessCPUMetric(object):
NAME = "\\Process(??APP_WIN32_PROC??)\\% Processor Time"

@staticmethod
def get_value():
try:
Expand Down
Loading

0 comments on commit 7ec4ff6

Please sign in to comment.