16 #include <boost/algorithm/string.hpp> 17 #include <boost/filesystem/fstream.hpp> 50 #include <boost/lexical_cast.hpp> 56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<
std::
string> (
"buDefPath",
std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned
int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned
int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
67 fileListMode_(pset.getUntrackedParameter<bool> (
"fileListMode",
false)),
68 fileListLoopMode_(pset.getUntrackedParameter<bool> (
"fileListLoopMode",
false)),
74 currentLumiSection_(0),
80 gethostname(thishost, 255);
81 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " 83 <<
" MB on host " << thishost;
85 long autoRunNumber = -1;
90 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
103 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
115 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
124 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
125 "no reading enabled with numBuffers parameter 0";
132 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
139 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
145 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
151 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
174 cvReader_.push_back(
new std::condition_variable);
176 threadInit_.store(
false,std::memory_order_release);
199 std::unique_lock<std::mutex> lk(
mReader_);
220 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
221 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
222 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
223 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
224 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
225 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
226 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
227 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
228 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
229 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
231 descriptions.
add(
"source", desc);
260 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
263 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
278 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
316 if (checkIfExists==
false ||
stat(fuBoLS.c_str(), &buf) != 0) {
317 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
331 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
334 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
347 gettimeofday(&tv, 0);
348 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
353 lumiSection, lsopentime,
359 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
384 std::unique_lock<std::mutex> lkw(
mWakeup_);
398 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
454 <<
" but according to BU JSON there should be " 459 std::unique_lock<std::mutex> lkw(
mWakeup_);
482 "Premature end of input file while reading event header";
502 if (detectedFRDversion_==0) {
503 detectedFRDversion_=*((
uint32*)dataPosition);
504 if (detectedFRDversion_>5)
507 assert(detectedFRDversion_>=1);
515 "Premature end of input file while reading event header";
522 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
523 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
524 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
532 "Premature end of input file while reading event data";
558 unsigned char *dataPosition;
566 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
567 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
568 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
576 "Premature end of input file while reading event data";
610 if ( crc !=
event_->crc32c() ) {
613 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
614 " but calculated 0x" << crc;
619 uint32_t adler = adler32(0
L,Z_NULL,0);
620 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
622 if ( adler !=
event_->adler32() ) {
625 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
626 " but calculated 0x" << adler;
647 catch (
const boost::filesystem::filesystem_error& ex)
649 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
650 <<
". Trying again.";
655 catch (
const boost::filesystem::filesystem_error&) {}
659 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
660 <<
". Trying again.";
721 bool fileIsBeingProcessed =
false;
724 fileIsBeingProcessed =
true;
728 if (!fileIsBeingProcessed) {
747 gettimeofday(&stv,0);
749 time = (time << 32) + stv.tv_usec;
752 uint32_t eventSize =
event_->eventSize();
753 char*
event = (
char*)
event_->payload();
756 while (eventSize > 0) {
757 assert(eventSize>=
sizeof(
fedt_t));
758 eventSize -=
sizeof(
fedt_t);
761 assert(eventSize>=fedSize -
sizeof(
fedt_t));
762 eventSize -= (fedSize -
sizeof(
fedt_t));
767 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
780 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
790 memcpy(fedData.
data(),
event + eventSize, fedSize);
792 assert(eventSize == 0);
805 std::ostringstream fileNameWithPID;
806 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid" 807 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
808 jsonDestPath /= fileNameWithPID.str();
810 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to " 815 catch (
const boost::filesystem::filesystem_error& ex)
818 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
829 catch (
const boost::filesystem::filesystem_error& ex)
832 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
837 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
840 boost::filesystem::ifstream ij(jsonDestPath);
844 std::stringstream ss;
846 if (!reader.
parse(ss.str(), deserializeRoot)) {
847 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
849 <<
"CONTENT:\n" << ss.str()<<
".";
850 throw std::runtime_error(
"Cannot deserialize input JSON file");
869 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
870 " error reading number of events from BU JSON -: No input value " <<
data;
872 return boost::lexical_cast<
int>(
data);
875 catch (
const boost::filesystem::filesystem_error& ex)
879 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
881 catch (std::runtime_error
e)
885 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
888 catch( boost::bad_lexical_cast
const& ) {
889 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. " 890 <<
"Input value is -: " <<
data;
897 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
910 unsigned int currentLumiSection = 0;
920 uint32_t lockCount=0;
931 bool copy_active=
false;
932 for (
auto j :
tid_active_)
if (j) copy_active=
true;
947 std::unique_lock<std::mutex> lkw(
mWakeup_);
952 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
983 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
993 if (thisLockWaitTimeUs>0.)
994 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1001 sumLockWaitTimeUs=0;
1022 currentLumiSection =
ls;
1027 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1037 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1043 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1047 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1050 int stat_res =
stat(rawFile.c_str(),&st);
1052 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1056 fileSize=st.st_size;
1063 int eventsInNewFile;
1065 if (fileSize==0) eventsInNewFile=0;
1066 else eventsInNewFile=-1;
1070 assert( eventsInNewFile>=0 );
1071 assert((eventsInNewFile>0) == (fileSize>0));
1079 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1083 for (
unsigned int i=0;
i<neededChunks;
i++) {
1086 bool copy_active=
false;
1087 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1094 unsigned int newTid = 0xffffffff;
1100 bool copy_active=
false;
1101 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1113 if (newChunk ==
nullptr) {
1121 std::unique_lock<std::mutex> lk(
mReader_);
1124 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1125 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1135 if (!eventsInNewFile) {
1137 std::unique_lock<std::mutex> lkw(
mWakeup_);
1138 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1147 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1149 std::unique_lock<std::mutex> lkw(
mWakeup_);
1153 newChunk->
reset(0,toRead,0);
1157 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1158 newInputFile->
chunks_[0]=newChunk;
1167 unsigned numFinishedThreads = 0;
1170 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1171 std::unique_lock<std::mutex> lk(
mReader_);
1174 numFinishedThreads++;
1185 threadInit_.exchange(
true,std::memory_order_acquire);
1190 std::unique_lock<std::mutex> lk(
mReader_);
1215 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1216 off_t
pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1219 if (fileDescriptor>=0)
1220 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1224 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1225 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1231 unsigned int bufferLeft = 0;
1245 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1246 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1247 close(fileDescriptor);
1260 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1268 while (!waitForChunk(currentChunk_)) {
1270 if (parent_->exceptionState()) parent_->threadError();
1273 dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1274 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1276 if (currentLeft < size) {
1279 while (!waitForChunk(currentChunk_+1)) {
1281 if (parent_->exceptionState()) parent_->threadError();
1284 dataPosition-=chunkPosition_;
1285 assert(dataPosition==chunks_[currentChunk_]->buf_);
1286 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1287 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1289 bufferPosition_+=
size;
1290 chunkPosition_=size-currentLeft;
1295 chunkPosition_+=
size;
1296 bufferPosition_+=
size;
1304 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1305 assert(size - offset < chunks_[currentChunk_]->size_);
1306 memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1307 chunkPosition_+=
size;
1308 bufferPosition_+=
size;
1312 chunkPosition_-=
size;
1313 bufferPosition_-=
size;
1325 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1328 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1334 uint32_t existingSize = 0;
1349 for (uint32_t
i=0;
i<blockcount;
i++) {
1363 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1377 itr->second+=events;
1387 auto && ret = std::pair<bool,unsigned int>(
true,itr->second);
1393 return std::pair<bool,unsigned int>(
false,0);
1400 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1401 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1408 auto end = fileStem.find(
"_");
1409 if (fileStem.find(
"run")==0) {
1413 long rval = boost::lexical_cast<
long>(runStr);
1414 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1417 catch( boost::bad_lexical_cast
const& ) {
1418 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1429 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1430 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1433 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1434 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1437 ls = boost::lexical_cast<
unsigned int>(fileStem);
1453 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
#define FED_EVSZ_EXTRACT(a)
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
StreamID streamID() const
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
def getRunNumber(filename)
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const