16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 fileNames_(pset.getUntrackedParameter<std::vector<std::
string>> (
"fileNames",std::vector<std::
string>())),
67 fileListMode_(pset.getUntrackedParameter<bool> (
"fileListMode",
false)),
68 fileListLoopMode_(pset.getUntrackedParameter<bool> (
"fileListLoopMode",
false)),
70 fuOutputDir_(std::
string()),
74 currentLumiSection_(0),
80 gethostname(thishost, 255);
81 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
83 <<
" MB on host " << thishost;
85 long autoRunNumber = -1;
90 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
103 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
106 if (stat(
defPath_.c_str(), &statbuf)) {
108 if (stat(
defPath_.c_str(), &statbuf)) {
115 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
124 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
125 "no reading enabled with numBuffers parameter 0";
132 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
139 edm::LogInfo(
"FedRawDataInputSource") <<
"No FastMonitoringService found in the configuration";
145 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
151 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
174 cvReader_.push_back(
new std::condition_variable);
176 threadInit_.store(
false,std::memory_order_release);
199 std::unique_lock<std::mutex> lk(
mReader_);
220 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
221 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
222 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
223 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
224 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
225 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
226 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
227 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
228 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
229 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
231 descriptions.
add(
"source", desc);
260 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
263 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
278 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
316 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
317 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
331 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
334 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
347 gettimeofday(&tv, 0);
348 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
353 lumiSection, lsopentime,
359 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
390 std::unique_lock<std::mutex> lkw(
mWakeup_);
404 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
461 <<
" but according to BU JSON there should be "
466 std::unique_lock<std::mutex> lkw(
mWakeup_);
489 "Premature end of input file while reading event header";
509 if (detectedFRDversion_==0) {
510 detectedFRDversion_=*((
uint32*)dataPosition);
511 if (detectedFRDversion_>5)
514 assert(detectedFRDversion_>=1);
522 "Premature end of input file while reading event header";
529 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
530 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
531 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
539 "Premature end of input file while reading event data";
565 unsigned char *dataPosition;
573 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
574 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
575 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
583 "Premature end of input file while reading event data";
617 if ( crc !=
event_->crc32c() ) {
620 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
621 " but calculated 0x" << crc;
626 uint32_t adler = adler32(0
L,Z_NULL,0);
627 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
629 if ( adler !=
event_->adler32() ) {
632 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
633 " but calculated 0x" << adler;
654 catch (
const boost::filesystem::filesystem_error& ex)
656 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
657 <<
". Trying again.";
662 catch (
const boost::filesystem::filesystem_error&) {}
666 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
667 <<
". Trying again.";
726 bool fileIsBeingProcessed =
false;
729 fileIsBeingProcessed =
true;
733 if (!fileIsBeingProcessed) {
752 gettimeofday(&stv,0);
754 time = (time << 32) + stv.tv_usec;
757 uint32_t eventSize =
event_->eventSize();
758 char*
event = (
char*)
event_->payload();
761 while (eventSize > 0) {
763 eventSize -=
sizeof(
fedt_t);
767 eventSize -= (fedSize -
sizeof(
fedt_t));
772 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
785 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
795 memcpy(fedData.
data(),
event + eventSize, fedSize);
810 std::ostringstream fileNameWithPID;
811 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
812 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
813 jsonDestPath /= fileNameWithPID.str();
815 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
820 catch (
const boost::filesystem::filesystem_error& ex)
823 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
834 catch (
const boost::filesystem::filesystem_error& ex)
837 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
842 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
845 boost::filesystem::ifstream ij(jsonDestPath);
849 std::stringstream
ss;
851 if (!reader.
parse(ss.str(), deserializeRoot)) {
852 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
854 <<
"CONTENT:\n" << ss.str()<<
".";
855 throw std::runtime_error(
"Cannot deserialize input JSON file");
874 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
875 " error reading number of events from BU JSON -: No input value " <<
data;
877 return boost::lexical_cast<
int>(
data);
880 catch (
const boost::filesystem::filesystem_error& ex)
884 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
886 catch (std::runtime_error
e)
890 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
893 catch( boost::bad_lexical_cast
const& ) {
894 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
895 <<
"Input value is -: " <<
data;
902 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
913 InputSource::rewind();
926 unsigned int currentLumiSection = 0;
936 uint32_t lockCount=0;
947 bool copy_active=
false;
963 std::unique_lock<std::mutex> lkw(
mWakeup_);
968 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
999 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
1009 if (thisLockWaitTimeUs>0.)
1010 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1017 sumLockWaitTimeUs=0;
1038 currentLumiSection =
ls;
1043 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1053 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1059 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1063 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1066 int stat_res = stat(rawFile.c_str(),&st);
1068 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1072 fileSize=st.st_size;
1079 int eventsInNewFile;
1081 if (fileSize==0) eventsInNewFile=0;
1082 else eventsInNewFile=-1;
1086 assert( eventsInNewFile>=0 );
1087 assert((eventsInNewFile>0) == (fileSize>0));
1101 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1105 for (
unsigned int i=0;
i<neededChunks;
i++) {
1108 bool copy_active=
false;
1116 unsigned int newTid = 0xffffffff;
1122 bool copy_active=
false;
1135 if (newChunk ==
nullptr) {
1143 std::unique_lock<std::mutex> lk(
mReader_);
1146 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1147 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1157 if (!eventsInNewFile) {
1159 std::unique_lock<std::mutex> lkw(
mWakeup_);
1160 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1169 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1171 std::unique_lock<std::mutex> lkw(
mWakeup_);
1175 newChunk->
reset(0,toRead,0);
1179 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1180 newInputFile->
chunks_[0]=newChunk;
1189 unsigned numFinishedThreads = 0;
1192 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1193 std::unique_lock<std::mutex> lk(
mReader_);
1196 numFinishedThreads++;
1207 threadInit_.exchange(
true,std::memory_order_acquire);
1212 std::unique_lock<std::mutex> lk(
mReader_);
1237 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1238 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1241 if (fileDescriptor>=0)
1242 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1246 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1247 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1253 unsigned int bufferLeft = 0;
1267 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1268 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1269 close(fileDescriptor);
1282 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1298 if (currentLeft < size) {
1347 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1350 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1356 uint32_t existingSize = 0;
1371 for (uint32_t
i=0;
i<blockcount;
i++) {
1385 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1399 itr->second+=events;
1409 auto &&
ret = std::pair<bool,unsigned int>(
true,itr->second);
1415 return std::pair<bool,unsigned int>(
false,0);
1422 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1423 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1430 auto end = fileStem.find(
"_");
1431 if (fileStem.find(
"run")==0) {
1435 long rval = boost::lexical_cast<
long>(runStr);
1436 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1439 catch( boost::bad_lexical_cast
const& ) {
1440 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1451 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1452 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1455 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1456 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1459 ls = boost::lexical_cast<
unsigned int>(fileStem);
1475 return getFile(ls,nextFile,fsize,lockWaitTime);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
void setInStateSup(FastMonitoringThread::InputState inputState)
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const