CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 129 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, cppFunctionSkipper::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
69  eventID_(),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
81 
83  setNewRun();
86 
87  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
88  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
89  struct stat statbuf;
90  if (stat(defPath_.c_str(), &statbuf)) {
91  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
92  if (stat(defPath_.c_str(), &statbuf)) {
93  defPath_ = defPathSuffix;
94  }
95  }
96 
97  dpd_ = new DataPointDefinition();
98  std::string defLabel = "data";
99  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
100 
101  //make sure that chunk size is N * block size
106 
107  if (!numBuffers_)
108  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
109  "no reading enabled with numBuffers parameter 0";
110 
114 
115  if (!crc32c_hw_test())
116  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
117 
118  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
120  if (!fms_) {
121  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
122  }
123 
125  if (!daqDirector_)
126  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
127 
128  //set DaqDirector to delete files in preGlobalEndLumi callback
130  if (fms_) daqDirector_->setFMS(fms_);
131 
132  fms_->setInputSource(this);
133  //should delete chunks when run stops
134  for (unsigned int i=0;i<numBuffers_;i++) {
136  }
137 
138  quit_threads_ = false;
139 
140  for (unsigned int i=0;i<numConcurrentReads_;i++)
141  {
142  std::unique_lock<std::mutex> lk(startupLock_);
143  //issue a memory fence here and in threads (constructor was segfaulting without this)
144  thread_quit_signal.push_back(false);
145  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
146  cvReader_.push_back(new std::condition_variable);
147  threadInit_.store(false,std::memory_order_release);
148  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
149  startupCv_.wait(lk);
150  }
151 
152  runAuxiliary()->setProcessHistoryID(processHistoryID_);
153 }
int i
Definition: DBlmapReader.cc:9
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void readWorker(unsigned int tid)
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 155 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

156 {
157  quit_threads_=true;
158 
159  //delete any remaining open files
160  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
161  deleteFile(it->second->fileName_);
162  delete it->second;
163  }
165  readSupervisorThread_->join();
166  }
167  else {
168  //join aux threads in case the supervisor thread was not started
169  for (unsigned int i=0;i<workerThreads_.size();i++) {
170  std::unique_lock<std::mutex> lk(mReader_);
171  thread_quit_signal[i]=true;
172  cvReader_[i]->notify_one();
173  lk.unlock();
174  workerThreads_[i]->join();
175  delete workerThreads_[i];
176  }
177  }
178  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
179  /*
180  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
181  InputChunk *ch;
182  while (!freeChunks_.try_pop(ch)) {}
183  delete ch;
184  }
185  */
186 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 203 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

204 {
206  {
207  //this thread opens new files and dispatches reading to worker readers
208  //threadInit_.store(false,std::memory_order_release);
209  std::unique_lock<std::mutex> lk(startupLock_);
210  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
212  startupCv_.wait(lk);
213  }
214  //signal hltd to start event accounting
217 
218  switch (nextEvent() ) {
220 
221  //maybe create EoL file in working directory before ending run
222  struct stat buf;
223  if ( currentLumiSection_ > 0 ) {
224  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
225  if (eolFound) {
227  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
228  if ( !found ) {
230  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
231  close(eol_fd);
233  }
234  }
235  }
236  //also create EoR file in FU data directory
237  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
238  if (!eorFound) {
239  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
240  close(eor_fd);
241  }
243  eventsThisLumi_=0;
245  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
246  return false;
247  }
249  //this is not reachable
250  return true;
251  }
253  //std::cout << "--------------NEW LUMI---------------" << std::endl;
254  return true;
255  }
256  default: {
257  if (!getLSFromFilename_) {
258  //get new lumi from file header
259  if (event_->lumi() > currentLumiSection_) {
261  eventsThisLumi_=0;
262  maybeOpenNewLumiSection( event_->lumi() );
263  }
264  }
265  eventRunNumber_=event_->run();
266  L1EventID_ = event_->event();
267 
268  setEventCached();
269 
270  return true;
271  }
272  }
273 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 275 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

276 {
277  //used for backpressure mechanisms and monitoring
278  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
279  struct stat buf;
280  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
281  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
282  close(bol_fd);
283  }
284 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 598 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

599 {
600  const boost::filesystem::path filePath(fileName);
601  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
602  try {
603  //sometimes this fails but file gets deleted
604  boost::filesystem::remove(filePath);
605  }
606  catch (const boost::filesystem::filesystem_error& ex)
607  {
608  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
609  << ". Trying again.";
610  usleep(100000);
611  try {
612  boost::filesystem::remove(filePath);
613  }
614  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
615  }
616  catch (std::exception& ex)
617  {
618  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
619  << ". Trying again.";
620  usleep(100000);
621  try {
622  boost::filesystem::remove(filePath);
623  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
624  }
625 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 74 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 188 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), and edm::ParameterSetDescription::setComment().

189 {
191  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
192  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
193  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
194  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
195  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
196  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
197  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
198  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
199  desc.setAllowAnything();
200  descriptions.add("source", desc);
201 }
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 697 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), stage2MP7BufferRaw_cfi::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

698 {
699  edm::TimeValue_t time;
700  timeval stv;
701  gettimeofday(&stv,0);
702  time = stv.tv_sec;
703  time = (time << 32) + stv.tv_usec;
704  edm::Timestamp tstamp(time);
705 
706  uint32_t eventSize = event_->eventSize();
707  char* event = (char*)event_->payload();
708  GTPEventID_=0;
709  tcds_pointer_ = 0;
710  while (eventSize > 0) {
711  assert(eventSize>=sizeof(fedt_t));
712  eventSize -= sizeof(fedt_t);
713  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
714  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
715  assert(eventSize>=fedSize - sizeof(fedt_t));
716  eventSize -= (fedSize - sizeof(fedt_t));
717  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
718  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
719  if(fedId>FEDNumbering::MAXFEDID)
720  {
721  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
722  }
723  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
724  tcds_pointer_ = (unsigned char *)(event + eventSize );
725  }
726  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
727  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
728  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
729  else
730  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
731  //evf::evtn::evm_board_setformat(fedSize);
732  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
733  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
734  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
735  }
736  //take event ID from GTPE FED
737  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
738  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
739  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
740  }
741  }
742  FEDRawData& fedData = rawData.FEDData(fedId);
743  fedData.resize(fedSize);
744  memcpy(fedData.data(), event + eventSize, fedSize);
745  }
746  assert(eventSize == 0);
747 
748  return tstamp;
749 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1271 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1272 {
1273  std::lock_guard<std::mutex> lock(monlock_);
1274  auto itr = sourceEventsReport_.find(lumi);
1275  if (itr!=sourceEventsReport_.end()) {
1276  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1277  if (erase)
1278  sourceEventsReport_.erase(itr);
1279  return ret;
1280  }
1281  else
1282  return std::pair<bool,unsigned int>(false,0);
1283 }
tuple ret
prodAgent to be discontinued
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 337 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

338 {
339 
341  if (!currentFile_)
342  {
343  if (!streamFileTrackerPtr_) {
347  }
348 
350  if (!fileQueue_.try_pop(currentFile_))
351  {
352  //sleep until wakeup (only in single-buffer mode) or timeout
353  std::unique_lock<std::mutex> lkw(mWakeup_);
354  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
356  }
357  status = currentFile_->status_;
358  if ( status == evf::EvFDaqDirector::runEnded)
359  {
360  delete currentFile_;
361  currentFile_=nullptr;
362  return status;
363  }
364  else if ( status == evf::EvFDaqDirector::runAbort)
365  {
366  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
367  }
368  else if (status == evf::EvFDaqDirector::newLumi)
369  {
370  if (getLSFromFilename_) {
373  eventsThisLumi_=0;
375  }
376  }
377  else {//let this be picked up from next event
379  }
380 
381  delete currentFile_;
382  currentFile_=nullptr;
383  return status;
384  }
385  else if (status == evf::EvFDaqDirector::newFile) {
388  }
389  else
390  assert(0);
391  }
392 
393  //file is empty
394  if (!currentFile_->fileSize_) {
396  //try to open new lumi
398  if (getLSFromFilename_)
401  eventsThisLumi_=0;
403  }
404  //immediately delete empty file
406  delete currentFile_;
407  currentFile_=nullptr;
409  }
410 
411  //file is finished
414  //release last chunk (it is never released elsewhere)
417  {
418  throw cms::Exception("FedRawDataInputSource::getNextEvent")
419  << "Fully processed " << currentFile_->nProcessed_
420  << " from the file " << currentFile_->fileName_
421  << " but according to BU JSON there should be "
422  << currentFile_->nEvents_ << " events";
423  }
424  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
425  if (singleBufferMode_) {
426  std::unique_lock<std::mutex> lkw(mWakeup_);
427  cvWakeup_.notify_one();
428  }
431  //put the file in pending delete list;
432  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
433  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
434  }
435  else {
436  //in single-thread and stream jobs, events are already processed
438  delete currentFile_;
439  }
440  currentFile_=nullptr;
442  }
443 
444 
445  //file is too short
447  {
448  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
449  "Premature end of input file while reading event header";
450  }
451  if (singleBufferMode_) {
452 
453  //should already be there
455  usleep(10000);
457  }
458 
459  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
460 
461  //conditions when read amount is not sufficient for the header to fit
462  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
464  {
466 
467  if (detectedFRDversion_==0) {
468  detectedFRDversion_=*((uint32*)dataPosition);
469  if (detectedFRDversion_>5)
470  throw cms::Exception("FedRawDataInputSource::getNextEvent")
471  << "Unknown FRD version -: " << detectedFRDversion_;
472  assert(detectedFRDversion_>=1);
473  }
474 
475  //recalculate chunk position
476  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
477  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
478  {
479  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
480  "Premature end of input file while reading event header";
481  }
482  }
483 
484  event_.reset( new FRDEventMsgView(dataPosition) );
485  if (event_->size()>eventChunkSize_) {
486  throw cms::Exception("FedRawDataInputSource::getNextEvent")
487  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
488  << " run:" << event_->run() << " of size:" << event_->size()
489  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
490  }
491 
492  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
493 
495  {
496  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
497  "Premature end of input file while reading event data";
498  }
499  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
501  //recalculate chunk position
502  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
503  event_.reset( new FRDEventMsgView(dataPosition) );
504  }
505  currentFile_->bufferPosition_ += event_->size();
506  currentFile_->chunkPosition_ += event_->size();
507  //last chunk is released when this function is invoked next time
508 
509  }
510  //multibuffer mode:
511  else
512  {
513  //wait for the current chunk to become added to the vector
515  usleep(10000);
517  }
518 
519  //check if header is at the boundary of two chunks
520  chunkIsFree_ = false;
521  unsigned char *dataPosition;
522 
523  //read header, copy it to a single chunk if necessary
524  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
525 
526  event_.reset( new FRDEventMsgView(dataPosition) );
527  if (event_->size()>eventChunkSize_) {
528  throw cms::Exception("FedRawDataInputSource::getNextEvent")
529  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
530  << " run:" << event_->run() << " of size:" << event_->size()
531  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
532  }
533 
534  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
535 
537  {
538  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
539  "Premature end of input file while reading event data";
540  }
541 
542  if (chunkEnd) {
543  //header was at the chunk boundary, we will have to move payload as well
544  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
545  chunkIsFree_ = true;
546  }
547  else {
548  //header was contiguous, but check if payload fits the chunk
549  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
550  //rewind to header start position
551  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
552  //copy event to a chunk start and move pointers
553  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
554  assert(chunkEnd);
555  chunkIsFree_=true;
556  //header is moved
557  event_.reset( new FRDEventMsgView(dataPosition) );
558  }
559  else {
560  //everything is in a single chunk, only move pointers forward
561  chunkEnd = currentFile_->advance(dataPosition,msgSize);
562  assert(!chunkEnd);
563  chunkIsFree_=false;
564  }
565  }
566  }//end multibuffer mode
567 
568  if (verifyChecksum_ && event_->version() >= 5)
569  {
570  uint32_t crc=0;
571  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
572  if ( crc != event_->crc32c() ) {
574  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
575  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
576  " but calculated 0x" << crc;
577  }
578  }
579  else if ( verifyAdler32_ && event_->version() >= 3)
580  {
581  uint32_t adler = adler32(0L,Z_NULL,0);
582  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
583 
584  if ( adler != event_->adler32() ) {
586  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
587  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
588  " but calculated 0x" << adler;
589  }
590  }
591 
592 
594 
596 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 751 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

752 {
754  try {
755  // assemble json destination path
757 
758  //TODO:should be ported to use fffnaming
759  std::ostringstream fileNameWithPID;
760  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
761  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
762  jsonDestPath /= fileNameWithPID.str();
763 
764  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
765  << jsonDestPath;
766  try {
767  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
768  }
769  catch (const boost::filesystem::filesystem_error& ex)
770  {
771  // Input dir gone?
772  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
773  // << " Maybe the file is not yet visible by FU. Trying again in one second";
774  sleep(1);
775  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
776  }
778 
779  try {
780  //sometimes this fails but file gets deleted
781  boost::filesystem::remove(jsonSourcePath);
782  }
783  catch (const boost::filesystem::filesystem_error& ex)
784  {
785  // Input dir gone?
786  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
787  }
788  catch (std::exception& ex)
789  {
790  // Input dir gone?
791  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
792  }
793 
794  boost::filesystem::ifstream ij(jsonDestPath);
795  Json::Value deserializeRoot;
797 
798  if (!reader.parse(ij, deserializeRoot))
799  throw std::runtime_error("Cannot deserialize input JSON file");
800 
801  //read BU JSON
803  DataPoint dp;
804  dp.deserialize(deserializeRoot);
805  bool success = false;
806  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
807  if (dpd_->getNames().at(i)=="NEvents")
808  if (i<dp.getData().size()) {
809  data = dp.getData()[i];
810  success=true;
811  }
812  }
813  if (!success) {
814  if (dp.getData().size())
815  data = dp.getData()[0];
816  else
817  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
818  " error reading number of events from BU JSON -: No input value " << data;
819  }
820  return boost::lexical_cast<int>(data);
821 
822  }
823  catch (const boost::filesystem::filesystem_error& ex)
824  {
825  // Input dir gone?
827  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
828  }
829  catch (std::runtime_error e)
830  {
831  // Another process grabbed the file and NFS did not register this
833  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
834  }
835 
836  catch( boost::bad_lexical_cast const& ) {
837  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
838  << "Input value is -: " << data;
839  }
840 
841  catch (std::exception e)
842  {
843  // BU run directory disappeared?
845  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
846  }
847 
848  return -1;
849 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
auto dp
Definition: deltaR.h:22
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 286 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

287 {
289  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
290 
291  if ( currentLumiSection_ > 0 ) {
292  const std::string fuEoLS =
294  struct stat buf;
295  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
296  if ( !found ) {
298  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
299  close(eol_fd);
300  createBoLSFile(lumiSection,false);
302  }
303  }
304  else createBoLSFile(lumiSection,true);//needed for initial lumisection
305 
306  currentLumiSection_ = lumiSection;
307 
309 
310  timeval tv;
311  gettimeofday(&tv, 0);
312  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
313 
314  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
316  runAuxiliary()->run(),
317  lumiSection, lsopentime,
319 
320  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
321  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
322 
323  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
324  }
325 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 327 of file FedRawDataInputSource.cc.

References getNextEvent(), load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

328 {
330  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
331  {
332  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
333  }
334  return status;
335 }
volatile std::atomic< bool > shutdown_flag
void load(int perCUT=90)
Definition: getMaxPt.h:59
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 854 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

855 {
856  InputSource::rewind();
860 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 851 of file FedRawDataInputSource.cc.

852 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 628 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

629 {
630  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
631  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
632 
633  if (useL1EventID_){
635  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
637  aux.setProcessHistoryID(processHistoryID_);
638  makeEvent(eventPrincipal, aux);
639  }
640  else if(tcds_pointer_==0){
643  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
645  aux.setProcessHistoryID(processHistoryID_);
646  makeEvent(eventPrincipal, aux);
647  }
648  else{
649  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
652  processGUID());
654  makeEvent(eventPrincipal, aux);
655  }
656 
657 
658 
659  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
660 
661  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
662  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
663  // daqProvenanceHelper_.dummyProvenance_);
664 
665  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
667 
668  eventsThisLumi_++;
669 
670  //this old file check runs no more often than every 10 events
671  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
672  //delete files that are not in processing
673  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
674  auto it = filesToDelete_.begin();
675  while (it!=filesToDelete_.end()) {
676  bool fileIsBeingProcessed = false;
677  for (unsigned int i=0;i<nStreams_;i++) {
678  if (it->first == streamFileTrackerPtr_->at(i)) {
679  fileIsBeingProcessed = true;
680  break;
681  }
682  }
683  if (!fileIsBeingProcessed) {
684  deleteFile(it->second->fileName_);
685  delete it->second;
686  it = filesToDelete_.erase(it);
687  }
688  else it++;
689  }
690 
691  }
693  chunkIsFree_=false;
694  return;
695 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1206 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1207 {
1208 
1209  if (fileDescriptor_<0) {
1210  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1211  bufferInputRead_ = 0;
1212  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1213  if (fileDescriptor_>=0)
1214  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1215  else
1216  {
1217  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1218  << file->fileName_ << " fd:" << fileDescriptor_;
1219  }
1220  }
1221 
1222  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1223  uint32_t existingSize = 0;
1224  for (unsigned int i=0;i<readBlocks_;i++)
1225  {
1226  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1227  bufferInputRead_+=last;
1228  existingSize+=last;
1229  }
1230  }
1231  else {
1232  const uint32_t chunksize = file->chunkPosition_;
1233  const uint32_t blockcount=chunksize/eventChunkBlock_;
1234  const uint32_t leftsize = chunksize%eventChunkBlock_;
1235  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1236  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1237 
1238  for (uint32_t i=0;i<blockcount;i++) {
1239  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1240  bufferInputRead_+=last;
1241  existingSize+=last;
1242  }
1243  if (leftsize) {
1244  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1245  bufferInputRead_+=last;
1246  }
1247  file->chunkPosition_=0;//data was moved to beginning of the chunk
1248  }
1249  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1250  if (fileDescriptor_!=-1)
1251  {
1252  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1253  close(fileDescriptor_);
1254  fileDescriptor_=-1;
1255  }
1256  }
1257 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 866 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

867 {
868  bool stop=false;
869  unsigned int currentLumiSection = 0;
870  //threadInit_.exchange(true,std::memory_order_acquire);
871 
872  {
873  std::unique_lock<std::mutex> lk(startupLock_);
874  startupCv_.notify_one();
875  }
876 
877  while (!stop) {
878 
879  //wait for at least one free thread and chunk
880  int counter=0;
882  {
883  std::unique_lock<std::mutex> lkw(mWakeup_);
884  //sleep until woken up by condition or a timeout
885  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
886  counter++;
887  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
888  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
889  }
890  else {
891  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
892  }
893  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
894  }
895 
896  if (stop) break;
897 
898  //look for a new file
899  std::string nextFile;
900  uint32_t ls=0;
901  uint32_t fileSize;
902 
903  uint32_t monLS=1;
904  uint32_t lockCount=0;
905  uint64_t sumLockWaitTimeUs=0.;
906 
908 
910 
911  while (status == evf::EvFDaqDirector::noFile) {
912  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
913  stop=true;
914  break;
915  }
916 
917  uint64_t thisLockWaitTimeUs=0.;
918  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
919  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
920 
921  //monitoring of lock wait time
922  if (thisLockWaitTimeUs>0.)
923  sumLockWaitTimeUs+=thisLockWaitTimeUs;
924  lockCount++;
925  if (ls>monLS) {
926  monLS=ls;
927  if (lockCount)
928  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
929  lockCount=0;
930  sumLockWaitTimeUs=0;
931  }
932 
933  //check again for any remaining index/EoLS files after EoR file is seen
934  if ( status == evf::EvFDaqDirector::runEnded) {
935  usleep(100000);
936  //now all files should have appeared in ramdisk, check again if any raw files were left behind
937  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
938  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
939  }
940 
941  if ( status == evf::EvFDaqDirector::runEnded) {
943  stop=true;
944  break;
945  }
946 
947  //queue new lumisection
948  if( getLSFromFilename_ && ls > currentLumiSection) {
949  currentLumiSection = ls;
950  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
951  }
952 
953  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
954  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
956  stop=true;
957  break;
958  }
959 
960  int dbgcount=0;
961  if (status == evf::EvFDaqDirector::noFile) {
962  dbgcount++;
963  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
964  usleep(100000);
965  }
966  }
967  if ( status == evf::EvFDaqDirector::newFile ) {
968  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
969 
970 
971  boost::filesystem::path rawFilePath(nextFile);
972  std::string rawFile = rawFilePath.replace_extension(".raw").string();
973 
974  struct stat st;
975  stat(rawFile.c_str(),&st);
976  fileSize=st.st_size;
977 
978  int eventsInNewFile = grabNextJsonFile(nextFile);
979  if (fms_) fms_->stoppedLookingForFile(ls);
980  assert( eventsInNewFile>=0 );
981  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
982 
983  if (!singleBufferMode_) {
984  //calculate number of needed chunks
985  unsigned int neededChunks = fileSize/eventChunkSize_;
986  if (fileSize%eventChunkSize_) neededChunks++;
987 
988  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
990  fileQueue_.push(newInputFile);
991 
992  for (unsigned int i=0;i<neededChunks;i++) {
993 
994  //get thread
995  unsigned int newTid = 0xffffffff;
996  while (!workerPool_.try_pop(newTid)) {
997  usleep(100000);
998  }
999 
1000  InputChunk * newChunk = nullptr;
1001  while (!freeChunks_.try_pop(newChunk)) {
1002  usleep(100000);
1003  if (quit_threads_.load(std::memory_order_relaxed)) break;
1004  }
1005 
1006  if (newChunk == nullptr) {
1007  //return unused tid if we received shutdown (nullptr chunk)
1008  if (newTid!=0xffffffff) workerPool_.push(newTid);
1009  stop = true;
1010  break;
1011  }
1012 
1013  std::unique_lock<std::mutex> lk(mReader_);
1014 
1015  unsigned int toRead = eventChunkSize_;
1016  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1017  newChunk->reset(i*eventChunkSize_,toRead,i);
1018 
1019  workerJob_[newTid].first=newInputFile;
1020  workerJob_[newTid].second=newChunk;
1021 
1022  //wake up the worker thread
1023  cvReader_[newTid]->notify_one();
1024  }
1025  }
1026  else {
1027  if (!eventsInNewFile) {
1028  //still queue file for lumi update
1029  std::unique_lock<std::mutex> lkw(mWakeup_);
1030  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1032  fileQueue_.push(newInputFile);
1033  cvWakeup_.notify_one();
1034  return;
1035  }
1036  //in single-buffer mode put single chunk in the file and let the main thread read the file
1037  InputChunk * newChunk;
1038  //should be available immediately
1039  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1040 
1041  std::unique_lock<std::mutex> lkw(mWakeup_);
1042 
1043  unsigned int toRead = eventChunkSize_;
1044  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1045  newChunk->reset(0,toRead,0);
1046  newChunk->readComplete_=true;
1047 
1048  //push file and wakeup main thread
1049  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1050  newInputFile->chunks_[0]=newChunk;
1052  fileQueue_.push(newInputFile);
1053  cvWakeup_.notify_one();
1054  }
1055  }
1056  }
1057  //make sure threads finish reading
1058  unsigned numFinishedThreads = 0;
1059  while (numFinishedThreads < workerThreads_.size()) {
1060  unsigned int tid;
1061  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1062  std::unique_lock<std::mutex> lk(mReader_);
1063  thread_quit_signal[tid]=true;
1064  cvReader_[tid]->notify_one();
1065  numFinishedThreads++;
1066  }
1067  for (unsigned int i=0;i<workerThreads_.size();i++) {
1068  workerThreads_[i]->join();
1069  delete workerThreads_[i];
1070  }
1071 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
def ls
Definition: eostools.py:348
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1073 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, plotBeamSpotDB::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1074 {
1075  bool init = true;
1076  threadInit_.exchange(true,std::memory_order_acquire);
1077 
1078  while (1) {
1079 
1080  std::unique_lock<std::mutex> lk(mReader_);
1081  workerJob_[tid].first=nullptr;
1082  workerJob_[tid].first=nullptr;
1083 
1084  assert(!thread_quit_signal[tid]);//should never get it here
1085  workerPool_.push(tid);
1086 
1087  if (init) {
1088  std::unique_lock<std::mutex> lk(startupLock_);
1089  init = false;
1090  startupCv_.notify_one();
1091  }
1092  cvReader_[tid]->wait(lk);
1093 
1094  if (thread_quit_signal[tid]) return;
1095 
1096  InputFile * file;
1097  InputChunk * chunk;
1098 
1099  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1100 
1101  file = workerJob_[tid].first;
1102  chunk = workerJob_[tid].second;
1103 
1104  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1105  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1106 
1107 
1108  if (fileDescriptor>=0)
1109  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1110  else
1111  {
1112  edm::LogError("FedRawDataInputSource") <<
1113  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1114  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1115  setExceptionState_=true;
1116  return;
1117 
1118  }
1119 
1120  unsigned int bufferLeft = 0;
1122  for (unsigned int i=0;i<readBlocks_;i++)
1123  {
1124  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1125  if ( last > 0 )
1126  bufferLeft+=last;
1127  if (last < eventChunkBlock_) {
1128  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1129  break;
1130  }
1131  }
1133  auto diff = end-start;
1134  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1135  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1136  close(fileDescriptor);
1137 
1138  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1140  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1141  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1142 
1143  }
1144 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1260 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1261 {
1262 
1263  std::lock_guard<std::mutex> lock(monlock_);
1264  auto itr = sourceEventsReport_.find(lumi);
1265  if (itr!=sourceEventsReport_.end())
1266  itr->second+=events;
1267  else
1269 }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
tuple events
Definition: patZpeak.py:19
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 862 of file FedRawDataInputSource.cc.

863 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1146 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1147 {
1148  quit_threads_=true;
1149  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1150 
1151 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 119 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 156 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private

Definition at line 83 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private