CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 123 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
62  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
64  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
68  eventID_(),
71  tcds_pointer_(0),
72  eventsThisLumi_(0),
73  dpd_(nullptr)
74 {
75  char thishost[256];
76  gethostname(thishost, 255);
77  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
78  << std::endl << (eventChunkSize_/1048576)
79  << " MB on host " << thishost;
81  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
82 
84  setNewRun();
87 
88  dpd_ = new DataPointDefinition();
89  std::string defLabel = "data";
90  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
91 
92  //make sure that chunk size is N * block size
97 
98  if (!numBuffers_)
99  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
100  "no reading enabled with numBuffers parameter 0";
101 
104 
105  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
106 
107  try {
109  } catch (...){
110  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
111  assert(0);//test
112  }
113 
114  try {
116  //set DaqDirector to delete files in preGlobalEndLumi callback
119  if (fms_) daqDirector_->setFMS(fms_);
120  } catch (...){
121  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
122  assert(0);//test
123  }
124 
125  //should delete chunks when run stops
126  for (unsigned int i=0;i<numBuffers_;i++) {
128  }
129 
130  quit_threads_ = false;
131 
132  for (unsigned int i=0;i<numConcurrentReads_;i++)
133  {
134  std::unique_lock<std::mutex> lk(startupLock_);
135  //issue a memory fence here and in threads (constructor was segfaulting without this)
136  thread_quit_signal.push_back(false);
137  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
138  cvReader_.push_back(new std::condition_variable);
139  threadInit_.store(false,std::memory_order_release);
140  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
141  startupCv_.wait(lk);
142  }
143 
144  runAuxiliary()->setProcessHistoryID(processHistoryID_);
145 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:175
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 147 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

148 {
149  quit_threads_=true;
150 
151  //delete any remaining open files
152  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
153  deleteFile(it->second->fileName_);
154  delete it->second;
155  }
157  readSupervisorThread_->join();
158  }
159  else {
160  //join aux threads in case the supervisor thread was not started
161  for (unsigned int i=0;i<workerThreads_.size();i++) {
162  std::unique_lock<std::mutex> lk(mReader_);
163  thread_quit_signal[i]=true;
164  cvReader_[i]->notify_one();
165  lk.unlock();
166  workerThreads_[i]->join();
167  delete workerThreads_[i];
168  }
169  }
170  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
171  /*
172  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
173  InputChunk *ch;
174  while (!freeChunks_.try_pop(ch)) {}
175  delete ch;
176  }
177  */
178 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 180 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, and AlCaHLTBitMon_QueryRunRegistry::string.

181 {
183  {
184  //this thread opens new files and dispatches reading to worker readers
185  //threadInit_.store(false,std::memory_order_release);
186  std::unique_lock<std::mutex> lk(startupLock_);
187  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
189  startupCv_.wait(lk);
190  }
191  switch (nextEvent() ) {
193 
194  //maybe create EoL file in working directory before ending run
195  struct stat buf;
196  if ( currentLumiSection_ > 0 ) {
197  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
198  if (eolFound) {
200  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
201  if ( !found ) {
203  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
204  close(eol_fd);
206  }
207  }
208  }
209  //also create EoR file in FU data directory
210  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
211  if (!eorFound) {
212  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
213  close(eor_fd);
214  }
216  eventsThisLumi_=0;
218  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
219  return false;
220  }
222  //this is not reachable
223  return true;
224  }
226  //std::cout << "--------------NEW LUMI---------------" << std::endl;
227  return true;
228  }
229  default: {
230  if (!getLSFromFilename_) {
231  //get new lumi from file header
232  if (event_->lumi() > currentLumiSection_) {
234  eventsThisLumi_=0;
235  maybeOpenNewLumiSection( event_->lumi() );
236  }
237  }
238  eventRunNumber_=event_->run();
239  L1EventID_ = event_->event();
240 
241  setEventCached();
242 
243  return true;
244  }
245  }
246 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 248 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

249 {
250  //used for backpressure mechanisms and monitoring
251  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
252  struct stat buf;
253  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
254  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
255  close(bol_fd);
256  }
257 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 553 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, cmsHarvester::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

554 {
555  const boost::filesystem::path filePath(fileName);
556  if (!testModeNoBuilderUnit_) {
557  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
558  try {
559  //sometimes this fails but file gets deleted
560  boost::filesystem::remove(filePath);
561  }
562  catch (const boost::filesystem::filesystem_error& ex)
563  {
564  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
565  << ". Trying again.";
566  usleep(100000);
567  try {
568  boost::filesystem::remove(filePath);
569  }
570  catch (...) {/*file gets deleted first time but exception is still thrown*/}
571  }
572  catch (std::exception& ex)
573  {
574  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
575  << ". Trying again.";
576  usleep(100000);
577  try {
578  boost::filesystem::remove(filePath);
579  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
580  }
581  } else {
583  }
584 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 73 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 656 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), HLT_25ns14e33_v1_cff::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and cond::rpcobgas::time.

Referenced by read().

657 {
659  timeval stv;
660  gettimeofday(&stv,0);
661  time = stv.tv_sec;
662  time = (time << 32) + stv.tv_usec;
663  edm::Timestamp tstamp(time);
664 
665  uint32_t eventSize = event_->eventSize();
666  char* event = (char*)event_->payload();
667  GTPEventID_=0;
668  tcds_pointer_ = 0;
669  while (eventSize > 0) {
670  eventSize -= sizeof(fedt_t);
671  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
672  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
673  eventSize -= (fedSize - sizeof(fedh_t));
674  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
675  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
676  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
677  tcds_pointer_ = (unsigned char *)(event + eventSize );
678  }
679  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
680  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
681  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
682  else
683  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
684  //evf::evtn::evm_board_setformat(fedSize);
685  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
686  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
687  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
688  }
689  //take event ID from GTPE FED
690  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
691  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
692  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
693  }
694  }
695  FEDRawData& fedData = rawData.FEDData(fedId);
696  fedData.resize(fedSize);
697  memcpy(fedData.data(), event + eventSize, fedSize);
698  }
699  assert(eventSize == 0);
700 
701  return tstamp;
702 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
struct fedh_struct fedh_t
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 310 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, and InputFile::waitForChunk().

Referenced by nextEvent().

311 {
312  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
313 
315  if (!currentFile_)
316  {
317  if (!streamFileTrackerPtr_) {
321  }
322 
324  if (!fileQueue_.try_pop(currentFile_))
325  {
326  //sleep until wakeup (only in single-buffer mode) or timeout
327  std::unique_lock<std::mutex> lkw(mWakeup_);
328  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
330  }
331  status = currentFile_->status_;
332  if ( status == evf::EvFDaqDirector::runEnded)
333  {
334  delete currentFile_;
335  currentFile_=nullptr;
336  return status;
337  }
338  else if ( status == evf::EvFDaqDirector::runAbort)
339  {
340  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
341  }
342  else if (status == evf::EvFDaqDirector::newLumi)
343  {
344  if (getLSFromFilename_) {
347  eventsThisLumi_=0;
349  }
350  }
351  else {//let this be picked up from next event
353  }
354 
355  delete currentFile_;
356  currentFile_=nullptr;
357  return status;
358  }
359  else if (status == evf::EvFDaqDirector::newFile) {
362  }
363  else
364  assert(0);
365  }
366 
367  //file is empty
368  if (!currentFile_->fileSize_) {
369  //try to open new lumi
371  if (getLSFromFilename_)
374  eventsThisLumi_=0;
376  }
377  //immediately delete empty file
379  delete currentFile_;
380  currentFile_=nullptr;
382  }
383 
384  //file is finished
386  //release last chunk (it is never released elsewhere)
389  {
390  throw cms::Exception("FedRawDataInputSource::getNextEvent")
391  << "Fully processed " << currentFile_->nProcessed_
392  << " from the file " << currentFile_->fileName_
393  << " but according to BU JSON there should be "
394  << currentFile_->nEvents_ << " events";
395  }
396  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
397  if (singleBufferMode_) {
398  std::unique_lock<std::mutex> lkw(mWakeup_);
399  cvWakeup_.notify_one();
400  }
403  //put the file in pending delete list;
404  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
405  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
406  }
407  else {
408  //in single-thread and stream jobs, events are already processed
410  delete currentFile_;
411  }
412  currentFile_=nullptr;
414  }
415 
416 
417  //file is too short
419  {
420  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
421  "Premature end of input file while reading event header";
422  }
423  if (singleBufferMode_) {
424 
425  //should already be there
427  usleep(10000);
429  }
430 
431  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
432 
433  //conditions when read amount is not sufficient for the header to fit
434  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
435  || eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
436  {
438 
439  if (detectedFRDversion_==0) {
440  detectedFRDversion_=*((uint32*)dataPosition);
441  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
442  }
443 
444  //recalculate chunk position
445  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
446  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
447  {
448  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
449  "Premature end of input file while reading event header";
450  }
451  }
452 
453  event_.reset( new FRDEventMsgView(dataPosition) );
454  if (event_->size()>eventChunkSize_) {
455  throw cms::Exception("FedRawDataInputSource::getNextEvent")
456  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
457  << " run:" << event_->run() << " of size:" << event_->size()
458  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
459  }
460 
461  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
462 
464  {
465  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
466  "Premature end of input file while reading event data";
467  }
468  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
470  //recalculate chunk position
471  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
472  event_.reset( new FRDEventMsgView(dataPosition) );
473  }
474  currentFile_->bufferPosition_ += event_->size();
475  currentFile_->chunkPosition_ += event_->size();
476  //last chunk is released when this function is invoked next time
477 
478  }
479  //multibuffer mode:
480  else
481  {
482  //wait for the current chunk to become added to the vector
484  usleep(10000);
486  }
487 
488  //check if header is at the boundary of two chunks
489  chunkIsFree_ = false;
490  unsigned char *dataPosition;
491 
492  //read header, copy it to a single chunk if necessary
493  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
494 
495  event_.reset( new FRDEventMsgView(dataPosition) );
496  if (event_->size()>eventChunkSize_) {
497  throw cms::Exception("FedRawDataInputSource::getNextEvent")
498  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
499  << " run:" << event_->run() << " of size:" << event_->size()
500  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
501  }
502 
503  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
504 
506  {
507  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
508  "Premature end of input file while reading event data";
509  }
510 
511  if (chunkEnd) {
512  //header was at the chunk boundary, we will have to move payload as well
513  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
514  chunkIsFree_ = true;
515  }
516  else {
517  //header was contiguous, but check if payload fits the chunk
518  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
519  //rewind to header start position
520  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
521  //copy event to a chunk start and move pointers
522  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
523  assert(chunkEnd);
524  chunkIsFree_=true;
525  //header is moved
526  event_.reset( new FRDEventMsgView(dataPosition) );
527  }
528  else {
529  //everything is in a single chunk, only move pointers forward
530  chunkEnd = currentFile_->advance(dataPosition,msgSize);
531  assert(!chunkEnd);
532  chunkIsFree_=false;
533  }
534  }
535  }//end multibuffer mode
536 
537  if ( verifyAdler32_ && event_->version() >= 3 )
538  {
539  uint32_t adler = adler32(0L,Z_NULL,0);
540  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
541 
542  if ( adler != event_->adler32() ) {
543  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
544  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
545  " but calculated 0x" << adler;
546  }
547  }
549 
551 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 704 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

705 {
707  try {
708  // assemble json destination path
710 
711  //TODO:should be ported to use fffnaming
712  std::ostringstream fileNameWithPID;
713  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
714  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
715  jsonDestPath /= fileNameWithPID.str();
716 
717  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
718  << jsonDestPath;
719 
721  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
722  else {
723  try {
724  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
725  }
726  catch (const boost::filesystem::filesystem_error& ex)
727  {
728  // Input dir gone?
729  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
730  // << " Maybe the file is not yet visible by FU. Trying again in one second";
731  sleep(1);
732  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
733  }
735 
736 
737  try {
738  //sometimes this fails but file gets deleted
739  boost::filesystem::remove(jsonSourcePath);
740  }
741  catch (const boost::filesystem::filesystem_error& ex)
742  {
743  // Input dir gone?
744  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
745  }
746  catch (std::exception& ex)
747  {
748  // Input dir gone?
749  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
750  }
751 
752  }
753 
754  boost::filesystem::ifstream ij(jsonDestPath);
755  Json::Value deserializeRoot;
757 
758  if (!reader.parse(ij, deserializeRoot))
759  throw std::runtime_error("Cannot deserialize input JSON file");
760 
761  //read BU JSON
763  DataPoint dp;
764  dp.deserialize(deserializeRoot);
765  bool success = false;
766  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
767  if (dpd_->getNames().at(i)=="NEvents")
768  if (i<dp.getData().size()) {
769  data = dp.getData()[i];
770  success=true;
771  }
772  }
773  if (!success) {
774  if (dp.getData().size())
775  data = dp.getData()[0];
776  else
777  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
778  " error reading number of events from BU JSON -: No input value " << data;
779  }
780  return boost::lexical_cast<int>(data);
781 
782  }
783  catch (const boost::filesystem::filesystem_error& ex)
784  {
785  // Input dir gone?
787  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
788  }
789  catch (std::runtime_error e)
790  {
791  // Another process grabbed the file and NFS did not register this
793  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
794  }
795 
796  catch( boost::bad_lexical_cast const& ) {
797  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
798  << "Input value is -: " << data;
799  }
800 
801  catch (std::exception e)
802  {
803  // BU run directory disappeared?
805  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
806  }
807 
808  return -1;
809 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 259 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

260 {
262  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
263 
264  if ( currentLumiSection_ > 0 ) {
265  const std::string fuEoLS =
267  struct stat buf;
268  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
269  if ( !found ) {
271  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
272  close(eol_fd);
273  createBoLSFile(lumiSection,false);
275  }
276  }
277  else createBoLSFile(lumiSection,true);//needed for initial lumisection
278 
279  currentLumiSection_ = lumiSection;
280 
282 
283  timeval tv;
284  gettimeofday(&tv, 0);
285  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
286 
287  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
289  runAuxiliary()->run(),
290  lumiSection, lsopentime,
292 
293  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
294  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
295 
296  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
297  }
298 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 300 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

301 {
303  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
304  {
305  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
306  }
307  return status;
308 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 825 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

826 {
827  InputSource::rewind();
831 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 822 of file FedRawDataInputSource.cc.

823 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 587 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

588 {
589  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
590  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
591 
592  if (useL1EventID_){
594  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
596  aux.setProcessHistoryID(processHistoryID_);
597  makeEvent(eventPrincipal, aux);
598  }
599  else if(tcds_pointer_==0){
602  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
604  aux.setProcessHistoryID(processHistoryID_);
605  makeEvent(eventPrincipal, aux);
606  }
607  else{
608  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
611  processGUID());
613  makeEvent(eventPrincipal, aux);
614  }
615 
616 
617 
618  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
619 
620  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
621  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
622  // daqProvenanceHelper_.dummyProvenance_);
623 
624  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
626 
627  eventsThisLumi_++;
628 
629  //this old file check runs no more often than every 10 events
630  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
631  //delete files that are not in processing
632  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
633  auto it = filesToDelete_.begin();
634  while (it!=filesToDelete_.end()) {
635  bool fileIsBeingProcessed = false;
636  for (unsigned int i=0;i<nStreams_;i++) {
637  if (it->first == streamFileTrackerPtr_->at(i)) {
638  fileIsBeingProcessed = true;
639  break;
640  }
641  }
642  if (!fileIsBeingProcessed) {
643  deleteFile(it->second->fileName_);
644  delete it->second;
645  it = filesToDelete_.erase(it);
646  }
647  else it++;
648  }
649 
650  }
652  chunkIsFree_=false;
653  return;
654 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:218
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1155 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1156 {
1157 
1158  if (fileDescriptor_<0) {
1159  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1160  bufferInputRead_ = 0;
1161  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1162  if (fileDescriptor_>=0)
1163  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1164  else
1165  {
1166  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1167  << file->fileName_ << " fd:" << fileDescriptor_;
1168  }
1169  }
1170 
1171  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1172  uint32_t existingSize = 0;
1173  for (unsigned int i=0;i<readBlocks_;i++)
1174  {
1175  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1176  bufferInputRead_+=last;
1177  existingSize+=last;
1178  }
1179  }
1180  else {
1181  const uint32_t chunksize = file->chunkPosition_;
1182  const uint32_t blockcount=chunksize/eventChunkBlock_;
1183  const uint32_t leftsize = chunksize%eventChunkBlock_;
1184  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1185  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1186 
1187  for (uint32_t i=0;i<blockcount;i++) {
1188  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1189  bufferInputRead_+=last;
1190  existingSize+=last;
1191  }
1192  if (leftsize) {
1193  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1194  bufferInputRead_+=last;
1195  existingSize+=last;
1196  }
1197  file->chunkPosition_=0;//data was moved to beginning of the chunk
1198  }
1199  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1200  if (fileDescriptor_!=-1)
1201  {
1202  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1203  close(fileDescriptor_);
1204  fileDescriptor_=-1;
1205  }
1206  }
1207 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 837 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

838 {
839  bool stop=false;
840  unsigned int currentLumiSection = 0;
841  //threadInit_.exchange(true,std::memory_order_acquire);
842 
843  {
844  std::unique_lock<std::mutex> lk(startupLock_);
845  startupCv_.notify_one();
846  }
847 
848  while (!stop) {
849 
850  //wait for at least one free thread and chunk
851  int counter=0;
852  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
853  {
854  std::unique_lock<std::mutex> lkw(mWakeup_);
855  //sleep until woken up by condition or a timeout
856  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
857  counter++;
858  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
859  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
860  }
861  else {
862  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
863  }
864  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
865  }
866 
867  if (stop) break;
868 
869  //look for a new file
870  std::string nextFile;
871  uint32_t ls;
872  uint32_t fileSize;
873 
875 
877 
878  while (status == evf::EvFDaqDirector::noFile) {
879  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
880  stop=true;
881  break;
882  }
883 
884  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
885 
886  //check again for any remaining index/EoLS files after EoR file is seen
887  if ( status == evf::EvFDaqDirector::runEnded) {
888  usleep(100000);
889  //now all files should have appeared in ramdisk, check again if any raw files were left behind
890  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
891  }
892 
893  if ( status == evf::EvFDaqDirector::runEnded) {
895  stop=true;
896  break;
897  }
898 
899  //queue new lumisection
900  if( getLSFromFilename_ && ls > currentLumiSection) {
901  currentLumiSection = ls;
902  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
903  }
904 
905  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
906  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
908  stop=true;
909  break;
910  }
911 
912  int dbgcount=0;
913  if (status == evf::EvFDaqDirector::noFile) {
914  dbgcount++;
915  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
916  usleep(100000);
917  }
918  }
919  if ( status == evf::EvFDaqDirector::newFile ) {
920  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
921 
922 
923  boost::filesystem::path rawFilePath(nextFile);
924  std::string rawFile = rawFilePath.replace_extension(".raw").string();
925 
926  struct stat st;
927  stat(rawFile.c_str(),&st);
928  fileSize=st.st_size;
929 
930  int eventsInNewFile = grabNextJsonFile(nextFile);
931  if (fms_) fms_->stoppedLookingForFile(ls);
932  assert( eventsInNewFile>=0 );
933  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
934 
935  if (!singleBufferMode_) {
936  //calculate number of needed chunks
937  unsigned int neededChunks = fileSize/eventChunkSize_;
938  if (fileSize%eventChunkSize_) neededChunks++;
939 
940  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
941  fileQueue_.push(newInputFile);
942 
943  for (unsigned int i=0;i<neededChunks;i++) {
944 
945  //get thread
946  unsigned int newTid = 0xffffffff;
947  while (!workerPool_.try_pop(newTid)) {
948  usleep(100000);
949  }
950 
951  InputChunk * newChunk = nullptr;
952  while (!freeChunks_.try_pop(newChunk)) {
953  usleep(100000);
954  if (quit_threads_.load(std::memory_order_relaxed)) break;
955  }
956 
957  if (newChunk == nullptr) {
958  //return unused tid if we received shutdown (nullptr chunk)
959  if (newTid!=0xffffffff) workerPool_.push(newTid);
960  stop = true;
961  break;
962  }
963 
964  std::unique_lock<std::mutex> lk(mReader_);
965 
966  unsigned int toRead = eventChunkSize_;
967  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
968  newChunk->reset(i*eventChunkSize_,toRead,i);
969 
970  workerJob_[newTid].first=newInputFile;
971  workerJob_[newTid].second=newChunk;
972 
973  //wake up the worker thread
974  cvReader_[newTid]->notify_one();
975  }
976  }
977  else {
978  if (!eventsInNewFile) {
979  //still queue file for lumi update
980  std::unique_lock<std::mutex> lkw(mWakeup_);
981  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
982  fileQueue_.push(newInputFile);
983  cvWakeup_.notify_one();
984  return;
985  }
986  //in single-buffer mode put single chunk in the file and let the main thread read the file
987  InputChunk * newChunk;
988  //should be available immediately
989  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
990 
991  std::unique_lock<std::mutex> lkw(mWakeup_);
992 
993  unsigned int toRead = eventChunkSize_;
994  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
995  newChunk->reset(0,toRead,0);
996  newChunk->readComplete_=true;
997 
998  //push file and wakeup main thread
999  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1000  newInputFile->chunks_[0]=newChunk;
1001  fileQueue_.push(newInputFile);
1002  cvWakeup_.notify_one();
1003  }
1004  }
1005  }
1006  //make sure threads finish reading
1007  unsigned numFinishedThreads = 0;
1008  while (numFinishedThreads < workerThreads_.size()) {
1009  unsigned int tid;
1010  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1011  std::unique_lock<std::mutex> lk(mReader_);
1012  thread_quit_signal[tid]=true;
1013  cvReader_[tid]->notify_one();
1014  numFinishedThreads++;
1015  }
1016  for (unsigned int i=0;i<workerThreads_.size();i++) {
1017  workerThreads_[i]->join();
1018  delete workerThreads_[i];
1019  }
1020 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1022 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1023 {
1024  bool init = true;
1025  threadInit_.exchange(true,std::memory_order_acquire);
1026 
1027  while (1) {
1028 
1029  std::unique_lock<std::mutex> lk(mReader_);
1030  workerJob_[tid].first=nullptr;
1031  workerJob_[tid].first=nullptr;
1032 
1033  assert(!thread_quit_signal[tid]);//should never get it here
1034  workerPool_.push(tid);
1035 
1036  if (init) {
1037  std::unique_lock<std::mutex> lk(startupLock_);
1038  init = false;
1039  startupCv_.notify_one();
1040  }
1041  cvReader_[tid]->wait(lk);
1042 
1043  if (thread_quit_signal[tid]) return;
1044 
1045  InputFile * file;
1046  InputChunk * chunk;
1047 
1048  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1049 
1050  file = workerJob_[tid].first;
1051  chunk = workerJob_[tid].second;
1052 
1053  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1054  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1055 
1056 
1057  if (fileDescriptor>=0)
1058  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1059  else
1060  {
1061  edm::LogError("FedRawDataInputSource") <<
1062  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1063  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1064  setExceptionState_=true;
1065  return;
1066 
1067  }
1068 
1069  unsigned int bufferLeft = 0;
1071  for (unsigned int i=0;i<readBlocks_;i++)
1072  {
1073  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1074  if ( last > 0 )
1075  bufferLeft+=last;
1076  if (last < eventChunkBlock_) {
1077  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1078  break;
1079  }
1080  }
1082  auto diff = end-start;
1083  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1084  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1085  close(fileDescriptor);
1086 
1087  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1089  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1090  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1091 
1092  }
1093 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 811 of file FedRawDataInputSource.cc.

References daqDirector_, roll_playback::destination, evf::EvFDaqDirector::getJumpFilePath(), cmsHarvester::path, and source.

Referenced by deleteFile().

812 {
815 
816  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
817  << " to: " << destination.string();
818  boost::filesystem::rename(source,destination);
819  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
820 }
string destination
tuple path
else: Piece not in the list, fine.
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:42
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 833 of file FedRawDataInputSource.cc.

834 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1095 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1096 {
1097  quit_threads_=true;
1098  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1099 
1100 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 82 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 113 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 150 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 161 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private