CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

bool checkNextEvent () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< InputFile * > fileQueue_
 
std::list< std::pair< int, InputFile * > > filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 36 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 127 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, MillePedeFileConverter_cfg::e, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", "")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool> ("alwaysStartFromFirstLS", false)),
64  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
65  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
66  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
67  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
68  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
69  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
72  eventID_(),
75  tcds_pointer_(nullptr),
77 {
78  char thishost[256];
79  gethostname(thishost, 255);
80  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
81  << std::endl << (eventChunkSize_/1048576)
82  << " MB on host " << thishost;
83 
84  long autoRunNumber = -1;
85  if (fileListMode_) {
86  autoRunNumber = initFileList();
87  if (!fileListLoopMode_) {
88  if (autoRunNumber<0)
89  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
90  //override run number
91  runNumber_ = (edm::RunNumber_t)autoRunNumber;
92  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
93  }
94  }
95 
97  setNewRun();
98  //todo:autodetect from file name (assert if names differ)
101 
102  //make sure that chunk size is N * block size
107 
108  if (!numBuffers_)
109  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
110  "no reading enabled with numBuffers parameter 0";
111 
115 
116  if (!crc32c_hw_test())
117  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
118 
119  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
120  if (fileListMode_) {
121  try {
123  } catch(cms::Exception e) {
124  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
125  }
126  }
127  else {
129  if (!fms_) {
130  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
131  }
132  }
133 
135  if (!daqDirector_)
136  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
137 
139  if (useFileBroker_)
140  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
141  //set DaqDirector to delete files in preGlobalEndLumi callback
143  if (fms_) {
145  fms_->setInputSource(this);
148  }
149  //should delete chunks when run stops
150  for (unsigned int i=0;i<numBuffers_;i++) {
152  }
153 
154  quit_threads_ = false;
155 
156  for (unsigned int i=0;i<numConcurrentReads_;i++)
157  {
158  std::unique_lock<std::mutex> lk(startupLock_);
159  //issue a memory fence here and in threads (constructor was segfaulting without this)
160  thread_quit_signal.push_back(false);
161  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
162  cvReader_.push_back(new std::condition_variable);
163  tid_active_.push_back(0);
164  threadInit_.store(false,std::memory_order_release);
165  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
166  startupCv_.wait(lk);
167  }
168 
169  runAuxiliary()->setProcessHistoryID(processHistoryID_);
170 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:334
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
void setInState(FastMonitoringThread::InputState inputState)
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
bool useFileBroker() const
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
def getRunNumber(filename)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:335
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:337
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 172 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

173 {
174  quit_threads_=true;
175 
176  //delete any remaining open files
177  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
178  deleteFile(it->second->fileName_);
179  delete it->second;
180  }
182  readSupervisorThread_->join();
183  }
184  else {
185  //join aux threads in case the supervisor thread was not started
186  for (unsigned int i=0;i<workerThreads_.size();i++) {
187  std::unique_lock<std::mutex> lk(mReader_);
188  thread_quit_signal[i]=true;
189  cvReader_[i]->notify_one();
190  lk.unlock();
191  workerThreads_[i]->join();
192  delete workerThreads_[i];
193  }
194  }
195  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
196  /*
197  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
198  InputChunk *ch;
199  while (!freeChunks_.try_pop(ch)) {}
200  delete ch;
201  }
202  */
203 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 223 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, runEdmFileComparison::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

224 {
226  {
227  //this thread opens new files and dispatches reading to worker readers
228  //threadInit_.store(false,std::memory_order_release);
229  std::unique_lock<std::mutex> lk(startupLock_);
230  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
232  startupCv_.wait(lk);
233  }
234  //signal hltd to start event accounting
235  if (!currentLumiSection_)
238  switch (nextEvent() ) {
240  //maybe create EoL file in working directory before ending run
241  struct stat buf;
242  if (!useFileBroker_ && currentLumiSection_ > 0) {
243  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
244  if (eolFound) {
246  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
247  if ( !found ) {
249  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
250  close(eol_fd);
252  }
253  }
254  }
255  //also create EoR file in FU data directory
256  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
257  if (!eorFound) {
258  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(eor_fd);
260  }
262  eventsThisLumi_=0;
264  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
265  return false;
266  }
268  //this is not reachable
269  return true;
270  }
272  //std::cout << "--------------NEW LUMI---------------" << std::endl;
273  return true;
274  }
275  default: {
276  if (!getLSFromFilename_) {
277  //get new lumi from file header
278  if (event_->lumi() > currentLumiSection_) {
280  eventsThisLumi_=0;
281  maybeOpenNewLumiSection( event_->lumi() );
282  }
283  }
286  else
287  eventRunNumber_=event_->run();
288  L1EventID_ = event_->event();
289 
290  setEventCached();
291 
292  return true;
293  }
294  }
295 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:366
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:349
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 613 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, callgraph::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

614 {
615  //no deletion in file list mode
616  if (fileListMode_) return;
617 
618  const boost::filesystem::path filePath(fileName);
619  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
620  try {
621  //sometimes this fails but file gets deleted
622  boost::filesystem::remove(filePath);
623  }
624  catch (const boost::filesystem::filesystem_error& ex)
625  {
626  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
627  << ". Trying again.";
628  usleep(100000);
629  try {
630  boost::filesystem::remove(filePath);
631  }
632  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
633  }
634  catch (std::exception& ex)
635  {
636  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
637  << ". Trying again.";
638  usleep(100000);
639  try {
640  boost::filesystem::remove(filePath);
641  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
642  }
643 }
#define LogDebug(id)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
bool FedRawDataInputSource::exceptionState ( )
inlineprivate
void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 205 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

206 {
208  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
209  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
210  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
211  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
212  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
213  desc.addUntracked<unsigned int> ("alwaysStartFromfirstLS",false)->setComment("Force source to start from LS 1 if server provides higher lumisection number");
214  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
215  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
216  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
217  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
218  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
219  desc.setAllowAnything();
220  descriptions.add("source", desc);
221 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 722 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, FEDRawDataCollection::FEDData(), l1t::stage2::layer2::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and ntuplemaker::time.

Referenced by read().

723 {
725  timeval stv;
726  gettimeofday(&stv,nullptr);
727  time = stv.tv_sec;
728  time = (time << 32) + stv.tv_usec;
729  edm::Timestamp tstamp(time);
730 
731  uint32_t eventSize = event_->eventSize();
732  unsigned char* event = (unsigned char*)event_->payload();
733  GTPEventID_=0;
734  tcds_pointer_ = nullptr;
735  while (eventSize > 0) {
736  assert(eventSize>=FEDTrailer::length);
737  eventSize -= FEDTrailer::length;
738  const FEDTrailer fedTrailer(event + eventSize);
739  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
740  assert(eventSize>=fedSize - FEDHeader::length);
741  eventSize -= (fedSize - FEDHeader::length);
742  const FEDHeader fedHeader(event + eventSize);
743  const uint16_t fedId = fedHeader.sourceID();
744  if(fedId>FEDNumbering::MAXFEDID)
745  {
746  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
747  }
748  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
749  tcds_pointer_ = event + eventSize;
750  }
751  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
752  if (evf::evtn::evm_board_sense(event + eventSize,fedSize))
753  GTPEventID_ = evf::evtn::get(event + eventSize,true);
754  else
755  GTPEventID_ = evf::evtn::get(event + eventSize,false);
756  //evf::evtn::evm_board_setformat(fedSize);
757  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
758  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
759  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
760  }
761  //take event ID from GTPE FED
762  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
763  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
764  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
765  }
766  }
767  FEDRawData& fedData = rawData.FEDData(fedId);
768  fedData.resize(fedSize);
769  memcpy(fedData.data(), event + eventSize, fedSize);
770  }
771  assert(eventSize == 0);
772 
773  return tstamp;
774 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:61
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1338 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1339 {
1340  std::lock_guard<std::mutex> lock(monlock_);
1341  auto itr = sourceEventsReport_.find(lumi);
1342  if (itr!=sourceEventsReport_.end()) {
1343  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1344  if (erase)
1345  sourceEventsReport_.erase(itr);
1346  return ret;
1347  }
1348  else
1349  return std::pair<bool,unsigned int>(false,0);
1350 }
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1381 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, callgraph::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1382 {
1383  if (fileListIndex_ < fileNames_.size()) {
1384  nextFile = fileNames_[fileListIndex_];
1385  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1386  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1387  boost::filesystem::path fileName = nextFile;
1388  std::string fileStem = fileName.stem().string();
1389  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1390  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1391 
1392  if (!fileListLoopMode_)
1393  ls = boost::lexical_cast<unsigned int>(fileStem);
1394  else //always starting from LS 1 in loop mode
1395  ls = 1 + loopModeIterationInc_;
1396 
1397  //fsize = 0;
1398  //lockWaitTime = 0;
1399  fileListIndex_++;
1401  }
1402  else {
1403  if (!fileListLoopMode_)
1405  else {
1406  //loop through files until interrupted
1408  fileListIndex_=0;
1409  return getFile(ls,nextFile,fsize,lockWaitTime);
1410  }
1411  }
1412 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 350 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, threadError(), mps_check::timeout, verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

351 {
352 
354  if (!currentFile_)
355  {
358  if (!fileQueue_.try_pop(currentFile_))
359  {
360  //sleep until wakeup (only in single-buffer mode) or timeout
361  std::unique_lock<std::mutex> lkw(mWakeup_);
362  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
364  }
365  status = currentFile_->status_;
366  if ( status == evf::EvFDaqDirector::runEnded)
367  {
369  delete currentFile_;
370  currentFile_=nullptr;
371  return status;
372  }
373  else if ( status == evf::EvFDaqDirector::runAbort)
374  {
375  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
376  }
377  else if (status == evf::EvFDaqDirector::newLumi)
378  {
380  if (getLSFromFilename_) {
383  eventsThisLumi_=0;
385  }
386  }
387  else {//let this be picked up from next event
389  }
390 
391  delete currentFile_;
392  currentFile_=nullptr;
393  return status;
394  }
395  else if (status == evf::EvFDaqDirector::newFile) {
397  }
398  else
399  assert(false);
400  }
402 
403  //file is empty
404  if (!currentFile_->fileSize_) {
406  //try to open new lumi
407  assert(currentFile_->nChunks_==0);
408  if (getLSFromFilename_)
411  eventsThisLumi_=0;
413  }
414  //immediately delete empty file
416  delete currentFile_;
417  currentFile_=nullptr;
419  }
420 
421  //file is finished
424  //release last chunk (it is never released elsewhere)
427  {
428  throw cms::Exception("FedRawDataInputSource::getNextEvent")
429  << "Fully processed " << currentFile_->nProcessed_
430  << " from the file " << currentFile_->fileName_
431  << " but according to BU JSON there should be "
432  << currentFile_->nEvents_ << " events";
433  }
434  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
435  if (singleBufferMode_) {
436  std::unique_lock<std::mutex> lkw(mWakeup_);
437  cvWakeup_.notify_one();
438  }
441  //put the file in pending delete list;
442  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
443  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
444  }
445  else {
446  //in single-thread and stream jobs, events are already processed
448  delete currentFile_;
449  }
450  currentFile_=nullptr;
452  }
453 
454 
455  //file is too short
457  {
458  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
459  "Premature end of input file while reading event header";
460  }
461  if (singleBufferMode_) {
462 
463  //should already be there
466  usleep(10000);
468  }
470 
471  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
472 
473  //conditions when read amount is not sufficient for the header to fit
474  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
476  {
478 
479  if (detectedFRDversion_==0) {
480  detectedFRDversion_=*((uint32*)dataPosition);
481  if (detectedFRDversion_>5)
482  throw cms::Exception("FedRawDataInputSource::getNextEvent")
483  << "Unknown FRD version -: " << detectedFRDversion_;
484  assert(detectedFRDversion_>=1);
485  }
486 
487  //recalculate chunk position
488  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
489  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
490  {
491  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
492  "Premature end of input file while reading event header";
493  }
494  }
495 
496  event_.reset( new FRDEventMsgView(dataPosition) );
497  if (event_->size()>eventChunkSize_) {
498  throw cms::Exception("FedRawDataInputSource::getNextEvent")
499  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
500  << " run:" << event_->run() << " of size:" << event_->size()
501  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
502  }
503 
504  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
505 
507  {
508  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
509  "Premature end of input file while reading event data";
510  }
511  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
513  //recalculate chunk position
514  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
515  event_.reset( new FRDEventMsgView(dataPosition) );
516  }
517  currentFile_->bufferPosition_ += event_->size();
518  currentFile_->chunkPosition_ += event_->size();
519  //last chunk is released when this function is invoked next time
520 
521  }
522  //multibuffer mode:
523  else
524  {
525  //wait for the current chunk to become added to the vector
528  usleep(10000);
530  }
532 
533  //check if header is at the boundary of two chunks
534  chunkIsFree_ = false;
535  unsigned char *dataPosition;
536 
537  //read header, copy it to a single chunk if necessary
538  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
539 
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  if (event_->size()>eventChunkSize_) {
542  throw cms::Exception("FedRawDataInputSource::getNextEvent")
543  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
544  << " run:" << event_->run() << " of size:" << event_->size()
545  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
546  }
547 
548  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
549 
551  {
552  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
553  "Premature end of input file while reading event data";
554  }
555 
556  if (chunkEnd) {
557  //header was at the chunk boundary, we will have to move payload as well
558  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
559  chunkIsFree_ = true;
560  }
561  else {
562  //header was contiguous, but check if payload fits the chunk
563  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
564  //rewind to header start position
565  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
566  //copy event to a chunk start and move pointers
567  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
568  assert(chunkEnd);
569  chunkIsFree_=true;
570  //header is moved
571  event_.reset( new FRDEventMsgView(dataPosition) );
572  }
573  else {
574  //everything is in a single chunk, only move pointers forward
575  chunkEnd = currentFile_->advance(dataPosition,msgSize);
576  assert(!chunkEnd);
577  chunkIsFree_=false;
578  }
579  }
580  }//end multibuffer mode
582 
583  if (verifyChecksum_ && event_->version() >= 5)
584  {
585  uint32_t crc=0;
586  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
587  if ( crc != event_->crc32c() ) {
589  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
590  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
591  " but calculated 0x" << crc;
592  }
593  }
594  else if ( verifyAdler32_ && event_->version() >= 3)
595  {
596  uint32_t adler = adler32(0L,Z_NULL,0);
597  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
598 
599  if ( adler != event_->adler32() ) {
601  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
602  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
603  " but calculated 0x" << adler;
604  }
605  }
607 
609 
611 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1352 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, callgraph::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1353 {
1354  std::sort(fileNames_.begin(),fileNames_.end(),
1355  [](std::string a, std::string b) {
1356  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1357  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1358  return b > a;});
1359 
1360  if (!fileNames_.empty()) {
1361  //get run number from first file in the vector
1363  std::string fileStem = fileName.stem().string();
1364  auto end = fileStem.find("_");
1365  if (fileStem.find("run")==0) {
1366  std::string runStr = fileStem.substr(3,end-3);
1367  try {
1368  //get long to support test run numbers < 2^32
1369  long rval = boost::lexical_cast<long>(runStr);
1370  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1371  return rval;
1372  }
1373  catch( boost::bad_lexical_cast const& ) {
1374  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1375  }
1376  }
1377  }
1378  return -1;
1379 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:39
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 297 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, runEdmFileComparison::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNextEvent(), and getNextEvent().

298 {
300  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
301 
302  if (!useFileBroker_) {
303  if ( currentLumiSection_ > 0) {
304  const std::string fuEoLS =
306  struct stat buf;
307  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
308  if ( !found ) {
310  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
311  close(eol_fd);
312  daqDirector_->createBoLSFile(lumiSection,false);
314  }
315  }
316  else daqDirector_->createBoLSFile(lumiSection,true);//needed for initial lumisection
317  }
318 
319  currentLumiSection_ = lumiSection;
320 
322 
323  timeval tv;
324  gettimeofday(&tv, nullptr);
325  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
326 
327  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
329  runAuxiliary()->run(),
330  lumiSection, lsopentime,
332 
333  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
334  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
335 
336  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
337  }
338 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:497
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:341
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:503
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:349
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:249
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 340 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

341 {
343  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
344  {
345  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
346  }
347  return status;
348 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 646 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

647 {
649  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
650  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
651 
652  if (useL1EventID_){
654  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
656  aux.setProcessHistoryID(processHistoryID_);
657  makeEvent(eventPrincipal, aux);
658  }
659  else if(tcds_pointer_==nullptr){
660  assert(GTPEventID_);
662  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
664  aux.setProcessHistoryID(processHistoryID_);
665  makeEvent(eventPrincipal, aux);
666  }
667  else{
668  const FEDHeader fedHeader(tcds_pointer_);
669  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
672  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
675  makeEvent(eventPrincipal, aux);
676  }
677 
678 
679 
680  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
681 
682  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
684 
685  eventsThisLumi_++;
687 
688  //resize vector if needed
689  while (streamFileTracker_.size() <= eventPrincipal.streamID())
690  streamFileTracker_.push_back(-1);
691 
693 
694  //this old file check runs no more often than every 10 events
695  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
696  //delete files that are not in processing
697  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
698  auto it = filesToDelete_.begin();
699  while (it!=filesToDelete_.end()) {
700  bool fileIsBeingProcessed = false;
701  for (unsigned int i=0;i<nStreams_;i++) {
702  if (it->first == streamFileTracker_.at(i)) {
703  fileIsBeingProcessed = true;
704  break;
705  }
706  }
707  if (!fileIsBeingProcessed) {
708  deleteFile(it->second->fileName_);
709  delete it->second;
710  it = filesToDelete_.erase(it);
711  }
712  else it++;
713  }
714 
715  }
717  chunkIsFree_=false;
719  return;
720 }
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
void setInState(FastMonitoringThread::InputState inputState)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:211
Definition: TCDSRaw.h:16
std::vector< int > streamFileTracker_
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
def move(src, dest)
Definition: eostools.py:510
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1273 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1274 {
1275 
1276  if (fileDescriptor_<0) {
1277  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1278  bufferInputRead_ = 0;
1279  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1280  if (fileDescriptor_>=0)
1281  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1282  else
1283  {
1284  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1285  << file->fileName_ << " fd:" << fileDescriptor_;
1286  }
1287  }
1288 
1289  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1290  uint32_t existingSize = 0;
1291  for (unsigned int i=0;i<readBlocks_;i++)
1292  {
1293  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1294  bufferInputRead_+=last;
1295  existingSize+=last;
1296  }
1297  }
1298  else {
1299  const uint32_t chunksize = file->chunkPosition_;
1300  const uint32_t blockcount=chunksize/eventChunkBlock_;
1301  const uint32_t leftsize = chunksize%eventChunkBlock_;
1302  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1303  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1304 
1305  for (uint32_t i=0;i<blockcount;i++) {
1306  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1307  bufferInputRead_+=last;
1308  existingSize+=last;
1309  }
1310  if (leftsize) {
1311  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1312  bufferInputRead_+=last;
1313  }
1314  file->chunkPosition_=0;//data was moved to beginning of the chunk
1315  }
1316  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1317  if (fileDescriptor_!=-1)
1318  {
1319  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1320  close(fileDescriptor_);
1321  fileDescriptor_=-1;
1322  }
1323  }
1324 }
#define LogDebug(id)
uint32_t chunkPosition_
void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 780 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, InputFile::chunks_, cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), mps_fire::i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, createfilelist::int, LogDebug, eostools::ls(), maxBufferedFiles_, min(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, callgraph::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, trackingPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

781 {
782  bool stop=false;
783  unsigned int currentLumiSection = 0;
784  //threadInit_.exchange(true,std::memory_order_acquire);
785 
786  {
787  std::unique_lock<std::mutex> lk(startupLock_);
788  startupCv_.notify_one();
789  }
790 
791  uint32_t ls=0;
792  uint32_t monLS=1;
793  uint32_t lockCount=0;
794  uint64_t sumLockWaitTimeUs=0.;
795 
796  while (!stop) {
797 
798  //wait for at least one free thread and chunk
799  int counter=0;
801  {
802  //report state to monitoring
803  if (fms_) {
804  bool copy_active=false;
805  for (auto j : tid_active_) if (j) copy_active=true;
807  else if (freeChunks_.empty()) {
808  if (copy_active)
810  else
812  }
813  else {
814  if (copy_active)
816  else
818  }
819  }
820  std::unique_lock<std::mutex> lkw(mWakeup_);
821  //sleep until woken up by condition or a timeout
822  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
823  counter++;
824  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
825  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
826  }
827  else {
828  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
829  }
830  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
831  }
832 
833  if (stop) break;
834 
835  //look for a new file
836  std::string nextFile;
837  uint32_t fileSizeIndex;
838  int64_t fileSizeFromJson;
839 
840  if (fms_) {
844  }
845 
847 
848  int serverEventsInNewFile_=-1;
849 
850  int backoff_exp=0;
851 
852  while (status == evf::EvFDaqDirector::noFile) {
853  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
854  stop=true;
855  break;
856  }
857 
858  uint64_t thisLockWaitTimeUs=0.;
859  if (fileListMode_) {
860  //return LS if LS not set, otherwise return file
861  status = getFile(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
862  }
863  else if (!useFileBroker_)
864  status = daqDirector_->updateFuLock(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
865  else {
866  status = daqDirector_->getNextFromFileBroker(currentLumiSection,ls,nextFile,serverEventsInNewFile_,fileSizeFromJson,thisLockWaitTimeUs);
867  }
868 
870 
871  //cycle through all remaining LS even if no files get assigned
872  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
873 
874  //monitoring of lock wait time
875  if (thisLockWaitTimeUs>0.)
876  sumLockWaitTimeUs+=thisLockWaitTimeUs;
877  lockCount++;
878  if (ls>monLS) {
879  monLS=ls;
880  if (lockCount)
881  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
882  lockCount=0;
883  sumLockWaitTimeUs=0;
884  }
885 
886  //check again for any remaining index/EoLS files after EoR file is seen
889  usleep(100000);
890  //now all files should have appeared in ramdisk, check again if any raw files were left behind
891  status = daqDirector_->updateFuLock(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
892  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
893  }
894 
895  if ( status == evf::EvFDaqDirector::runEnded) {
897  stop=true;
898  break;
899  }
900 
901  //error from filelocking function
902  if (status == evf::EvFDaqDirector::runAbort) {
904  stop=true;
905  break;
906  }
907  //queue new lumisection
908  if( getLSFromFilename_) {
909  if (ls > currentLumiSection) {
910  if (!useFileBroker_) {
911  //file locking
912  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
913  currentLumiSection = ls;
914  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
915  }
916  else {
917  //new file service
918  if (currentLumiSection==0 && !alwaysStartFromFirstLS_) {
919  if (ls < 100) {
920  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
921  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
922 
923  for (unsigned int nextLS=lsToStart;nextLS<=ls;nextLS++)
925  }
926  else {
927  //start from current LS
929  }
930  }
931  else {
932  //queue all lumisections after last one seen to avoid gaps
933  for (unsigned int nextLS=currentLumiSection+1;nextLS<=ls;nextLS++) {
935  }
936  }
937  currentLumiSection = ls;
938  }
939  }
940  //else
941  if( currentLumiSection>0 && ls < currentLumiSection) {
942  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
944  stop=true;
945  break;
946  }
947  }
948 
949  int dbgcount=0;
950  if (status == evf::EvFDaqDirector::noFile) {
952  dbgcount++;
953  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
954  if (!useFileBroker_) usleep(100000);
955  else {
956  backoff_exp = std::min(4,backoff_exp); // max 1.6 seconds
957  //backoff_exp=0; // disabled!
958  int sleeptime = (int) (100000. * pow(2,backoff_exp));
959  usleep(sleeptime);
960  backoff_exp++;
961  }
962  }
963  else backoff_exp=0;
964  }
965  if ( status == evf::EvFDaqDirector::newFile ) {
967  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
968 
969 
970  std::string rawFile;
971  //file service will report raw extension
972  if (useFileBroker_) rawFile = nextFile;
973  else {
974  boost::filesystem::path rawFilePath(nextFile);
975  rawFile = rawFilePath.replace_extension(".raw").string();
976  }
977 
978  struct stat st;
979  int stat_res = stat(rawFile.c_str(),&st);
980  if (stat_res==-1) {
981  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
982  setExceptionState_=true;
983  break;
984  }
985  uint64_t fileSize=st.st_size;
986 
987  if (fms_) {
991  }
992  int eventsInNewFile;
993  if (fileListMode_) {
994  if (fileSize==0) eventsInNewFile=0;
995  else eventsInNewFile=-1;
996  }
997  else {
999  if (!useFileBroker_)
1000  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1001  else
1002  eventsInNewFile = serverEventsInNewFile_;
1003  assert( eventsInNewFile>=0 );
1004  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1005  }
1006 
1007  if (!singleBufferMode_) {
1008  //calculate number of needed chunks
1009  unsigned int neededChunks = fileSize/eventChunkSize_;
1010  if (fileSize%eventChunkSize_) neededChunks++;
1011 
1012  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1014  fileQueue_.push(newInputFile);
1015 
1016  for (unsigned int i=0;i<neededChunks;i++) {
1017 
1018  if (fms_) {
1019  bool copy_active=false;
1020  for (auto j : tid_active_) if (j) copy_active=true;
1021  if (copy_active)
1023  else
1025  }
1026  //get thread
1027  unsigned int newTid = 0xffffffff;
1028  while (!workerPool_.try_pop(newTid)) {
1029  usleep(100000);
1030  if (quit_threads_.load(std::memory_order_relaxed)) {stop=true;break;}
1031  }
1032 
1033  if (fms_) {
1034  bool copy_active=false;
1035  for (auto j : tid_active_) if (j) copy_active=true;
1036  if (copy_active)
1038  else
1040  }
1041  InputChunk * newChunk = nullptr;
1042  while (!freeChunks_.try_pop(newChunk)) {
1043  usleep(100000);
1044  if (quit_threads_.load(std::memory_order_relaxed)) {stop=true;break;}
1045  }
1046 
1047  if (newChunk == nullptr) {
1048  //return unused tid if we received shutdown (nullptr chunk)
1049  if (newTid!=0xffffffff) workerPool_.push(newTid);
1050  stop = true;
1051  break;
1052  }
1053  if (stop) break;
1055 
1056  std::unique_lock<std::mutex> lk(mReader_);
1057 
1058  unsigned int toRead = eventChunkSize_;
1059  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1060  newChunk->reset(i*eventChunkSize_,toRead,i);
1061 
1062  workerJob_[newTid].first=newInputFile;
1063  workerJob_[newTid].second=newChunk;
1064 
1065  //wake up the worker thread
1066  cvReader_[newTid]->notify_one();
1067  }
1068  }
1069  else {
1070  if (!eventsInNewFile) {
1071  //still queue file for lumi update
1072  std::unique_lock<std::mutex> lkw(mWakeup_);
1073  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1075  fileQueue_.push(newInputFile);
1076  cvWakeup_.notify_one();
1077  break;
1078  }
1079  //in single-buffer mode put single chunk in the file and let the main thread read the file
1080  InputChunk * newChunk;
1081  //should be available immediately
1082  while(!freeChunks_.try_pop(newChunk)) {
1083  usleep(100000);
1084  if (quit_threads_.load(std::memory_order_relaxed)) break;
1085  }
1086 
1087  std::unique_lock<std::mutex> lkw(mWakeup_);
1088 
1089  unsigned int toRead = eventChunkSize_;
1090  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1091  newChunk->reset(0,toRead,0);
1092  newChunk->readComplete_=true;
1093 
1094  //push file and wakeup main thread
1095  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1096  newInputFile->chunks_[0]=newChunk;
1098  fileQueue_.push(newInputFile);
1099  cvWakeup_.notify_one();
1100  }
1101  }
1102  }
1104  //make sure threads finish reading
1105  unsigned numFinishedThreads = 0;
1106  while (numFinishedThreads < workerThreads_.size()) {
1107  unsigned int tid;
1108  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1109  std::unique_lock<std::mutex> lk(mReader_);
1110  thread_quit_signal[tid]=true;
1111  cvReader_[tid]->notify_one();
1112  numFinishedThreads++;
1113  }
1114  for (unsigned int i=0;i<workerThreads_.size();i++) {
1115  workerThreads_[i]->join();
1116  delete workerThreads_[i];
1117  }
1118 }
#define LogDebug(id)
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
T min(T a, T b)
Definition: MathUtil.h:58
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:348
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
tbb::concurrent_queue< InputChunk * > freeChunks_
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:40
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1120 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, mps_fire::i, init, plotBeamSpotDB::last, LogDebug, mReader_, cmsPerfSuiteHarvest::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1121 {
1122  bool init = true;
1123  threadInit_.exchange(true,std::memory_order_acquire);
1124 
1125  while (true) {
1126 
1127  tid_active_[tid]=false;
1128  std::unique_lock<std::mutex> lk(mReader_);
1129  workerJob_[tid].first=nullptr;
1130  workerJob_[tid].first=nullptr;
1131 
1132  assert(!thread_quit_signal[tid]);//should never get it here
1133  workerPool_.push(tid);
1134 
1135  if (init) {
1136  std::unique_lock<std::mutex> lk(startupLock_);
1137  init = false;
1138  startupCv_.notify_one();
1139  }
1140  cvReader_[tid]->wait(lk);
1141 
1142  if (thread_quit_signal[tid]) return;
1143  tid_active_[tid]=true;
1144 
1145  InputFile * file;
1146  InputChunk * chunk;
1147 
1148  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1149 
1150  file = workerJob_[tid].first;
1151  chunk = workerJob_[tid].second;
1152 
1153  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1154  off_t pos = 0;
1155 
1156 
1157  if (fileDescriptor<0) {
1158  edm::LogError("FedRawDataInputSource") <<
1159  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<" error: " << strerror(errno);
1160  setExceptionState_=true;
1161  continue;
1162  }
1163  pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1164  if (pos==-1) {
1165  edm::LogError("FedRawDataInputSource") <<
1166  "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1167  " to offset " << chunk->offset_ << " error: " << strerror(errno);
1168  setExceptionState_=true;
1169  continue;
1170  }
1171 
1172  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1173 
1174  unsigned int bufferLeft = 0;
1176  for (unsigned int i=0;i<readBlocks_;i++)
1177  {
1178  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1179  if (last<0) {
1180  edm::LogError("FedRawDataInputSource") <<
1181  "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " error: " << strerror(errno);
1182  setExceptionState_=true;
1183  break;
1184  }
1185  if ( last > 0 )
1186  bufferLeft+=last;
1187  if (last < eventChunkBlock_) {
1188  if (!(chunk->usedSize_==i*eventChunkBlock_+last)) {
1189  edm::LogError("FedRawDataInputSource") <<
1190  "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last <<
1191  " expectedChunkSize:" << chunk->usedSize_ << " readChunkSize:" << (i*eventChunkBlock_+last) << " error: " << strerror(errno);
1192  setExceptionState_=true;
1193  }
1194  break;
1195  }
1196  }
1197  if (setExceptionState_) continue;
1198 
1200  auto diff = end-start;
1201  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1202  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1203  close(fileDescriptor);
1204 
1205  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1206  assert(detectedFRDversion_<=5);
1207  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1208  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1209 
1210  }
1211 }
#define LogDebug(id)
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:39
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1327 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1328 {
1329 
1330  std::lock_guard<std::mutex> lock(monlock_);
1331  auto itr = sourceEventsReport_.find(lumi);
1332  if (itr!=sourceEventsReport_.end())
1333  itr->second+=events;
1334  else
1336 }
std::map< unsigned int, unsigned int > sourceEventsReport_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 776 of file FedRawDataInputSource.cc.

777 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1213 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by getNextEvent().

1214 {
1215  quit_threads_=true;
1216  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1217 
1218 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 39 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Definition at line 38 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 78 of file FedRawDataInputSource.h.

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 119 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 155 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 104 of file FedRawDataInputSource.h.

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 84 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 83 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 85 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 172 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::useFileBroker_
private
const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private