diff --git a/server/.gitignore b/server/.gitignore new file mode 100644 index 0000000..c795b05 --- /dev/null +++ b/server/.gitignore @@ -0,0 +1 @@ +build \ No newline at end of file diff --git a/server/recceiver/announce.py b/server/recceiver/announce.py index a6af907..1c3aee4 100644 --- a/server/recceiver/announce.py +++ b/server/recceiver/announce.py @@ -4,9 +4,9 @@ import struct from twisted.internet import protocol, reactor -import logging +from twisted.logger import Logger -_log = logging.getLogger(__name__) +_log = Logger(__name__) _Ann = struct.Struct('>HH4sHHI') @@ -51,14 +51,14 @@ def sendOne(self): self.D = self.reactor.callLater(self.delay, self.sendOne) for A in self.udps: try: - _log.debug('announce to %s',A) + _log.debug('announce to {s}',s=A) self.transport.write(self.msg, A) try: self.udpErr.remove(A) - _log.warn('announce OK to %s',A) + _log.warn('announce OK to {s}',s=A) except KeyError: pass except: if A not in self.udpErr: self.udpErr.add(A) - _log.exception('announce Error to %s',A) + _log.exception('announce Error to {s}',s=A) diff --git a/server/recceiver/application.py b/server/recceiver/application.py index 87f75c7..597022b 100644 --- a/server/recceiver/application.py +++ b/server/recceiver/application.py @@ -10,13 +10,14 @@ from twisted.internet import reactor, defer from twisted.internet.error import CannotListenError from twisted.application import service +from twisted.logger import Logger from .recast import CastFactory from .udpbcast import SharedUDP from .announce import Announcer from .processors import ProcessorController -_log = logging.getLogger(__name__) +_log = Logger(__name__) class Log2Twisted(logging.StreamHandler): """Print logging module stream to the twisted log @@ -89,7 +90,7 @@ def privilegedStartService(self): # Find out which port is in use addr = self.tcp.getHost() - _log.info('RecService listening on ', addr) + _log.info('RecService listening on {addr}', addr=addr) self.key = random.randint(0,0xffffffff) @@ -142,7 +143,7 @@ def makeService(self, opts): lvl = logging.WARN else: if not isinstance(lvl, (int, )): - _log.info("Invalid loglevel", lvlname) + print("Invalid loglevel", lvlname) lvl = logging.WARN fmt = conf.get('logformat', "%(levelname)s:%(name)s %(message)s") diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 5ccae5a..097acaa 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- -import logging +from twisted.logger import Logger import socket -_log = logging.getLogger(__name__) +_log = Logger(__name__) from zope.interface import implementer @@ -39,7 +39,7 @@ @implementer(interfaces.IProcessor) class CFProcessor(service.Service): def __init__(self, name, conf): - _log.info("CF_INIT %s", name) + _log.info("CF_INIT {name}", name=name) self.name, self.conf = name, conf self.channel_dict = defaultdict(list) self.iocs = dict() @@ -148,7 +148,7 @@ def waitForThread(_ignored): def chainError(err): if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: %s", err) + _log.error("CF_COMMIT FAILURE: {s}", s=err) if self.cancelled: if not err.check(defer.CancelledError): raise defer.CancelledError() @@ -167,9 +167,9 @@ def chainResult(_ignored): def _commitWithThread(self, TR): if not self.running: - raise defer.CancelledError('CF Processor is not running (TR: %s:%s)', TR.src.host, TR.src.port) + raise defer.CancelledError('CF Processor is not running (TR: {host}:{port})', host=TR.src.host, port=TR.src.port) - _log.info("CF_COMMIT: %s", TR) + _log.info("CF_COMMIT: {TR}", TR=TR) """ a dictionary with a list of records with their associated property info pvInfo @@ -195,7 +195,7 @@ def _commitWithThread(self, TR): for rid, (recinfos) in TR.recinfos.items(): # find intersection of these sets if rid not in pvInfo: - _log.warn('IOC: %s: PV not found for recinfo with RID: %s', iocid, rid) + _log.warn('IOC: {iocid}: PV not found for recinfo with RID: {rid}', iocid=iocid, rid=rid) continue recinfo_wl = [p for p in self.whitelist if p in recinfos.keys()] if recinfo_wl: @@ -207,7 +207,7 @@ def _commitWithThread(self, TR): for rid, alias in TR.aliases.items(): if rid not in pvInfo: - _log.warn('IOC: %s: PV not found for alias with RID: %s', iocid, rid) + _log.warn('IOC: {iocid}: PV not found for alias with RID: {rid}', iocid=iocid, rid=rid) continue pvInfo[rid]['aliases'] = alias @@ -220,19 +220,19 @@ def _commitWithThread(self, TR): pvInfo[rid]['infoProperties'] = list() pvInfo[rid]['infoProperties'].append(property) else: - _log.debug('EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s', epics_env_var_name, iocName) + _log.debug('EPICS environment var {env_var} listed in environment_vars setting list not found in this IOC: {iocName}', env_var=epics_env_var_name, iocName=iocName) delrec = list(TR.delrec) - _log.debug("Delete records: %s", delrec) + _log.debug("Delete records: {s}", s=delrec) pvInfoByName = {} for rid, (info) in pvInfo.items(): if info["pvName"] in pvInfoByName: - _log.warn("Commit contains multiple records with PV name: %s (%s)", info["pvName"], iocid) + _log.warn("Commit contains multiple records with PV name: {pv} ({iocid})", pv=info["pvName"], iocid=iocid) continue pvInfoByName[info["pvName"]] = info - _log.debug("Add record: %s: %s", rid, info) + _log.debug("Add record: {rid}: {info}", rid=rid, info=info) if TR.initial: """Add IOC to source list """ @@ -267,7 +267,7 @@ def remove_channel(self, a, iocid): if self.iocs[iocid]['channelcount'] == 0: self.iocs.pop(iocid, None) elif self.iocs[iocid]['channelcount'] < 0: - _log.error("Channel count negative: %s", iocid) + _log.error("Channel count negative: {s}", s=iocid) if len(self.channel_dict[a]) <= 0: # case: channel has no more iocs del self.channel_dict[a] @@ -287,22 +287,22 @@ def clean_service(self): new_channels = [] for ch in channels or []: new_channels.append(ch[u'name']) - _log.info("Total channels to update: %s", len(new_channels)) + _log.info("Total channels to update: {nChannels}", nChannels=len(new_channels)) while len(new_channels) > 0: - _log.debug('Update "pvStatus" property to "Inactive" for %s channels', min(len(new_channels), 10000)) + _log.debug('Update "pvStatus" property to "Inactive" for {n_channels} channels', n_channels=min(len(new_channels), 10000)) self.client.update(property={u'name': 'pvStatus', u'owner': owner, u'value': "Inactive"}, channelNames=new_channels[:10000]) new_channels = new_channels[10000:] _log.info("CF Clean Completed") return except RequestException as e: - _log.error("Clean service failed: %s", e) - - _log.info("Clean service retry in %s seconds", min(60, sleep)) - time.sleep(min(60, sleep)) + _log.error("Clean service failed: {s}", s=e) + retry_seconds = min(60, sleep) + _log.info("Clean service retry in {retry_seconds} seconds", retry_seconds=retry_seconds) + time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after %s seconds", retry_limit) + _log.info("Abandoning clean after {retry_limit} seconds", retry_limit=retry_limit) return @@ -322,7 +322,7 @@ def dict_to_file(dict, iocs, conf): def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime): - _log.info("CF Update IOC: %s", iocid) + _log.info("CF Update IOC: {iocid}", iocid=iocid) # Consider making this function a class methed then 'proc' simply becomes 'self' client = proc.client @@ -338,7 +338,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io owner = iocs[iocid]["owner"] iocTime = iocs[iocid]["time"] else: - _log.warn('IOC Env Info not found: %s', iocid) + _log.warn('IOC Env Info not found: {iocid}', iocid=iocid) if hostName is None or iocName is None: raise Exception('missing hostName or iocName') @@ -348,7 +348,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels = [] """A list of channels in channelfinder with the associated hostName and iocName""" - _log.debug('Find existing channels by IOCID: %s', iocid) + _log.debug('Find existing channels by IOCID: {iocid}', iocid=iocid) old = client.findByArgs(prepareFindArgs(conf, [('iocid', iocid)])) if proc.cancelled: raise defer.CancelledError() @@ -363,7 +363,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[ch[u'name']][-1]]["recordType"]}), ch[u'properties']) channels.append(ch) - _log.debug("Add existing channel to previous IOC: %s", channels[-1]) + _log.debug("Add existing channel to previous IOC: {s}", s=channels[-1]) """In case alias exist, also delete them""" if (conf.get('alias', 'default') == 'on'): if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: @@ -375,7 +375,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io if (conf.get('recordType', 'default') == 'on'): ch[u'properties'] = __merge_property_lists(ch[u'properties'].append({u'name': 'recordType', u'owner': owner, u'value': iocs[channels_dict[a[u'name']][-1]]["recordType"]}), ch[u'properties']) channels.append(a) - _log.debug("Add existing alias to previous IOC: %s", channels[-1]) + _log.debug("Add existing alias to previous IOC: {s}", s=channels[-1]) else: """Orphan the channel : mark as inactive, keep the old hostName and iocName""" @@ -383,7 +383,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], ch[u'properties']) channels.append(ch) - _log.debug("Add orphaned channel with no IOC: %s", channels[-1]) + _log.debug("Add orphaned channel with no IOC: {s}", s=channels[-1]) """Also orphan any alias""" if (conf.get('alias', 'default') == 'on'): if ch[u'name'] in pvInfoByName and "aliases" in pvInfoByName[ch[u'name']]: @@ -392,7 +392,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], a[u'properties']) channels.append(a) - _log.debug("Add orphaned alias with no IOC: %s", channels[-1]) + _log.debug("Add orphaned alias with no IOC: {s}", s=channels[-1]) else: if ch[u'name'] in new: # case: channel in old and new """ @@ -403,7 +403,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io {u'name': 'time', u'owner': owner, u'value': iocTime}], ch[u'properties']) channels.append(ch) - _log.debug("Add existing channel with same IOC: %s", channels[-1]) + _log.debug("Add existing channel with same IOC: {s}", s=channels[-1]) new.remove(ch[u'name']) """In case, alias exist""" @@ -427,7 +427,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io u'owner': owner, u'properties': aprops}) new.remove(a[u'name']) - _log.debug("Add existing alias with same IOC: %s", channels[-1]) + _log.debug("Add existing alias with same IOC: {s}", s=channels[-1]) # now pvNames contains a list of pv's new on this host/ioc """A dictionary representing the current channelfinder information associated with the pvNames""" existingChannels = {} @@ -450,7 +450,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io searchStrings.append(searchString) for eachSearchString in searchStrings: - _log.debug('Find existing channels by name: %s', eachSearchString) + _log.debug('Find existing channels by name: {search}', search=eachSearchString) for ch in client.findByArgs(prepareFindArgs(conf, [('~name', eachSearchString)])): existingChannels[ch["name"]] = ch if proc.cancelled: @@ -468,7 +468,7 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io existingChannel = existingChannels[pv] existingChannel["properties"] = __merge_property_lists(newProps, existingChannel["properties"]) channels.append(existingChannel) - _log.debug("Add existing channel with different IOC: %s", channels[-1]) + _log.debug("Add existing channel with different IOC: {s}", s=channels[-1]) """in case, alias exists, update their properties too""" if (conf.get('alias', 'default') == 'on'): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: @@ -484,14 +484,14 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels.append({u'name': a, u'owner': owner, u'properties': alProps}) - _log.debug("Add existing alias with different IOC: %s", channels[-1]) + _log.debug("Add existing alias with different IOC: {s}", s=channels[-1]) else: """New channel""" channels.append({u'name': pv, u'owner': owner, u'properties': newProps}) - _log.debug("Add new channel: %s", channels[-1]) + _log.debug("Add new channel: {s}", s=channels[-1]) if (conf.get('alias', 'default') == 'on'): if pv in pvInfoByName and "aliases" in pvInfoByName[pv]: alProps = [{u'name': 'alias', u'owner': owner, u'value': pv}] @@ -501,8 +501,8 @@ def __updateCF__(proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io channels.append({u'name': a, u'owner': owner, u'properties': alProps}) - _log.debug("Add new alias: %s", channels[-1]) - _log.info("Total channels to update: %s %s", len(channels), iocName) + _log.debug("Add new alias: {s}", s=channels[-1]) + _log.info("Total channels to update: {nChannels} {iocName}", nChannels=len(channels), iocName=iocName) if len(channels) != 0: client.set(channels=channels) else: @@ -552,7 +552,7 @@ def prepareFindArgs(conf, args): def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, iocTime): - _log.info("Polling %s begins...", iocName) + _log.info("Polling {iocName} begins...", iocName=iocName) sleep = 1 success = False while not success: @@ -561,9 +561,10 @@ def poll(update, proc, pvInfoByName, delrec, hostName, iocName, iocid, owner, io success = True return success except RequestException as e: - _log.error("ChannelFinder update failed: %s", e) - _log.info("ChannelFinder update retry in %s seconds", min(60, sleep)) + _log.error("ChannelFinder update failed: {s}", s=e) + retry_seconds = min(60, sleep) + _log.info("ChannelFinder update retry in {retry_seconds} seconds", retry_seconds=retry_seconds) #_log.debug(str(channels_dict)) - time.sleep(min(60, sleep)) + time.sleep(retry_seconds) sleep *= 1.5 - _log.info("Polling %s complete", iocName) + _log.info("Polling {iocName} complete", iocName=iocName) diff --git a/server/recceiver/dbstore.py b/server/recceiver/dbstore.py index 189b7f6..852900d 100644 --- a/server/recceiver/dbstore.py +++ b/server/recceiver/dbstore.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- import itertools -import logging +from twisted.logger import Logger from zope.interface import implementer @@ -11,7 +11,7 @@ from . import interfaces -_log = logging.getLogger(__name__) +_log = Logger(__name__) __all__ = ['DBProcessor'] diff --git a/server/recceiver/processors.py b/server/recceiver/processors.py index 65a8355..5492779 100644 --- a/server/recceiver/processors.py +++ b/server/recceiver/processors.py @@ -1,9 +1,6 @@ # -*- coding: utf-8 -*- -import logging -_log = logging.getLogger(__name__) - - +from twisted.logger import Logger import sys from zope.interface import implementer @@ -24,6 +21,7 @@ from twisted.application import service from . import interfaces +_log = Logger(__name__) __all__ = [ 'ShowProcessor', @@ -83,7 +81,7 @@ def __init__(self, cfile=None): plugs = {} for plug in plugin.getPlugins(interfaces.IProcessorFactory): - _log.debug('Available plugin: %s', plug.name) + _log.debug('Available plugin: {name}', name=plug.name) plugs[plug.name] = plug self.procs = [] @@ -115,13 +113,13 @@ def commit(self, trans): def punish(err, B): if err.check(defer.CancelledError): - _log.debug('Cancel processing: %s: %s', B.name, trans) + _log.debug('Cancel processing: {name}: {trans}', name=B.name, trans=trans) return err try: self.procs.remove(B) - _log.error('Remove processor: %s: %s', B.name, err) + _log.error('Remove processor: {name}: {err}', name=B.name, err=err) except: - _log.debug('Remove processor: %s: aleady removed', B.name) + _log.debug('Remove processor: {name}: aleady removed', name=B.name) return err defers = [ defer.maybeDeferred(P.commit, trans).addErrback(punish, P) for P in self.procs ] @@ -142,7 +140,7 @@ def __init__(self, name, opts): def startService(self): service.Service.startService(self) - _log.info("Show processor '%s' starting", self.name) + _log.info("Show processor '{processor}' starting", processor=self.name) def commit(self, transaction): @@ -168,27 +166,25 @@ def releaseLock(result): def _commit(self, trans): - _log.debug("# Show processor '%s' commit", self.name) - if not _log.isEnabledFor(logging.INFO): - return - _log.info("# From %s:%d", trans.src.host, trans.src.port) + _log.debug("# Show processor '{name}' commit", name=self.name) + _log.info("# From {host}:{port}", host=trans.src.host,port=trans.src.port) if not trans.connected: _log.info("# connection lost") - for I in trans.infos.items(): - _log.info(" epicsEnvSet(\"%s\",\"%s\")", *I) + for item in trans.infos.items(): + _log.info(" epicsEnvSet('{name}','{value}')", name=item[0], value=item[1]) for rid, (rname, rtype) in trans.addrec.items(): - _log.info(" record(%s, \"%s\") {", rtype, rname) - for A in trans.aliases.get(rid, []): - _log.info(" alias(\"%s\")", A) - for I in trans.recinfos.get(rid, {}).items(): - _log.info(" info(%s,\"%s\")", *I) + _log.info(" record({rtype}, \"{rname}\") {", rtype=rtype, rname=rname) + for alias in trans.aliases.get(rid, []): + _log.info(" alias(\"{alias}\")", alias=alias) + for item in trans.recinfos.get(rid, {}).items(): + _log.info(" info({name},\"{value}\")", name=item[0], value=[1]) _log.info(" }") yield _log.info("# End") def stopService(self): service.Service.stopService(self) - _log.info("Show processor '%s' stopping", self.name) + _log.info("Show processor '{name}' stopping", name=self.name) @implementer(plugin.IPlugin, interfaces.IProcessorFactory) diff --git a/server/recceiver/recast.py b/server/recceiver/recast.py index 6f6fce2..07db729 100644 --- a/server/recceiver/recast.py +++ b/server/recceiver/recast.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- -import logging -_log = logging.getLogger(__name__) +from twisted.logger import Logger +_log = Logger(__name__) import sys, time if sys.version_info[0] < 3: @@ -111,7 +111,7 @@ def recvHeader(self, data): self.restartPingTimer() magic, msgid, blen = _Head.unpack(data) if magic!=_M: - _log.error('Protocol error! Bad magic %s',magic) + _log.error('Protocol error! Bad magic {magic}',magic=magic) self.transport.loseConnection() return self.msgid = msgid @@ -125,7 +125,7 @@ def recvHeader(self, data): def recvClientGreeting(self, body): cver, ctype, skey = _c_greet.unpack(body[:_c_greet.size]) if ctype!=0: - _log.error("I don't understand you! %s", ctype) + _log.error("I don't understand you! {s}", s=ctype) self.transport.loseConnection() return self.version = min(self.version, cver) @@ -137,7 +137,7 @@ def recvClientGreeting(self, body): def recvPong(self, body): nonce, = _ping.unpack(body[:_ping.size]) if nonce != self.nonce: - _log.error('pong nonce does not match! %s!=%s',nonce,self.nonce) + _log.error('pong nonce does not match! {pong_nonce}!={nonce}',pong_nonce=nonce,nonce=self.nonce) self.transport.loseConnection() else: _log.debug('pong nonce match') @@ -198,7 +198,7 @@ def recvDone(self, body): size_kb = self.uploadSize / 1024 rate_kbs = size_kb / elapsed_s src = "{}:{}".format(self.sess.ep.host, self.sess.ep.port) - _log.info('Done message from %s: uploaded %dkB in %.3fs (%dkB/s)', src, size_kb, elapsed_s, rate_kbs) + _log.info('Done message from {src}: uploaded {size_kb}kB in {elapsed_s}s ({rate_kbs}kB/s)', src=src, size_kb=size_kb, elapsed_s=elapsed_s, rate_kbs=rate_kbs) return self.getInitialState() @@ -241,7 +241,7 @@ class CollectionSession(object): reactor = reactor def __init__(self, proto, endpoint): - _log.info("Open session from %s",endpoint) + _log.info("Open session from {endpoint}",endpoint=endpoint) self.proto, self.ep = proto, endpoint self.TR = Transaction(self.ep, id(self)) self.TR.initial = True @@ -250,7 +250,7 @@ def __init__(self, proto, endpoint): self.dirty = False def close(self): - _log.info("Close session from %s", self.ep) + _log.info("Close session from {ep}", ep=self.ep) def suppressCancelled(err): if not err.check(defer.CancelledError): @@ -267,7 +267,7 @@ def suppressCancelled(err): self.flush() def flush(self, connected=True): - _log.info("Flush session from %s", self.ep) + _log.info("Flush session from {s}", s=self.ep) self.T = None if not self.dirty: return @@ -276,15 +276,15 @@ def flush(self, connected=True): self.dirty = False def commit(_ignored): - _log.info('Commit: %s', TR) + _log.info('Commit: {TR}', TR=TR) return defer.maybeDeferred(self.factory.commit, TR) def abort(err): if err.check(defer.CancelledError): - _log.info('Commit cancelled: %s', TR) + _log.info('Commit cancelled: {TR}', TR=TR) return err else: - _log.error('Commit failure: %s', err) + _log.error('Commit failure: {err}', err=err) self.proto.transport.loseConnection() raise defer.CancelledError()