-
Notifications
You must be signed in to change notification settings - Fork 15
/
simple_throttler.py
74 lines (69 loc) · 3.33 KB
/
simple_throttler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import datetime
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
from pandaharvester.harvestercore.plugin_base import PluginBase
# logger
baseLogger = core_utils.setup_logger("simple_throttler")
# simple throttler
class SimpleThrottler(PluginBase):
# constructor
def __init__(self, **kwarg):
# logic type : AND: throttled if all rules are satisfied, OR: throttled if one rule is satisfied
self.logicType = "OR"
PluginBase.__init__(self, **kwarg)
self.dbProxy = DBProxy()
# check if to be throttled
def to_be_throttled(self, queue_config, queue_config_mapper=None):
tmpLog = self.make_logger(baseLogger, f"computingSite={queue_config.queueName}", method_name="to_be_throttled")
tmpLog.debug("start")
# set default return vale
if self.logicType == "OR":
retVal = False, "no rule was satisfied"
else:
retVal = True, "all rules were satisfied"
# loop over all rules
criteriaList = []
maxMissedList = []
timeNow = core_utils.naive_utcnow()
for rule in self.rulesForMissed:
# convert rule to criteria
if rule["level"] == "site":
criteria = dict()
criteria["siteName"] = queue_config.siteName
criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
criteriaList.append(criteria)
maxMissedList.append(rule["maxMissed"])
elif rule["level"] == "pq":
criteria = dict()
criteria["computingSite"] = queue_config.queueName
criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
criteriaList.append(criteria)
maxMissedList.append(rule["maxMissed"])
elif rule["level"] == "ce":
elmName = "computingElements"
if elmName not in queue_config.submitter:
tmpLog.debug(f"skipped since {elmName} is undefined in submitter config")
continue
for ce in queue_config.submitter[elmName]:
criteria = dict()
criteria["computingElement"] = ce
criteria["timeLimit"] = timeNow - datetime.timedelta(minutes=rule["timeWindow"])
criteriaList.append(criteria)
maxMissedList.append(rule["maxMissed"])
# loop over all criteria
for criteria, maxMissed in zip(criteriaList, maxMissedList):
nMissed = self.dbProxy.get_num_missed_workers(queue_config.queueName, criteria)
if nMissed > maxMissed:
if self.logicType == "OR":
tmpMsg = f"logic={self.logicType} and "
tmpMsg += f"nMissed={nMissed} > maxMissed={maxMissed} for {str(criteria)}"
retVal = True, tmpMsg
break
else:
if self.logicType == "AND":
tmpMsg = f"logic={self.logicType} and "
tmpMsg += f"nMissed={nMissed} <= maxMissed={maxMissed} for {str(criteria)}"
retVal = False, tmpMsg
break
tmpLog.debug("ret={0} : {1}".format(*retVal))
return retVal