CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 125 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
69  eventID_(),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
81 
83  setNewRun();
86 
87  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
88  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
89  struct stat statbuf;
90  if (stat(defPath_.c_str(), &statbuf)) {
91  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
92  if (stat(defPath_.c_str(), &statbuf)) {
93  defPath_ = defPathSuffix;
94  }
95  }
96 
97  dpd_ = new DataPointDefinition();
98  std::string defLabel = "data";
99  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
100 
101  //make sure that chunk size is N * block size
106 
107  if (!numBuffers_)
108  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
109  "no reading enabled with numBuffers parameter 0";
110 
114 
115  if (!crc32c_hw_test())
116  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
117 
118  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
119 
120  try {
122  } catch (...){
123  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
124  assert(0);//test
125  }
126 
127  try {
129  //set DaqDirector to delete files in preGlobalEndLumi callback
131  if (fms_) daqDirector_->setFMS(fms_);
132  } catch (...){
133  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
134  assert(0);//test
135  }
136 
137  //should delete chunks when run stops
138  for (unsigned int i=0;i<numBuffers_;i++) {
140  }
141 
142  quit_threads_ = false;
143 
144  for (unsigned int i=0;i<numConcurrentReads_;i++)
145  {
146  std::unique_lock<std::mutex> lk(startupLock_);
147  //issue a memory fence here and in threads (constructor was segfaulting without this)
148  thread_quit_signal.push_back(false);
149  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
150  cvReader_.push_back(new std::condition_variable);
151  threadInit_.store(false,std::memory_order_release);
152  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
153  startupCv_.wait(lk);
154  }
155 
156  runAuxiliary()->setProcessHistoryID(processHistoryID_);
157 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:345
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:257
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:348
void readWorker(unsigned int tid)
ProcessHistoryRegistry & processHistoryRegistryForUpdate() const
Definition: InputSource.h:346
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 159 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

160 {
161  quit_threads_=true;
162 
163  //delete any remaining open files
164  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
165  deleteFile(it->second->fileName_);
166  delete it->second;
167  }
169  readSupervisorThread_->join();
170  }
171  else {
172  //join aux threads in case the supervisor thread was not started
173  for (unsigned int i=0;i<workerThreads_.size();i++) {
174  std::unique_lock<std::mutex> lk(mReader_);
175  thread_quit_signal[i]=true;
176  cvReader_[i]->notify_one();
177  lk.unlock();
178  workerThreads_[i]->join();
179  delete workerThreads_[i];
180  }
181  }
182  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
183  /*
184  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
185  InputChunk *ch;
186  while (!freeChunks_.try_pop(ch)) {}
187  delete ch;
188  }
189  */
190 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 207 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

208 {
210  {
211  //this thread opens new files and dispatches reading to worker readers
212  //threadInit_.store(false,std::memory_order_release);
213  std::unique_lock<std::mutex> lk(startupLock_);
214  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
216  startupCv_.wait(lk);
217  }
218  //signal hltd to start event accounting
221 
222  switch (nextEvent() ) {
224 
225  //maybe create EoL file in working directory before ending run
226  struct stat buf;
227  if ( currentLumiSection_ > 0 ) {
228  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
229  if (eolFound) {
231  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
232  if ( !found ) {
234  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
235  close(eol_fd);
237  }
238  }
239  }
240  //also create EoR file in FU data directory
241  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
242  if (!eorFound) {
243  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
244  close(eor_fd);
245  }
247  eventsThisLumi_=0;
249  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
250  return false;
251  }
253  //this is not reachable
254  return true;
255  }
257  //std::cout << "--------------NEW LUMI---------------" << std::endl;
258  return true;
259  }
260  default: {
261  if (!getLSFromFilename_) {
262  //get new lumi from file header
263  if (event_->lumi() > currentLumiSection_) {
265  eventsThisLumi_=0;
266  maybeOpenNewLumiSection( event_->lumi() );
267  }
268  }
269  eventRunNumber_=event_->run();
270  L1EventID_ = event_->event();
271 
272  setEventCached();
273 
274  return true;
275  }
276  }
277 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:379
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 279 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

280 {
281  //used for backpressure mechanisms and monitoring
282  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
283  struct stat buf;
284  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
285  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
286  close(bol_fd);
287  }
288 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 602 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, LogDebug, cmsHarvester::path, and python.multivaluedict::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

603 {
604  const boost::filesystem::path filePath(fileName);
605  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
606  try {
607  //sometimes this fails but file gets deleted
608  boost::filesystem::remove(filePath);
609  }
610  catch (const boost::filesystem::filesystem_error& ex)
611  {
612  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
613  << ". Trying again.";
614  usleep(100000);
615  try {
616  boost::filesystem::remove(filePath);
617  }
618  catch (...) {/*file gets deleted first time but exception is still thrown*/}
619  }
620  catch (std::exception& ex)
621  {
622  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
623  << ". Trying again.";
624  usleep(100000);
625  try {
626  boost::filesystem::remove(filePath);
627  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
628  }
629 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 73 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 192 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), and edm::ParameterSetDescription::setComment().

193 {
195  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
196  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
197  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
198  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
199  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
200  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
201  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
202  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
203  desc.setAllowAnything();
204  descriptions.add("source", desc);
205 }
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 701 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), HLT_25ns14e33_v1_cff::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and cond::rpcobgas::time.

Referenced by read().

702 {
704  timeval stv;
705  gettimeofday(&stv,0);
706  time = stv.tv_sec;
707  time = (time << 32) + stv.tv_usec;
708  edm::Timestamp tstamp(time);
709 
710  uint32_t eventSize = event_->eventSize();
711  char* event = (char*)event_->payload();
712  GTPEventID_=0;
713  tcds_pointer_ = 0;
714  while (eventSize > 0) {
715  assert(eventSize>=sizeof(fedt_t));
716  eventSize -= sizeof(fedt_t);
717  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
718  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
719  assert(eventSize>=fedSize - sizeof(fedt_t));
720  eventSize -= (fedSize - sizeof(fedt_t));
721  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
722  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
723  if(fedId>FEDNumbering::MAXFEDID)
724  {
725  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
726  }
727  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
728  tcds_pointer_ = (unsigned char *)(event + eventSize );
729  }
730  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
731  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
732  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
733  else
734  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
735  //evf::evtn::evm_board_setformat(fedSize);
736  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
737  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
738  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
739  }
740  //take event ID from GTPE FED
741  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
742  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
743  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
744  }
745  }
746  FEDRawData& fedData = rawData.FEDData(fedId);
747  fedData.resize(fedSize);
748  memcpy(fedData.data(), event + eventSize, fedSize);
749  }
750  assert(eventSize == 0);
751 
752  return tstamp;
753 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 341 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

342 {
343 
345  if (!currentFile_)
346  {
347  if (!streamFileTrackerPtr_) {
351  }
352 
354  if (!fileQueue_.try_pop(currentFile_))
355  {
356  //sleep until wakeup (only in single-buffer mode) or timeout
357  std::unique_lock<std::mutex> lkw(mWakeup_);
358  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
360  }
361  status = currentFile_->status_;
362  if ( status == evf::EvFDaqDirector::runEnded)
363  {
364  delete currentFile_;
365  currentFile_=nullptr;
366  return status;
367  }
368  else if ( status == evf::EvFDaqDirector::runAbort)
369  {
370  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
371  }
372  else if (status == evf::EvFDaqDirector::newLumi)
373  {
374  if (getLSFromFilename_) {
377  eventsThisLumi_=0;
379  }
380  }
381  else {//let this be picked up from next event
383  }
384 
385  delete currentFile_;
386  currentFile_=nullptr;
387  return status;
388  }
389  else if (status == evf::EvFDaqDirector::newFile) {
392  }
393  else
394  assert(0);
395  }
396 
397  //file is empty
398  if (!currentFile_->fileSize_) {
400  //try to open new lumi
402  if (getLSFromFilename_)
405  eventsThisLumi_=0;
407  }
408  //immediately delete empty file
410  delete currentFile_;
411  currentFile_=nullptr;
413  }
414 
415  //file is finished
418  //release last chunk (it is never released elsewhere)
421  {
422  throw cms::Exception("FedRawDataInputSource::getNextEvent")
423  << "Fully processed " << currentFile_->nProcessed_
424  << " from the file " << currentFile_->fileName_
425  << " but according to BU JSON there should be "
426  << currentFile_->nEvents_ << " events";
427  }
428  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
429  if (singleBufferMode_) {
430  std::unique_lock<std::mutex> lkw(mWakeup_);
431  cvWakeup_.notify_one();
432  }
435  //put the file in pending delete list;
436  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
437  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
438  }
439  else {
440  //in single-thread and stream jobs, events are already processed
442  delete currentFile_;
443  }
444  currentFile_=nullptr;
446  }
447 
448 
449  //file is too short
451  {
452  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
453  "Premature end of input file while reading event header";
454  }
455  if (singleBufferMode_) {
456 
457  //should already be there
459  usleep(10000);
461  }
462 
463  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
464 
465  //conditions when read amount is not sufficient for the header to fit
466  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
468  {
470 
471  if (detectedFRDversion_==0) {
472  detectedFRDversion_=*((uint32*)dataPosition);
473  if (detectedFRDversion_>5)
474  throw cms::Exception("FedRawDataInputSource::getNextEvent")
475  << "Unknown FRD version -: " << detectedFRDversion_;
476  assert(detectedFRDversion_>=1);
477  }
478 
479  //recalculate chunk position
480  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
481  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
482  {
483  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
484  "Premature end of input file while reading event header";
485  }
486  }
487 
488  event_.reset( new FRDEventMsgView(dataPosition) );
489  if (event_->size()>eventChunkSize_) {
490  throw cms::Exception("FedRawDataInputSource::getNextEvent")
491  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
492  << " run:" << event_->run() << " of size:" << event_->size()
493  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
494  }
495 
496  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
497 
499  {
500  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
501  "Premature end of input file while reading event data";
502  }
503  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
505  //recalculate chunk position
506  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
507  event_.reset( new FRDEventMsgView(dataPosition) );
508  }
509  currentFile_->bufferPosition_ += event_->size();
510  currentFile_->chunkPosition_ += event_->size();
511  //last chunk is released when this function is invoked next time
512 
513  }
514  //multibuffer mode:
515  else
516  {
517  //wait for the current chunk to become added to the vector
519  usleep(10000);
521  }
522 
523  //check if header is at the boundary of two chunks
524  chunkIsFree_ = false;
525  unsigned char *dataPosition;
526 
527  //read header, copy it to a single chunk if necessary
528  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
529 
530  event_.reset( new FRDEventMsgView(dataPosition) );
531  if (event_->size()>eventChunkSize_) {
532  throw cms::Exception("FedRawDataInputSource::getNextEvent")
533  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
534  << " run:" << event_->run() << " of size:" << event_->size()
535  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
536  }
537 
538  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
539 
541  {
542  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
543  "Premature end of input file while reading event data";
544  }
545 
546  if (chunkEnd) {
547  //header was at the chunk boundary, we will have to move payload as well
548  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
549  chunkIsFree_ = true;
550  }
551  else {
552  //header was contiguous, but check if payload fits the chunk
553  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
554  //rewind to header start position
555  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
556  //copy event to a chunk start and move pointers
557  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
558  assert(chunkEnd);
559  chunkIsFree_=true;
560  //header is moved
561  event_.reset( new FRDEventMsgView(dataPosition) );
562  }
563  else {
564  //everything is in a single chunk, only move pointers forward
565  chunkEnd = currentFile_->advance(dataPosition,msgSize);
566  assert(!chunkEnd);
567  chunkIsFree_=false;
568  }
569  }
570  }//end multibuffer mode
571 
572  if (verifyChecksum_ && event_->version() >= 5)
573  {
574  uint32_t crc=0;
575  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
576  if ( crc != event_->crc32c() ) {
578  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
579  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
580  " but calculated 0x" << crc;
581  }
582  }
583  else if ( verifyAdler32_ && event_->version() >= 3)
584  {
585  uint32_t adler = adler32(0L,Z_NULL,0);
586  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
587 
588  if ( adler != event_->adler32() ) {
590  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
591  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
592  " but calculated 0x" << adler;
593  }
594  }
595 
596 
598 
600 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 755 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

756 {
758  try {
759  // assemble json destination path
761 
762  //TODO:should be ported to use fffnaming
763  std::ostringstream fileNameWithPID;
764  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
765  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
766  jsonDestPath /= fileNameWithPID.str();
767 
768  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
769  << jsonDestPath;
770  try {
771  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
772  }
773  catch (const boost::filesystem::filesystem_error& ex)
774  {
775  // Input dir gone?
776  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
777  // << " Maybe the file is not yet visible by FU. Trying again in one second";
778  sleep(1);
779  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
780  }
782 
783  try {
784  //sometimes this fails but file gets deleted
785  boost::filesystem::remove(jsonSourcePath);
786  }
787  catch (const boost::filesystem::filesystem_error& ex)
788  {
789  // Input dir gone?
790  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
791  }
792  catch (std::exception& ex)
793  {
794  // Input dir gone?
795  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
796  }
797 
798  boost::filesystem::ifstream ij(jsonDestPath);
799  Json::Value deserializeRoot;
801 
802  if (!reader.parse(ij, deserializeRoot))
803  throw std::runtime_error("Cannot deserialize input JSON file");
804 
805  //read BU JSON
807  DataPoint dp;
808  dp.deserialize(deserializeRoot);
809  bool success = false;
810  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
811  if (dpd_->getNames().at(i)=="NEvents")
812  if (i<dp.getData().size()) {
813  data = dp.getData()[i];
814  success=true;
815  }
816  }
817  if (!success) {
818  if (dp.getData().size())
819  data = dp.getData()[0];
820  else
821  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
822  " error reading number of events from BU JSON -: No input value " << data;
823  }
824  return boost::lexical_cast<int>(data);
825 
826  }
827  catch (const boost::filesystem::filesystem_error& ex)
828  {
829  // Input dir gone?
831  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
832  }
833  catch (std::runtime_error e)
834  {
835  // Another process grabbed the file and NFS did not register this
837  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
838  }
839 
840  catch( boost::bad_lexical_cast const& ) {
841  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
842  << "Input value is -: " << data;
843  }
844 
845  catch (std::exception e)
846  {
847  // BU run directory disappeared?
849  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
850  }
851 
852  return -1;
853 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 290 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

291 {
293  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
294 
295  if ( currentLumiSection_ > 0 ) {
296  const std::string fuEoLS =
298  struct stat buf;
299  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
300  if ( !found ) {
302  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
303  close(eol_fd);
304  createBoLSFile(lumiSection,false);
306  }
307  }
308  else createBoLSFile(lumiSection,true);//needed for initial lumisection
309 
310  currentLumiSection_ = lumiSection;
311 
313 
314  timeval tv;
315  gettimeofday(&tv, 0);
316  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
317 
318  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
320  runAuxiliary()->run(),
321  lumiSection, lsopentime,
323 
324  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
325  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
326 
327  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
328  }
329 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:352
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:257
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:260
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 331 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

332 {
334  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
335  {
336  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
337  }
338  return status;
339 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 858 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

859 {
860  InputSource::rewind();
864 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:348
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 855 of file FedRawDataInputSource.cc.

856 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 632 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

633 {
634  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
635  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
636 
637  if (useL1EventID_){
639  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
641  aux.setProcessHistoryID(processHistoryID_);
642  makeEvent(eventPrincipal, aux);
643  }
644  else if(tcds_pointer_==0){
647  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
649  aux.setProcessHistoryID(processHistoryID_);
650  makeEvent(eventPrincipal, aux);
651  }
652  else{
653  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
656  processGUID());
658  makeEvent(eventPrincipal, aux);
659  }
660 
661 
662 
663  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
664 
665  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
666  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
667  // daqProvenanceHelper_.dummyProvenance_);
668 
669  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
671 
672  eventsThisLumi_++;
673 
674  //this old file check runs no more often than every 10 events
675  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
676  //delete files that are not in processing
677  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
678  auto it = filesToDelete_.begin();
679  while (it!=filesToDelete_.end()) {
680  bool fileIsBeingProcessed = false;
681  for (unsigned int i=0;i<nStreams_;i++) {
682  if (it->first == streamFileTrackerPtr_->at(i)) {
683  fileIsBeingProcessed = true;
684  break;
685  }
686  }
687  if (!fileIsBeingProcessed) {
688  deleteFile(it->second->fileName_);
689  delete it->second;
690  it = filesToDelete_.erase(it);
691  }
692  else it++;
693  }
694 
695  }
697  chunkIsFree_=false;
698  return;
699 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:212
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1210 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1211 {
1212 
1213  if (fileDescriptor_<0) {
1214  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1215  bufferInputRead_ = 0;
1216  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1217  if (fileDescriptor_>=0)
1218  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1219  else
1220  {
1221  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1222  << file->fileName_ << " fd:" << fileDescriptor_;
1223  }
1224  }
1225 
1226  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1227  uint32_t existingSize = 0;
1228  for (unsigned int i=0;i<readBlocks_;i++)
1229  {
1230  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1231  bufferInputRead_+=last;
1232  existingSize+=last;
1233  }
1234  }
1235  else {
1236  const uint32_t chunksize = file->chunkPosition_;
1237  const uint32_t blockcount=chunksize/eventChunkBlock_;
1238  const uint32_t leftsize = chunksize%eventChunkBlock_;
1239  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1240  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1241 
1242  for (uint32_t i=0;i<blockcount;i++) {
1243  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1244  bufferInputRead_+=last;
1245  existingSize+=last;
1246  }
1247  if (leftsize) {
1248  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1249  bufferInputRead_+=last;
1250  existingSize+=last;
1251  }
1252  file->chunkPosition_=0;//data was moved to beginning of the chunk
1253  }
1254  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1255  if (fileDescriptor_!=-1)
1256  {
1257  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1258  close(fileDescriptor_);
1259  fileDescriptor_=-1;
1260  }
1261  }
1262 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 870 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

871 {
872  bool stop=false;
873  unsigned int currentLumiSection = 0;
874  //threadInit_.exchange(true,std::memory_order_acquire);
875 
876  {
877  std::unique_lock<std::mutex> lk(startupLock_);
878  startupCv_.notify_one();
879  }
880 
881  while (!stop) {
882 
883  //wait for at least one free thread and chunk
884  int counter=0;
886  {
887  std::unique_lock<std::mutex> lkw(mWakeup_);
888  //sleep until woken up by condition or a timeout
889  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
890  counter++;
891  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
892  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
893  }
894  else {
895  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
896  }
897  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
898  }
899 
900  if (stop) break;
901 
902  //look for a new file
903  std::string nextFile;
904  uint32_t ls=0;
905  uint32_t fileSize;
906 
907  uint32_t monLS=1;
908  uint32_t lockCount=0;
909  uint64_t sumLockWaitTimeUs=0.;
910 
912 
914 
915  while (status == evf::EvFDaqDirector::noFile) {
916  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
917  stop=true;
918  break;
919  }
920 
921  uint64_t thisLockWaitTimeUs=0.;
922  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
923  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
924 
925  //monitoring of lock wait time
926  if (thisLockWaitTimeUs>0.)
927  sumLockWaitTimeUs+=thisLockWaitTimeUs;
928  lockCount++;
929  if (ls>monLS) {
930  monLS=ls;
931  if (lockCount)
932  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
933  lockCount=0;
934  sumLockWaitTimeUs=0;
935  }
936 
937  //check again for any remaining index/EoLS files after EoR file is seen
938  if ( status == evf::EvFDaqDirector::runEnded) {
939  usleep(100000);
940  //now all files should have appeared in ramdisk, check again if any raw files were left behind
941  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
942  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
943  }
944 
945  if ( status == evf::EvFDaqDirector::runEnded) {
947  stop=true;
948  break;
949  }
950 
951  //queue new lumisection
952  if( getLSFromFilename_ && ls > currentLumiSection) {
953  currentLumiSection = ls;
954  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
955  }
956 
957  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
958  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
960  stop=true;
961  break;
962  }
963 
964  int dbgcount=0;
965  if (status == evf::EvFDaqDirector::noFile) {
966  dbgcount++;
967  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
968  usleep(100000);
969  }
970  }
971  if ( status == evf::EvFDaqDirector::newFile ) {
972  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
973 
974 
975  boost::filesystem::path rawFilePath(nextFile);
976  std::string rawFile = rawFilePath.replace_extension(".raw").string();
977 
978  struct stat st;
979  stat(rawFile.c_str(),&st);
980  fileSize=st.st_size;
981 
982  int eventsInNewFile = grabNextJsonFile(nextFile);
983  if (fms_) fms_->stoppedLookingForFile(ls);
984  assert( eventsInNewFile>=0 );
985  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
986 
987  if (!singleBufferMode_) {
988  //calculate number of needed chunks
989  unsigned int neededChunks = fileSize/eventChunkSize_;
990  if (fileSize%eventChunkSize_) neededChunks++;
991 
992  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
994  fileQueue_.push(newInputFile);
995 
996  for (unsigned int i=0;i<neededChunks;i++) {
997 
998  //get thread
999  unsigned int newTid = 0xffffffff;
1000  while (!workerPool_.try_pop(newTid)) {
1001  usleep(100000);
1002  }
1003 
1004  InputChunk * newChunk = nullptr;
1005  while (!freeChunks_.try_pop(newChunk)) {
1006  usleep(100000);
1007  if (quit_threads_.load(std::memory_order_relaxed)) break;
1008  }
1009 
1010  if (newChunk == nullptr) {
1011  //return unused tid if we received shutdown (nullptr chunk)
1012  if (newTid!=0xffffffff) workerPool_.push(newTid);
1013  stop = true;
1014  break;
1015  }
1016 
1017  std::unique_lock<std::mutex> lk(mReader_);
1018 
1019  unsigned int toRead = eventChunkSize_;
1020  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1021  newChunk->reset(i*eventChunkSize_,toRead,i);
1022 
1023  workerJob_[newTid].first=newInputFile;
1024  workerJob_[newTid].second=newChunk;
1025 
1026  //wake up the worker thread
1027  cvReader_[newTid]->notify_one();
1028  }
1029  }
1030  else {
1031  if (!eventsInNewFile) {
1032  //still queue file for lumi update
1033  std::unique_lock<std::mutex> lkw(mWakeup_);
1034  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1036  fileQueue_.push(newInputFile);
1037  cvWakeup_.notify_one();
1038  return;
1039  }
1040  //in single-buffer mode put single chunk in the file and let the main thread read the file
1041  InputChunk * newChunk;
1042  //should be available immediately
1043  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1044 
1045  std::unique_lock<std::mutex> lkw(mWakeup_);
1046 
1047  unsigned int toRead = eventChunkSize_;
1048  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1049  newChunk->reset(0,toRead,0);
1050  newChunk->readComplete_=true;
1051 
1052  //push file and wakeup main thread
1053  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1054  newInputFile->chunks_[0]=newChunk;
1056  fileQueue_.push(newInputFile);
1057  cvWakeup_.notify_one();
1058  }
1059  }
1060  }
1061  //make sure threads finish reading
1062  unsigned numFinishedThreads = 0;
1063  while (numFinishedThreads < workerThreads_.size()) {
1064  unsigned int tid;
1065  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1066  std::unique_lock<std::mutex> lk(mReader_);
1067  thread_quit_signal[tid]=true;
1068  cvReader_[tid]->notify_one();
1069  numFinishedThreads++;
1070  }
1071  for (unsigned int i=0;i<workerThreads_.size();i++) {
1072  workerThreads_[i]->join();
1073  delete workerThreads_[i];
1074  }
1075 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1077 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1078 {
1079  bool init = true;
1080  threadInit_.exchange(true,std::memory_order_acquire);
1081 
1082  while (1) {
1083 
1084  std::unique_lock<std::mutex> lk(mReader_);
1085  workerJob_[tid].first=nullptr;
1086  workerJob_[tid].first=nullptr;
1087 
1088  assert(!thread_quit_signal[tid]);//should never get it here
1089  workerPool_.push(tid);
1090 
1091  if (init) {
1092  std::unique_lock<std::mutex> lk(startupLock_);
1093  init = false;
1094  startupCv_.notify_one();
1095  }
1096  cvReader_[tid]->wait(lk);
1097 
1098  if (thread_quit_signal[tid]) return;
1099 
1100  InputFile * file;
1101  InputChunk * chunk;
1102 
1103  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1104 
1105  file = workerJob_[tid].first;
1106  chunk = workerJob_[tid].second;
1107 
1108  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1109  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1110 
1111 
1112  if (fileDescriptor>=0)
1113  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1114  else
1115  {
1116  edm::LogError("FedRawDataInputSource") <<
1117  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1118  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1119  setExceptionState_=true;
1120  return;
1121 
1122  }
1123 
1124  unsigned int bufferLeft = 0;
1126  for (unsigned int i=0;i<readBlocks_;i++)
1127  {
1128  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1129  if ( last > 0 )
1130  bufferLeft+=last;
1131  if (last < eventChunkBlock_) {
1132  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1133  break;
1134  }
1135  }
1137  auto diff = end-start;
1138  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1139  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1140  close(fileDescriptor);
1141 
1142  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1144  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1145  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1146 
1147  }
1148 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 866 of file FedRawDataInputSource.cc.

867 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1150 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1151 {
1152  quit_threads_=true;
1153  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1154 
1155 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 82 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 115 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 152 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 155 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private