Skip to content

Commit

Permalink
Use twisted logging not python logging
Browse files Browse the repository at this point in the history
The python logger is not passing to the twisted
logger by default or with the Log2Twisted setup.

From the twisted documentation
https://docs.twisted.org/en/twisted-18.7.0/core/howto/logger.html#compatibility-with-standard-library-logging
It's suggested not to do this anyway.

This commit moves all the logs to the twisted
logger twisted.logger.Logger.
  • Loading branch information
jacomago committed Apr 18, 2024
1 parent 360dc57 commit 75c68cb
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 83 deletions.
1 change: 1 addition & 0 deletions server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
build
10 changes: 5 additions & 5 deletions server/recceiver/announce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import struct

from twisted.internet import protocol, reactor
import logging
from twisted.logger import Logger

_log = logging.getLogger(__name__)
_log = Logger(__name__)


_Ann = struct.Struct('>HH4sHHI')
Expand Down Expand Up @@ -51,14 +51,14 @@ def sendOne(self):
self.D = self.reactor.callLater(self.delay, self.sendOne)
for A in self.udps:
try:
_log.debug('announce to %s',A)
_log.debug('announce to {s}',s=A)
self.transport.write(self.msg, A)
try:
self.udpErr.remove(A)
_log.warn('announce OK to %s',A)
_log.warn('announce OK to {s}',s=A)
except KeyError:
pass
except:
if A not in self.udpErr:
self.udpErr.add(A)
_log.exception('announce Error to %s',A)
_log.exception('announce Error to {s}',s=A)
7 changes: 4 additions & 3 deletions server/recceiver/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
from twisted.internet import reactor, defer
from twisted.internet.error import CannotListenError
from twisted.application import service
from twisted.logger import Logger

from .recast import CastFactory
from .udpbcast import SharedUDP
from .announce import Announcer
from .processors import ProcessorController

_log = logging.getLogger(__name__)
_log = Logger(__name__)

class Log2Twisted(logging.StreamHandler):
"""Print logging module stream to the twisted log
Expand Down Expand Up @@ -89,7 +90,7 @@ def privilegedStartService(self):

# Find out which port is in use
addr = self.tcp.getHost()
_log.info('RecService listening on ', addr)
_log.info('RecService listening on {addr}', addr=addr)

self.key = random.randint(0,0xffffffff)

Expand Down Expand Up @@ -142,7 +143,7 @@ def makeService(self, opts):
lvl = logging.WARN
else:
if not isinstance(lvl, (int, )):
_log.info("Invalid loglevel", lvlname)
print("Invalid loglevel", lvlname)
lvl = logging.WARN

fmt = conf.get('logformat', "%(levelname)s:%(name)s %(message)s")
Expand Down
81 changes: 41 additions & 40 deletions server/recceiver/cfstore.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-

import logging
from twisted.logger import Logger
import socket
_log = logging.getLogger(__name__)
_log = Logger(__name__)

from zope.interface import implementer

Expand Down Expand Up @@ -39,7 +39,7 @@
@implementer(interfaces.IProcessor)
class CFProcessor(service.Service):
def __init__(self, name, conf):
_log.info("CF_INIT %s", name)
_log.info("CF_INIT {name}", name=name)
self.name, self.conf = name, conf
self.channel_dict = defaultdict(list)
self.iocs = dict()
Expand Down Expand Up @@ -148,7 +148,7 @@ def waitForThread(_ignored):

def chainError(err):
if not err.check(defer.CancelledError):
_log.error("CF_COMMIT FAILURE: %s", err)
_log.error("CF_COMMIT FAILURE: {s}", s=err)
if self.cancelled:
if not err.check(defer.CancelledError):
raise defer.CancelledError()
Expand All @@ -167,9 +167,9 @@ def chainResult(_ignored):

def _commitWithThread(self, TR):
if not self.running:
raise defer.CancelledError('CF Processor is not running (TR: %s:%s)', TR.src.host, TR.src.port)
raise defer.CancelledError('CF Processor is not running (TR: {host}:{port})', host=TR.src.host, port=TR.src.port)

_log.info("CF_COMMIT: %s", TR)
_log.info("CF_COMMIT: {TR}", TR=TR)
"""
a dictionary with a list of records with their associated property info
pvInfo
Expand All @@ -195,7 +195,7 @@ def _commitWithThread(self, TR):
for rid, (recinfos) in TR.recinfos.items():
# find intersection of these sets
if rid not in pvInfo:
_log.warn('IOC: %s: PV not found for recinfo with RID: %s', iocid, rid)
_log.warn('IOC: {iocid}: PV not found for recinfo with RID: {rid}', iocid=iocid, rid=rid)
continue
recinfo_wl = [p for p in self.whitelist if p in recinfos.keys()]
if recinfo_wl:
Expand All @@ -207,7 +207,7 @@ def _commitWithThread(self, TR):

for rid, alias in TR.aliases.items():
if rid not in pvInfo:
_log.warn('IOC: %s: PV not found for alias with RID: %s', iocid, rid)
_log.warn('IOC: {iocid}: PV not found for alias with RID: {rid}', iocid=iocid, rid=rid)
continue
pvInfo[rid]['aliases'] = alias

Expand All @@ -220,19 +220,19 @@ def _commitWithThread(self, TR):
pvInfo[rid]['infoProperties'] = list()
pvInfo[rid]['infoProperties'].append(property)
else:
_log.debug('EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s', epics_env_var_name, iocName)
_log.debug('EPICS environment var {env_var} listed in environment_vars setting list not found in this IOC: {iocName}', env_var=epics_env_var_name, iocName=iocName)

delrec = list(TR.delrec)
_log.debug("Delete records: %s", delrec)
_log.debug("Delete records: {s}", s=delrec)


pvInfoByName = {}
for rid, (info) in pvInfo.items():
if info["pvName"] in pvInfoByName:
_log.warn("Commit contains multiple records with PV name: %s (%s)", info["pvName"], iocid)
_log.warn("Commit contains multiple records with PV name: {pv} ({iocid})", pv=info["pvName"], iocid=iocid)
continue
pvInfoByName[info["pvName"]] = info
_log.debug("Add record: %s: %s", rid, info)
_log.debug("Add record: {rid}: {info}", rid=rid, info=info)

if TR.initial:
"""Add IOC to source list """
Expand Down Expand Up @@ -267,7 +267,7 @@ def remove_channel(self, a, iocid):
if self.iocs[iocid]['channelcount'] == 0:
self.iocs.pop(iocid, None)
elif self.iocs[iocid]['channelcount'] < 0:
_log.error("Channel count negative: %s", iocid)
_log.error("Channel count negative: {s}", s=iocid)
if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs
del self.channel_dict[a]

Expand All @@ -287,22 +287,22 @@ def clean_service(self):
new_channels = []
for ch in channels or []:
new_channels.append(ch[u'name'])
_log.info("Total channels to update: %s", len(new_channels))
_log.info("Total channels to update: {nChannels}", nChannels=len(new_channels))
while len(new_channels) > 0:
_log.debug('Update "pvStatus" property to "Inactive" for %s channels', min(len(new_channels), 10000))
_log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000))
self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"},
channelNames=new_channels[:10000])
new_channels = new_channels[10000:]
_log.info("CF Clean Completed")
return
except RequestException as e:
_log.error("Clean service failed: %s", e)

_log.info("Clean service retry in %s seconds", min(60, sleep))
time.sleep(min(60, sleep))
_log.error("Clean service failed: {s}", s=e)
retry_seconds = min(60, sleep)
_log.info("Clean service retry in {retry_seconds} seconds", retry_seconds=retry_seconds)
time.sleep(retry_seconds)
sleep *= 1.5
if self.running == 0 and sleep >= retry_limit:
_log.info("Abandoning clean after %s seconds", retry_limit)
_log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit)
return


Expand All @@ -322,7 +322,7 @@ def dict_to_file(dict, iocs, conf):


def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime):
_log.info("CF Update IOC: %s", iocid)
_log.info("CF Update IOC: {iocid}", iocid=iocid)

# Consider making this function a class methed then 'proc' simply becomes 'self'
client = proc.client
Expand All @@ -338,7 +338,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
owner = iocs[iocid]["owner"]
iocTime = iocs[iocid]["time"]
else:
_log.warn('IOC Env Info not found: %s', iocid)
_log.warn('IOC Env Info not found: {iocid}', iocid=iocid)

if hostName is None or iocName is None:
raise Exception('missing hostName or iocName')
Expand All @@ -348,7 +348,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io

channels = []
"""A list of channels in channelfinder with the associated hostName and iocName"""
_log.debug('Find existing channels by IOCID: %s', iocid)
_log.debug('Find existing channels by IOCID: {iocid}', iocid=iocid)
old = client.findByArgs(prepareFindArgs(conf, [('iocid', iocid)]))
if proc.cancelled:
raise defer.CancelledError()
Expand All @@ -363,7 +363,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties'])
channels.append(ch)
_log.debug("Add existing channel to previous IOC: %s", channels[-1])
_log.debug("Add existing channel to previous IOC: {s}", s=channels[-1])
"""In case alias exist, also delete them"""
if (conf.get('alias', 'default') == 'on'):
if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]:
Expand All @@ -375,15 +375,15 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
if (conf.get('recordType', 'default') == 'on'):
ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties'])
channels.append(a)
_log.debug("Add existing alias to previous IOC: %s", channels[-1])
_log.debug("Add existing alias to previous IOC: {s}", s=channels[-1])

else:
"""Orphan the channel : mark as inactive, keep the old hostName and iocName"""
ch[u'properties'] = __merge_property_lists([{u'name': 'pvStatus', u'owner': owner, u'value': 'Inactive'},
{u'name': 'time', u'owner': owner, u'value': iocTime}],
ch[u'properties'])
channels.append(ch)
_log.debug("Add orphaned channel with no IOC: %s", channels[-1])
_log.debug("Add orphaned channel with no IOC: {s}", s=channels[-1])
"""Also orphan any alias"""
if (conf.get('alias', 'default') == 'on'):
if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]:
Expand All @@ -392,7 +392,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
{u'name': 'time', u'owner': owner, u'value': iocTime}],
a[u'properties'])
channels.append(a)
_log.debug("Add orphaned alias with no IOC: %s", channels[-1])
_log.debug("Add orphaned alias with no IOC: {s}", s=channels[-1])
else:
if ch[u'name'] in new: # case: channel in old and new
"""
Expand All @@ -403,7 +403,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
{u'name': 'time', u'owner': owner, u'value': iocTime}],
ch[u'properties'])
channels.append(ch)
_log.debug("Add existing channel with same IOC: %s", channels[-1])
_log.debug("Add existing channel with same IOC: {s}", s=channels[-1])
new.remove(ch[u'name'])

"""In case, alias exist"""
Expand All @@ -427,7 +427,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
u'owner': owner,
u'properties': aprops})
new.remove(a[u'name'])
_log.debug("Add existing alias with same IOC: %s", channels[-1])
_log.debug("Add existing alias with same IOC: {s}", s=channels[-1])
# now pvNames contains a list of pv's new on this host/ioc
"""A dictionary representing the current channelfinder information associated with the pvNames"""
existingChannels = {}
Expand All @@ -450,7 +450,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
searchStrings.append(searchString)

for eachSearchString in searchStrings:
_log.debug('Find existing channels by name: %s', eachSearchString)
_log.debug('Find existing channels by name: {search}', search=eachSearchString)
for ch in client.findByArgs(prepareFindArgs(conf, [('~name', eachSearchString)])):
existingChannels[ch["name"]] = ch
if proc.cancelled:
Expand All @@ -468,7 +468,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
existingChannel = existingChannels[pv]
existingChannel["properties"] = __merge_property_lists(newProps, existingChannel["properties"])
channels.append(existingChannel)
_log.debug("Add existing channel with different IOC: %s", channels[-1])
_log.debug("Add existing channel with different IOC: {s}", s=channels[-1])
"""in case, alias exists, update their properties too"""
if (conf.get('alias', 'default') == 'on'):
if pv in pvInfoByName and "aliases" in pvInfoByName[pv]:
Expand All @@ -484,14 +484,14 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
channels.append({u'name': a,
u'owner': owner,
u'properties': alProps})
_log.debug("Add existing alias with different IOC: %s", channels[-1])
_log.debug("Add existing alias with different IOC: {s}", s=channels[-1])

else:
"""New channel"""
channels.append({u'name': pv,
u'owner': owner,
u'properties': newProps})
_log.debug("Add new channel: %s", channels[-1])
_log.debug("Add new channel: {s}", s=channels[-1])
if (conf.get('alias', 'default') == 'on'):
if pv in pvInfoByName and "aliases" in pvInfoByName[pv]:
alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}]
Expand All @@ -501,8 +501,8 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
channels.append({u'name': a,
u'owner': owner,
u'properties': alProps})
_log.debug("Add new alias: %s", channels[-1])
_log.info("Total channels to update: %s %s", len(channels), iocName)
_log.debug("Add new alias: {s}", s=channels[-1])
_log.info("Total channels to update: {nChannels} {iocName}", nChannels=len(channels), iocName=iocName)
if len(channels) != 0:
client.set(channels=channels)
else:
Expand Down Expand Up @@ -552,7 +552,7 @@ def prepareFindArgs(conf, args):


def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime):
_log.info("Polling %s begins...", iocName)
_log.info("Polling {iocName} begins...", iocName=iocName)
sleep = 1
success = False
while not success:
Expand All @@ -561,9 +561,10 @@ def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io
success = True
return success
except RequestException as e:
_log.error("ChannelFinder update failed: %s", e)
_log.info("ChannelFinder update retry in %s seconds", min(60, sleep))
_log.error("ChannelFinder update failed: {s}", s=e)
retry_seconds = min(60, sleep)
_log.info("ChannelFinder update retry in {retry_seconds} seconds", retry_seconds=retry_seconds)
#_log.debug(str(channels_dict))
time.sleep(min(60, sleep))
time.sleep(retry_seconds)
sleep *= 1.5
_log.info("Polling %s complete", iocName)
_log.info("Polling {iocName} complete", iocName=iocName)
4 changes: 2 additions & 2 deletions server/recceiver/dbstore.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-

import itertools
import logging
from twisted.logger import Logger

from zope.interface import implementer

Expand All @@ -11,7 +11,7 @@

from . import interfaces

_log = logging.getLogger(__name__)
_log = Logger(__name__)

__all__ = ['DBProcessor']

Expand Down
Loading

0 comments on commit 75c68cb

Please sign in to comment.