Skip to content

Commit

Permalink
Revisited transaction model
Browse files Browse the repository at this point in the history
  • Loading branch information
ggovi committed Dec 7, 2021
1 parent 5c649ef commit 87d7095
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 95 deletions.
24 changes: 12 additions & 12 deletions CondCore/DBOutputService/interface/OnlineDBOutputService.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ namespace cond {

~OnlineDBOutputService() override;

cond::Iov_t preLoadIov(const std::string& recordName, cond::Time_t targetTime);

//
template <typename PayloadType>
bool writeIOVForNextLumisection(const PayloadType& payload, const std::string& recordName) {
cond::Time_t targetTime = getLastLumiProcessed() + m_latencyInLumisections;
cond::Time_t writeIOVForNextLumisection(const PayloadType& payload, const std::string& recordName) {
auto& rec = PoolDBOutputService::lookUpRecord( recordName );
cond::Time_t lastTime = getLastLumiProcessed();
auto unpkLastTime = cond::time::unpack( lastTime );
cond::Time_t targetTime = cond::time::lumiTime( unpkLastTime.first, unpkLastTime.second+m_latencyInLumisections);
auto t0 = std::chrono::high_resolution_clock::now();
logger().logInfo() << "Updating lumisection " << targetTime;
cond::Hash payloadId = PoolDBOutputService::writeOneIOV<PayloadType>(payload, targetTime, recordName);
bool ret = true;
PoolDBOutputService::commitTransaction();
if (payloadId.empty()) {
return false;
return 0;
}
auto t1 = std::chrono::high_resolution_clock::now();
auto w_lat = std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0).count();
Expand All @@ -55,7 +55,7 @@ namespace cond {
// check the pre-loaded iov
logger().logInfo() << "Preloading lumisection " << targetTime;
auto t2 = std::chrono::high_resolution_clock::now();
cond::Iov_t usedIov = preLoadIov(recordName, targetTime);
cond::Iov_t usedIov = preLoadIov(rec, targetTime);
auto t3 = std::chrono::high_resolution_clock::now();
logger().logInfo() << "Iov for preloaded lumisection " << targetTime << " is " << usedIov.since;
auto p_lat = std::chrono::duration_cast<std::chrono::microseconds>(t3 - t2).count();
Expand All @@ -65,12 +65,12 @@ namespace cond {
<< usedIov.since << "). A revert is required.";
PoolDBOutputService::eraseSinceTime(payloadId, targetTime, recordName);
PoolDBOutputService::commitTransaction();
ret = false;
targetTime = 0;
}
auto t4 = std::chrono::high_resolution_clock::now();
auto t_lat = std::chrono::duration_cast<std::chrono::microseconds>(t4 - t0).count();
logger().logInfo() << "Total update time: " << t_lat << " microsecs.";
return ret;
return targetTime;
}

//
Expand All @@ -83,9 +83,9 @@ namespace cond {
}

private:
cond::Time_t getLastLumiProcessed();
cond::Iov_t preLoadIov(const PoolDBOutputService::Record& record, cond::Time_t targetTime);

cond::persistency::Session getReadOnlyCache(cond::Time_t targetTime);
cond::Time_t getLastLumiProcessed();

private:
cond::Time_t m_runNumber;
Expand Down
52 changes: 32 additions & 20 deletions CondCore/DBOutputService/interface/PoolDBOutputService.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace cond {
Hash thePayloadHash("");
try {
this->initDB();
Record& myrecord = this->lookUpRecord(recordName);
auto& myrecord = this->getRecord(recordName);
m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
bool newTag = isNewTagRequest(recordName);
if (myrecord.m_onlyAppendUpdatePolicy && !newTag) {
Expand All @@ -92,10 +92,6 @@ namespace cond {
appendSinceTime(thePayloadHash, time, myrecord);
}
if (m_autoCommit) {
if (m_writeTransactionDelay) {
m_logger.logWarning() << "Waiting " << m_writeTransactionDelay << "s before commit the changes...";
::sleep(m_writeTransactionDelay);
}
doCommitTransaction();
}
} catch (const std::exception& er) {
Expand Down Expand Up @@ -123,7 +119,7 @@ namespace cond {
cond::persistency::TransactionScope scope(m_session.transaction());
try {
this->initDB();
Record& myrecord = this->lookUpRecord(recordName);
auto& myrecord = this->getRecord(recordName);
m_logger.logInfo() << "Tag mapped to record " << recordName << ": " << myrecord.m_tag;
bool newTag = isNewTagRequest(recordName);
cond::Time_t lastSince;
Expand Down Expand Up @@ -173,16 +169,19 @@ namespace cond {
template <typename T>
void createOneIOV(const T& payload, cond::Time_t firstSinceTime, const std::string& recordName) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
Record& myrecord = this->lookUpRecord(recordName);
if (!myrecord.m_isNewTag) {
cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
}
doStartTransaction();
cond::persistency::TransactionScope scope(m_session.transaction());
try {
this->initDB();
this->initDB();
auto& myrecord = this->getRecord(recordName);
if (!myrecord.m_isNewTag) {
cond::throwException(myrecord.m_tag + " is not a new tag", "PoolDBOutputService::createNewIOV");
}
Hash payloadId = m_session.storePayload(payload);
createNewIOV(payloadId, cond::demangledName(typeid(payload)), firstSinceTime, myrecord);
if (m_autoCommit) {
doCommitTransaction();
}
} catch (const std::exception& er) {
cond::throwException(std::string(er.what()), "PoolDBOutputService::createNewIov");
}
Expand All @@ -201,15 +200,23 @@ namespace cond {
template <typename T>
void appendOneIOV(const T& payload, cond::Time_t sinceTime, const std::string& recordName) {
std::lock_guard<std::recursive_mutex> lock(m_mutex);
Record& myrecord = this->lookUpRecord(recordName);
if (myrecord.m_isNewTag) {
cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
"PoolDBOutputService::appendSinceTime");
}
doStartTransaction();
cond::persistency::TransactionScope scope(m_session.transaction());
try {
bool dbexists = this->initDB( true );
if(!dbexists){
cond::throwException(std::string("Target database does not exist."),
"PoolDBOutputService::appendSinceTime");
}
auto& myrecord = this->lookUpRecord(recordName);
if (myrecord.m_isNewTag) {
cond::throwException(std::string("Cannot append to non-existing tag ") + myrecord.m_tag,
"PoolDBOutputService::appendSinceTime");
}
appendSinceTime(m_session.storePayload(payload), sinceTime, myrecord);
if (m_autoCommit) {
doCommitTransaction();
}
} catch (const std::exception& er) {
cond::throwException(std::string(er.what()), "PoolDBOutputService::appendSinceTime");
}
Expand Down Expand Up @@ -262,7 +269,6 @@ namespace cond {

cond::persistency::Logger& logger() { return m_logger; }

private:
struct Record {
Record()
: m_tag(), m_isNewTag(true), m_idName(), m_timetype(cond::runnumber), m_onlyAppendUpdatePolicy(false) {}
Expand All @@ -272,9 +278,14 @@ namespace cond {
bool m_isNewTag;
std::string m_idName;
cond::TimeType m_timetype;
unsigned int m_refreshTime = 0;
bool m_onlyAppendUpdatePolicy;
};

const Record& lookUpRecord(const std::string& recordName);

private:

//
void doStartTransaction();
void doCommitTransaction();
Expand All @@ -292,7 +303,7 @@ namespace cond {
// Note: the iov index appended to MUST pre-existing and the existing
// conditions data are retrieved from the DB
//
bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, Record& record);
bool appendSinceTime(const std::string& payloadId, cond::Time_t sinceTime, const Record& record);

//use these to control transaction interval
void preEventProcessing(edm::StreamContext const&);
Expand All @@ -303,9 +314,10 @@ namespace cond {

void fillRecord(edm::ParameterSet& pset, const std::string& gTimeTypeStr);

void initDB();
bool initDB( bool readOnly=false );

Record& getRecord(const std::string& recordName);

Record& lookUpRecord(const std::string& recordName);
cond::UserLogInfo& lookUpUserLogInfo(const std::string& recordName);

private:
Expand Down
15 changes: 4 additions & 11 deletions CondCore/DBOutputService/src/OnlineDBOutputService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,14 @@ cond::Time_t cond::service::OnlineDBOutputService::getLastLumiProcessed() {
return lastLumiProcessed;
}

cond::Iov_t cond::service::OnlineDBOutputService::preLoadIov(const std::string& recordName, cond::Time_t targetTime) {
cond::persistency::Session session = getReadOnlyCache(targetTime);
cond::Iov_t cond::service::OnlineDBOutputService::preLoadIov(const PoolDBOutputService::Record& record, cond::Time_t targetTime) {
auto transId = cond::time::transactionIdForLumiTime( targetTime, record.m_refreshTime, m_frontierKey );
cond::persistency::Session session = PoolDBOutputService::newReadOnlySession(m_preLoadConnectionString, transId);
cond::persistency::TransactionScope transaction(session.transaction());
transaction.start(true);
cond::persistency::IOVProxy proxy = session.readIov(PoolDBOutputService::tag(recordName));
cond::persistency::IOVProxy proxy = session.readIov(record.m_tag);
auto iov = proxy.getInterval(targetTime);
transaction.commit();
return iov;
}

cond::persistency::Session cond::service::OnlineDBOutputService::getReadOnlyCache(cond::Time_t targetTime) {
std::stringstream transId;
transId << targetTime;
if (!m_frontierKey.empty()) {
transId << "_" << m_frontierKey;
}
return PoolDBOutputService::newReadOnlySession(m_preLoadConnectionString, transId.str());
}
Loading

0 comments on commit 87d7095

Please sign in to comment.