15 #include <boost/algorithm/string.hpp>
16 #include <boost/filesystem/fstream.hpp>
45 #include <boost/lexical_cast.hpp>
47 using namespace jsoncollector;
51 edm::RawInputSource(pset, desc),
52 defPath_(pset.getUntrackedParameter<std::
string> (
"buDefPath", std::
string(getenv(
"CMSSW_BASE"))+
"/src/EventFilter/Utilities/plugins/budef.jsd")),
53 eventChunkSize_(pset.getUntrackedParameter<unsigned int> (
"eventChunkSize",16)*1048576),
54 eventChunkBlock_(pset.getUntrackedParameter<unsigned int> (
"eventChunkBlock",eventChunkSize_/1048576)*1048576),
55 numBuffers_(pset.getUntrackedParameter<unsigned int> (
"numBuffers",1)),
56 getLSFromFilename_(pset.getUntrackedParameter<bool> (
"getLSFromFilename",
true)),
57 verifyAdler32_(pset.getUntrackedParameter<bool> (
"verifyAdler32",
true)),
58 testModeNoBuilderUnit_(edm::Service<evf::
EvFDaqDirector>()->getTestModeNoBuilderUnit()),
64 currentLumiSection_(0),
69 gethostname(thishost, 255);
70 edm::LogInfo(
"FedRawDataInputSource") <<
"Construction. read-ahead chunk size -: "
72 <<
" MB on host " << thishost;
74 edm::LogInfo(
"FedRawDataInputSource") <<
"Test mode is ON!";
83 DataPointDefinition::getDataPointDefinitionFor(
defPath_,
dpd_,&defLabel);
92 throw cms::Exception(
"FedRawDataInputSource::FedRawDataInputSource") <<
93 "no reading enabled with numBuffers parameter 0";
103 edm::LogWarning(
"FedRawDataInputSource") <<
"FastMonitoringService not found";
114 edm::LogWarning(
"FedRawDataInputSource") <<
"EvFDaqDirector not found";
131 cvReader_.push_back(
new std::condition_variable);
132 threadInit_.store(
false,std::memory_order_release);
155 std::unique_lock<std::mutex> lk(
mReader_);
193 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
196 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
211 edm::LogInfo(
"FedRawDataInputSource") <<
"----------------RUN ENDED----------------";
249 bool found = (stat(fuEoLS.c_str(), &buf) == 0);
252 int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
263 gettimeofday(&tv, 0);
264 const edm::Timestamp lsopentime( (
unsigned long long) tv.tv_sec * 1000000 + (
unsigned long long) tv.tv_usec );
269 lumiSection, lsopentime,
275 edm::LogInfo(
"FedRawDataInputSource") <<
"New lumi section was opened. LUMI -: "<< lumiSection;
291 const size_t headerSize[4] = {0,2*
sizeof(
uint32),(4 + 1024) *
sizeof(
uint32),7*
sizeof(
uint32)};
306 std::unique_lock<std::mutex> lkw(
mWakeup_);
307 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !
currentFile_)
369 <<
" but according to BU JSON there should be "
374 std::unique_lock<std::mutex> lkw(
mWakeup_);
397 "Premature end of input file while reading event header";
414 if (detectedFRDversion_==0) {
415 detectedFRDversion_=*((
uint32*)dataPosition);
416 assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
424 "Premature end of input file while reading event header";
431 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
432 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
433 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
441 "Premature end of input file while reading event data";
465 unsigned char *dataPosition;
473 <<
" event id:"<<
event_->event()<<
" lumi:" <<
event_->lumi()
474 <<
" run:" <<
event_->run() <<
" of size:" <<
event_->size()
475 <<
" bytes does not fit into a chunk of size:" <<
eventChunkSize_ <<
" bytes";
483 "Premature end of input file while reading event data";
514 uint32_t adler = adler32(0
L,Z_NULL,0);
515 adler = adler32(adler,(Bytef*)
event_->payload(),
event_->eventSize());
517 if ( adler !=
event_->adler32() ) {
519 "Found a wrong Adler32 checksum: expected 0x" << std::hex <<
event_->adler32() <<
520 " but calculated 0x" << adler;
537 catch (
const boost::filesystem::filesystem_error& ex)
539 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
540 <<
". Trying again.";
549 edm::LogError(
"FedRawDataInputSource") <<
" - deleteFile std::exception CAUGHT -: " << ex.what()
550 <<
". Trying again.";
589 bool fileIsBeingProcessed =
false;
592 fileIsBeingProcessed =
true;
596 if (!fileIsBeingProcessed) {
613 uint32_t eventSize =
event_->eventSize();
614 char*
event = (
char*)
event_->payload();
616 while (eventSize > 0) {
617 eventSize -=
sizeof(
fedt_t);
620 eventSize -= (fedSize -
sizeof(
fedh_t));
627 tstamp =
edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
629 FEDRawData& fedData = rawData->FEDData(fedId);
631 memcpy(fedData.
data(),
event + eventSize, fedSize);
633 assert(eventSize == 0);
646 std::ostringstream fileNameWithPID;
647 fileNameWithPID << jsonSourcePath.stem().string() <<
"_pid"
648 << std::setfill(
'0') << std::setw(5) << getpid() <<
".jsn";
649 jsonDestPath /= fileNameWithPID.str();
651 LogDebug(
"FedRawDataInputSource") <<
"JSON rename -: " << jsonSourcePath <<
" to "
660 catch (
const boost::filesystem::filesystem_error& ex)
663 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
675 catch (
const boost::filesystem::filesystem_error& ex)
678 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
683 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile std::exception CAUGHT -: " << ex.what();
688 boost::filesystem::ifstream ij(jsonDestPath);
692 if (!reader.
parse(ij, deserializeRoot))
693 throw std::runtime_error(
"Cannot deserialize input JSON file");
711 throw cms::Exception(
"FedRawDataInputSource::grabNextJsonFile") <<
712 " error reading number of events from BU JSON -: No input value " <<
data;
714 return boost::lexical_cast<
int>(
data);
717 catch (
const boost::filesystem::filesystem_error& ex)
721 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
722 <<
" - Maybe the BU run dir disappeared? Ending process with code 0...";
725 catch (std::runtime_error
e)
729 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - runtime Exception -: " << e.what();
732 catch( boost::bad_lexical_cast
const& ) {
733 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - error parsing number of events from BU JSON. "
734 <<
"Input value is -: " <<
data;
741 edm::LogError(
"FedRawDataInputSource") <<
"grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
752 edm::LogInfo(
"FedRawDataInputSource") <<
"Instead of delete, RENAME -: " << fileName
753 <<
" to: " << destination.string();
754 boost::filesystem::rename(source,destination);
755 boost::filesystem::rename(source.replace_extension(
".jsn"),destination.replace_extension(
".jsn"));
763 InputSource::rewind();
776 unsigned int currentLumiSection = 0;
790 std::unique_lock<std::mutex> lkw(
mWakeup_);
792 if (
cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
795 LogDebug(
"FedRawDataInputSource") <<
"No free chunks or threads...";
830 currentLumiSection =
ls;
835 edm::LogError(
"FedRawDataInputSource") <<
"Got old LS ("<<ls<<
") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<
". Aborting execution."<<std::endl;
843 if (!(dbgcount%20))
LogDebug(
"FedRawDataInputSource") <<
"No file for me... sleep and try again...";
848 LogDebug(
"FedRawDataInputSource") <<
"The director says to grab -: " << nextFile;
852 std::string rawFile = rawFilePath.replace_extension(
".raw").string();
855 stat(rawFile.c_str(),&st);
860 assert( eventsInNewFile>=0 );
861 assert((eventsInNewFile>0) == (fileSize>0));
868 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,
this);
871 for (
unsigned int i=0;
i<neededChunks;
i++) {
874 unsigned int newTid = 0xffffffff;
885 if (newChunk ==
nullptr) {
892 std::unique_lock<std::mutex> lk(
mReader_);
895 if (
i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%
eventChunkSize_;
896 newChunk->
reset(
i*eventChunkSize_,toRead,
i);
906 if (!eventsInNewFile) {
908 std::unique_lock<std::mutex> lkw(
mWakeup_);
909 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,
this);
917 while(!
freeChunks_.try_pop(newChunk)) usleep(100000);
919 std::unique_lock<std::mutex> lkw(
mWakeup_);
923 newChunk->
reset(0,toRead,0);
927 InputFile * newInputFile =
new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,
this);
928 newInputFile->
chunks_[0]=newChunk;
935 unsigned numFinishedThreads = 0;
939 std::unique_lock<std::mutex> lk(
mReader_);
942 numFinishedThreads++;
953 threadInit_.exchange(
true,std::memory_order_acquire);
957 std::unique_lock<std::mutex> lk(
mReader_);
981 int fileDescriptor = open(file->
fileName_.c_str(), O_RDONLY);
982 off_t pos = lseek(fileDescriptor,chunk->
offset_,SEEK_SET);
985 if (fileDescriptor>=0)
986 LogDebug(
"FedRawDataInputSource") <<
"Reader thread opened file -: TID: " << tid <<
" file: " << file->
fileName_ <<
" at offset " << pos;
990 "readWorker failed to open file -: " << file->
fileName_ <<
" fd:" << fileDescriptor <<
991 " or seek to offset " << chunk->
offset_ <<
", lseek returned:" << pos;
997 unsigned int bufferLeft = 0;
1011 std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(
diff);
1012 LogDebug(
"FedRawDataInputSource") <<
" finished reading block -: " << (bufferLeft >> 20) <<
" MB" <<
" in " << msec.count() <<
" ms ("<< (bufferLeft >> 20)/
double(msec.count())<<
" GB/s)";
1013 close(fileDescriptor);
1026 throw cms::Exception(
"FedRawDataInputSource:threadError") <<
" file reader thread error ";
1042 if (currentLeft < size) {
1091 LogDebug(
"FedRawDataInputSource") <<
"opened file -: " << std::endl << file->
fileName_;
1094 throw cms::Exception(
"FedRawDataInputSource:readNextChunkIntoBuffer") <<
"failed to open file " << std::endl
1100 uint32_t existingSize = 0;
1115 for (uint32_t
i=0;
i<blockcount;
i++) {
1130 LogDebug(
"FedRawDataInputSource") <<
"Closing input file -: " << std::endl << file->
fileName_;
static const char runNumber_[]
std::vector< std::string > & getData()
unsigned int getgpshigh(const unsigned char *)
tuple start
Check for commandline option errors.
void startedLookingForFile()
static Timestamp invalidTimestamp()
std::vector< int > * getStreamFileTracker()
volatile std::atomic< bool > shutdown_flag
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
bool isSingleStreamThread()
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
U second(std::pair< T, U > const &p)
static Timestamp beginOfTime()
void evm_board_setformat(size_t size)
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
void resize(size_t newsize)
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
#define FED_EVSZ_EXTRACT(a)
BranchDescription const & branchDescription() const
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void deserialize(Json::Value &root)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
unsigned long long uint64_t
void setProcessHistoryID(ProcessHistoryID const &phid)
void stoppedLookingForFile(unsigned int lumi)
char data[epos_bytes_allocation]
static std::atomic< unsigned int > counter
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Unserialize a JSON document into a Value.
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
std::string getEoRFilePathOnFU() const
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)