Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v7r0] Accounting and Monitoring changed #3929

Closed
wants to merge 13 commits into from
2 changes: 1 addition & 1 deletion AccountingSystem/Client/Types/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self):
('ProcessingType', 'VARCHAR(256)'),
('Site', 'VARCHAR(32)'),
('FinalMajorStatus', 'VARCHAR(32)'),
('FinalMinorStatus', 'VARCHAR(64)')
('FinalMinorStatus', 'VARCHAR(256)')
]
self.definitionAccountingFields = [('CPUTime', "INT UNSIGNED"),
('NormCPUTime', "INT UNSIGNED"),
Expand Down
41 changes: 21 additions & 20 deletions Resources/MessageQueue/MQCommunication.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
""" General Message Queue Interface to create Consumers and Producers
"""

__RCSID__ = "$Id$"

from DIRAC import gLogger, S_OK
from DIRAC.Resources.MessageQueue.MQProducer import MQProducer
from DIRAC.Resources.MessageQueue.MQConsumer import MQConsumer
from DIRAC.Resources.MessageQueue.MQConnectionManager import MQConnectionManager
from DIRAC.Resources.MessageQueue.Utilities import getMQParamsFromCS
from DIRAC.Resources.MessageQueue.Utilities import generateDefaultCallback

connectionManager = MQConnectionManager() #To manage the active MQ connections.
__RCSID__ = "$Id$"

connectionManager = MQConnectionManager() # To manage the active MQ connections.

def createConsumer( mqURI, callback = generateDefaultCallback() ):

def createConsumer(mqURI, callback=generateDefaultCallback()):
"""
Function creates MQConsumer. All parameters are taken from the
Configuration Service based on the mqURI value.
Expand All @@ -27,16 +28,17 @@ def createConsumer( mqURI, callback = generateDefaultCallback() ):
S_OK/S_ERROR: with the consumer object in S_OK.

"""
result = _setupConnection( mqURI = mqURI, mType = "consumer" )
result = _setupConnection(mqURI=mqURI, mType="consumer")
if not result['OK']:
gLogger.error( 'Failed to createConsumer:', result['Message'] )
gLogger.error('Failed to createConsumer:', result['Message'])
return result
return S_OK( MQConsumer( mqManager = connectionManager,
mqURI = mqURI,
consumerId = result['Value'],
callback = callback ) )
return S_OK(MQConsumer(mqManager=connectionManager,
mqURI=mqURI,
consumerId=result['Value'],
callback=callback))


def createProducer( mqURI ):
def createProducer(mqURI):
"""
Function creates MQProducer. All parameters are taken from
the Configuration Service based on the mqURI value.
Expand All @@ -48,15 +50,15 @@ def createProducer( mqURI ):
Returns:
S_OK/S_ERROR: with the producer object in S_OK.
"""
result = _setupConnection( mqURI = mqURI, mType = "producer" )
result = _setupConnection(mqURI=mqURI, mType="producer")
if not result['OK']:
gLogger.error( 'Failed to createProducer:', result['Message'] )
return result
return S_OK( MQProducer( mqManager = connectionManager,
mqURI = mqURI,
producerId = result['Value'] ) )
return S_OK(MQProducer(mqManager=connectionManager,
mqURI=mqURI,
producerId=result['Value']))


def _setupConnection( mqURI, mType ):
def _setupConnection(mqURI, mType):
""" Function sets up the active MQ connection. All parameters are taken
from the Configuration Service based on the mqURI
value and the messenger Type mType.
Expand All @@ -69,9 +71,8 @@ def _setupConnection( mqURI, mType ):
Returns:
S_OK/S_ERROR: with the value of the messenger Id ( e.g. 'consumer4' ) in S_OK.
"""
result = getMQParamsFromCS( mqURI = mqURI )
result = getMQParamsFromCS(mqURI=mqURI)
if not result['OK']:
gLogger.error( 'Failed to setupConnection:', '%s' % ( result['Message'] ) )
return result
params = result['Value']
return connectionManager.startConnection( mqURI, params, mType )
return connectionManager.startConnection(mqURI, params, mType)