CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 124 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
62  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
63  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
64  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
65  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
69  eventID_(),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
82  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
83 
85  setNewRun();
88 
89  dpd_ = new DataPointDefinition();
90  std::string defLabel = "data";
91  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
92 
93  //make sure that chunk size is N * block size
98 
99  if (!numBuffers_)
100  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
101  "no reading enabled with numBuffers parameter 0";
102 
105 
106  if (!crc32c_hw_test())
107  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
108 
109  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
110 
111  try {
113  } catch (...){
114  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
115  assert(0);//test
116  }
117 
118  try {
120  //set DaqDirector to delete files in preGlobalEndLumi callback
123  if (fms_) daqDirector_->setFMS(fms_);
124  } catch (...){
125  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
126  assert(0);//test
127  }
128 
129  //should delete chunks when run stops
130  for (unsigned int i=0;i<numBuffers_;i++) {
132  }
133 
134  quit_threads_ = false;
135 
136  for (unsigned int i=0;i<numConcurrentReads_;i++)
137  {
138  std::unique_lock<std::mutex> lk(startupLock_);
139  //issue a memory fence here and in threads (constructor was segfaulting without this)
140  thread_quit_signal.push_back(false);
141  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
142  cvReader_.push_back(new std::condition_variable);
143  threadInit_.store(false,std::memory_order_release);
144  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
145  startupCv_.wait(lk);
146  }
147 
148  runAuxiliary()->setProcessHistoryID(processHistoryID_);
149 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:175
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 151 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

152 {
153  quit_threads_=true;
154 
155  //delete any remaining open files
156  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
157  deleteFile(it->second->fileName_);
158  delete it->second;
159  }
161  readSupervisorThread_->join();
162  }
163  else {
164  //join aux threads in case the supervisor thread was not started
165  for (unsigned int i=0;i<workerThreads_.size();i++) {
166  std::unique_lock<std::mutex> lk(mReader_);
167  thread_quit_signal[i]=true;
168  cvReader_[i]->notify_one();
169  lk.unlock();
170  workerThreads_[i]->join();
171  delete workerThreads_[i];
172  }
173  }
174  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
175  /*
176  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
177  InputChunk *ch;
178  while (!freeChunks_.try_pop(ch)) {}
179  delete ch;
180  }
181  */
182 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 184 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

185 {
187  {
188  //this thread opens new files and dispatches reading to worker readers
189  //threadInit_.store(false,std::memory_order_release);
190  std::unique_lock<std::mutex> lk(startupLock_);
191  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
193  startupCv_.wait(lk);
194  }
195  switch (nextEvent() ) {
197 
198  //maybe create EoL file in working directory before ending run
199  struct stat buf;
200  if ( currentLumiSection_ > 0 ) {
201  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
202  if (eolFound) {
204  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
205  if ( !found ) {
207  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
208  close(eol_fd);
210  }
211  }
212  }
213  //also create EoR file in FU data directory
214  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
215  if (!eorFound) {
216  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
217  close(eor_fd);
218  }
220  eventsThisLumi_=0;
222  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
223  return false;
224  }
226  //this is not reachable
227  return true;
228  }
230  //std::cout << "--------------NEW LUMI---------------" << std::endl;
231  return true;
232  }
233  default: {
234  if (!getLSFromFilename_) {
235  //get new lumi from file header
236  if (event_->lumi() > currentLumiSection_) {
238  eventsThisLumi_=0;
239  maybeOpenNewLumiSection( event_->lumi() );
240  }
241  }
242  eventRunNumber_=event_->run();
243  L1EventID_ = event_->event();
244 
245  setEventCached();
246 
247  return true;
248  }
249  }
250 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 252 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

253 {
254  //used for backpressure mechanisms and monitoring
255  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
256  struct stat buf;
257  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
258  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(bol_fd);
260  }
261 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 571 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, cmsHarvester::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

572 {
573  const boost::filesystem::path filePath(fileName);
574  if (!testModeNoBuilderUnit_) {
575  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
576  try {
577  //sometimes this fails but file gets deleted
578  boost::filesystem::remove(filePath);
579  }
580  catch (const boost::filesystem::filesystem_error& ex)
581  {
582  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
583  << ". Trying again.";
584  usleep(100000);
585  try {
586  boost::filesystem::remove(filePath);
587  }
588  catch (...) {/*file gets deleted first time but exception is still thrown*/}
589  }
590  catch (std::exception& ex)
591  {
592  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
593  << ". Trying again.";
594  usleep(100000);
595  try {
596  boost::filesystem::remove(filePath);
597  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
598  }
599  } else {
601  }
602 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 73 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 674 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), edm::hlt::Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), HLT_25ns14e33_v1_cff::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and cond::rpcobgas::time.

Referenced by read().

675 {
677  timeval stv;
678  gettimeofday(&stv,0);
679  time = stv.tv_sec;
680  time = (time << 32) + stv.tv_usec;
681  edm::Timestamp tstamp(time);
682 
683  uint32_t eventSize = event_->eventSize();
684  char* event = (char*)event_->payload();
685  GTPEventID_=0;
686  tcds_pointer_ = 0;
687  while (eventSize > 0) {
688  assert(eventSize>=sizeof(fedt_t));
689  eventSize -= sizeof(fedt_t);
690  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
691  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
692  assert(eventSize>=fedSize - sizeof(fedt_t));
693  eventSize -= (fedSize - sizeof(fedt_t));
694  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
695  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
696  if(fedId>FEDNumbering::MAXFEDID)
697  {
698  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
699  }
700  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
701  tcds_pointer_ = (unsigned char *)(event + eventSize );
702  }
703  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
704  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
705  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
706  else
707  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
708  //evf::evtn::evm_board_setformat(fedSize);
709  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
710  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
711  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
712  }
713  //take event ID from GTPE FED
714  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
715  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
716  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
717  }
718  }
719  FEDRawData& fedData = rawData.FEDData(fedId);
720  fedData.resize(fedSize);
721  memcpy(fedData.data(), event + eventSize, fedSize);
722  }
723  assert(eventSize == 0);
724 
725  return tstamp;
726 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 314 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

315 {
316 
318  if (!currentFile_)
319  {
320  if (!streamFileTrackerPtr_) {
324  }
325 
327  if (!fileQueue_.try_pop(currentFile_))
328  {
329  //sleep until wakeup (only in single-buffer mode) or timeout
330  std::unique_lock<std::mutex> lkw(mWakeup_);
331  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
333  }
334  status = currentFile_->status_;
335  if ( status == evf::EvFDaqDirector::runEnded)
336  {
337  delete currentFile_;
338  currentFile_=nullptr;
339  return status;
340  }
341  else if ( status == evf::EvFDaqDirector::runAbort)
342  {
343  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
344  }
345  else if (status == evf::EvFDaqDirector::newLumi)
346  {
347  if (getLSFromFilename_) {
350  eventsThisLumi_=0;
352  }
353  }
354  else {//let this be picked up from next event
356  }
357 
358  delete currentFile_;
359  currentFile_=nullptr;
360  return status;
361  }
362  else if (status == evf::EvFDaqDirector::newFile) {
365  }
366  else
367  assert(0);
368  }
369 
370  //file is empty
371  if (!currentFile_->fileSize_) {
372  //try to open new lumi
374  if (getLSFromFilename_)
377  eventsThisLumi_=0;
379  }
380  //immediately delete empty file
382  delete currentFile_;
383  currentFile_=nullptr;
385  }
386 
387  //file is finished
389  //release last chunk (it is never released elsewhere)
392  {
393  throw cms::Exception("FedRawDataInputSource::getNextEvent")
394  << "Fully processed " << currentFile_->nProcessed_
395  << " from the file " << currentFile_->fileName_
396  << " but according to BU JSON there should be "
397  << currentFile_->nEvents_ << " events";
398  }
399  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
400  if (singleBufferMode_) {
401  std::unique_lock<std::mutex> lkw(mWakeup_);
402  cvWakeup_.notify_one();
403  }
406  //put the file in pending delete list;
407  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
408  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
409  }
410  else {
411  //in single-thread and stream jobs, events are already processed
413  delete currentFile_;
414  }
415  currentFile_=nullptr;
417  }
418 
419 
420  //file is too short
422  {
423  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
424  "Premature end of input file while reading event header";
425  }
426  if (singleBufferMode_) {
427 
428  //should already be there
430  usleep(10000);
432  }
433 
434  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
435 
436  //conditions when read amount is not sufficient for the header to fit
437  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
439  {
441 
442  if (detectedFRDversion_==0) {
443  detectedFRDversion_=*((uint32*)dataPosition);
444  if (detectedFRDversion_>5)
445  throw cms::Exception("FedRawDataInputSource::getNextEvent")
446  << "Unknown FRD version -: " << detectedFRDversion_;
447  assert(detectedFRDversion_>=1);
448  }
449 
450  //recalculate chunk position
451  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
452  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
453  {
454  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
455  "Premature end of input file while reading event header";
456  }
457  }
458 
459  event_.reset( new FRDEventMsgView(dataPosition) );
460  if (event_->size()>eventChunkSize_) {
461  throw cms::Exception("FedRawDataInputSource::getNextEvent")
462  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
463  << " run:" << event_->run() << " of size:" << event_->size()
464  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
465  }
466 
467  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
468 
470  {
471  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
472  "Premature end of input file while reading event data";
473  }
474  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
476  //recalculate chunk position
477  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
478  event_.reset( new FRDEventMsgView(dataPosition) );
479  }
480  currentFile_->bufferPosition_ += event_->size();
481  currentFile_->chunkPosition_ += event_->size();
482  //last chunk is released when this function is invoked next time
483 
484  }
485  //multibuffer mode:
486  else
487  {
488  //wait for the current chunk to become added to the vector
490  usleep(10000);
492  }
493 
494  //check if header is at the boundary of two chunks
495  chunkIsFree_ = false;
496  unsigned char *dataPosition;
497 
498  //read header, copy it to a single chunk if necessary
499  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
500 
501  event_.reset( new FRDEventMsgView(dataPosition) );
502  if (event_->size()>eventChunkSize_) {
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
505  << " run:" << event_->run() << " of size:" << event_->size()
506  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
507  }
508 
509  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
510 
512  {
513  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
514  "Premature end of input file while reading event data";
515  }
516 
517  if (chunkEnd) {
518  //header was at the chunk boundary, we will have to move payload as well
519  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
520  chunkIsFree_ = true;
521  }
522  else {
523  //header was contiguous, but check if payload fits the chunk
524  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
525  //rewind to header start position
526  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
527  //copy event to a chunk start and move pointers
528  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
529  assert(chunkEnd);
530  chunkIsFree_=true;
531  //header is moved
532  event_.reset( new FRDEventMsgView(dataPosition) );
533  }
534  else {
535  //everything is in a single chunk, only move pointers forward
536  chunkEnd = currentFile_->advance(dataPosition,msgSize);
537  assert(!chunkEnd);
538  chunkIsFree_=false;
539  }
540  }
541  }//end multibuffer mode
542 
543  if (verifyChecksum_ && event_->version() >= 5)
544  {
545  uint32_t crc=0;
546  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
547  if ( crc != event_->crc32c() ) {
548  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
549  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
550  " but calculated 0x" << crc;
551  }
552  }
553  else if ( verifyAdler32_ && event_->version() >= 3)
554  {
555  uint32_t adler = adler32(0L,Z_NULL,0);
556  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
557 
558  if ( adler != event_->adler32() ) {
559  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
560  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
561  " but calculated 0x" << adler;
562  }
563  }
564 
565 
567 
569 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 728 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

729 {
731  try {
732  // assemble json destination path
734 
735  //TODO:should be ported to use fffnaming
736  std::ostringstream fileNameWithPID;
737  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
738  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
739  jsonDestPath /= fileNameWithPID.str();
740 
741  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
742  << jsonDestPath;
743 
745  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
746  else {
747  try {
748  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
749  }
750  catch (const boost::filesystem::filesystem_error& ex)
751  {
752  // Input dir gone?
753  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
754  // << " Maybe the file is not yet visible by FU. Trying again in one second";
755  sleep(1);
756  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
757  }
759 
760 
761  try {
762  //sometimes this fails but file gets deleted
763  boost::filesystem::remove(jsonSourcePath);
764  }
765  catch (const boost::filesystem::filesystem_error& ex)
766  {
767  // Input dir gone?
768  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
769  }
770  catch (std::exception& ex)
771  {
772  // Input dir gone?
773  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
774  }
775 
776  }
777 
778  boost::filesystem::ifstream ij(jsonDestPath);
779  Json::Value deserializeRoot;
781 
782  if (!reader.parse(ij, deserializeRoot))
783  throw std::runtime_error("Cannot deserialize input JSON file");
784 
785  //read BU JSON
787  DataPoint dp;
788  dp.deserialize(deserializeRoot);
789  bool success = false;
790  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
791  if (dpd_->getNames().at(i)=="NEvents")
792  if (i<dp.getData().size()) {
793  data = dp.getData()[i];
794  success=true;
795  }
796  }
797  if (!success) {
798  if (dp.getData().size())
799  data = dp.getData()[0];
800  else
801  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
802  " error reading number of events from BU JSON -: No input value " << data;
803  }
804  return boost::lexical_cast<int>(data);
805 
806  }
807  catch (const boost::filesystem::filesystem_error& ex)
808  {
809  // Input dir gone?
811  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
812  }
813  catch (std::runtime_error e)
814  {
815  // Another process grabbed the file and NFS did not register this
817  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
818  }
819 
820  catch( boost::bad_lexical_cast const& ) {
821  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
822  << "Input value is -: " << data;
823  }
824 
825  catch (std::exception e)
826  {
827  // BU run directory disappeared?
829  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
830  }
831 
832  return -1;
833 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 263 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

264 {
266  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
267 
268  if ( currentLumiSection_ > 0 ) {
269  const std::string fuEoLS =
271  struct stat buf;
272  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
273  if ( !found ) {
275  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
276  close(eol_fd);
277  createBoLSFile(lumiSection,false);
279  }
280  }
281  else createBoLSFile(lumiSection,true);//needed for initial lumisection
282 
283  currentLumiSection_ = lumiSection;
284 
286 
287  timeval tv;
288  gettimeofday(&tv, 0);
289  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
290 
291  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
293  runAuxiliary()->run(),
294  lumiSection, lsopentime,
296 
297  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
298  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
299 
300  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
301  }
302 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 304 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

305 {
307  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
308  {
309  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
310  }
311  return status;
312 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 849 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

850 {
851  InputSource::rewind();
855 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 846 of file FedRawDataInputSource.cc.

847 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 605 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

606 {
607  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
608  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
609 
610  if (useL1EventID_){
612  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
614  aux.setProcessHistoryID(processHistoryID_);
615  makeEvent(eventPrincipal, aux);
616  }
617  else if(tcds_pointer_==0){
620  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
622  aux.setProcessHistoryID(processHistoryID_);
623  makeEvent(eventPrincipal, aux);
624  }
625  else{
626  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
629  processGUID());
631  makeEvent(eventPrincipal, aux);
632  }
633 
634 
635 
636  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
637 
638  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
639  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
640  // daqProvenanceHelper_.dummyProvenance_);
641 
642  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
644 
645  eventsThisLumi_++;
646 
647  //this old file check runs no more often than every 10 events
648  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
649  //delete files that are not in processing
650  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
651  auto it = filesToDelete_.begin();
652  while (it!=filesToDelete_.end()) {
653  bool fileIsBeingProcessed = false;
654  for (unsigned int i=0;i<nStreams_;i++) {
655  if (it->first == streamFileTrackerPtr_->at(i)) {
656  fileIsBeingProcessed = true;
657  break;
658  }
659  }
660  if (!fileIsBeingProcessed) {
661  deleteFile(it->second->fileName_);
662  delete it->second;
663  it = filesToDelete_.erase(it);
664  }
665  else it++;
666  }
667 
668  }
670  chunkIsFree_=false;
671  return;
672 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:218
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1179 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1180 {
1181 
1182  if (fileDescriptor_<0) {
1183  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1184  bufferInputRead_ = 0;
1185  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1186  if (fileDescriptor_>=0)
1187  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1188  else
1189  {
1190  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1191  << file->fileName_ << " fd:" << fileDescriptor_;
1192  }
1193  }
1194 
1195  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1196  uint32_t existingSize = 0;
1197  for (unsigned int i=0;i<readBlocks_;i++)
1198  {
1199  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1200  bufferInputRead_+=last;
1201  existingSize+=last;
1202  }
1203  }
1204  else {
1205  const uint32_t chunksize = file->chunkPosition_;
1206  const uint32_t blockcount=chunksize/eventChunkBlock_;
1207  const uint32_t leftsize = chunksize%eventChunkBlock_;
1208  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1209  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1210 
1211  for (uint32_t i=0;i<blockcount;i++) {
1212  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1213  bufferInputRead_+=last;
1214  existingSize+=last;
1215  }
1216  if (leftsize) {
1217  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1218  bufferInputRead_+=last;
1219  existingSize+=last;
1220  }
1221  file->chunkPosition_=0;//data was moved to beginning of the chunk
1222  }
1223  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1224  if (fileDescriptor_!=-1)
1225  {
1226  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1227  close(fileDescriptor_);
1228  fileDescriptor_=-1;
1229  }
1230  }
1231 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 861 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

862 {
863  bool stop=false;
864  unsigned int currentLumiSection = 0;
865  //threadInit_.exchange(true,std::memory_order_acquire);
866 
867  {
868  std::unique_lock<std::mutex> lk(startupLock_);
869  startupCv_.notify_one();
870  }
871 
872  while (!stop) {
873 
874  //wait for at least one free thread and chunk
875  int counter=0;
876  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
877  {
878  std::unique_lock<std::mutex> lkw(mWakeup_);
879  //sleep until woken up by condition or a timeout
880  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
881  counter++;
882  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
883  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
884  }
885  else {
886  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
887  }
888  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
889  }
890 
891  if (stop) break;
892 
893  //look for a new file
894  std::string nextFile;
895  uint32_t ls;
896  uint32_t fileSize;
897 
899 
901 
902  while (status == evf::EvFDaqDirector::noFile) {
903  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
904  stop=true;
905  break;
906  }
907 
908  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
909 
910  //check again for any remaining index/EoLS files after EoR file is seen
911  if ( status == evf::EvFDaqDirector::runEnded) {
912  usleep(100000);
913  //now all files should have appeared in ramdisk, check again if any raw files were left behind
914  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
915  }
916 
917  if ( status == evf::EvFDaqDirector::runEnded) {
919  stop=true;
920  break;
921  }
922 
923  //queue new lumisection
924  if( getLSFromFilename_ && ls > currentLumiSection) {
925  currentLumiSection = ls;
926  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
927  }
928 
929  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
930  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
932  stop=true;
933  break;
934  }
935 
936  int dbgcount=0;
937  if (status == evf::EvFDaqDirector::noFile) {
938  dbgcount++;
939  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
940  usleep(100000);
941  }
942  }
943  if ( status == evf::EvFDaqDirector::newFile ) {
944  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
945 
946 
947  boost::filesystem::path rawFilePath(nextFile);
948  std::string rawFile = rawFilePath.replace_extension(".raw").string();
949 
950  struct stat st;
951  stat(rawFile.c_str(),&st);
952  fileSize=st.st_size;
953 
954  int eventsInNewFile = grabNextJsonFile(nextFile);
955  if (fms_) fms_->stoppedLookingForFile(ls);
956  assert( eventsInNewFile>=0 );
957  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
958 
959  if (!singleBufferMode_) {
960  //calculate number of needed chunks
961  unsigned int neededChunks = fileSize/eventChunkSize_;
962  if (fileSize%eventChunkSize_) neededChunks++;
963 
964  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
965  fileQueue_.push(newInputFile);
966 
967  for (unsigned int i=0;i<neededChunks;i++) {
968 
969  //get thread
970  unsigned int newTid = 0xffffffff;
971  while (!workerPool_.try_pop(newTid)) {
972  usleep(100000);
973  }
974 
975  InputChunk * newChunk = nullptr;
976  while (!freeChunks_.try_pop(newChunk)) {
977  usleep(100000);
978  if (quit_threads_.load(std::memory_order_relaxed)) break;
979  }
980 
981  if (newChunk == nullptr) {
982  //return unused tid if we received shutdown (nullptr chunk)
983  if (newTid!=0xffffffff) workerPool_.push(newTid);
984  stop = true;
985  break;
986  }
987 
988  std::unique_lock<std::mutex> lk(mReader_);
989 
990  unsigned int toRead = eventChunkSize_;
991  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
992  newChunk->reset(i*eventChunkSize_,toRead,i);
993 
994  workerJob_[newTid].first=newInputFile;
995  workerJob_[newTid].second=newChunk;
996 
997  //wake up the worker thread
998  cvReader_[newTid]->notify_one();
999  }
1000  }
1001  else {
1002  if (!eventsInNewFile) {
1003  //still queue file for lumi update
1004  std::unique_lock<std::mutex> lkw(mWakeup_);
1005  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1006  fileQueue_.push(newInputFile);
1007  cvWakeup_.notify_one();
1008  return;
1009  }
1010  //in single-buffer mode put single chunk in the file and let the main thread read the file
1011  InputChunk * newChunk;
1012  //should be available immediately
1013  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1014 
1015  std::unique_lock<std::mutex> lkw(mWakeup_);
1016 
1017  unsigned int toRead = eventChunkSize_;
1018  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1019  newChunk->reset(0,toRead,0);
1020  newChunk->readComplete_=true;
1021 
1022  //push file and wakeup main thread
1023  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1024  newInputFile->chunks_[0]=newChunk;
1025  fileQueue_.push(newInputFile);
1026  cvWakeup_.notify_one();
1027  }
1028  }
1029  }
1030  //make sure threads finish reading
1031  unsigned numFinishedThreads = 0;
1032  while (numFinishedThreads < workerThreads_.size()) {
1033  unsigned int tid;
1034  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1035  std::unique_lock<std::mutex> lk(mReader_);
1036  thread_quit_signal[tid]=true;
1037  cvReader_[tid]->notify_one();
1038  numFinishedThreads++;
1039  }
1040  for (unsigned int i=0;i<workerThreads_.size();i++) {
1041  workerThreads_[i]->join();
1042  delete workerThreads_[i];
1043  }
1044 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1046 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1047 {
1048  bool init = true;
1049  threadInit_.exchange(true,std::memory_order_acquire);
1050 
1051  while (1) {
1052 
1053  std::unique_lock<std::mutex> lk(mReader_);
1054  workerJob_[tid].first=nullptr;
1055  workerJob_[tid].first=nullptr;
1056 
1057  assert(!thread_quit_signal[tid]);//should never get it here
1058  workerPool_.push(tid);
1059 
1060  if (init) {
1061  std::unique_lock<std::mutex> lk(startupLock_);
1062  init = false;
1063  startupCv_.notify_one();
1064  }
1065  cvReader_[tid]->wait(lk);
1066 
1067  if (thread_quit_signal[tid]) return;
1068 
1069  InputFile * file;
1070  InputChunk * chunk;
1071 
1072  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1073 
1074  file = workerJob_[tid].first;
1075  chunk = workerJob_[tid].second;
1076 
1077  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1078  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1079 
1080 
1081  if (fileDescriptor>=0)
1082  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1083  else
1084  {
1085  edm::LogError("FedRawDataInputSource") <<
1086  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1087  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1088  setExceptionState_=true;
1089  return;
1090 
1091  }
1092 
1093  unsigned int bufferLeft = 0;
1095  for (unsigned int i=0;i<readBlocks_;i++)
1096  {
1097  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1098  if ( last > 0 )
1099  bufferLeft+=last;
1100  if (last < eventChunkBlock_) {
1101  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1102  break;
1103  }
1104  }
1106  auto diff = end-start;
1107  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1108  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1109  close(fileDescriptor);
1110 
1111  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1113  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1114  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1115 
1116  }
1117 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 835 of file FedRawDataInputSource.cc.

References daqDirector_, roll_playback::destination, evf::EvFDaqDirector::getJumpFilePath(), cmsHarvester::path, and source.

Referenced by deleteFile().

836 {
839 
840  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
841  << " to: " << destination.string();
842  boost::filesystem::rename(source,destination);
843  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
844 }
string destination
tuple path
else: Piece not in the list, fine.
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:42
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 857 of file FedRawDataInputSource.cc.

858 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1119 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1120 {
1121  quit_threads_=true;
1122  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1123 
1124 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 155 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 82 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 114 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 151 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private