16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
50 #include <boost/lexical_cast.hpp>
52 using namespace jsoncollector;
56 edm::RawInputSource(pset, desc),
57 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
58 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",32)*1048576),
59 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",32)*1048576),
60 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",2)),
61 maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> (
"maxBufferedFiles",2)),
62 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
63 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
64 verifyChecksum_(pset.getUntrackedParameter<bool> (
"verifyChecksum",
true)),
65 useL1EventID_(pset.getUntrackedParameter<bool> (
"useL1EventID",
false)),
66 fileNames_(pset.getUntrackedParameter<std::vector<std::
string>> (
"fileNames",std::vector<std::
string>())),
67 fileListMode_(pset.getUntrackedParameter<bool> (
"fileListMode",
false)),
68 runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
69 fuOutputDir_(std::
string()),
73 currentLumiSection_(0),
79 gethostname(thishost, 255);
80 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
82 <<
" MB on host " << thishost;
84 long autoRunNumber = -1;
88 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Run number not found from filename";
100 std::string defPathSuffix =
"src/EventFilter/Utilities/plugins/budef.jsd";
103 if (stat(
defPath_.c_str(), &statbuf)) {
105 if (stat(
defPath_.c_str(), &statbuf)) {
112 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
121 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
122 "no reading enabled with numBuffers parameter 0";
129 edm::LogError(
"FedRawDataInputSource::FedRawDataInputSource") <<
"Intel crc32c checksum computation unavailable";
140 throw cms::Exception(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
146 cms::Exception(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
169 cvReader_.push_back(
new std::condition_variable);
171 threadInit_.store(
false,std::memory_order_release);
194 std::unique_lock<std::mutex> lk(
mReader_);
215 desc.
setComment(
"File-based Filter Farm input source for reading raw data from BU ramdisk");
216 desc.
addUntracked<
unsigned int> (
"eventChunkSize",32)->setComment(
"Input buffer (chunk) size");
217 desc.
addUntracked<
unsigned int> (
"eventChunkBlock",32)->setComment(
"Block size used in a single file read call (must be smaller or equal to buffer size)");
218 desc.
addUntracked<
unsigned int> (
"numBuffers",2)->setComment(
"Number of buffers used for reading input");
219 desc.
addUntracked<
unsigned int> (
"maxBufferedFiles",2)->setComment(
"Maximum number of simultaneously buffered raw files");
220 desc.
addUntracked<
bool> (
"verifyAdler32",
true)->setComment(
"Verify event Adler32 checksum with FRDv3 or v4");
221 desc.
addUntracked<
bool> (
"verifyChecksum",
true)->setComment(
"Verify event CRC-32C checksum of FRDv5 or higher");
222 desc.
addUntracked<
bool> (
"useL1EventID",
false)->setComment(
"Use L1 event ID from FED header if true or from TCDS FED if false");
223 desc.
addUntracked<
bool> (
"fileListMode",
false)->setComment(
"Use fileNames parameter to directly specify raw files to open");
224 desc.
addUntracked<std::vector<std::string>> (
"fileNames", std::vector<std::string>())->
setComment(
"file list used when fileListMode is enabled");
226 descriptions.
add(
"source", desc);
255 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
258 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
273 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
308 if (checkIfExists==
false || stat(fuBoLS.c_str(), &buf) != 0) {
309 int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
323 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
326 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
339 gettimeofday(&tv, 0);
340 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
345 lumiSection, lsopentime,
351 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
382 std::unique_lock<std::mutex> lkw(
mWakeup_);
396 throw cms::Exception(
"FedRawDataInputSource::getNextEvent") <<
"Run has been aborted by the input source reader thread";
453 <<
" but according to BU JSON there should be "
458 std::unique_lock<std::mutex> lkw(
mWakeup_);
481 "Premature end of input file while reading event header";
501 if (detectedFRDversion_==0) {
502 detectedFRDversion_=*((
uint32*)dataPosition);
503 if (detectedFRDversion_>5)
506 assert(detectedFRDversion_>=1);
514 "Premature end of input file while reading event header";
521 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
522 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
523 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
531 "Premature end of input file while reading event data";
557 unsigned char *dataPosition;
565 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
566 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
567 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
575 "Premature end of input file while reading event data";
609 if ( crc !=
event_->crc32c() ) {
612 "Found a wrong crc32c checksum: expected 0x" << std::hex <<
event_->crc32c() <<
613 " but calculated 0x" << crc;
618 uint32_t adler = adler32(0
L,Z_NULL,0);
619 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
621 if ( adler !=
event_->adler32() ) {
624 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
625 " but calculated 0x" << adler;
646 catch (
const boost::filesystem::filesystem_error& ex)
648 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
649 <<
". Trying again.";
654 catch (
const boost::filesystem::filesystem_error&) {}
658 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
659 <<
". Trying again.";
718 bool fileIsBeingProcessed =
false;
721 fileIsBeingProcessed =
true;
725 if (!fileIsBeingProcessed) {
744 gettimeofday(&stv,0);
746 time = (time << 32) + stv.tv_usec;
749 uint32_t eventSize =
event_->eventSize();
750 char*
event = (
char*)
event_->payload();
753 while (eventSize > 0) {
755 eventSize -=
sizeof(
fedt_t);
759 eventSize -= (fedSize -
sizeof(
fedt_t));
764 throw cms::Exception(
"FedRawDataInputSource::fillFEDRawDataCollection") <<
"Out of range FED ID : " <<
fedId;
777 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
787 memcpy(fedData.
data(),
event + eventSize, fedSize);
802 std::ostringstream fileNameWithPID;
803 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
804 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
805 jsonDestPath /= fileNameWithPID.str();
807 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
812 catch (
const boost::filesystem::filesystem_error& ex)
815 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
826 catch (
const boost::filesystem::filesystem_error& ex)
829 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
834 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
837 boost::filesystem::ifstream ij(jsonDestPath);
841 std::stringstream
ss;
843 if (!reader.
parse(ss.str(), deserializeRoot)) {
844 edm::LogError(
"FedRawDataInputSource") <<
"Failed to deserialize JSON file -: " << jsonDestPath
846 <<
"CONTENT:\n" << ss.str()<<
".";
847 throw std::runtime_error(
"Cannot deserialize input JSON file");
866 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
867 " error reading number of events from BU JSON -: No input value " <<
data;
869 return boost::lexical_cast<
int>(
data);
872 catch (
const boost::filesystem::filesystem_error& ex)
876 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
878 catch (std::runtime_error
e)
882 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - runtime Exception -: " << e.what();
885 catch( boost::bad_lexical_cast
const& ) {
886 edm::LogError(
"FedRawDataInputSource") <<
"grabNextJsonFile - error parsing number of events from BU JSON. "
887 <<
"Input value is -: " <<
data;
894 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
905 InputSource::rewind();
918 unsigned int currentLumiSection = 0;
928 uint32_t lockCount=0;
939 bool copy_active=
false;
955 std::unique_lock<std::mutex> lkw(
mWakeup_);
960 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
991 status =
getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
1001 if (thisLockWaitTimeUs>0.)
1002 sumLockWaitTimeUs+=thisLockWaitTimeUs;
1009 sumLockWaitTimeUs=0;
1030 currentLumiSection =
ls;
1035 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
1045 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
1051 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
1055 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
1058 int stat_res = stat(rawFile.c_str(),&st);
1060 edm::LogError(
"FedRawDataInputSource") <<
"Can not stat file (" << errno <<
"):-"<< rawFile << std::endl;
1065 fileSize=st.st_size;
1072 int eventsInNewFile;
1074 if (fileSize==0) eventsInNewFile=0;
1075 else eventsInNewFile=-1;
1079 assert( eventsInNewFile>=0 );
1080 assert((eventsInNewFile>0) == (fileSize>0));
1094 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
1098 for (
unsigned int i=0;
i<neededChunks;
i++) {
1101 bool copy_active=
false;
1109 unsigned int newTid = 0xffffffff;
1115 bool copy_active=
false;
1128 if (newChunk ==
nullptr) {
1136 std::unique_lock<std::mutex> lk(
mReader_);
1139 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
1140 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
1150 if (!eventsInNewFile) {
1152 std::unique_lock<std::mutex> lkw(
mWakeup_);
1153 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
1162 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
1164 std::unique_lock<std::mutex> lkw(
mWakeup_);
1168 newChunk->
reset(0,toRead,0);
1172 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
1173 newInputFile->
chunks_[0]=newChunk;
1182 unsigned numFinishedThreads = 0;
1185 while (!
workerPool_.try_pop(tid)) {usleep(10000);}
1186 std::unique_lock<std::mutex> lk(
mReader_);
1189 numFinishedThreads++;
1200 threadInit_.exchange(
true,std::memory_order_acquire);
1205 std::unique_lock<std::mutex> lk(
mReader_);
1230 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
1231 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
1234 if (fileDescriptor>=0)
1235 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
1239 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
1240 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
1246 unsigned int bufferLeft = 0;
1260 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1261 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1262 close(fileDescriptor);
1275 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1291 if (currentLeft < size) {
1340 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1343 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1349 uint32_t existingSize = 0;
1364 for (uint32_t
i=0;
i<blockcount;
i++) {
1378 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
1392 itr->second+=events;
1402 auto &&
ret = std::pair<bool,unsigned int>(
true,itr->second);
1408 return std::pair<bool,unsigned int>(
false,0);
1415 if (a.rfind(
"/")!=std::string::npos) a=a.substr(a.rfind(
"/"));
1416 if (
b.rfind(
"/")!=std::string::npos)
b=
b.substr(
b.rfind(
"/"));
1423 auto end = fileStem.find(
"_");
1424 if (fileStem.find(
"run")==0) {
1428 long rval = boost::lexical_cast<
long>(runStr);
1429 edm::LogInfo(
"FedRawDataInputSource") <<
"Autodetected run number in fileListMode -: "<<
rval;
1432 catch( boost::bad_lexical_cast
const& ) {
1433 edm::LogWarning(
"FedRawDataInputSource") <<
"Unable to autodetect run number in fileListMode from file -: "<<
fileName;
1444 if (nextFile.find(
"file://")==0) nextFile=nextFile.substr(7);
1445 else if (nextFile.find(
"file:")==0) nextFile=nextFile.substr(5);
1448 if (fileStem.find(
"ls")) fileStem = fileStem.substr(fileStem.find(
"ls")+2);
1449 if (fileStem.find(
"_")) fileStem = fileStem.substr(0,fileStem.find(
"_"));
1450 ls = boost::lexical_cast<
unsigned int>(fileStem);
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
void setInStateSup(FastMonitoringThread::InputState inputState)
volatile std::atomic< bool > shutdown_flag
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void setComment(std::string const &value)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool evm_board_sense(const unsigned char *p, size_t size)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
void setInputSource(FedRawDataInputSource *inputSource)
unsigned long long int rval
unsigned long long uint64_t
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
std::string getBoLSFilePathOnFU(const unsigned int ls) const
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const