CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 124 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", verifyAdler32_)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
69  eventID_(),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
81 
83  setNewRun();
86 
87  dpd_ = new DataPointDefinition();
88  std::string defLabel = "data";
89  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
90 
91  //make sure that chunk size is N * block size
96 
97  if (!numBuffers_)
98  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
99  "no reading enabled with numBuffers parameter 0";
100 
104 
105  if (!crc32c_hw_test())
106  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
107 
108  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
109 
110  try {
112  } catch (...){
113  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
114  assert(0);//test
115  }
116 
117  try {
119  //set DaqDirector to delete files in preGlobalEndLumi callback
121  if (fms_) daqDirector_->setFMS(fms_);
122  } catch (...){
123  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
124  assert(0);//test
125  }
126 
127  //should delete chunks when run stops
128  for (unsigned int i=0;i<numBuffers_;i++) {
130  }
131 
132  quit_threads_ = false;
133 
134  for (unsigned int i=0;i<numConcurrentReads_;i++)
135  {
136  std::unique_lock<std::mutex> lk(startupLock_);
137  //issue a memory fence here and in threads (constructor was segfaulting without this)
138  thread_quit_signal.push_back(false);
139  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
140  cvReader_.push_back(new std::condition_variable);
141  threadInit_.store(false,std::memory_order_release);
142  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
143  startupCv_.wait(lk);
144  }
145 
146  runAuxiliary()->setProcessHistoryID(processHistoryID_);
147 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:345
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:257
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:348
void readWorker(unsigned int tid)
ProcessHistoryRegistry & processHistoryRegistryForUpdate() const
Definition: InputSource.h:346
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 149 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

150 {
151  quit_threads_=true;
152 
153  //delete any remaining open files
154  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
155  deleteFile(it->second->fileName_);
156  delete it->second;
157  }
159  readSupervisorThread_->join();
160  }
161  else {
162  //join aux threads in case the supervisor thread was not started
163  for (unsigned int i=0;i<workerThreads_.size();i++) {
164  std::unique_lock<std::mutex> lk(mReader_);
165  thread_quit_signal[i]=true;
166  cvReader_[i]->notify_one();
167  lk.unlock();
168  workerThreads_[i]->join();
169  delete workerThreads_[i];
170  }
171  }
172  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
173  /*
174  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
175  InputChunk *ch;
176  while (!freeChunks_.try_pop(ch)) {}
177  delete ch;
178  }
179  */
180 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 182 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

183 {
185  {
186  //this thread opens new files and dispatches reading to worker readers
187  //threadInit_.store(false,std::memory_order_release);
188  std::unique_lock<std::mutex> lk(startupLock_);
189  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
191  startupCv_.wait(lk);
192  }
193  //signal hltd to start event accounting
196 
197  switch (nextEvent() ) {
199 
200  //maybe create EoL file in working directory before ending run
201  struct stat buf;
202  if ( currentLumiSection_ > 0 ) {
203  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
204  if (eolFound) {
206  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
207  if ( !found ) {
209  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
210  close(eol_fd);
212  }
213  }
214  }
215  //also create EoR file in FU data directory
216  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
217  if (!eorFound) {
218  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
219  close(eor_fd);
220  }
222  eventsThisLumi_=0;
224  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
225  return false;
226  }
228  //this is not reachable
229  return true;
230  }
232  //std::cout << "--------------NEW LUMI---------------" << std::endl;
233  return true;
234  }
235  default: {
236  if (!getLSFromFilename_) {
237  //get new lumi from file header
238  if (event_->lumi() > currentLumiSection_) {
240  eventsThisLumi_=0;
241  maybeOpenNewLumiSection( event_->lumi() );
242  }
243  }
244  eventRunNumber_=event_->run();
245  L1EventID_ = event_->event();
246 
247  setEventCached();
248 
249  return true;
250  }
251  }
252 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:379
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 254 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

255 {
256  //used for backpressure mechanisms and monitoring
257  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
258  struct stat buf;
259  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
260  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
261  close(bol_fd);
262  }
263 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 577 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and python.multivaluedict::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

578 {
579  const boost::filesystem::path filePath(fileName);
580  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
581  try {
582  //sometimes this fails but file gets deleted
583  boost::filesystem::remove(filePath);
584  }
585  catch (const boost::filesystem::filesystem_error& ex)
586  {
587  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
588  << ". Trying again.";
589  usleep(100000);
590  try {
591  boost::filesystem::remove(filePath);
592  }
593  catch (...) {/*file gets deleted first time but exception is still thrown*/}
594  }
595  catch (std::exception& ex)
596  {
597  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
598  << ". Trying again.";
599  usleep(100000);
600  try {
601  boost::filesystem::remove(filePath);
602  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
603  }
604 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 72 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 676 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), HLT_25ns14e33_v1_cff::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and cond::rpcobgas::time.

Referenced by read().

677 {
679  timeval stv;
680  gettimeofday(&stv,0);
681  time = stv.tv_sec;
682  time = (time << 32) + stv.tv_usec;
683  edm::Timestamp tstamp(time);
684 
685  uint32_t eventSize = event_->eventSize();
686  char* event = (char*)event_->payload();
687  GTPEventID_=0;
688  tcds_pointer_ = 0;
689  while (eventSize > 0) {
690  assert(eventSize>=sizeof(fedt_t));
691  eventSize -= sizeof(fedt_t);
692  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
693  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
694  assert(eventSize>=fedSize - sizeof(fedt_t));
695  eventSize -= (fedSize - sizeof(fedt_t));
696  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
697  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
698  if(fedId>FEDNumbering::MAXFEDID)
699  {
700  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
701  }
702  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
703  tcds_pointer_ = (unsigned char *)(event + eventSize );
704  }
705  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
706  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
707  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
708  else
709  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
710  //evf::evtn::evm_board_setformat(fedSize);
711  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
712  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
713  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
714  }
715  //take event ID from GTPE FED
716  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
717  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
718  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
719  }
720  }
721  FEDRawData& fedData = rawData.FEDData(fedId);
722  fedData.resize(fedSize);
723  memcpy(fedData.data(), event + eventSize, fedSize);
724  }
725  assert(eventSize == 0);
726 
727  return tstamp;
728 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 316 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

317 {
318 
320  if (!currentFile_)
321  {
322  if (!streamFileTrackerPtr_) {
326  }
327 
329  if (!fileQueue_.try_pop(currentFile_))
330  {
331  //sleep until wakeup (only in single-buffer mode) or timeout
332  std::unique_lock<std::mutex> lkw(mWakeup_);
333  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
335  }
336  status = currentFile_->status_;
337  if ( status == evf::EvFDaqDirector::runEnded)
338  {
339  delete currentFile_;
340  currentFile_=nullptr;
341  return status;
342  }
343  else if ( status == evf::EvFDaqDirector::runAbort)
344  {
345  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
346  }
347  else if (status == evf::EvFDaqDirector::newLumi)
348  {
349  if (getLSFromFilename_) {
352  eventsThisLumi_=0;
354  }
355  }
356  else {//let this be picked up from next event
358  }
359 
360  delete currentFile_;
361  currentFile_=nullptr;
362  return status;
363  }
364  else if (status == evf::EvFDaqDirector::newFile) {
367  }
368  else
369  assert(0);
370  }
371 
372  //file is empty
373  if (!currentFile_->fileSize_) {
375  //try to open new lumi
377  if (getLSFromFilename_)
380  eventsThisLumi_=0;
382  }
383  //immediately delete empty file
385  delete currentFile_;
386  currentFile_=nullptr;
388  }
389 
390  //file is finished
393  //release last chunk (it is never released elsewhere)
396  {
397  throw cms::Exception("FedRawDataInputSource::getNextEvent")
398  << "Fully processed " << currentFile_->nProcessed_
399  << " from the file " << currentFile_->fileName_
400  << " but according to BU JSON there should be "
401  << currentFile_->nEvents_ << " events";
402  }
403  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
404  if (singleBufferMode_) {
405  std::unique_lock<std::mutex> lkw(mWakeup_);
406  cvWakeup_.notify_one();
407  }
410  //put the file in pending delete list;
411  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
412  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
413  }
414  else {
415  //in single-thread and stream jobs, events are already processed
417  delete currentFile_;
418  }
419  currentFile_=nullptr;
421  }
422 
423 
424  //file is too short
426  {
427  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
428  "Premature end of input file while reading event header";
429  }
430  if (singleBufferMode_) {
431 
432  //should already be there
434  usleep(10000);
436  }
437 
438  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
439 
440  //conditions when read amount is not sufficient for the header to fit
441  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
443  {
445 
446  if (detectedFRDversion_==0) {
447  detectedFRDversion_=*((uint32*)dataPosition);
448  if (detectedFRDversion_>5)
449  throw cms::Exception("FedRawDataInputSource::getNextEvent")
450  << "Unknown FRD version -: " << detectedFRDversion_;
451  assert(detectedFRDversion_>=1);
452  }
453 
454  //recalculate chunk position
455  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
456  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
457  {
458  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
459  "Premature end of input file while reading event header";
460  }
461  }
462 
463  event_.reset( new FRDEventMsgView(dataPosition) );
464  if (event_->size()>eventChunkSize_) {
465  throw cms::Exception("FedRawDataInputSource::getNextEvent")
466  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
467  << " run:" << event_->run() << " of size:" << event_->size()
468  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
469  }
470 
471  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
472 
474  {
475  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
476  "Premature end of input file while reading event data";
477  }
478  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
480  //recalculate chunk position
481  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
482  event_.reset( new FRDEventMsgView(dataPosition) );
483  }
484  currentFile_->bufferPosition_ += event_->size();
485  currentFile_->chunkPosition_ += event_->size();
486  //last chunk is released when this function is invoked next time
487 
488  }
489  //multibuffer mode:
490  else
491  {
492  //wait for the current chunk to become added to the vector
494  usleep(10000);
496  }
497 
498  //check if header is at the boundary of two chunks
499  chunkIsFree_ = false;
500  unsigned char *dataPosition;
501 
502  //read header, copy it to a single chunk if necessary
503  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
504 
505  event_.reset( new FRDEventMsgView(dataPosition) );
506  if (event_->size()>eventChunkSize_) {
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
509  << " run:" << event_->run() << " of size:" << event_->size()
510  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
511  }
512 
513  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
514 
516  {
517  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
518  "Premature end of input file while reading event data";
519  }
520 
521  if (chunkEnd) {
522  //header was at the chunk boundary, we will have to move payload as well
523  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
524  chunkIsFree_ = true;
525  }
526  else {
527  //header was contiguous, but check if payload fits the chunk
528  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
529  //rewind to header start position
530  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
531  //copy event to a chunk start and move pointers
532  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
533  assert(chunkEnd);
534  chunkIsFree_=true;
535  //header is moved
536  event_.reset( new FRDEventMsgView(dataPosition) );
537  }
538  else {
539  //everything is in a single chunk, only move pointers forward
540  chunkEnd = currentFile_->advance(dataPosition,msgSize);
541  assert(!chunkEnd);
542  chunkIsFree_=false;
543  }
544  }
545  }//end multibuffer mode
546 
547  if (verifyChecksum_ && event_->version() >= 5)
548  {
549  uint32_t crc=0;
550  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
551  if ( crc != event_->crc32c() ) {
553  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
554  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
555  " but calculated 0x" << crc;
556  }
557  }
558  else if ( verifyAdler32_ && event_->version() >= 3)
559  {
560  uint32_t adler = adler32(0L,Z_NULL,0);
561  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
562 
563  if ( adler != event_->adler32() ) {
565  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
566  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
567  " but calculated 0x" << adler;
568  }
569  }
570 
571 
573 
575 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 730 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

731 {
733  try {
734  // assemble json destination path
736 
737  //TODO:should be ported to use fffnaming
738  std::ostringstream fileNameWithPID;
739  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
740  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
741  jsonDestPath /= fileNameWithPID.str();
742 
743  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
744  << jsonDestPath;
745  try {
746  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
747  }
748  catch (const boost::filesystem::filesystem_error& ex)
749  {
750  // Input dir gone?
751  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
752  // << " Maybe the file is not yet visible by FU. Trying again in one second";
753  sleep(1);
754  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
755  }
757 
758  try {
759  //sometimes this fails but file gets deleted
760  boost::filesystem::remove(jsonSourcePath);
761  }
762  catch (const boost::filesystem::filesystem_error& ex)
763  {
764  // Input dir gone?
765  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
766  }
767  catch (std::exception& ex)
768  {
769  // Input dir gone?
770  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
771  }
772 
773  boost::filesystem::ifstream ij(jsonDestPath);
774  Json::Value deserializeRoot;
776 
777  if (!reader.parse(ij, deserializeRoot))
778  throw std::runtime_error("Cannot deserialize input JSON file");
779 
780  //read BU JSON
782  DataPoint dp;
783  dp.deserialize(deserializeRoot);
784  bool success = false;
785  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
786  if (dpd_->getNames().at(i)=="NEvents")
787  if (i<dp.getData().size()) {
788  data = dp.getData()[i];
789  success=true;
790  }
791  }
792  if (!success) {
793  if (dp.getData().size())
794  data = dp.getData()[0];
795  else
796  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
797  " error reading number of events from BU JSON -: No input value " << data;
798  }
799  return boost::lexical_cast<int>(data);
800 
801  }
802  catch (const boost::filesystem::filesystem_error& ex)
803  {
804  // Input dir gone?
806  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
807  }
808  catch (std::runtime_error e)
809  {
810  // Another process grabbed the file and NFS did not register this
812  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
813  }
814 
815  catch( boost::bad_lexical_cast const& ) {
816  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
817  << "Input value is -: " << data;
818  }
819 
820  catch (std::exception e)
821  {
822  // BU run directory disappeared?
824  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
825  }
826 
827  return -1;
828 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 265 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

266 {
268  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
269 
270  if ( currentLumiSection_ > 0 ) {
271  const std::string fuEoLS =
273  struct stat buf;
274  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
275  if ( !found ) {
277  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
278  close(eol_fd);
279  createBoLSFile(lumiSection,false);
281  }
282  }
283  else createBoLSFile(lumiSection,true);//needed for initial lumisection
284 
285  currentLumiSection_ = lumiSection;
286 
288 
289  timeval tv;
290  gettimeofday(&tv, 0);
291  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
292 
293  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
295  runAuxiliary()->run(),
296  lumiSection, lsopentime,
298 
299  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
300  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
301 
302  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
303  }
304 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:352
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:257
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:260
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 306 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

307 {
309  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
310  {
311  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
312  }
313  return status;
314 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 833 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

834 {
835  InputSource::rewind();
839 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:348
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 830 of file FedRawDataInputSource.cc.

831 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 607 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

608 {
609  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
610  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
611 
612  if (useL1EventID_){
614  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
616  aux.setProcessHistoryID(processHistoryID_);
617  makeEvent(eventPrincipal, aux);
618  }
619  else if(tcds_pointer_==0){
622  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
624  aux.setProcessHistoryID(processHistoryID_);
625  makeEvent(eventPrincipal, aux);
626  }
627  else{
628  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
631  processGUID());
633  makeEvent(eventPrincipal, aux);
634  }
635 
636 
637 
638  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
639 
640  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
641  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
642  // daqProvenanceHelper_.dummyProvenance_);
643 
644  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
646 
647  eventsThisLumi_++;
648 
649  //this old file check runs no more often than every 10 events
650  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
651  //delete files that are not in processing
652  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
653  auto it = filesToDelete_.begin();
654  while (it!=filesToDelete_.end()) {
655  bool fileIsBeingProcessed = false;
656  for (unsigned int i=0;i<nStreams_;i++) {
657  if (it->first == streamFileTrackerPtr_->at(i)) {
658  fileIsBeingProcessed = true;
659  break;
660  }
661  }
662  if (!fileIsBeingProcessed) {
663  deleteFile(it->second->fileName_);
664  delete it->second;
665  it = filesToDelete_.erase(it);
666  }
667  else it++;
668  }
669 
670  }
672  chunkIsFree_=false;
673  return;
674 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:212
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1185 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1186 {
1187 
1188  if (fileDescriptor_<0) {
1189  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1190  bufferInputRead_ = 0;
1191  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1192  if (fileDescriptor_>=0)
1193  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1194  else
1195  {
1196  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1197  << file->fileName_ << " fd:" << fileDescriptor_;
1198  }
1199  }
1200 
1201  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1202  uint32_t existingSize = 0;
1203  for (unsigned int i=0;i<readBlocks_;i++)
1204  {
1205  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1206  bufferInputRead_+=last;
1207  existingSize+=last;
1208  }
1209  }
1210  else {
1211  const uint32_t chunksize = file->chunkPosition_;
1212  const uint32_t blockcount=chunksize/eventChunkBlock_;
1213  const uint32_t leftsize = chunksize%eventChunkBlock_;
1214  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1215  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1216 
1217  for (uint32_t i=0;i<blockcount;i++) {
1218  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1219  bufferInputRead_+=last;
1220  existingSize+=last;
1221  }
1222  if (leftsize) {
1223  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1224  bufferInputRead_+=last;
1225  existingSize+=last;
1226  }
1227  file->chunkPosition_=0;//data was moved to beginning of the chunk
1228  }
1229  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1230  if (fileDescriptor_!=-1)
1231  {
1232  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1233  close(fileDescriptor_);
1234  fileDescriptor_=-1;
1235  }
1236  }
1237 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 845 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

846 {
847  bool stop=false;
848  unsigned int currentLumiSection = 0;
849  //threadInit_.exchange(true,std::memory_order_acquire);
850 
851  {
852  std::unique_lock<std::mutex> lk(startupLock_);
853  startupCv_.notify_one();
854  }
855 
856  while (!stop) {
857 
858  //wait for at least one free thread and chunk
859  int counter=0;
861  {
862  std::unique_lock<std::mutex> lkw(mWakeup_);
863  //sleep until woken up by condition or a timeout
864  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
865  counter++;
866  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
867  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
868  }
869  else {
870  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
871  }
872  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
873  }
874 
875  if (stop) break;
876 
877  //look for a new file
878  std::string nextFile;
879  uint32_t ls=0;
880  uint32_t fileSize;
881 
882  uint32_t monLS=1;
883  uint32_t lockCount=0;
884  uint64_t sumLockWaitTimeUs=0.;
885 
887 
889 
890  while (status == evf::EvFDaqDirector::noFile) {
891  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
892  stop=true;
893  break;
894  }
895 
896  uint64_t thisLockWaitTimeUs=0.;
897  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
898  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
899 
900  //monitoring of lock wait time
901  if (thisLockWaitTimeUs>0.)
902  sumLockWaitTimeUs+=thisLockWaitTimeUs;
903  lockCount++;
904  if (ls>monLS) {
905  monLS=ls;
906  if (lockCount)
907  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
908  lockCount=0;
909  sumLockWaitTimeUs=0;
910  }
911 
912  //check again for any remaining index/EoLS files after EoR file is seen
913  if ( status == evf::EvFDaqDirector::runEnded) {
914  usleep(100000);
915  //now all files should have appeared in ramdisk, check again if any raw files were left behind
916  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
917  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
918  }
919 
920  if ( status == evf::EvFDaqDirector::runEnded) {
922  stop=true;
923  break;
924  }
925 
926  //queue new lumisection
927  if( getLSFromFilename_ && ls > currentLumiSection) {
928  currentLumiSection = ls;
929  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
930  }
931 
932  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
933  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
935  stop=true;
936  break;
937  }
938 
939  int dbgcount=0;
940  if (status == evf::EvFDaqDirector::noFile) {
941  dbgcount++;
942  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
943  usleep(100000);
944  }
945  }
946  if ( status == evf::EvFDaqDirector::newFile ) {
947  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
948 
949 
950  boost::filesystem::path rawFilePath(nextFile);
951  std::string rawFile = rawFilePath.replace_extension(".raw").string();
952 
953  struct stat st;
954  stat(rawFile.c_str(),&st);
955  fileSize=st.st_size;
956 
957  int eventsInNewFile = grabNextJsonFile(nextFile);
958  if (fms_) fms_->stoppedLookingForFile(ls);
959  assert( eventsInNewFile>=0 );
960  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
961 
962  if (!singleBufferMode_) {
963  //calculate number of needed chunks
964  unsigned int neededChunks = fileSize/eventChunkSize_;
965  if (fileSize%eventChunkSize_) neededChunks++;
966 
967  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
969  fileQueue_.push(newInputFile);
970 
971  for (unsigned int i=0;i<neededChunks;i++) {
972 
973  //get thread
974  unsigned int newTid = 0xffffffff;
975  while (!workerPool_.try_pop(newTid)) {
976  usleep(100000);
977  }
978 
979  InputChunk * newChunk = nullptr;
980  while (!freeChunks_.try_pop(newChunk)) {
981  usleep(100000);
982  if (quit_threads_.load(std::memory_order_relaxed)) break;
983  }
984 
985  if (newChunk == nullptr) {
986  //return unused tid if we received shutdown (nullptr chunk)
987  if (newTid!=0xffffffff) workerPool_.push(newTid);
988  stop = true;
989  break;
990  }
991 
992  std::unique_lock<std::mutex> lk(mReader_);
993 
994  unsigned int toRead = eventChunkSize_;
995  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
996  newChunk->reset(i*eventChunkSize_,toRead,i);
997 
998  workerJob_[newTid].first=newInputFile;
999  workerJob_[newTid].second=newChunk;
1000 
1001  //wake up the worker thread
1002  cvReader_[newTid]->notify_one();
1003  }
1004  }
1005  else {
1006  if (!eventsInNewFile) {
1007  //still queue file for lumi update
1008  std::unique_lock<std::mutex> lkw(mWakeup_);
1009  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1011  fileQueue_.push(newInputFile);
1012  cvWakeup_.notify_one();
1013  return;
1014  }
1015  //in single-buffer mode put single chunk in the file and let the main thread read the file
1016  InputChunk * newChunk;
1017  //should be available immediately
1018  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1019 
1020  std::unique_lock<std::mutex> lkw(mWakeup_);
1021 
1022  unsigned int toRead = eventChunkSize_;
1023  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1024  newChunk->reset(0,toRead,0);
1025  newChunk->readComplete_=true;
1026 
1027  //push file and wakeup main thread
1028  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1029  newInputFile->chunks_[0]=newChunk;
1031  fileQueue_.push(newInputFile);
1032  cvWakeup_.notify_one();
1033  }
1034  }
1035  }
1036  //make sure threads finish reading
1037  unsigned numFinishedThreads = 0;
1038  while (numFinishedThreads < workerThreads_.size()) {
1039  unsigned int tid;
1040  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1041  std::unique_lock<std::mutex> lk(mReader_);
1042  thread_quit_signal[tid]=true;
1043  cvReader_[tid]->notify_one();
1044  numFinishedThreads++;
1045  }
1046  for (unsigned int i=0;i<workerThreads_.size();i++) {
1047  workerThreads_[i]->join();
1048  delete workerThreads_[i];
1049  }
1050 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
def ls
Definition: eostools.py:348
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1052 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1053 {
1054  bool init = true;
1055  threadInit_.exchange(true,std::memory_order_acquire);
1056 
1057  while (1) {
1058 
1059  std::unique_lock<std::mutex> lk(mReader_);
1060  workerJob_[tid].first=nullptr;
1061  workerJob_[tid].first=nullptr;
1062 
1063  assert(!thread_quit_signal[tid]);//should never get it here
1064  workerPool_.push(tid);
1065 
1066  if (init) {
1067  std::unique_lock<std::mutex> lk(startupLock_);
1068  init = false;
1069  startupCv_.notify_one();
1070  }
1071  cvReader_[tid]->wait(lk);
1072 
1073  if (thread_quit_signal[tid]) return;
1074 
1075  InputFile * file;
1076  InputChunk * chunk;
1077 
1078  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1079 
1080  file = workerJob_[tid].first;
1081  chunk = workerJob_[tid].second;
1082 
1083  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1084  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1085 
1086 
1087  if (fileDescriptor>=0)
1088  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1089  else
1090  {
1091  edm::LogError("FedRawDataInputSource") <<
1092  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1093  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1094  setExceptionState_=true;
1095  return;
1096 
1097  }
1098 
1099  unsigned int bufferLeft = 0;
1101  for (unsigned int i=0;i<readBlocks_;i++)
1102  {
1103  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1104  if ( last > 0 )
1105  bufferLeft+=last;
1106  if (last < eventChunkBlock_) {
1107  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1108  break;
1109  }
1110  }
1112  auto diff = end-start;
1113  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1114  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1115  close(fileDescriptor);
1116 
1117  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1119  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1120  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1121 
1122  }
1123 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 841 of file FedRawDataInputSource.cc.

842 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1125 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1126 {
1127  quit_threads_=true;
1128  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1129 
1130 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 155 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 81 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 114 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 151 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private