16 #include <boost/algorithm/string.hpp> 17 #include <boost/filesystem/fstream.hpp> 52 #include <boost/lexical_cast.hpp> 56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<
std::
string> (
"buDefPath",
"")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned
int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned
int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned
int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<
bool> (
"getLSFromFilename",
true)),
63 alwaysStartFromFirstLS_(pset.getUntrackedParameter<
bool> (
"alwaysStartFromFirstLS",
false)),
64 verifyAdler32_(pset.getUntrackedParameter<
bool> (
"verifyAdler32",
true)),
65 verifyChecksum_(pset.getUntrackedParameter<
bool> (
"verifyChecksum",
true)),
66 useL1EventID_(pset.getUntrackedParameter<
bool> (
"useL1EventID",
false)),
68 fileListMode_(pset.getUntrackedParameter<
bool> (
"fileListMode",
false)),
69 fileListLoopMode_(pset.getUntrackedParameter<
bool> (
"fileListLoopMode",
false)),
74 currentLumiSection_(0),
79 gethostname(thishost, 255);
80 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: " 82 <<
" MB on host " << thishost;
84 long autoRunNumber = -1;
89 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
109 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
110 "no reading enabled with numBuffers parameter 0";
117 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
124 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
130 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
136 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
140 edm::LogInfo(
"FedRawDataInputSource") <<
"EvFDaqDirector/Source configured to use file service";
162 cvReader_.push_back(
new std::condition_variable);
164 threadInit_.store(
false,std::memory_order_release);
187 std::unique_lock<std::mutex> lk(
mReader_);
208 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
209 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
210 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
211 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
212 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
213 desc.
addUntracked<
unsigned int> (
"alwaysStartFromfirstLS",
false)->setComment(
"Force source to start from LS 1 if server provides higher lumisection number");
214 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
215 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
216 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
217 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
218 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
220 descriptions.
add(
"source", desc);
246 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
249 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
264 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
307 bool found = (
stat(fuEoLS.c_str(), &buf) == 0);
310 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
324 gettimeofday(&tv,
nullptr);
325 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
330 lumiSection, lsopentime,
336 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
361 std::unique_lock<std::mutex> lkw(
mWakeup_);
375 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
431 <<
" but according to BU JSON there should be " 436 std::unique_lock<std::mutex> lkw(
mWakeup_);
459 "Premature end of input file while reading event header";
479 if (detectedFRDversion_==0) {
480 detectedFRDversion_=*((
uint32*)dataPosition);
481 if (detectedFRDversion_>5)
484 assert(detectedFRDversion_>=1);
492 "Premature end of input file while reading event header";
499 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
500 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
501 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
509 "Premature end of input file while reading event data";
535 unsigned char *dataPosition;
543 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
544 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
545 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
553 "Premature end of input file while reading event data";
587 if ( crc !=
event_->crc32c() ) {
590 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
591 " but calculated 0x" << crc;
596 uint32_t adler = adler32(0
L,Z_NULL,0);
597 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
599 if ( adler !=
event_->adler32() ) {
602 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
603 " but calculated 0x" << adler;
624 catch (
const boost::filesystem::filesystem_error& ex)
626 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
627 <<
". Trying again.";
632 catch (
const boost::filesystem::filesystem_error&) {}
636 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
637 <<
". Trying again.";
672 static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.
triggerType()),
700 bool fileIsBeingProcessed =
false;
703 fileIsBeingProcessed =
true;
707 if (!fileIsBeingProcessed) {
726 gettimeofday(&stv,
nullptr);
728 time = (time << 32) + stv.tv_usec;
731 uint32_t eventSize =
event_->eventSize();
732 unsigned char*
event = (
unsigned char*)
event_->payload();
735 while (eventSize > 0) {
746 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
759 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
769 memcpy(fedData.
data(),
event + eventSize, fedSize);
771 assert(eventSize == 0);
783 unsigned int currentLumiSection = 0;
793 uint32_t lockCount=0;
804 bool copy_active=
false;
805 for (
auto j :
tid_active_)
if (j) copy_active=
true;
820 std::unique_lock<std::mutex> lkw(
mWakeup_);
825 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
837 uint32_t fileSizeIndex;
838 int64_t fileSizeFromJson;
848 int serverEventsInNewFile_=-1;
861 status =
getFile(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
875 if (thisLockWaitTimeUs>0.)
876 sumLockWaitTimeUs+=thisLockWaitTimeUs;
909 if (ls > currentLumiSection) {
913 currentLumiSection =
ls;
923 for (
unsigned int nextLS=lsToStart;nextLS<=
ls;nextLS++)
933 for (
unsigned int nextLS=currentLumiSection+1;nextLS<=
ls;nextLS++) {
937 currentLumiSection =
ls;
941 if( currentLumiSection>0 && ls < currentLumiSection) {
942 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
953 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
956 backoff_exp =
std::min(4,backoff_exp);
958 int sleeptime = (
int) (100000. *
pow(2,backoff_exp));
967 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
975 rawFile = rawFilePath.replace_extension(
".raw").string();
979 int stat_res =
stat(rawFile.c_str(),&st);
981 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
994 if (fileSize==0) eventsInNewFile=0;
995 else eventsInNewFile=-1;
1002 eventsInNewFile = serverEventsInNewFile_;
1003 assert( eventsInNewFile>=0 );
1004 assert((eventsInNewFile>0) == (fileSize>0));
1012 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1016 for (
unsigned int i=0;
i<neededChunks;
i++) {
1019 bool copy_active=
false;
1020 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1027 unsigned int newTid = 0xffffffff;
1030 if (
quit_threads_.load(std::memory_order_relaxed)) {stop=
true;
break;}
1034 bool copy_active=
false;
1035 for (
auto j :
tid_active_)
if (j) copy_active=
true;
1044 if (
quit_threads_.load(std::memory_order_relaxed)) {stop=
true;
break;}
1047 if (newChunk ==
nullptr) {
1056 std::unique_lock<std::mutex> lk(
mReader_);
1059 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1060 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1070 if (!eventsInNewFile) {
1072 std::unique_lock<std::mutex> lkw(
mWakeup_);
1073 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1087 std::unique_lock<std::mutex> lkw(
mWakeup_);
1091 newChunk->
reset(0,toRead,0);
1095 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1096 newInputFile->
chunks_[0]=newChunk;
1105 unsigned numFinishedThreads = 0;
1108 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1109 std::unique_lock<std::mutex> lk(
mReader_);
1112 numFinishedThreads++;
1123 threadInit_.exchange(
true,std::memory_order_acquire);
1128 std::unique_lock<std::mutex> lk(
mReader_);
1153 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1157 if (fileDescriptor<0) {
1159 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1163 pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1166 "readWorker failed to seek file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1167 " to offset " << chunk->
offset_ <<
" error: " << strerror(errno);
1172 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1174 unsigned int bufferLeft = 0;
1181 "readWorker failed to read file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
" error: " << strerror(errno);
1190 "readWorker failed to read file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
" last:" << last <<
1201 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1202 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1203 close(fileDescriptor);
1216 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1224 while (!waitForChunk(currentChunk_)) {
1226 if (parent_->exceptionState()) parent_->threadError();
1229 dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1230 size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1232 if (currentLeft < size) {
1235 while (!waitForChunk(currentChunk_+1)) {
1237 if (parent_->exceptionState()) parent_->threadError();
1240 dataPosition-=chunkPosition_;
1241 assert(dataPosition==chunks_[currentChunk_]->buf_);
1242 memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1243 memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1245 bufferPosition_+=
size;
1246 chunkPosition_=size-currentLeft;
1251 chunkPosition_+=
size;
1252 bufferPosition_+=
size;
1260 assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1261 assert(size - offset < chunks_[currentChunk_]->size_);
1262 memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1263 chunkPosition_+=
size;
1264 bufferPosition_+=
size;
1268 chunkPosition_-=
size;
1269 bufferPosition_-=
size;
1281 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1284 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1290 uint32_t existingSize = 0;
1305 for (uint32_t
i=0;
i<blockcount;
i++) {
1319 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1333 itr->second+=events;
1343 std::pair<bool,unsigned int> ret(
true,itr->second);
1349 return std::pair<bool,unsigned int>(
false,0);
1356 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1357 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1364 auto end = fileStem.find(
"_");
1365 if (fileStem.find(
"run")==0) {
1369 long rval = boost::lexical_cast<
long>(runStr);
1370 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1373 catch( boost::bad_lexical_cast
const& ) {
1374 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1385 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1386 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1389 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1390 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1393 ls = boost::lexical_cast<
unsigned int>(fileStem);
1409 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
static const uint32_t length
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
bool useFileBroker() const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
StreamID streamID() const
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
unsigned long long TimeValue_t
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
def getRunNumber(filename)
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
unsigned int gtpe_get(const unsigned char *)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
Power< A, B >::type pow(const A &a, const B &b)
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const