From c1a994febb1845b269b86dc9d75e467981bdb2d8 Mon Sep 17 00:00:00 2001 From: Giacomo Govi Date: Thu, 10 Nov 2016 16:14:05 +0100 Subject: [PATCH 1/2] Various fixes for the copy GT functions --- CondCore/Utilities/python/conddblib.py | 1 + CondCore/Utilities/scripts/conddb | 130 +++++++++++++++---------- 2 files changed, 78 insertions(+), 53 deletions(-) diff --git a/CondCore/Utilities/python/conddblib.py b/CondCore/Utilities/python/conddblib.py index 27493b36b32e3..3cbc321e70dda 100644 --- a/CondCore/Utilities/python/conddblib.py +++ b/CondCore/Utilities/python/conddblib.py @@ -369,6 +369,7 @@ def get_dbtype(self,theType): def session(self): s = self._session() s.get_dbtype = self.get_dbtype + s._is_sqlite = self._is_sqlite return s @property diff --git a/CondCore/Utilities/scripts/conddb b/CondCore/Utilities/scripts/conddb index 7cdecadba0bcf..ffe7ec14586c1 100755 --- a/CondCore/Utilities/scripts/conddb +++ b/CondCore/Utilities/scripts/conddb @@ -123,7 +123,6 @@ def _run_editor(editor, tempfd): def _parse_timestamp(timestamp): - try: return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f') except ValueError: @@ -403,7 +402,9 @@ def output_table(args, table, headers, filters=None, output_file=None, no_first_ filters = [None] * len(headers) def max_length_filter(s): - s = str(s).replace('\n', '\\n') + #s = str(s).replace('\n', '\\n') + s = str(s).replace('\n', ' ') + s = s.replace(chr(13),' ') return '%s...' % s[:conddb.name_length] if ( len(s) > conddb.name_length and not no_max_length ) else s new_table = [[] for i in range(len(table))] @@ -901,39 +902,49 @@ def diff(args): if not (is_tag1 and is_tag2) and not (is_global_tag1 and is_global_tag2): raise Exception('There are no tag or global tag pairs named %s and %s in the database(s).' % (args.first, args.second)) -def convertRunToTimes(startRun, stopRun=None): - - if not stopRun : - stopRun = startRun + 1 - - startTime1, stopTime1 = runToTime(startRun) - startTime2, stopTime2 = runToTime(stopRun) - - timeMap = { 'start' : { +def convertRunToTimes( fromRun, toRun ): + + fromTime = None + fromLumi = None + toTime = None + toLumi = None + # the time we get may be a bit delayed (7-10 sec according to Salvatore) + if not fromRun is None: + startTime1, stopTime1 = runToTime( fromRun ) + fromTime = startTime1-15. + fromLumi = fromRun<<32|0x1 + if not toRun is None: + startTime2, stopTime2 = runToTime( toRun ) + toTime = stopTime2+15. + toLumi = toRun<<32|0x1 + + timeMap = { 'from' : { 'hash' : None, - 'run' : startRun, - 'time' : startTime1-15., # the time we get may be a bit delayed (7-10 sec according to Salvatore) - 'lumi' : startRun<<32|0x1, + 'run' : fromRun, + 'time' : fromTime, # the time we get may be a bit delayed (7-10 sec according to Salvatore) + 'lumi' : fromLumi, }, - 'stop' : { + 'to' : { 'hash' : None, - 'run' : stopRun, - 'time' : stopTime2+15., # the time we get may be a bit delayed (7-10 sec according to Salvatore) - 'lumi' : stopRun<<32|0x1, + 'run' : toRun, + 'time' : toTime, # the time we get may be a bit delayed (7-10 sec according to Salvatore) + 'lumi' : toLumi, } } - logging.debug("convertRunToTimes> start: %s stop %s \n timeMap: %s " % (startRun, stopRun, str(timeMap))) + logging.debug("convertRunToTimes> start: %s stop %s \n timeMap: %s " % (fromRun, toRun, str(timeMap))) return timeMap def runToTime(runNr): connStr = conddb._getCMSFrontierSQLAlchemyConnectionString('PromptProd', 'CMS_CONDITIONS') - - connection = conddb.connect(connStr) + url = conddb.make_url( connStr, True ) + connection = conddb.connect(url) session = connection.session() + IOV = session.get_dbtype(conddb.IOV) + startIOV = session.query(IOV.insertion_time).filter(IOV.tag_name == 'runinfo_start_31X_hlt', IOV.since == runNr).all() stopIOV = session.query(IOV.insertion_time).filter(IOV.tag_name == 'runinfo_31X_hlt', IOV.since == runNr).all() @@ -950,30 +961,36 @@ def _update_tag_log(session,the_tag,the_timestamp,the_action,note): TagLog = session.get_dbtype(conddb.TagLog) session.add(TagLog(tag_name=the_tag, event_time=the_timestamp, action=the_action, user_name=userName, host_name=hostName, command=userCommand, user_text=note )) -def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromIOV=None, toIOV=None, timeMap=None): - logging.info('Copying tag %s to %s ...', str_db_object(args.db, first), str_db_object(args.destdb, second)) - +def _copy_tag(args, copyTime, session1, session2, first, second, fromIOV=None, toIOV=None, timeMap=None): Tag1 = session1.get_dbtype(conddb.Tag) Tag2 = session2.get_dbtype(conddb.Tag) # Copy the tag tag = _rawdict(session1.query(Tag1).get(first)) tag['name'] = second - if isSQLite: + + if session2._is_sqlite: if tag['end_of_validity'] >= maxSince: tag['end_of_validity'] = -1 else: - if tag['end_of_validity'] == -1: + if tag['end_of_validity'] == -1 or tag['end_of_validity'] > maxSince : tag['end_of_validity'] = maxSince tag['insertion_time'] = copyTime tag['modification_time'] = copyTime + if timeMap: + fromIOV = timeMap['from'][ tag['time_type'].lower().strip() ] + toIOV = timeMap['to'] [ tag['time_type'].lower().strip() ] + if fromIOV is None: fromIOV = 1 - if timeMap and not fromIOV and not toIOV: - fromIOV = timeMap['start'][ tag['time_type'].lower().strip() ] - toIOV = timeMap['stop'] [ tag['time_type'].lower().strip() ] + selectStr = 'from since=%s' %fromIOV + if toIOV is not None: + selectStr += ' to since=%s' %toIOV + if args.snapshot is not None: + selectStr += ' selecting insertion time < %s' %args.snapshot + logging.info('Copying tag %s to %s %s', str_db_object(args.db, first), str_db_object(args.destdb, second), selectStr) query = session2.query(Tag2).filter(Tag2.name == second ) destExists = False @@ -1029,27 +1046,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI scalar() logging.debug('The closest smaller IOV than the given starting one (--from %s) is %s...', fromVal, prev_iov) - - # Copy the distinct payloads referenced in the IOVs of the tag - # FIXME: Put the DISTINCT query as a subquery (we can't directly use distinct on BLOBs) - query = session1.query(IOV1.payload_hash).filter(IOV1.tag_name == first) - if prev_iov is not None: - query = query.filter(IOV1.since >= prev_iov) - if toIOV is not None: - toVal = toIOV - logging.debug("filtering with TO %s of type %s for tag: %s to " % (toIOV, tag['time_type'], str(tag['name'])) ) - query = query.filter(IOV1.since <= toVal) - query = query.distinct() - for (payload_hash, ) in query: - if _exists(session2, Payload2.hash, payload_hash): - logging.info('Skipping copy of payload %s to %s since it already exists...', str_db_object(args.db, payload_hash), str_db_object(args.destdb, payload_hash)) - else: - logging.info('Copying payload %s to %s ...', str_db_object(args.db, payload_hash), str_db_object(args.destdb, payload_hash)) - payload = _rawdict(session1.query(Payload1).filter(Payload1.hash == payload_hash).one()) - payload['insertion_time'] = copyTime - session2.add(Payload2(** payload)) - - # Copy the IOVs of the tag + # Select the input IOVs query = session1.query(IOV1).filter(IOV1.tag_name == first) if prev_iov is not None: query = query.filter(IOV1.since >= prev_iov) @@ -1058,6 +1055,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI query = query.filter(_inserted_before(IOV1,args.snapshot)) query = query.order_by(IOV1.since, IOV1.insertion_time.desc()) iovs = {} + hashes = set() for iov in query: iov = _rawdict(iov) iov['tag_name'] = second @@ -1073,10 +1071,28 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI first_iov = False since = iov['since'] + if since not in iovs.keys(): # updating the insertion time to the execution time. # Because of that, for a given since, only the most recent will be added. iovs[since] = iov['payload_hash'] + hashes.add( iov['payload_hash'] ) + else: + logging.warning('Skipping older iov with since %s...', since) + logging.info('Found %s iovs and %s referenced payloads to copy.',len(iovs.keys()), len(hashes)) + # Copy the payloads referenced in the selected iovs + np = 0 + for h in hashes: + if _exists(session2, Payload2.hash, h): + logging.info('Skipping copy of payload %s to %s since it already exists...', str_db_object(args.db, h), str_db_object(args.destdb, h)) + else: + logging.info('Copying payload %s to %s ...', str_db_object(args.db, h), str_db_object(args.destdb, h)) + payload = _rawdict(session1.query(Payload1).filter(Payload1.hash == h).one()) + payload['insertion_time'] = copyTime + session2.add(Payload2(** payload)) + np += 1 + logging.info('%s payload(s) copied.',np) + # Calculate if extra iovs are needed - for the override mode ( they will have already their payloads copied ) extraiovs = {} if args.override: # the interval to be overriden is defined by the new iov set boundaries, @@ -1098,7 +1114,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI if newSince < since: extraiovs[since] = iovs[newSince] break - + # Copy the set of IOVs collected niovs = 0 for k,v in iovs.items(): logging.debug('Copying IOV %s -> %s...', k, v) @@ -1148,7 +1164,7 @@ def copy(args): copyTime = datetime.datetime.now() - niovs = _copy_tag(args, copyTime, session1, session2, connection2.is_sqlite, args.first, args.second, getattr(args, 'from'), args.to) + niovs = _copy_tag(args, copyTime, session1, session2, args.first, args.second, getattr(args, 'from'), args.to) _confirm_changes(args) note = args.note @@ -1165,6 +1181,7 @@ def copy(args): if args.second is None: args.second = args.first + # 'from' is a keyword! timeMap = convertRunToTimes(getattr(args, 'from'), args.to) logging.info('Copying global tag %s to %s ...', str_db_object(args.db, args.first), str_db_object(args.destdb, args.second)) @@ -1177,6 +1194,12 @@ def copy(args): global_tag = _rawdict(session1.query(GlobalTag1).get(args.first)) global_tag['name'] = args.second global_tag['validity'] = 0 # XXX: SQLite does not work with long ints... + if args.snapshot is None: + args.snapshot = str(global_tag['snapshot_time'].strftime("%Y-%m-%d %H:%M:%S")) + else: + global_tag['snapshot_time'] = _parse_timestamp(args.snapshot) + if _exists(session2, GlobalTag2.name, args.second): + raise Exception('A GlobalTag named "%s" already exists in %s' %(args.second, args.destdb)) session2.add(GlobalTag2(**global_tag)) # Copy the tags of the global tag @@ -1189,7 +1212,8 @@ def copy(args): logging.warn('Skipping copy of tag %s to %s since it already exists... *The tags may differ in content*', str_db_object(args.db, tag), str_db_object(args.destdb, tag)) else: logging.debug('going to copy tag %s to %s ... ', str_db_object(args.db, tag), str_db_object(args.destdb, tag)) - _copy_tag(args, session1, session2, tag, tag, timeMap=timeMap) + copyTime = datetime.datetime.now() + _copy_tag(args, copyTime, session1, session2, tag, tag, timeMap=timeMap) # Copy the map of the global tag query = session1.query(GlobalTagMap1).filter(GlobalTagMap1.global_tag_name == args.first) From 433a9cb8a12afc3240aadef513cbcc5380a8479e Mon Sep 17 00:00:00 2001 From: Giacomo Govi Date: Wed, 16 Nov 2016 12:21:02 +0100 Subject: [PATCH 2/2] Workaround for sqlalchemy GLOBAL_ keyword --- CondCore/Utilities/python/conddblib.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/CondCore/Utilities/python/conddblib.py b/CondCore/Utilities/python/conddblib.py index 3cbc321e70dda..905cc9073b53f 100644 --- a/CondCore/Utilities/python/conddblib.py +++ b/CondCore/Utilities/python/conddblib.py @@ -273,6 +273,8 @@ class IOV: 'payload_hash':(DbRef(Payload,'hash'),_Col.pk) } +# the string 'GLOBAL' being a keyword in sqlalchemy ( upper case ), when used in the model cause the two GT tables to be unreadable ( bug ) +# the only choice is to use lower case names, and rename the tables in sqlite after creation!! class GlobalTag: __tablename__ = 'global_tag' columns = { 'name':(sqlalchemy.String(name_length),_Col.pk), @@ -282,7 +284,6 @@ class GlobalTag: 'insertion_time':(sqlalchemy.TIMESTAMP,_Col.notNull), 'snapshot_time':(sqlalchemy.TIMESTAMP,_Col.notNull) } - class GlobalTagMap: __tablename__ = 'global_tag_map' columns = { 'global_tag_name':(DbRef(GlobalTag,'name'),_Col.pk), @@ -348,6 +349,7 @@ def __init__(self, url, init=False): 'cms_orcon_prod', 'cmsintr_lb', } + self._url = url self._backendName = ('sqlite' if self._is_sqlite else 'oracle' ) self._schemaName = ( None if self._is_sqlite else schema_name ) logging.debug(' ... using db "%s", schema "%s"' % (url, self._schemaName) ) @@ -429,7 +431,19 @@ def init(self, drop=False): self.get_dbtype(GlobalTag).__table__.create(bind = self.engine) self.get_dbtype(GlobalTagMap).__table__.create(bind = self.engine) #self.metadata.create_all(self.engine) - + if self.is_sqlite: + # horrible hack, but no choice because of the sqlalchemy bug ( see comment in the model) + import sqlite3 + import string + conn = sqlite3.connect( self._url.database ) + c = conn.cursor() + stmt = string.Template('ALTER TABLE $before RENAME TO $after') + c.execute( stmt.substitute( before=GlobalTag.__tablename__, after='TMP0' ) ) + c.execute( stmt.substitute( before='TMP0', after=GlobalTag.__tablename__.upper() ) ) + c.execute( stmt.substitute( before=GlobalTagMap.__tablename__, after='TMP1' ) ) + c.execute( stmt.substitute( before='TMP1', after=GlobalTagMap.__tablename__.upper() ) ) + conn.commit() + conn.close() # TODO: Create indexes #logger.debug('Creating indexes...')