CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 122 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
62  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
64  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
68  eventID_(),
71  tcds_pointer_(0),
72  eventsThisLumi_(0),
73  dpd_(nullptr)
74 {
75  char thishost[256];
76  gethostname(thishost, 255);
77  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
78  << std::endl << (eventChunkSize_/1048576)
79  << " MB on host " << thishost;
81  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
82 
84  setNewRun();
87 
88  dpd_ = new DataPointDefinition();
89  std::string defLabel = "data";
90  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
91 
92  //make sure that chunk size is N * block size
97 
98  if (!numBuffers_)
99  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
100  "no reading enabled with numBuffers parameter 0";
101 
104 
105  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
106 
107  try {
109  } catch (...){
110  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
111  assert(0);//test
112  }
113 
114  try {
116  //set DaqDirector to delete files in preGlobalEndLumi callback
119  if (fms_) daqDirector_->setFMS(fms_);
120  } catch (...){
121  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
122  assert(0);//test
123  }
124 
125  //should delete chunks when run stops
126  for (unsigned int i=0;i<numBuffers_;i++) {
128  }
129 
130  quit_threads_ = false;
131 
132  for (unsigned int i=0;i<numConcurrentReads_;i++)
133  {
134  std::unique_lock<std::mutex> lk(startupLock_);
135  //issue a memory fence here and in threads (constructor was segfaulting without this)
136  thread_quit_signal.push_back(false);
137  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
138  cvReader_.push_back(new std::condition_variable);
139  threadInit_.store(false,std::memory_order_release);
140  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
141  startupCv_.wait(lk);
142  }
143 
144  runAuxiliary()->setProcessHistoryID(processHistoryID_);
145 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
jsoncollector::DataPointDefinition * dpd_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:175
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 147 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

148 {
149  quit_threads_=true;
150 
151  //delete any remaining open files
152  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
153  deleteFile(it->second->fileName_);
154  delete it->second;
155  }
157  readSupervisorThread_->join();
158  }
159  else {
160  //join aux threads in case the supervisor thread was not started
161  for (unsigned int i=0;i<workerThreads_.size();i++) {
162  std::unique_lock<std::mutex> lk(mReader_);
163  thread_quit_signal[i]=true;
164  cvReader_[i]->notify_one();
165  lk.unlock();
166  workerThreads_[i]->join();
167  delete workerThreads_[i];
168  }
169  }
170  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
171  /*
172  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
173  InputChunk *ch;
174  while (!freeChunks_.try_pop(ch)) {}
175  delete ch;
176  }
177  */
178 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 180 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, and AlCaHLTBitMon_QueryRunRegistry::string.

181 {
183  {
184  //this thread opens new files and dispatches reading to worker readers
185  //threadInit_.store(false,std::memory_order_release);
186  std::unique_lock<std::mutex> lk(startupLock_);
187  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
189  startupCv_.wait(lk);
190  }
191  switch (nextEvent() ) {
193 
194  //maybe create EoL file in working directory before ending run
195  struct stat buf;
196  if ( currentLumiSection_ > 0 ) {
197  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
198  if (eolFound) {
200  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
201  if ( !found ) {
203  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
204  close(eol_fd);
206  }
207  }
208  }
209  //also create EoR file in FU data directory
210  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
211  if (!eorFound) {
212  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
213  close(eor_fd);
214  }
216  eventsThisLumi_=0;
218  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
219  return false;
220  }
222  //this is not reachable
223  return true;
224  }
226  //std::cout << "--------------NEW LUMI---------------" << std::endl;
227  return true;
228  }
229  default: {
230  if (!getLSFromFilename_) {
231  //get new lumi from file header
232  if (event_->lumi() > currentLumiSection_) {
234  eventsThisLumi_=0;
235  maybeOpenNewLumiSection( event_->lumi() );
236  }
237  }
238  eventRunNumber_=event_->run();
239  L1EventID_ = event_->event();
240 
241  setEventCached();
242 
243  return true;
244  }
245  }
246 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 540 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, cmsHarvester::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

541 {
542  const boost::filesystem::path filePath(fileName);
543  if (!testModeNoBuilderUnit_) {
544  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
545  try {
546  //sometimes this fails but file gets deleted
547  boost::filesystem::remove(filePath);
548  }
549  catch (const boost::filesystem::filesystem_error& ex)
550  {
551  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
552  << ". Trying again.";
553  usleep(100000);
554  try {
555  boost::filesystem::remove(filePath);
556  }
557  catch (...) {/*file gets deleted first time but exception is still thrown*/}
558  }
559  catch (std::exception& ex)
560  {
561  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
562  << ". Trying again.";
563  usleep(100000);
564  try {
565  boost::filesystem::remove(filePath);
566  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
567  }
568  } else {
570  }
571 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 72 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 643 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and cond::rpcobgas::time.

Referenced by read().

644 {
646  timeval stv;
647  gettimeofday(&stv,0);
648  time = stv.tv_sec;
649  time = (time << 32) + stv.tv_usec;
650  edm::Timestamp tstamp(time);
651 
652  uint32_t eventSize = event_->eventSize();
653  char* event = (char*)event_->payload();
654  GTPEventID_=0;
655  tcds_pointer_ = 0;
656  while (eventSize > 0) {
657  eventSize -= sizeof(fedt_t);
658  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
659  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
660  eventSize -= (fedSize - sizeof(fedh_t));
661  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
662  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
663  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
664  tcds_pointer_ = (unsigned char *)(event + eventSize );
665  }
666  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
667  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
668  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
669  else
670  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
671  //evf::evtn::evm_board_setformat(fedSize);
672  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
673  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
674  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
675  }
676  //take event ID from GTPE FED
677  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
678  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
679  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
680  }
681  }
682  FEDRawData& fedData = rawData.FEDData(fedId);
683  fedData.resize(fedSize);
684  memcpy(fedData.data(), event + eventSize, fedSize);
685  }
686  assert(eventSize == 0);
687 
688  return tstamp;
689 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
struct fedh_struct fedh_t
unsigned int get(const unsigned char *, bool)
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 297 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, and InputFile::waitForChunk().

Referenced by nextEvent().

298 {
299  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
300 
302  if (!currentFile_)
303  {
304  if (!streamFileTrackerPtr_) {
308  }
309 
311  if (!fileQueue_.try_pop(currentFile_))
312  {
313  //sleep until wakeup (only in single-buffer mode) or timeout
314  std::unique_lock<std::mutex> lkw(mWakeup_);
315  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
317  }
318  status = currentFile_->status_;
319  if ( status == evf::EvFDaqDirector::runEnded)
320  {
321  delete currentFile_;
322  currentFile_=nullptr;
323  return status;
324  }
325  else if ( status == evf::EvFDaqDirector::runAbort)
326  {
327  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
328  }
329  else if (status == evf::EvFDaqDirector::newLumi)
330  {
331  if (getLSFromFilename_) {
334  eventsThisLumi_=0;
336  }
337  }
338  else {//let this be picked up from next event
340  }
341 
342  delete currentFile_;
343  currentFile_=nullptr;
344  return status;
345  }
346  else if (status == evf::EvFDaqDirector::newFile) {
349  }
350  else
351  assert(0);
352  }
353 
354  //file is empty
355  if (!currentFile_->fileSize_) {
356  //try to open new lumi
357  assert(currentFile_->nChunks_==0);
358  if (getLSFromFilename_)
361  eventsThisLumi_=0;
363  }
364  //immediately delete empty file
366  delete currentFile_;
367  currentFile_=nullptr;
369  }
370 
371  //file is finished
373  //release last chunk (it is never released elsewhere)
376  {
377  throw cms::Exception("FedRawDataInputSource::getNextEvent")
378  << "Fully processed " << currentFile_->nProcessed_
379  << " from the file " << currentFile_->fileName_
380  << " but according to BU JSON there should be "
381  << currentFile_->nEvents_ << " events";
382  }
383  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
384  if (singleBufferMode_) {
385  std::unique_lock<std::mutex> lkw(mWakeup_);
386  cvWakeup_.notify_one();
387  }
390  //put the file in pending delete list;
391  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
392  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
393  }
394  else {
395  //in single-thread and stream jobs, events are already processed
397  delete currentFile_;
398  }
399  currentFile_=nullptr;
401  }
402 
403 
404  //file is too short
406  {
407  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
408  "Premature end of input file while reading event header";
409  }
410  if (singleBufferMode_) {
411 
412  //should already be there
414  usleep(10000);
416  }
417 
418  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
419 
420  //conditions when read amount is not sufficient for the header to fit
421  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
422  || eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
423  {
425 
426  if (detectedFRDversion_==0) {
427  detectedFRDversion_=*((uint32*)dataPosition);
428  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
429  }
430 
431  //recalculate chunk position
432  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
433  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
434  {
435  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
436  "Premature end of input file while reading event header";
437  }
438  }
439 
440  event_.reset( new FRDEventMsgView(dataPosition) );
441  if (event_->size()>eventChunkSize_) {
442  throw cms::Exception("FedRawDataInputSource::getNextEvent")
443  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
444  << " run:" << event_->run() << " of size:" << event_->size()
445  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
446  }
447 
448  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
449 
451  {
452  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
453  "Premature end of input file while reading event data";
454  }
455  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
457  //recalculate chunk position
458  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
459  event_.reset( new FRDEventMsgView(dataPosition) );
460  }
461  currentFile_->bufferPosition_ += event_->size();
462  currentFile_->chunkPosition_ += event_->size();
463  //last chunk is released when this function is invoked next time
464 
465  }
466  //multibuffer mode:
467  else
468  {
469  //wait for the current chunk to become added to the vector
471  usleep(10000);
473  }
474 
475  //check if header is at the boundary of two chunks
476  chunkIsFree_ = false;
477  unsigned char *dataPosition;
478 
479  //read header, copy it to a single chunk if necessary
480  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
481 
482  event_.reset( new FRDEventMsgView(dataPosition) );
483  if (event_->size()>eventChunkSize_) {
484  throw cms::Exception("FedRawDataInputSource::getNextEvent")
485  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
486  << " run:" << event_->run() << " of size:" << event_->size()
487  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
488  }
489 
490  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
491 
493  {
494  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
495  "Premature end of input file while reading event data";
496  }
497 
498  if (chunkEnd) {
499  //header was at the chunk boundary, we will have to move payload as well
500  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
501  chunkIsFree_ = true;
502  }
503  else {
504  //header was contiguous, but check if payload fits the chunk
505  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
506  //rewind to header start position
507  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
508  //copy event to a chunk start and move pointers
509  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
510  assert(chunkEnd);
511  chunkIsFree_=true;
512  //header is moved
513  event_.reset( new FRDEventMsgView(dataPosition) );
514  }
515  else {
516  //everything is in a single chunk, only move pointers forward
517  chunkEnd = currentFile_->advance(dataPosition,msgSize);
518  assert(!chunkEnd);
519  chunkIsFree_=false;
520  }
521  }
522  }//end multibuffer mode
523 
524  if ( verifyAdler32_ && event_->version() >= 3 )
525  {
526  uint32_t adler = adler32(0L,Z_NULL,0);
527  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
528 
529  if ( adler != event_->adler32() ) {
530  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
531  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
532  " but calculated 0x" << adler;
533  }
534  }
536 
538 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 691 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

692 {
694  try {
695  // assemble json destination path
697 
698  //TODO:should be ported to use fffnaming
699  std::ostringstream fileNameWithPID;
700  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
701  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
702  jsonDestPath /= fileNameWithPID.str();
703 
704  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
705  << jsonDestPath;
706 
708  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
709  else {
710  try {
711  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
712  }
713  catch (const boost::filesystem::filesystem_error& ex)
714  {
715  // Input dir gone?
716  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
717  // << " Maybe the file is not yet visible by FU. Trying again in one second";
718  sleep(1);
719  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
720  }
722 
723 
724  try {
725  //sometimes this fails but file gets deleted
726  boost::filesystem::remove(jsonSourcePath);
727  }
728  catch (const boost::filesystem::filesystem_error& ex)
729  {
730  // Input dir gone?
731  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
732  }
733  catch (std::exception& ex)
734  {
735  // Input dir gone?
736  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
737  }
738 
739  }
740 
741  boost::filesystem::ifstream ij(jsonDestPath);
742  Json::Value deserializeRoot;
744 
745  if (!reader.parse(ij, deserializeRoot))
746  throw std::runtime_error("Cannot deserialize input JSON file");
747 
748  //read BU JSON
750  DataPoint dp;
751  dp.deserialize(deserializeRoot);
752  bool success = false;
753  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
754  if (dpd_->getNames().at(i)=="NEvents")
755  if (i<dp.getData().size()) {
756  data = dp.getData()[i];
757  success=true;
758  }
759  }
760  if (!success) {
761  if (dp.getData().size())
762  data = dp.getData()[0];
763  else
764  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
765  " error reading number of events from BU JSON -: No input value " << data;
766  }
767  return boost::lexical_cast<int>(data);
768 
769  }
770  catch (const boost::filesystem::filesystem_error& ex)
771  {
772  // Input dir gone?
774  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
775  }
776  catch (std::runtime_error e)
777  {
778  // Another process grabbed the file and NFS did not register this
780  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
781  }
782 
783  catch( boost::bad_lexical_cast const& ) {
784  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
785  << "Input value is -: " << data;
786  }
787 
788  catch (std::exception e)
789  {
790  // BU run directory disappeared?
792  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
793  }
794 
795  return -1;
796 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 248 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

249 {
251  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
252 
253  if ( currentLumiSection_ > 0 ) {
254  const std::string fuEoLS =
256  struct stat buf;
257  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
258  if ( !found ) {
260  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
261  close(eol_fd);
263  }
264  }
265 
266  currentLumiSection_ = lumiSection;
267 
269 
270  timeval tv;
271  gettimeofday(&tv, 0);
272  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
273 
274  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
276  runAuxiliary()->run(),
277  lumiSection, lsopentime,
279 
280  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
281  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
282 
283  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
284  }
285 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 287 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

288 {
290  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
291  {
292  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
293  }
294  return status;
295 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 812 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

813 {
814  InputSource::rewind();
818 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 809 of file FedRawDataInputSource.cc.

810 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 574 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

575 {
576  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
577  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
578 
579  if (useL1EventID_){
581  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
583  aux.setProcessHistoryID(processHistoryID_);
584  makeEvent(eventPrincipal, aux);
585  }
586  else if(tcds_pointer_==0){
587  assert(GTPEventID_);
589  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
591  aux.setProcessHistoryID(processHistoryID_);
592  makeEvent(eventPrincipal, aux);
593  }
594  else{
595  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
598  processGUID());
600  makeEvent(eventPrincipal, aux);
601  }
602 
603 
604 
605  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
606 
607  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
608  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
609  // daqProvenanceHelper_.dummyProvenance_);
610 
611  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
613 
614  eventsThisLumi_++;
615 
616  //this old file check runs no more often than every 10 events
617  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
618  //delete files that are not in processing
619  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
620  auto it = filesToDelete_.begin();
621  while (it!=filesToDelete_.end()) {
622  bool fileIsBeingProcessed = false;
623  for (unsigned int i=0;i<nStreams_;i++) {
624  if (it->first == streamFileTrackerPtr_->at(i)) {
625  fileIsBeingProcessed = true;
626  break;
627  }
628  }
629  if (!fileIsBeingProcessed) {
630  deleteFile(it->second->fileName_);
631  delete it->second;
632  it = filesToDelete_.erase(it);
633  }
634  else it++;
635  }
636 
637  }
639  chunkIsFree_=false;
640  return;
641 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:218
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1135 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1136 {
1137 
1138  if (fileDescriptor_<0) {
1139  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1140  bufferInputRead_ = 0;
1141  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1142  if (fileDescriptor_>=0)
1143  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1144  else
1145  {
1146  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1147  << file->fileName_ << " fd:" << fileDescriptor_;
1148  }
1149  }
1150 
1151  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1152  uint32_t existingSize = 0;
1153  for (unsigned int i=0;i<readBlocks_;i++)
1154  {
1155  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1156  bufferInputRead_+=last;
1157  existingSize+=last;
1158  }
1159  }
1160  else {
1161  const uint32_t chunksize = file->chunkPosition_;
1162  const uint32_t blockcount=chunksize/eventChunkBlock_;
1163  const uint32_t leftsize = chunksize%eventChunkBlock_;
1164  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1165  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1166 
1167  for (uint32_t i=0;i<blockcount;i++) {
1168  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1169  bufferInputRead_+=last;
1170  existingSize+=last;
1171  }
1172  if (leftsize) {
1173  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1174  bufferInputRead_+=last;
1175  existingSize+=last;
1176  }
1177  file->chunkPosition_=0;//data was moved to beginning of the chunk
1178  }
1179  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1180  if (fileDescriptor_!=-1)
1181  {
1182  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1183  close(fileDescriptor_);
1184  fileDescriptor_=-1;
1185  }
1186  }
1187 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 824 of file FedRawDataInputSource.cc.

References InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

825 {
826  bool stop=false;
827  unsigned int currentLumiSection = 0;
828  //threadInit_.exchange(true,std::memory_order_acquire);
829 
830  {
831  std::unique_lock<std::mutex> lk(startupLock_);
832  startupCv_.notify_one();
833  }
834 
835  while (!stop) {
836 
837  //wait for at least one free thread and chunk
838  int counter=0;
839  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
840  {
841  std::unique_lock<std::mutex> lkw(mWakeup_);
842  //sleep until woken up by condition or a timeout
843  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
844  counter++;
845  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
846  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
847  }
848  else {
849  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
850  }
851  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
852  }
853 
854  if (stop) break;
855 
856  //look for a new file
857  std::string nextFile;
858  uint32_t ls;
859  uint32_t fileSize;
860 
862 
864 
865  while (status == evf::EvFDaqDirector::noFile) {
866  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
867  stop=true;
868  break;
869  }
870  else
871  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
872 
873  if ( status == evf::EvFDaqDirector::runEnded) {
875  stop=true;
876  break;
877  }
878 
879  //queue new lumisection
880  if( getLSFromFilename_ && ls > currentLumiSection) {
881  currentLumiSection = ls;
882  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
883  }
884 
885  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
886  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
888  stop=true;
889  break;
890  }
891 
892  int dbgcount=0;
893  if (status == evf::EvFDaqDirector::noFile) {
894  dbgcount++;
895  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
896  usleep(100000);
897  }
898  }
899  if ( status == evf::EvFDaqDirector::newFile ) {
900  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
901 
902 
903  boost::filesystem::path rawFilePath(nextFile);
904  std::string rawFile = rawFilePath.replace_extension(".raw").string();
905 
906  struct stat st;
907  stat(rawFile.c_str(),&st);
908  fileSize=st.st_size;
909 
910  int eventsInNewFile = grabNextJsonFile(nextFile);
911  if (fms_) fms_->stoppedLookingForFile(ls);
912  assert( eventsInNewFile>=0 );
913  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
914 
915  if (!singleBufferMode_) {
916  //calculate number of needed chunks
917  unsigned int neededChunks = fileSize/eventChunkSize_;
918  if (fileSize%eventChunkSize_) neededChunks++;
919 
920  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
921  fileQueue_.push(newInputFile);
922 
923  for (unsigned int i=0;i<neededChunks;i++) {
924 
925  //get thread
926  unsigned int newTid = 0xffffffff;
927  while (!workerPool_.try_pop(newTid)) {
928  usleep(100000);
929  }
930 
931  InputChunk * newChunk = nullptr;
932  while (!freeChunks_.try_pop(newChunk)) {
933  usleep(100000);
934  if (quit_threads_.load(std::memory_order_relaxed)) break;
935  }
936 
937  if (newChunk == nullptr) {
938  //return unused tid if we received shutdown (nullptr chunk)
939  if (newTid!=0xffffffff) workerPool_.push(newTid);
940  stop = true;
941  break;
942  }
943 
944  std::unique_lock<std::mutex> lk(mReader_);
945 
946  unsigned int toRead = eventChunkSize_;
947  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
948  newChunk->reset(i*eventChunkSize_,toRead,i);
949 
950  workerJob_[newTid].first=newInputFile;
951  workerJob_[newTid].second=newChunk;
952 
953  //wake up the worker thread
954  cvReader_[newTid]->notify_one();
955  }
956  }
957  else {
958  if (!eventsInNewFile) {
959  //still queue file for lumi update
960  std::unique_lock<std::mutex> lkw(mWakeup_);
961  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
962  fileQueue_.push(newInputFile);
963  cvWakeup_.notify_one();
964  return;
965  }
966  //in single-buffer mode put single chunk in the file and let the main thread read the file
967  InputChunk * newChunk;
968  //should be available immediately
969  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
970 
971  std::unique_lock<std::mutex> lkw(mWakeup_);
972 
973  unsigned int toRead = eventChunkSize_;
974  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
975  newChunk->reset(0,toRead,0);
976  newChunk->readComplete_=true;
977 
978  //push file and wakeup main thread
979  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
980  newInputFile->chunks_[0]=newChunk;
981  fileQueue_.push(newInputFile);
982  cvWakeup_.notify_one();
983  }
984  }
985  }
986  //make sure threads finish reading
987  unsigned numFinishedThreads = 0;
988  while (numFinishedThreads < workerThreads_.size()) {
989  unsigned int tid;
990  while (!workerPool_.try_pop(tid)) {usleep(10000);}
991  std::unique_lock<std::mutex> lk(mReader_);
992  thread_quit_signal[tid]=true;
993  cvReader_[tid]->notify_one();
994  numFinishedThreads++;
995  }
996  for (unsigned int i=0;i<workerThreads_.size();i++) {
997  workerThreads_[i]->join();
998  delete workerThreads_[i];
999  }
1000 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1002 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1003 {
1004  bool init = true;
1005  threadInit_.exchange(true,std::memory_order_acquire);
1006 
1007  while (1) {
1008 
1009  std::unique_lock<std::mutex> lk(mReader_);
1010  workerJob_[tid].first=nullptr;
1011  workerJob_[tid].first=nullptr;
1012 
1013  assert(!thread_quit_signal[tid]);//should never get it here
1014  workerPool_.push(tid);
1015 
1016  if (init) {
1017  std::unique_lock<std::mutex> lk(startupLock_);
1018  init = false;
1019  startupCv_.notify_one();
1020  }
1021  cvReader_[tid]->wait(lk);
1022 
1023  if (thread_quit_signal[tid]) return;
1024 
1025  InputFile * file;
1026  InputChunk * chunk;
1027 
1028  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1029 
1030  file = workerJob_[tid].first;
1031  chunk = workerJob_[tid].second;
1032 
1033  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1034  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1035 
1036 
1037  if (fileDescriptor>=0)
1038  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1039  else
1040  {
1041  edm::LogError("FedRawDataInputSource") <<
1042  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1043  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1044  setExceptionState_=true;
1045  return;
1046 
1047  }
1048 
1049  unsigned int bufferLeft = 0;
1051  for (unsigned int i=0;i<readBlocks_;i++)
1052  {
1053  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1054  if ( last > 0 )
1055  bufferLeft+=last;
1056  if (last < eventChunkBlock_) {
1057  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1058  break;
1059  }
1060  }
1062  auto diff = end-start;
1063  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1064  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1065  close(fileDescriptor);
1066 
1067  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1068  assert(detectedFRDversion_<=3);
1069  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1070  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1071 
1072  }
1073 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
bool first
Definition: L1TdeRCT.cc:75
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 798 of file FedRawDataInputSource.cc.

References daqDirector_, roll_playback::destination, evf::EvFDaqDirector::getJumpFilePath(), cmsHarvester::path, and source.

Referenced by deleteFile().

799 {
802 
803  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
804  << " to: " << destination.string();
805  boost::filesystem::rename(source,destination);
806  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
807 }
string destination
tuple path
else: Piece not in the list, fine.
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:43
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 820 of file FedRawDataInputSource.cc.

821 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1075 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1076 {
1077  quit_threads_=true;
1078  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1079 
1080 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 81 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 112 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 161 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 149 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private