Skip to content

Commit

Permalink
Merge pull request #208 from PADME-Experiment/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
eleonardi authored Mar 16, 2020
2 parents a9bf58e + 56ad801 commit 7a5260a
Show file tree
Hide file tree
Showing 87 changed files with 7,185 additions and 5,981 deletions.
2 changes: 1 addition & 1 deletion Level1/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ CPPFLAGS = $(ROOTCFLAGS) -I$(IDIR) -I$(PADMEROOTDIR)/include -I/usr/include
DEPEND = $(CXX) -MM

# Define parameters for link command (-lrt is needed for old Scientific Linux 6 linker)
LIBS = -L$(PADMEROOTDIR)/lib -lPadmeRoot -L/usr/lib64/mysql -lmysqlclient -lrt
LIBS = -L$(PADMEROOTDIR)/lib -lPadmeRoot -L/usr/lib64/mysql -lmysqlclient -lrt -lz
LDFLAGS = -O4 $(ROOTLDFLAGS) $(LIBS) $(ROOTLIBS)

# Get list of files to process
Expand Down
63 changes: 15 additions & 48 deletions Level1/PadmeLevel1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
int main(int argc, char* argv[])
{

int rc; // DB library return code
//int rc; // DB library return code

// Set standard output/error in unbuffered mode
setbuf(stdout,NULL);
Expand Down Expand Up @@ -52,7 +52,7 @@ int main(int argc, char* argv[])
fprintf (stderr, "Error while processing option '-r'. Run number set to %d (must be >=0).\n", runnr);
exit(1);
}
fprintf(stdout,"Merging files from run %d\n",runnr);
fprintf(stdout,"Filtering files from run %d\n",runnr);
cfg->SetRunNumber(runnr);
break;
case 'n':
Expand Down Expand Up @@ -113,24 +113,6 @@ int main(int argc, char* argv[])
exit(1);
}

// If this is an official run, connect to DB and get id of merger process
// N.B. merger id is needed to assign root files in DB
if (cfg->RunNumber()) {

// Get handle to DB
DBService* db = DBService::GetInstance();

// Get id of merger for future DB accesses
int merger_id = 0;
rc = db->GetMergerId(merger_id,cfg->RunNumber());
if (rc != DBSERVICE_OK) {
printf("ERROR retrieving from DB id of merger process for run %d. Aborting\n",cfg->RunNumber());
exit(1);
}
cfg->SetMergerId(merger_id);

}

// Connect to root services
RootIO* root = new RootIO();
if ( root->Init() != ROOTIO_OK ) {
Expand All @@ -146,6 +128,11 @@ int main(int argc, char* argv[])
// We are now ready to process data: get start time
time_t time_start;
time(&time_start);
//printf("=== PadmeLevel1 starting on %s UTC ===\n",format_time(time_start));
printf("=== PadmeLevel1 starting on %s UTC ===\n",cfg->FormatTime(time_start));

printf("DBINFO - %s - process_set_status %d\n",cfg->FormatTime(time(0)),DB_STATUS_RUNNING);
printf("DBINFO - %s - process_set_time_start %s\n",cfg->FormatTime(time(0)),cfg->FormatTime(time_start));

// Define counters for input stream size and number of events
unsigned long int input_size = 0;
Expand Down Expand Up @@ -567,34 +554,14 @@ int main(int argc, char* argv[])
printf("Events written: %u (%6.1f events/sec)\n",root->GetTotalEvents(),evtpsec);
printf("Bytes written: %llu (%10.1f bytes/sec)\n",root->GetTotalSize(),bytepsec);

//// If input was from a real run, update DB
//if (cfg->RunNumber()) {
//
// // Get handle to DB
// DBService* db = DBService::GetInstance();
//
// // Update merger status
// rc = db->SetMergerStatus(3,cfg->MergerId());
// if (rc != DBSERVICE_OK) {
// printf("ERROR setting merger status in DB. Aborting\n");
// exit(1);
// }
//
// // Update merger stop time
// rc = db->SetMergerTime("STOP",cfg->MergerId());
// if (rc != DBSERVICE_OK) {
// printf("ERROR setting merger stop time in DB. Aborting\n");
// exit(1);
// }
//
// // Update DB with final counters (files created, events written, data written)
// rc = db->UpdateMergerInfo(root->GetTotalFiles(),root->GetTotalEvents(),root->GetTotalSize(),cfg->MergerId());
// if (rc != DBSERVICE_OK) {
// printf("ERROR updating DB with number of files (n=%u) number of events (n=%u) and output size (size=%llu) for merger id %d. Aborting\n",root->GetTotalFiles(),root->GetTotalEvents(),root->GetTotalSize(),cfg->MergerId());
// exit(1);
// }
//
//}
printf("DBINFO - %s - process_set_status %d\n",cfg->FormatTime(time(0)),DB_STATUS_FINISHED);
printf("DBINFO - %s - process_set_time_stop %s\n",cfg->FormatTime(time(0)),cfg->FormatTime(time_stop));
printf("DBINFO - %s - process_set_n_files %d\n",cfg->FormatTime(time(0)),root->GetTotalFiles());
printf("DBINFO - %s - process_set_total_events %d\n",cfg->FormatTime(time(0)),root->GetTotalEvents());
printf("DBINFO - %s - process_set_total_size %lld\n",cfg->FormatTime(time(0)),root->GetTotalSize());

// Show exit time
printf("=== PadmeLevel1 exiting on %s UTC ===\n",cfg->FormatTime(time(0)));

exit(0);

Expand Down
113 changes: 27 additions & 86 deletions Level1/PadmeMerger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,19 @@
#define F_GETPIPE_SZ 1032
#define PIPESIZE_MB 16

void fmt_time(char buf[20],time_t* t)
{
struct tm* tgm = gmtime(t);
sprintf(buf,"%04d-%02d-%02d %02d:%02d:%02d",1900+tgm->tm_year,tgm->tm_mon+1,tgm->tm_mday,tgm->tm_hour,tgm->tm_min,tgm->tm_sec);
}

int main(int argc, char* argv[])
{

int rc; // DB library retrun code
//int rc; // DB library return code

time_t time_start, time_stop, time_first, time_last;
char t_fmt[20]; // Formatted time string "YYYY-MM-DD hh:mm:ss"
//time_t time_start, time_stop, time_first, time_last;
time_t time_start, time_first, time_last;
//char t_fmt[20]; // Formatted time string "YYYY-MM-DD hh:mm:ss"

// Set standard output/error in unbuffered mode
setbuf(stdout,NULL);
setbuf(stderr,NULL);

time(&time_start); fmt_time(t_fmt,&time_start);
printf("=== PadmeMerger starting on %s UTC ===\n",t_fmt);

// Make sure we are on a sane machine (int: 32bits, long int: 64bits)
if (sizeof(int) < 4 || sizeof(long int) < 8) {
printf("*** ERROR *** On this machine int is %lu bytes and long int is %lu bytes. Aborting.\n",sizeof(int),sizeof(long int));
Expand All @@ -62,8 +54,6 @@ int main(int argc, char* argv[])
int run_number = cfg->RunNumber();
std::string input_stream_list = cfg->InputStreamList();
std::string output_stream_list = cfg->OutputStreamList();
//std::string raw_file_header = cfg->RawFileHeader();
//unsigned int n_events_per_file = cfg->NEventsPerFile();
unsigned int verbose = cfg->Verbose();

// Parse options
Expand Down Expand Up @@ -130,29 +120,27 @@ int main(int argc, char* argv[])
}
}

// Check if input file list was defined
if (cfg->InputStreamList().compare("")==0) {
// Check if input and output file lists were defined and are accessible
if ( cfg->InputStreamList().compare("") == 0 ) {
printf("ERROR no input file list defined. Aborting\n");
exit(1);
}

// If this is an official run, connect to DB and get id of merger
if (cfg->RunNumber()) {

// Get handle to DB
DBService* db = DBService::GetInstance();

// Get id of merger for future DB accesses
int merger_id = 0;
rc = db->GetMergerId(merger_id,cfg->RunNumber());
if (rc != DBSERVICE_OK) {
printf("ERROR retrieving from DB id of merger process for run %d. Aborting\n",cfg->RunNumber());
exit(1);
}
cfg->SetMergerId(merger_id);

if ( access( cfg->InputStreamList().c_str(), F_OK ) != 0 ) {
printf("ERROR input file list %s is not accessible. Aborting\n",cfg->InputStreamList().c_str());
exit(1);
}
if ( cfg->OutputStreamList().compare("") == 0 ) {
printf("ERROR no output file list defined. Aborting\n");
exit(1);
}
if ( access( cfg->OutputStreamList().c_str(), F_OK ) != 0 ) {
printf("ERROR output file list %s is not accessible. Aborting\n",cfg->OutputStreamList().c_str());
exit(1);
}

time(&time_start);
printf("=== PadmeMerger starting on %s UTC ===\n",cfg->FormatTime(time_start));

ADCBoard* board;
std::vector<ADCBoard*> boards;

Expand Down Expand Up @@ -274,27 +262,8 @@ int main(int argc, char* argv[])
list.close();
printf("- Using a total of %u output Level1 streams\n",NOutputStreams);

// Everything is set: tell DB merger has started
if (cfg->RunNumber()) {

// Get handle to DB
DBService* db = DBService::GetInstance();

// Update merger status
rc = db->SetMergerStatus(2,cfg->MergerId());
if (rc != DBSERVICE_OK) {
printf("ERROR setting merger status in DB. Aborting\n");
exit(1);
}

// Update merger start time
rc = db->SetMergerTime("START",cfg->MergerId());
if (rc != DBSERVICE_OK) {
printf("ERROR setting merger start time in DB. Aborting\n");
exit(1);
}

}
printf("DBINFO - %s - process_set_status %d\n",cfg->FormatTime(time(0)),DB_STATUS_RUNNING);
printf("DBINFO - %s - process_set_time_start %s\n",cfg->FormatTime(time(0)),cfg->FormatTime(time_start));

unsigned int CurrentOutputStream = 0; // First event will be sent to first output stream

Expand Down Expand Up @@ -614,9 +583,6 @@ int main(int argc, char* argv[])
}
}

//clock_gettime(CLOCK_REALTIME,&sys_time);
//printf("After output %ld.%09ld\n",sys_time.tv_sec,sys_time.tv_nsec);

// Update counters for this stream
output_stream_nevents[CurrentOutputStream]++;

Expand Down Expand Up @@ -724,38 +690,13 @@ int main(int argc, char* argv[])
}
printf("Total Events %7u Data %11.1f MiB Rates %6.1f evt/s %7.3f MiB/s\n",NumberOfEvents,size_mib,event_rate,data_rate);

// If input was from a real run, update DB
if (cfg->RunNumber()) {

// Get handle to DB
DBService* db = DBService::GetInstance();

// Update merger status
rc = db->SetMergerStatus(3,cfg->MergerId());
if (rc != DBSERVICE_OK) {
printf("ERROR setting merger status in DB. Aborting\n");
exit(1);
}

// Update merger stop time
rc = db->SetMergerTime("STOP",cfg->MergerId());
if (rc != DBSERVICE_OK) {
printf("ERROR setting merger stop time in DB. Aborting\n");
exit(1);
}

// Update DB with final counters (files created, events written, data written)
//rc = db->UpdateMergerInfo(root->GetTotalFiles(),root->GetTotalEvents(),root->GetTotalSize(),cfg->MergerId());
//if (rc != DBSERVICE_OK) {
// printf("ERROR updating DB with number of files (n=%u) number of events (n=%u) and output size (size=%lu) for merger id %d. Aborting\n",root->GetTotalFiles(),root->GetTotalEvents(),root->GetTotalSize(),cfg->MergerId());
// exit(1);
//}

}
printf("DBINFO - %s - process_set_status %d\n",cfg->FormatTime(time(0)),DB_STATUS_FINISHED);
printf("DBINFO - %s - process_set_time_stop %s\n",cfg->FormatTime(time(0)),cfg->FormatTime(time_last));
printf("DBINFO - %s - process_set_total_events %d\n",cfg->FormatTime(time(0)),NumberOfEvents);
printf("DBINFO - %s - process_set_total_size %llu\n",cfg->FormatTime(time(0)),total_output_size);

// Show exit time
time(&time_stop); fmt_time(t_fmt,&time_stop);
printf("=== PadmeMerger exiting on %s UTC ===\n",t_fmt);
printf("=== PadmeMerger exiting on %s UTC ===\n",cfg->FormatTime(time(0)));

exit(0);
}
8 changes: 5 additions & 3 deletions Level1/include/Configuration.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,22 @@ public:
void SetNEventsPerFile(unsigned int n) { fNEventsPerFile = n; }
unsigned int NEventsPerFile() { return fNEventsPerFile; }

void SetMergerId(int i) { fMergerId = i; }
int MergerId() { return fMergerId; }
void SetProcessId(int i) { fProcessId = i; }
int ProcessId() { return fProcessId; }

void SetVerbose(unsigned int v) { fVerbose = v; }
unsigned int Verbose() { return fVerbose; }

void SetDebugScale(unsigned int v) { fDebugScale = v; }
unsigned int DebugScale() { return fDebugScale; }

char* FormatTime(const time_t);

private:

int fRunNumber;

int fMergerId;
int fProcessId;

std::string fInputStream;

Expand Down
32 changes: 23 additions & 9 deletions Level1/include/DBService.hh
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
#ifndef DBService_H
#define DBService_H

#include <vector>
#include <string>
#include <mysql/mysql.h>

#define DBSERVICE_OK 0
#define DBSERVICE_ERROR 1
#define DBSERVICE_SQLERROR 2
#define DBSERVICE_CONNECTERROR 3

//#include <vector>
//#include <string>
//#include <mysql/mysql.h>

//// Return codes for DB service calls
//#define DBSERVICE_OK 0
//#define DBSERVICE_ERROR 1
//#define DBSERVICE_SQLERROR 2
//#define DBSERVICE_CONNECTERROR 3

// Definition of process status values in DB
#define DB_STATUS_IDLE 0
#define DB_STATUS_INITIALIZING 1
#define DB_STATUS_INIT_FAIL 2
#define DB_STATUS_INITIALIZED 3
#define DB_STATUS_ABORTED 4
#define DB_STATUS_RUNNING 5
#define DB_STATUS_RUN_FAIL 6
#define DB_STATUS_FINISHED 7
#define DB_STATUS_CLOSE_FAIL 8
#define DB_STATUS_UNKNOWN 9
/*
class DBService
{
Expand Down Expand Up @@ -52,4 +65,5 @@ private:
MYSQL* fDBHandle;
};
*/
#endif
6 changes: 5 additions & 1 deletion Level1/include/RootIO.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <string>
#include <vector>

#include "TFile.h"
#include "TTree.h"
Expand Down Expand Up @@ -60,9 +61,10 @@ private:
Int_t ChangeOutFile();
Int_t CloseOutFile();
Int_t SetOutFile();
unsigned long int GetAdler32(TString);

Configuration* fConfig;
DBService* fDB;
//DBService* fDB;

UInt_t fOutEventsTotal;
ULong_t fOutSizeTotal;
Expand All @@ -75,6 +77,8 @@ private:
ULong_t fOutFileSize;
UInt_t fNMaxEvtsPerOutFile;

std::vector<TString> fOutFileList;

TFile* fTFileHandle;
TTree* fTTreeMain;
TRawEvent* fTRawEvent;
Expand Down
12 changes: 11 additions & 1 deletion Level1/src/Configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Configuration::Configuration()
{
// Set default configuration parameters
fRunNumber = 0;
fMergerId = -1;
fProcessId = -1;
fInputStream = "";
fInputStreamList = "";
fOutputStreamList = "";
Expand All @@ -24,3 +24,13 @@ Configuration* Configuration::GetInstance()
if ( fInstance == 0 ) { fInstance = new Configuration(); }
return fInstance;
}

char* Configuration::FormatTime(const time_t tt)
{
static char tform[20];
struct tm* t = gmtime(&tt);
sprintf(tform,"%04d/%02d/%02d %02d:%02d:%02d",
1900+t->tm_year,1+t->tm_mon,t->tm_mday,
t->tm_hour,t->tm_min,t->tm_sec);
return tform;
}
Loading

0 comments on commit 7a5260a

Please sign in to comment.