Skip to content

Commit

Permalink
Fix up the DBS3Uploader and modify it to support an upload only mode.…
Browse files Browse the repository at this point in the history
… Kill

off SQLite in DBS3Buffer and DBSBuffer.
  • Loading branch information
Stephen Foulkes committed Jan 23, 2013
1 parent d525feb commit e698849
Show file tree
Hide file tree
Showing 103 changed files with 201 additions and 2,354 deletions.
49 changes: 31 additions & 18 deletions src/python/WMComponent/DBS3Buffer/DBSBufferBlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,13 @@ def __init__(self, name, location, das):


self.data = {'dataset_conf_list': [], # List of dataset configurations
'file_conf_list': [], # List of files, with the configuration for each
'file_conf_list': [], # List of files, the configuration for each
'files': [], # List of file objects
'block': {}, # Dict of block info
'block_parent_list': [], # List of block parents
'processing_era': {}, # Dict of processing era info
'ds_parent_list': [], # List of parent datasets
'acquisition_era': {}, # Dict of acquisition era information
'primds': {}, # Dict of primary dataset info
'dataset': {}, # Dict of processed dataset info
'physics_group_name': {}, # Physics Group Name
'file_parent_list': []} # List of file parents

self.files = []
Expand All @@ -68,6 +65,10 @@ def __init__(self, name, location, das):
self.data['block']['origin_site_name'] = location
self.data['block']['open_for_writing'] = 0 # If we're sending a block, it better be open

self.data['block']['create_by'] = "WMAgent"
self.data['block']['creation_date'] = int(time.time())
self.data['block']['block_size'] = 0
self.data['block']['file_count'] = 0
return


Expand All @@ -89,9 +90,6 @@ def addFile(self, dbsFile):
Add a DBSBufferFile object to our block
"""



if dbsFile['id'] in [x['id'] for x in self.files]:
msg = "Duplicate file inserted into DBSBlock: %i\n" % (dbsFile['id'])
msg += "Ignoring this file for now!\n"
Expand All @@ -103,12 +101,21 @@ def addFile(self, dbsFile):
return

self.files.append(dbsFile)
self.data['block']['block_size'] += dbsFile['size']
self.data['block']['file_count'] += 1

# Assemble information for the file itself
fileDict = {}
fileDict['file_type'] = 'EDM'
fileDict['logical_file_name'] = dbsFile['lfn']
fileDict['file_size'] = dbsFile['size']
fileDict['event_count'] = dbsFile['events']
fileDict['adler32'] = "NOTSET"
fileDict['md5'] = "NOTSET"
fileDict['last_modified_by'] = "WMAgent"
fileDict['last_modification_date'] = int(time.time())
fileDict['auto_cross_section'] = 0.0

# Do the checksums
for cktype in dbsFile['checksums'].keys():
cksum = dbsFile['checksums'][cktype]
Expand Down Expand Up @@ -198,14 +205,15 @@ def addDatasetParent(self, parent):
self.data['ds_parent_list'].append({'parent_dataset': parent})
return

def setProcessingVer(self, era):
def setProcessingVer(self, procVer):
"""
_setProcessingEra_
_setProcessingVer_
Set the block's processing era
Set the block's processing version.
"""

self.data['processing_era']['processing_version'] = era
self.data["processing_era"]["processing_version"] = procVer
self.data["processing_era"]["create_by"] = "WMAgent"
self.data["processing_era"]["description"] = ""
return

def setAcquisitionEra(self, era, date = 123456789):
Expand All @@ -214,7 +222,6 @@ def setAcquisitionEra(self, era, date = 123456789):
Set the acquisition era for the block
"""

self.data['acquisition_era']['acquisition_era_name'] = era
self.data['acquisition_era']['start_date'] = date
return
Expand All @@ -235,11 +242,8 @@ def hasDataset(self):
Check and see if the dataset has been properly set
"""

return self.data['dataset'].get('dataset', False)



def setDataset(self, datasetName, primaryType,
datasetType, physicsGroup = None, overwrite = False, valid = 1):
"""
Expand All @@ -248,7 +252,6 @@ def setDataset(self, datasetName, primaryType,
Set all the information concerning a single dataset, including
the primary, processed and tier info
"""

if self.hasDataset() and not overwrite:
# Do nothing, we already have a dataset
return
Expand Down Expand Up @@ -276,7 +279,8 @@ def setDataset(self, datasetName, primaryType,
# Do the primary dataset
self.data['primds']['primary_ds_name'] = primary
self.data['primds']['primary_ds_type'] = primaryType

self.data['primds']['create_by'] = "WMAgent"
self.data['primds']['creation_date'] = int(time.time())

# Do the processed
self.data['dataset']['physics_group_name'] = physicsGroup
Expand All @@ -285,6 +289,11 @@ def setDataset(self, datasetName, primaryType,
self.data['dataset']['dataset_access_type'] = datasetType
self.data['dataset']['dataset'] = datasetName

# Add misc meta data.
self.data['dataset']['create_by'] = "WMAgent"
self.data['dataset']['last_modified_by'] = "WMAgent"
self.data['dataset']['creation_date'] = int(time.time())
self.data['dataset']['last_modification_date'] = int(time.time())
return


Expand Down Expand Up @@ -383,5 +392,9 @@ def FillFromDBSBuffer(self, blockInfo):
self.startTime = blockInfo.get('creation_date')
self.inBuff = True

if 'status' in blockInfo.keys():
self.status = blockInfo['status']
del blockInfo['status']

for key in blockInfo.keys():
self.data['block'][key] = blockInfo.get(key)
23 changes: 8 additions & 15 deletions src/python/WMComponent/DBS3Buffer/DBSBufferUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
"""
_DBSBufferUtil_
APIs related to using the DBSBuffer
APIs related to using the DBSBuffer.
"""




import logging
import os
import threading

from WMCore.WMFactory import WMFactory
from WMCore.DAOFactory import DAOFactory
from WMComponent.DBS3Buffer.DBSBufferFile import DBSBufferFile
Expand Down Expand Up @@ -208,18 +204,17 @@ def findUploadableDAS(self):



def findOpenBlocks(self):
def findOpenBlocks(self, dbs3OnlyUpload = False):
"""
_findOpenBlocks_
This should find all blocks.
"""

myThread = threading.currentThread()
existingTransaction = self.beginTransaction()

openBlocks = self.daoFactory(classname = "GetOpenBlocks")
result = openBlocks.execute(conn = self.getDBConn(),
result = openBlocks.execute(dbs3OnlyUpload, conn = self.getDBConn(),
transaction=self.existingTransaction())

self.commitTransaction(existingTransaction)
Expand Down Expand Up @@ -250,7 +245,7 @@ def loadBlocksByDAS(self, das):



def loadBlocks(self, blocknames):
def loadBlocks(self, blocknames, dbs3UploadOnly):
"""
_loadBlocksByDAS_
Expand All @@ -266,12 +261,11 @@ def loadBlocks(self, blocknames):
existingTransaction = self.beginTransaction()

findBlocks = self.daoFactory(classname = "LoadBlocks")
result = findBlocks.execute(blocknames = blocknames,
result = findBlocks.execute(blocknames, dbs3UploadOnly,
conn = self.getDBConn(),
transaction=self.existingTransaction())

self.commitTransaction(existingTransaction)

return result


Expand Down Expand Up @@ -321,13 +315,12 @@ def findUploadableFilesByDAS(self, das):
return dbsFiles


def updateBlocks(self, blocks):
def updateBlocks(self, blocks, dbs3UploadOnly = False):
"""
_updateBlocks_
Update the blocks in DBSBuffer
"""

if len(blocks) < 1:
# Nothing to do
return
Expand All @@ -336,7 +329,7 @@ def updateBlocks(self, blocks):
existingTransaction = self.beginTransaction()

updateBlock = self.daoFactory(classname = "UpdateBlocks")
updateBlock.execute(blocks = blocks, conn = self.getDBConn(),
updateBlock.execute(blocks, dbs3UploadOnly, conn = self.getDBConn(),
transaction=self.existingTransaction())
self.commitTransaction(existingTransaction)
return
Expand Down
Loading

0 comments on commit e698849

Please sign in to comment.