CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::vector< std::string > const & getBUBaseDirs () const
 
std::vector< int > const & getBUBaseDirsNSources () const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFileName () const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &) const
 
std::string getStreamMergeType (std::string const &, MergeType defaultType) const
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
std::string const & runString () const
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Static Private Member Functions

static bool checkFileRead (char *buf, int infile, std::size_t buf_sz, std::string const &path)
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
std::vector< std::string > bu_base_dirs_all_
 
std::vector< int > bu_base_dirs_nSources_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 61 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 39 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dirs_all_, bu_base_dirs_nSources_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, Exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, mps_fire::i, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
43  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
44  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
45  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
46  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
47  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
48  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
49  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
50  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
51  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
52  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
53  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
54  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
55  hostname_(""),
56  bu_readlock_fd_(-1),
57  bu_writelock_fd_(-1),
61  bu_w_lock_stream(nullptr),
62  bu_r_lock_stream(nullptr),
63  fu_rw_lock_stream(nullptr),
66  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
67  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
68  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
71  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
76 
77  //save hostname for later
78  char hostname[33];
79  gethostname(hostname, 32);
80  hostname_ = hostname;
81 
82  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
83  if (fuLockPollIntervalPtr) {
84  try {
85  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
86  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
87  << " us";
88  } catch (const std::exception&) {
89  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
90  }
91  }
92 
93  //override file service parameter if specified by environment
94  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
95  if (fileBrokerParamPtr) {
96  try {
97  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
98  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
99  } catch (const std::exception&) {
100  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
101  }
102  }
103  if (useFileBroker_) {
105  //find BU data address from hltd configuration
107  struct stat buf;
108  if (stat("/etc/appliance/bus.config", &buf) == 0) {
109  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
110  std::getline(busconfig, fileBrokerHost_);
111  }
112  if (fileBrokerHost_.empty())
113  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
114  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
115  throw cms::Exception("EvFDaqDirector")
116  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
117 
118  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
119  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
120  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
121  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
122  }
123 
124  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
125  if (startFromLSPtr) {
126  try {
127  startFromLS_ = std::stoul(std::string(startFromLSPtr));
128  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
129  } catch (const std::exception&) {
130  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
131  }
132  }
133 
134  //override file service parameter if specified by environment
135  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
136  if (fileBrokerUseLockParamPtr) {
137  try {
138  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
139  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
141  } catch (const std::exception&) {
142  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
143  }
144  }
145 
146  // set number of streams in each BU's ramdisk
147  if (bu_base_dirs_nSources_.empty()) {
148  // default is 1 stream per ramdisk
149  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
150  bu_base_dirs_nSources_.push_back(1);
151  }
152  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
153  throw cms::Exception("DaqDirector")
154  << " Error while setting number of sources: size mismatch with BU base directory vector";
155  } else {
156  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
158  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
159  << " for ramdisk " << bu_base_dirs_all_[i];
160  }
161  }
162 
163  std::stringstream ss;
164  ss << "run" << std::setfill('0') << std::setw(6) << run_;
165  run_string_ = ss.str();
166  ss = std::stringstream();
167  ss << run_;
168  run_nstring_ = ss.str();
169  run_dir_ = base_dir_ + "/" + run_string_;
170  input_throttled_file_ = run_dir_ + "/input_throttle";
171  discard_ls_filestem_ = run_dir_ + "/discard_ls";
172  ss = std::stringstream();
173  ss << getpid();
174  pid_ = ss.str();
175  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::vector< int > bu_base_dirs_nSources_
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 352 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

352  {
353  //close server connection
354  if (socket_.get() && socket_->is_open()) {
355  boost::system::error_code ec;
356  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
357  socket_->close(ec);
358  }
359 
360  if (fulocal_rwlock_fd_ != -1) {
361  unlockFULocal();
362  close(fulocal_rwlock_fd_);
363  }
364 
365  if (fulocal_rwlock_fd2_ != -1) {
366  unlockFULocal2();
367  close(fulocal_rwlock_fd2_);
368  }
369  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 75 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

75 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

References bu_run_dir_.

Referenced by DAQSource::readSupervisor().

76 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References bu_run_open_dir_.

77 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 830 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

836  {
837  if (previousFileSize_ != 0) {
838  if (!fms_) {
840  }
841  if (fms_)
843  previousFileSize_ = 0;
844  }
845  nextFile = "";
846 
847  //reached limit
848  if (maxLS >= 0 && ls > (unsigned int)maxLS)
849  return false;
850 
851  struct stat buf;
852  std::stringstream ss;
853 
854  // 1. Check suggested file
855  std::string nextFileJson = getInputJsonFilePath(ls, index);
856  if (stat(nextFileJson.c_str(), &buf) == 0) {
857  fsize = previousFileSize_ = buf.st_size;
858  nextFile = nextFileJson;
859  return true;
860  }
861  // 2. No file -> lumi ended? (and how many?)
862  else {
863  // 3. No file -> check for standalone raw file
864  std::string nextFileRaw = getRawFilePath(ls, index);
865  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
866  fsize = previousFileSize_ = buf.st_size;
867  nextFile = nextFileRaw;
868  return true;
869  }
870 
871  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
872 
873  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
874  // recheck that no raw file appeared in the meantime
875  if (stat(nextFileJson.c_str(), &buf) == 0) {
876  fsize = previousFileSize_ = buf.st_size;
877  nextFile = nextFileJson;
878  return true;
879  }
880  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
881  fsize = previousFileSize_ = buf.st_size;
882  nextFile = nextFileRaw;
883  return true;
884  }
885 
886  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
887  if (indexFilesInLS < 0)
888  //parsing failed
889  return false;
890  else {
891  //check index
892  if ((int)index < indexFilesInLS) {
893  //we have 2 files, and check for 1 failed... retry (2 will never be here)
894  edm::LogError("EvFDaqDirector")
895  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
896  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
897  setExceptionState = true;
898  return false;
899  }
900  }
901  // this lumi ended, check for files
902  ++ls;
903  index = 0;
904 
905  //reached limit
906  if (maxLS >= 0 && ls > (unsigned int)maxLS)
907  return false;
908 
909  nextFileJson = getInputJsonFilePath(ls, 0);
910  nextFileRaw = getRawFilePath(ls, 0);
911  if (stat(nextFileJson.c_str(), &buf) == 0) {
912  // a new file was found at new lumisection, index 0
913  fsize = previousFileSize_ = buf.st_size;
914  nextFile = nextFileJson;
915  return true;
916  }
917  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
918  fsize = previousFileSize_ = buf.st_size;
919  nextFile = nextFileRaw;
920  return true;
921  }
922  return false;
923  }
924  }
925  // no new file found
926  return false;
927  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkFileRead()

bool evf::EvFDaqDirector::checkFileRead ( char *  buf,
int  infile,
std::size_t  buf_sz,
std::string const &  path 
)
staticprivate

Definition at line 1122 of file EvFDaqDirector.cc.

References visDQMUpload::buf, timingPdfMaker::infile, castor_dqm_sourceclient_file_cfg::path, and fileinputsource_cfi::read.

Referenced by parseFRDFileHeader(), and rawFileHasHeader().

1122  {
1123  ssize_t sz_read = ::read(infile, buf, buf_sz);
1124  if (sz_read < 0) {
1125  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1126  if (infile != -1)
1127  close(infile);
1128  return false;
1129  }
1130  if ((size_t)sz_read < buf_sz) {
1131  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1132  if (infile != -1)
1133  close(infile);
1134  return false;
1135  }
1136  return true;
1137  }
Log< level::Error, false > LogError

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1542 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1549  {
1550  EvFDaqDirector::FileStatus fileStatus = noFile;
1551  serverError = false;
1552 
1553  boost::system::error_code ec;
1554  try {
1555  while (true) {
1556  //socket connect
1557  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1559 
1560  if (ec) {
1561  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1562  serverError = true;
1563  break;
1564  }
1565  }
1566 
1567  boost::asio::streambuf request;
1568  std::ostream request_stream(&request);
1569  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1570  if (maxLS >= 0) {
1571  std::stringstream spath;
1572  spath << path << "&stopls=" << maxLS;
1573  path = spath.str();
1574  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1575  }
1576  request_stream << "GET " << path << " HTTP/1.1\r\n";
1577  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1578  request_stream << "Accept: */*\r\n";
1579  request_stream << "Connection: keep-alive\r\n\r\n";
1580 
1581  boost::asio::write(*socket_, request, ec);
1582  if (ec) {
1583  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1584  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1585  //we got disconnected, try to reconnect to the server before writing the request
1587  if (ec) {
1588  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1589  serverError = true;
1590  break;
1591  }
1592  continue;
1593  }
1594  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1595  serverError = true;
1596  break;
1597  }
1598 
1599  boost::asio::streambuf response;
1600  boost::asio::read_until(*socket_, response, "\r\n", ec);
1601  if (ec) {
1602  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1603  serverError = true;
1604  break;
1605  }
1606 
1607  std::istream response_stream(&response);
1608 
1609  std::string http_version;
1610  response_stream >> http_version;
1611 
1612  response_stream >> serverHttpStatus;
1613 
1614  std::string status_message;
1615  std::getline(response_stream, status_message);
1616  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1617  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1618  serverError = true;
1619  break;
1620  }
1621  if (serverHttpStatus != 200) {
1622  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1623  serverError = true;
1624  break;
1625  }
1626 
1627  // Process the response headers.
1629  while (std::getline(response_stream, header) && header != "\r") {
1630  }
1631 
1632  std::string fileInfo;
1633  std::map<std::string, std::string> serverMap;
1634  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1635  auto pos = fileInfo.find('=');
1636  if (pos == std::string::npos)
1637  continue;
1638  auto stitle = fileInfo.substr(0, pos);
1639  auto svalue = fileInfo.substr(pos + 1);
1640  serverMap[stitle] = svalue;
1641  }
1642 
1643  //check that response run number if correct
1644  auto server_version = serverMap.find("version");
1645  assert(server_version != serverMap.end());
1646 
1647  auto server_run = serverMap.find("runnumber");
1648  assert(server_run != serverMap.end());
1649  assert(run_nstring_ == server_run->second);
1650 
1651  auto server_state = serverMap.find("state");
1652  assert(server_state != serverMap.end());
1653 
1654  auto server_eols = serverMap.find("lasteols");
1655  assert(server_eols != serverMap.end());
1656 
1657  auto server_ls = serverMap.find("lumisection");
1658 
1659  int version_maj = 1;
1660  int version_min = 0;
1661  int version_rev = 0;
1662  {
1663  auto* s_ptr = server_version->second.c_str();
1664  if (!server_version->second.empty() && server_version->second[0] == '"')
1665  s_ptr++;
1666  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1667  if (res < 3) {
1668  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1669  if (res < 2) {
1670  res = sscanf(s_ptr, "%d", &version_maj);
1671  if (res < 1) {
1672  //expecting at least 1 number (major version)
1673  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1674  }
1675  }
1676  }
1677  }
1678 
1679  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1680  if (server_ls != serverMap.end())
1681  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1682  else
1683  serverLS = closedServerLS + 1;
1684 
1685  std::string s_state = server_state->second;
1686  if (s_state == "STARTING") //initial, always empty starting with LS 1
1687  {
1688  auto server_file = serverMap.find("file");
1689  assert(server_file == serverMap.end()); //no file with starting state
1690  fileStatus = noFile;
1691  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1692  } else if (s_state == "READY") {
1693  auto server_file = serverMap.find("file");
1694  if (server_file == serverMap.end()) {
1695  //can be returned by server if files from new LS already appeared but LS is not yet closed
1696  if (serverLS <= closedServerLS)
1697  serverLS = closedServerLS + 1;
1698  fileStatus = noFile;
1699  edm::LogInfo("EvFDaqDirector")
1700  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1701  } else {
1702  std::string filestem;
1703  std::string fileprefix;
1704  auto server_fileprefix = serverMap.find("fileprefix");
1705 
1706  if (server_fileprefix != serverMap.end()) {
1707  auto pssize = server_fileprefix->second.size();
1708  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1709  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1710  else
1711  fileprefix = server_fileprefix->second;
1712  }
1713 
1714  //remove string literals
1715  auto ssize = server_file->second.size();
1716  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1717  filestem = server_file->second.substr(1, ssize - 2);
1718  else
1719  filestem = server_file->second;
1720  assert(!filestem.empty());
1721  if (version_maj > 1) {
1722  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1723  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1724  nextFileJson = "";
1725  rawHeader = true;
1726  } else {
1727  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1728  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1729  nextFileJson = filestem + ".jsn";
1730  rawHeader = false;
1731  }
1732  fileStatus = newFile;
1733  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1734  << serverLS << " file:" << filestem;
1735  }
1736  } else if (s_state == "EOLS") {
1737  serverLS = closedServerLS + 1;
1738  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1739  fileStatus = noFile;
1740  } else if (s_state == "EOR") {
1741  //server_eor = serverMap.find("iseor");
1742  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1743  fileStatus = runEnded;
1744  } else if (s_state == "NORUN") {
1745  auto err_msg = serverMap.find("errormessage");
1746  if (err_msg != serverMap.end())
1747  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1748  else
1749  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1750  edm::LogWarning("EvFDaqDirector") << "executing run end";
1751  fileStatus = runEnded;
1752  } else if (s_state == "ERROR") {
1753  auto err_msg = serverMap.find("errormessage");
1754  if (err_msg != serverMap.end())
1755  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1756  else
1757  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1758  fileStatus = noFile;
1759  serverError = true;
1760  } else {
1761  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1762  fileStatus = noFile;
1763  serverError = true;
1764  }
1765 
1766  // Read until EOF, writing data to output as we go.
1767  if (!fileBrokerKeepAlive_) {
1768  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1769  }
1770  if (ec != boost::asio::error::eof) {
1771  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1772  serverError = true;
1773  }
1774  }
1775 
1776  break;
1777  }
1778 
1779  } catch (std::exception const& e) {
1780  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1781  serverError = true;
1782  }
1783 
1784  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1785  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1786  if (ec) {
1787  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1788  }
1789  socket_->close(ec);
1790  if (ec) {
1791  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1792  }
1793  }
1794 
1795  if (serverError) {
1796  if (socket_->is_open())
1797  socket_->close(ec);
1798  if (ec) {
1799  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1800  }
1801  fileStatus = noFile;
1802  sleep(1); //back-off if error detected
1803  }
1804 
1805  return fileStatus;
1806  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 982 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

982  {
983  //used for backpressure mechanisms and monitoring
984  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
985  struct stat buf;
986  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
987  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
988  close(bol_fd);
989  }
990  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 992 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

995  {
996  if (currentLumiSection > 0) {
997  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
998  struct stat buf;
999  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1000  if (!found) {
1001  if (doCreateEoLS) {
1002  int eol_fd =
1003  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1004  close(eol_fd);
1005  }
1006  if (doCreateBoLS)
1007  createBoLSFile(lumiSection, false);
1008  }
1009  } else if (doCreateBoLS) {
1010  createBoLSFile(lumiSection, true); //needed for initial lumisection
1011  }
1012  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 1997 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

1997  {
1998  std::string proc_flag = run_dir_ + "/processing";
1999  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2000  close(proc_flag_fd);
2001  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1958 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1958  {
1959  // create open dir if not already there
1960 
1962  if (!std::filesystem::is_directory(openPath)) {
1963  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1964  std::filesystem::create_directories(openPath);
1965  }
1966  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 379 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

379  {
381  desc.setComment(
382  "Service used for file locking arbitration and for propagating information between other EvF components");
383  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
384  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
385  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
386  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
387  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
388  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
389  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
390  desc.addUntracked<bool>("useFileBroker", false)
391  ->setComment("Use BU file service to grab input data instead of NFS file locking");
392  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
393  ->setComment("Allow service to discover BU address from hltd configuration");
394  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
395  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
396  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
397  ->setComment("Use keep alive to avoid using large number of sockets");
398  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
399  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
400  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
401  ->setComment("Lock polling interval in microseconds for the input directory file lock");
402  desc.addUntracked<bool>("outputAdler32Recheck", false)
403  ->setComment("Check Adler32 of per-process output files while micro-merging");
404  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
405  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
406  desc.addUntracked<std::string>("mergingPset", "")
407  ->setComment("Name of merging PSet to look for merging type definitions for streams");
408  descriptions.add("EvFDaqDirector", desc);
409  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 80 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

80 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 535 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

535  {
536  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
537  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getBUBaseDirs()

std::vector<std::string> const& evf::EvFDaqDirector::getBUBaseDirs ( ) const
inline

Definition at line 194 of file EvFDaqDirector.h.

References bu_base_dirs_all_.

Referenced by DAQSource::DAQSource().

194 { return bu_base_dirs_all_; }
std::vector< std::string > bu_base_dirs_all_

◆ getBUBaseDirsNSources()

std::vector<int> const& evf::EvFDaqDirector::getBUBaseDirsNSources ( ) const
inline

Definition at line 195 of file EvFDaqDirector.h.

References bu_base_dirs_nSources_.

Referenced by DAQSource::DAQSource().

195 { return bu_base_dirs_nSources_; }
std::vector< int > bu_base_dirs_nSources_

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 464 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

464  {
466  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 527 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

527  {
528  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
529  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 531 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

531  {
532  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
533  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFileName()

std::string evf::EvFDaqDirector::getEoRFileName ( ) const

Definition at line 541 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), and run_.

Referenced by DAQSource::readSupervisor().

541 { return fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 539 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

539 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 543 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

543 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 545 of file EvFDaqDirector.cc.

References bu_run_dir_.

545 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 492 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

492  {
494  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 496 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

496  {
498  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 448 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

448  {
450  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1982 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1982  {
1983  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1985  struct stat buf;
1986  unsigned int lscount = 1;
1987  do {
1988  std::stringstream ss;
1989  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1990  fullpath = ss.str();
1991  lscount++;
1992  } while (stat(fullpath.c_str(), &buf) == 0);
1993  return lscount - 1;
1994  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 484 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

484  {
486  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 480 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

480  {
482  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 510 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

511  {
513  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 523 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

523  {
525  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs,
bool  requireHeader = true 
)

Definition at line 1808 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1816  {
1817  EvFDaqDirector::FileStatus fileStatus = noFile;
1818 
1819  //int retval = -1;
1820  //int lock_attempts = 0;
1821  //long total_lock_attempts = 0;
1822 
1823  struct stat buf;
1824  int stopFileLS = -1;
1825  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1826  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1827  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1828  if (stopFileCheck == 0)
1829  stopFileLS = readLastLSEntry(stopFilePath_);
1830  else
1831  stopFileLS = 1; //stop without drain if only pid is stopped
1832  if (!stop_ls_override_) {
1833  //if lumisection is higher than in stop file, should quit at next from current
1834  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1835  stopFileLS = stop_ls_override_ = ls;
1836  } else
1837  stopFileLS = stop_ls_override_;
1838  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1839  << stopFileLS;
1840  //return runEnded;
1841  } else //if file was removed before reaching stop condition, reset this
1842  stop_ls_override_ = 0;
1843 
1844  /* look for EoLS
1845  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1846  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1847  ls++;
1848  return noFile;
1849  }
1850  */
1851 
1852  timeval ts_lockbegin;
1853  gettimeofday(&ts_lockbegin, nullptr);
1854 
1855  std::string nextFileJson;
1856  uint32_t serverLS, closedServerLS;
1857  unsigned int serverHttpStatus;
1858  bool serverError;
1859 
1860  //local lock to force index json and EoLS files to appear in order
1862  lockFULocal();
1863 
1864  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1865  bool rawHeader = false;
1866  fileStatus = contactFileBroker(
1867  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1868 
1869  if (serverError) {
1870  //do not update anything
1872  unlockFULocal();
1873  return noFile;
1874  }
1875 
1876  //handle creation of BoLS files if lumisection has changed
1877  if (currentLumiSection == 0) {
1878  if (fileStatus == runEnded)
1879  createLumiSectionFiles(closedServerLS, 0, true, false);
1880  else
1881  createLumiSectionFiles(serverLS, 0, true, false);
1882  } else {
1883  if (closedServerLS >= currentLumiSection) {
1884  //only BoLS files
1885  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1886  createLumiSectionFiles(i + 1, i, true, false);
1887  }
1888  }
1889 
1890  bool fileFound = true;
1891 
1892  if (fileStatus == newFile) {
1893  if (rawHeader > 0)
1894  serverEventsInNewFile = grabNextJsonFromRaw(
1895  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1896  else
1897  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1898  }
1899  //closing file in case of any error
1900  if (serverEventsInNewFile < 0 && rawFd != -1) {
1901  close(rawFd);
1902  rawFd = -1;
1903  }
1904 
1905  //can unlock because all files have been created locally
1907  unlockFULocal();
1908 
1909  if (!fileFound) {
1910  //catch condition where directory got deleted
1911  fileStatus = noFile;
1912  struct stat buf;
1913  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1914  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1915  fileStatus = runEnded;
1916  }
1917  }
1918 
1919  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1920  //so that EoLS files can not appear locally before index files
1921  if (currentLumiSection == 0) {
1922  lockFULocal2();
1923  if (fileStatus == runEnded) {
1924  createLumiSectionFiles(closedServerLS, 0, false, true);
1925  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1926  } else {
1927  createLumiSectionFiles(serverLS, 0, false, true);
1928  }
1929  unlockFULocal2();
1930  } else {
1931  if (closedServerLS >= currentLumiSection) {
1932  //lock exclusive to create EoLS files
1933  lockFULocal2();
1934  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1935  createLumiSectionFiles(i + 1, i, false, true);
1936  unlockFULocal2();
1937  }
1938  }
1939 
1940  if (fileStatus == runEnded)
1941  ls = std::max(currentLumiSection, serverLS);
1942  else if (fileStatus == newFile) {
1943  assert(serverLS >= ls);
1944  ls = serverLS;
1945  } else if (fileStatus == noFile) {
1946  if (serverLS >= ls)
1947  ls = serverLS;
1948  else {
1949  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1950  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1951  sleep(1);
1952  }
1953  }
1954 
1955  return fileStatus;
1956  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 769 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

769  {
770  std::ifstream ij(BUEoLSFile);
771  Json::Value deserializeRoot;
773 
774  if (!reader.parse(ij, deserializeRoot)) {
775  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
776  return -1;
777  }
778 
780  DataPoint dp;
781  dp.deserialize(deserializeRoot);
782 
783  //read definition
784  if (readEolsDefinition_) {
785  //std::string def = boost::algorithm::trim(dp.getDefinition());
786  std::string def = dp.getDefinition();
787  if (def.empty())
788  readEolsDefinition_ = false;
789  while (!def.empty()) {
791  if (def.find('/') == 0)
792  fullpath = def;
793  else
794  fullpath = bu_run_dir_ + '/' + def;
795  struct stat buf;
796  if (stat(fullpath.c_str(), &buf) == 0) {
797  DataPointDefinition eolsDpd;
798  std::string defLabel = "legend";
799  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
800  if (eolsDpd.getNames().empty()) {
801  //try with "data" label if "legend" format is not used
802  eolsDpd = DataPointDefinition();
803  defLabel = "data";
804  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
805  }
806  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
807  if (eolsDpd.getNames().at(i) == "NFiles")
809  readEolsDefinition_ = false;
810  break;
811  }
812  //check if we can still find definition
813  if (def.size() <= 1 || def.find('/') == std::string::npos) {
814  readEolsDefinition_ = false;
815  break;
816  }
817  def = def.substr(def.find('/') + 1);
818  }
819  }
820 
821  if (dp.getData().size() > eolsNFilesIndex_)
822  data = dp.getData()[eolsNFilesIndex_];
823  else {
824  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
825  return -1;
826  }
827  return std::stoi(data);
828  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 468 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

468  {
470  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 488 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

488  {
489  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
490  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 460 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

460  {
461  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
462  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 472 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

472  {
474  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 500 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

501  {
503  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 456 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

456  {
457  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
458  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 515 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

515  {
517  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 476 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

476  {
478  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 505 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

506  {
508  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 452 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

452  {
454  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 519 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

519  {
521  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

References run_.

118 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

107 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 180 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

180 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  ) const
inline

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  ,
MergeType  defaultType 
) const
inline

Definition at line 188 of file EvFDaqDirector.h.

References MergeTypeNames_.

188  {
189  return MergeTypeNames_[defaultType];
190  }
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1267 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1270  {
1271  fileFound = true;
1272 
1273  //should be ported to use fffnaming
1274  std::ostringstream fileNameWithPID;
1275  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1276  << std::setw(5) << pid_ << ".jsn";
1277 
1278  // assemble json destination path
1279  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1280 
1281  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1282 
1283  int infile = -1, outfile = -1;
1284 
1285  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1286  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1287  << strerror(errno);
1288  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1289  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1290  << jsonSourcePath << " : " << strerror(errno);
1291  if (errno == ENOENT)
1292  fileFound = false;
1293  return -1;
1294  }
1295  }
1296 
1297  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1298  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1299  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1300  if (errno == EEXIST) {
1301  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1302  << " : ";
1303  ::close(infile);
1304  return -1;
1305  }
1306  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1307  << strerror(errno);
1308  struct stat out_stat;
1309  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1310  edm::LogWarning("EvFDaqDirector")
1311  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1312  if (unlink(jsonDestPath.c_str()) == -1) {
1313  edm::LogWarning("EvFDaqDirector")
1314  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1315  }
1316  }
1317  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1318  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1319  << jsonDestPath << " : " << strerror(errno);
1320  ::close(infile);
1321  return -1;
1322  }
1323  }
1324  //copy contents
1325  const std::size_t buf_sz = 512;
1326  std::size_t tot_written = 0;
1327  std::unique_ptr<char[]> buf(new char[buf_sz]);
1328 
1329  ssize_t sz, sz_read = 1, sz_write;
1330  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1331  sz_write = 0;
1332  do {
1333  assert(sz_read - sz_write > 0);
1334  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1335  sz_read = sz; // cause read loop termination
1336  break;
1337  }
1338  assert(sz > 0);
1339  sz_write += sz;
1340  tot_written += sz;
1341  } while (sz_write < sz_read);
1342  }
1343  close(infile);
1344  close(outfile);
1345 
1346  if (tot_written > 0) {
1347  //leave file if it was empty for diagnosis
1348  if (unlink(jsonSourcePath.c_str()) == -1) {
1349  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1350  << strerror(errno);
1351  return -1;
1352  }
1353  } else {
1354  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1355  << jsonSourcePath;
1356  return -1;
1357  }
1358 
1359  Json::Value deserializeRoot;
1361 
1362  std::string data;
1363  std::stringstream ss;
1364  bool result;
1365  try {
1366  if (tot_written <= buf_sz) {
1367  result = reader.parse(buf.get(), deserializeRoot);
1368  } else {
1369  //json will normally not be bigger than buf_sz bytes
1370  try {
1371  std::ifstream ij(jsonDestPath);
1372  ss << ij.rdbuf();
1373  } catch (std::filesystem::filesystem_error const& ex) {
1374  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1375  return -1;
1376  }
1377  result = reader.parse(ss.str(), deserializeRoot);
1378  }
1379  if (!result) {
1380  if (tot_written <= buf_sz)
1381  ss << buf.get();
1382  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1383  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1384  << ss.str() << ".";
1385  return -1;
1386  }
1387 
1388  //read BU JSON
1389  DataPoint dp;
1390  dp.deserialize(deserializeRoot);
1391  bool success = false;
1392  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1393  if (dpd_->getNames().at(i) == "NEvents")
1394  if (i < dp.getData().size()) {
1395  data = dp.getData()[i];
1396  success = true;
1397  break;
1398  }
1399  }
1400  if (!success) {
1401  if (!dp.getData().empty())
1402  data = dp.getData()[0];
1403  else {
1404  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1405  << "grabNextJsonFile - "
1406  << " error reading number of events from BU JSON; No input value. data -: " << data;
1407  return -1;
1408  }
1409  }
1410 
1411  //try to read raw file size
1412  fileSizeFromJson = -1;
1413  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1414  if (dpd_->getNames().at(i) == "NBytes") {
1415  if (i < dp.getData().size()) {
1416  std::string dataSize = dp.getData()[i];
1417  try {
1418  fileSizeFromJson = std::stol(dataSize);
1419  } catch (const std::exception&) {
1420  //non-fatal currently, processing can continue without this value
1421  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1422  << "Input value is -: " << dataSize;
1423  }
1424  break;
1425  }
1426  }
1427  }
1428  return std::stoi(data);
1429  } catch (const std::out_of_range& e) {
1430  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1431  << "Input value is -: " << data;
1432  } catch (const std::invalid_argument& e) {
1433  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1434  << "Input value is -: " << data;
1435  } catch (std::runtime_error const& e) {
1436  //Can be thrown by Json parser
1437  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1438  }
1439 
1440  catch (std::exception const& e) {
1441  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1442  } catch (...) {
1443  //unknown exception
1444  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1445  }
1446 
1447  return -1;
1448  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1450 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1450  {
1451  std::string data;
1452  try {
1453  // assemble json destination path
1454  std::filesystem::path jsonDestPath(baseRunDir());
1455 
1456  //should be ported to use fffnaming
1457  std::ostringstream fileNameWithPID;
1458  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1459  << ".jsn";
1460  jsonDestPath /= fileNameWithPID.str();
1461 
1462  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1463  try {
1464  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1465  } catch (std::filesystem::filesystem_error const& ex) {
1466  // Input dir gone?
1467  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1468  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1469  sleep(1);
1470  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1471  }
1472  unlockFULocal();
1473 
1474  try {
1475  //sometimes this fails but file gets deleted
1476  std::filesystem::remove(jsonSourcePath);
1477  } catch (std::filesystem::filesystem_error const& ex) {
1478  // Input dir gone?
1479  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1480  } catch (std::exception const& ex) {
1481  // Input dir gone?
1482  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1483  }
1484 
1485  std::ifstream ij(jsonDestPath);
1486  Json::Value deserializeRoot;
1488 
1489  std::stringstream ss;
1490  ss << ij.rdbuf();
1491  if (!reader.parse(ss.str(), deserializeRoot)) {
1492  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1493  << "\nERROR:\n"
1494  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1495  << ss.str() << ".";
1496  throw std::runtime_error("Cannot deserialize input JSON file");
1497  }
1498 
1499  //read BU JSON
1500  std::string data;
1501  DataPoint dp;
1502  dp.deserialize(deserializeRoot);
1503  bool success = false;
1504  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1505  if (dpd_->getNames().at(i) == "NEvents")
1506  if (i < dp.getData().size()) {
1507  data = dp.getData()[i];
1508  success = true;
1509  }
1510  }
1511  if (!success) {
1512  if (!dp.getData().empty())
1513  data = dp.getData()[0];
1514  else
1515  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1516  << " error reading number of events from BU JSON -: No input value " << data;
1517  }
1518  return std::stoi(data);
1519  } catch (std::filesystem::filesystem_error const& ex) {
1520  // Input dir gone?
1521  unlockFULocal();
1522  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1523  } catch (std::runtime_error const& e) {
1524  // Another process grabbed the file and NFS did not register this
1525  unlockFULocal();
1526  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1527  } catch (const std::out_of_range&) {
1528  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1529  << "Input value is -: " << data;
1530  } catch (const std::invalid_argument&) {
1531  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1532  << "Input value is -: " << data;
1533  } catch (std::exception const& e) {
1534  // BU run directory disappeared?
1535  unlockFULocal();
1536  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1537  }
1538 
1539  return -1;
1540  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile,
bool  requireHeader = true 
)

Definition at line 1175 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1182  {
1183  fileFound = true;
1184 
1185  //take only first three tokens delimited by "_" in the renamed raw file name
1186  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1187  size_t pos = 0, n_tokens = 0;
1188  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1189  }
1190  std::string reducedJsonStem = jsonStem.substr(0, pos);
1191 
1192  std::ostringstream fileNameWithPID;
1193  //should be ported to use fffnaming
1194  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1195 
1196  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1197 
1198  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1199 
1200  //parse RAW file header if it exists
1201  uint32_t lsFromRaw;
1202  int32_t nbEventsWrittenRaw;
1203  int64_t fileSizeFromRaw;
1204  uint16_t rawDataType;
1205  auto ret = parseFRDFileHeader(rawSourcePath,
1206  rawFd,
1207  rawHeaderSize,
1208  rawDataType,
1209  lsFromRaw,
1210  nbEventsWrittenRaw,
1211  fileSizeFromRaw,
1212  requireHeader,
1213  true,
1214  closeFile);
1215  if (ret != 0) {
1216  if (ret == 1)
1217  fileFound = false;
1218  return -1;
1219  }
1220 
1221  int outfile;
1222  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1223  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1224  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1225  if (errno == EEXIST) {
1226  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1227  << " : ";
1228  return -1;
1229  }
1230  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1231  << strerror(errno);
1232  struct stat out_stat;
1233  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1234  edm::LogWarning("EvFDaqDirector")
1235  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1236  << jsonDestPath;
1237  if (unlink(jsonDestPath.c_str()) == -1) {
1238  edm::LogWarning("EvFDaqDirector")
1239  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1240  }
1241  }
1242  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1243  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1244  << jsonDestPath << " : " << strerror(errno);
1245  return -1;
1246  }
1247  }
1248  //write JSON file (TODO: use jsoncpp)
1249  std::stringstream ss;
1250  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1251  std::string sstr = ss.str();
1252 
1253  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1254  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1255  << " : " << strerror(errno);
1256  return -1;
1257  }
1258  close(outfile);
1259  if (serverLS && serverLS != lsFromRaw)
1260  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1261  << " and raw file header LS " << lsFromRaw;
1262 
1263  fileSizeFromHeader = fileSizeFromRaw;
1264  return nbEventsWrittenRaw;
1265  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 177 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_base_dirs_all_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, mps_fire::i, init_lock_, svgfig::load(), eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm::shutdown_flag, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, tryInitializeFuLockFile(), and useFileBroker_.

Referenced by preallocate().

177  {
178  // check if base dir exists or create it accordingly
179  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
180  if (retval != 0 && errno != EEXIST) {
181  throw cms::Exception("DaqDirector")
182  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
183  }
184 
185  //create run dir in base dir
186  umask(0);
187  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
188  if (retval != 0 && errno != EEXIST) {
189  throw cms::Exception("DaqDirector")
190  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
191  }
192 
193  //create fu-local.lock in run open dir
194  if (!directorBU_) {
196  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
198  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
199  if (fulocal_rwlock_fd_ == -1)
200  throw cms::Exception("DaqDirector")
201  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
202  chmod(fulocal_lock_.c_str(), 0777);
203  fsync(fulocal_rwlock_fd_);
204  //open second fd for another input source thread
206  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
207  if (fulocal_rwlock_fd2_ == -1)
208  throw cms::Exception("DaqDirector")
209  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
210  }
211 
212  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
213  //for BU, it is created at this point
214  if (directorBU_) {
216  std::string bulockfile = bu_run_dir_ + "/bu.lock";
217  fulockfile_ = bu_run_dir_ + "/fu.lock";
218 
219  //make or find bu run dir
220  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
221  if (retval != 0 && errno != EEXIST) {
222  throw cms::Exception("DaqDirector")
223  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
224  }
225  bu_run_open_dir_ = bu_run_dir_ + "/open";
226  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
227  if (retval != 0 && errno != EEXIST) {
228  throw cms::Exception("DaqDirector")
229  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
230  }
231 
232  // the BU director does not need to know about the fu lock
233  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
234  if (bu_writelock_fd_ == -1)
235  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
236  else
237  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
238  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
239  if (bu_w_lock_stream == nullptr)
240  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
241 
242  // BU INITIALIZES LOCK FILE
243  // FU LOCK FILE OPEN
244  openFULockfileStream(true);
246  fflush(fu_rw_lock_stream);
247  close(fu_readwritelock_fd_);
248 
249  if (!hltSourceDirectory_.empty()) {
250  struct stat buf;
251  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
252  std::string hltdir = bu_run_dir_ + "/hlt";
253  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
254  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
255  if (retval != 0 && errno != EEXIST)
256  throw cms::Exception("DaqDirector")
257  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
258 
259  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
260  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
261 
262  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
263  for (auto& optfile : optfiles) {
264  try {
265  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
266  } catch (...) {
267  }
268  }
269 
270  std::filesystem::rename(tmphltdir, hltdir);
271  } else
272  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
273  }
274  //else{}//no configuration specified
275  } else {
276  // for FU, check if bu base dir exists
277 
278  auto checkExists = [=](std::string const& bu_base_dir) -> void {
279  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
280  if (retval != 0 && errno != EEXIST) {
281  throw cms::Exception("DaqDirector")
282  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
283  }
284  };
285 
286  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
287  int cnt = 0;
288  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
289  //stat should trigger autofs mount (mkdir could fail with access denied first time)
290  struct stat statbuf;
291  stat(bu_base_dir.c_str(), &statbuf);
292  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
293  if (retval != 0 && errno != EEXIST) {
294  usleep(500000);
295  cnt++;
296  if (cnt % 20 == 0)
297  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
298  if (cnt > 120)
299  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
300  << " mkdir error:" << strerror(errno);
301  continue;
302  }
303  break;
304  }
305  };
306 
307  if (!bu_base_dirs_all_.empty()) {
308  std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
309  checkExists(check_dir);
310  bu_run_dir_ = check_dir + "/" + run_string_;
311  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
312  waitForDir(bu_base_dirs_all_[i]);
313  } else {
314  checkExists(bu_base_dir_);
316  }
317 
318  fulockfile_ = bu_run_dir_ + "/fu.lock";
319  if (!useFileBroker_)
320  openFULockfileStream(false);
321  }
322 
323  pthread_mutex_init(&init_lock_, nullptr);
324 
325  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
326  std::stringstream sstp;
327  sstp << stopFilePath_ << "_pid" << pid_;
328  stopFilePathPid_ = sstp.str();
329 
330  if (!directorBU_) {
331  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
332  struct stat statbuf;
333  if (!stat(defPath.c_str(), &statbuf))
334  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
335  else {
336  //look in source directory if not present in ramdisk
337  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
338  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
339  if (stat(defPath.c_str(), &statbuf)) {
340  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
341  if (stat(defPath.c_str(), &statbuf)) {
342  defPath = defPathSuffix;
343  }
344  }
345  }
346  dpd_ = new DataPointDefinition();
347  std::string defLabel = "data";
348  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
349  }
350  }
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
def load(fileName)
Definition: svgfig.py:547
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by DAQSource::getNextDataBlock(), and FedRawDataInputSource::getNextEvent().

122 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 962 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

962  {
963  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
964  flock(fulocal_rwlock_fd_, LOCK_SH);
965  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 972 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

972  {
973  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
974  flock(fulocal_rwlock_fd2_, LOCK_EX);
975  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 958 of file EvFDaqDirector.cc.

References init_lock_.

958 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2016 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and to_string().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

2016  {
2017  struct stat buf;
2018  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2019  }
static std::string to_string(const XMLCh *ch)
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2003 of file EvFDaqDirector.cc.

References command_line::start.

2003  {
2004 #ifdef __APPLE__
2005  return {start, len, pid, type, whence};
2006 #else
2007  return {type, whence, start, len, pid};
2008 #endif
2009  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 123 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

123 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 939 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

939  {
940  if (create) {
942  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
943  chmod(fulockfile_.c_str(), 0766);
944  } else {
945  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
946  }
947  if (fu_readwritelock_fd_ == -1)
948  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
949  << " create:" << create << " error:" << strerror(errno);
950  else
951  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
952 
953  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
954  if (fu_rw_lock_stream == nullptr)
955  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
956  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 108 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

108 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 73 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

Referenced by DAQSource::DAQSource().

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint16_t &  rawDataType,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 1014 of file EvFDaqDirector.cc.

References checkFileRead(), FRDFileHeaderContent_v2::dataType_, FRDFileHeaderContent_v1::eventCount_, FRDFileHeaderContent_v2::eventCount_, FRDFileHeaderContent_v1::fileSize_, FRDFileHeaderContent_v2::fileSize_, getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, FRDFileHeaderContent_v1::lumiSection_, FRDFileHeaderContent_v2::lumiSection_, and FRDFileHeaderIdentifier::version_.

Referenced by grabNextJsonFromRaw(), FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1023  {
1024  int infile;
1025 
1026  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1027  if (retry) {
1028  edm::LogWarning("EvFDaqDirector")
1029  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1030  return parseFRDFileHeader(rawSourcePath,
1031  rawFd,
1032  rawHeaderSize,
1033  rawDataType,
1034  lsFromHeader,
1035  eventsFromHeader,
1036  fileSizeFromHeader,
1037  requireHeader,
1038  false,
1039  closeFile);
1040  } else {
1041  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1042  edm::LogError("EvFDaqDirector")
1043  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1044  if (errno == ENOENT)
1045  return 1; // error && file not found
1046  else
1047  return -1;
1048  }
1049  }
1050  }
1051 
1052  //v2 is the largest possible read
1053  char hdr[sizeof(FRDFileHeader_v2)];
1054  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1055  return -1;
1056 
1058  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1059 
1060  if (frd_version == 0) {
1061  //no header (specific sequence not detected)
1062  if (requireHeader) {
1063  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1064  close(infile);
1065  return -1;
1066  } else {
1067  //no header, but valid file
1068  lseek(infile, 0, SEEK_SET);
1069  rawHeaderSize = 0;
1070  lsFromHeader = 0;
1071  eventsFromHeader = -1;
1072  fileSizeFromHeader = -1;
1073  }
1074  } else if (frd_version == 1) {
1075  //version 1 header
1076  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1077  return -1;
1079  uint32_t headerSizeRaw = fhContent->headerSize_;
1080  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1081  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1082  << " v:" << frd_version;
1083  close(infile);
1084  return -1;
1085  }
1086  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1087  rawDataType = 0;
1088  lsFromHeader = fhContent->lumiSection_;
1089  eventsFromHeader = (int32_t)fhContent->eventCount_;
1090  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1091  rawHeaderSize = fhContent->headerSize_;
1092 
1093  } else if (frd_version == 2) {
1094  //version 2 heade
1095  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1096  return -1;
1098  uint32_t headerSizeRaw = fhContent->headerSize_;
1099  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1100  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1101  << " v:" << frd_version;
1102  close(infile);
1103  return -1;
1104  }
1105  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1106  rawDataType = fhContent->dataType_;
1107  lsFromHeader = fhContent->lumiSection_;
1108  eventsFromHeader = (int32_t)fhContent->eventCount_;
1109  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1110  rawHeaderSize = fhContent->headerSize_;
1111  }
1112 
1113  if (closeFile) {
1114  close(infile);
1115  infile = -1;
1116  }
1117 
1118  rawFd = infile;
1119  return 0; //OK
1120  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 371 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

371  {
372  initRun();
373 
374  nThreads_ = bounds.maxNumberOfStreams();
375  nStreams_ = bounds.maxNumberOfThreads();
376  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
377  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 411 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

411  {
412  //assert(run_ == id.run());
413 
414  // check if the requested run is the latest one - issue a warning if it isn't
416  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
417  << ". This is not the highest run " << dirManager_.findHighestRunDir();
418  }
419  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 430 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

430  {
431  //delete all files belonging to just closed lumi
432  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
434  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
435  return;
436  }
437 
438  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
439  auto it = filesToDeletePtr_->begin();
440  while (it != filesToDeletePtr_->end()) {
441  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
442  it = filesToDeletePtr_->erase(it);
443  } else
444  it++;
445  }
446  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1139 of file EvFDaqDirector.cc.

References checkFileRead(), getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, and FRDFileHeaderIdentifier::version_.

Referenced by bumpFile().

1139  {
1140  int infile;
1141  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1142  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1143  << strerror(errno);
1144  return false;
1145  }
1146  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1147  char hdr[sizeof(FRDFileHeader_v2)];
1148  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1149  return false;
1151  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1152 
1153  if (frd_version == 1) {
1154  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1155  return false;
1157  rawHeaderSize = fhContent->headerSize_;
1158  close(infile);
1159  return true;
1160  } else if (frd_version == 2) {
1161  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1162  return false;
1164  rawHeaderSize = fhContent->headerSize_;
1165  close(infile);
1166  return true;
1167  } else
1168  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1169 
1170  close(infile);
1171  rawHeaderSize = 0;
1172  return false;
1173  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1968 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1968  {
1969  std::ifstream ij(file);
1970  Json::Value deserializeRoot;
1972 
1973  if (!reader.parse(ij, deserializeRoot)) {
1974  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1975  return -1;
1976  }
1977 
1978  int ret = deserializeRoot.get("lastLS", "").asInt();
1979  return ret;
1980  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile()

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 547 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

Referenced by postEndRun().

547  {
548  int retval = remove(filename.c_str());
549  if (retval != 0)
550  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
551  << ". error = " << strerror(errno);
552  }
Log< level::Error, false > LogError

◆ runString()

std::string const& evf::EvFDaqDirector::runString ( ) const
inline

Definition at line 74 of file EvFDaqDirector.h.

References run_string_.

Referenced by DAQSource::DAQSource().

74 { return run_string_; }
std::string run_string_

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 181 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

182  {
183  fileDeleteLockPtr_ = fileDeleteLock;
184  filesToDeletePtr_ = filesToDelete;
185  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

References fms_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

121 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 929 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

929  {
930  if (fu_rw_lock_stream == nullptr)
931  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
932  else {
933  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
934  unsigned int readLs = 1, readIndex = 0;
935  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
936  }
937  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 967 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

967  {
968  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
969  flock(fulocal_rwlock_fd_, LOCK_UN);
970  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 977 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

977  {
978  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
979  flock(fulocal_rwlock_fd2_, LOCK_UN);
980  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 960 of file EvFDaqDirector.cc.

References init_lock_.

960 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 554 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

559  {
560  EvFDaqDirector::FileStatus fileStatus = noFile;
561  rawHeaderSize = 0;
562 
563  int retval = -1;
564  int lock_attempts = 0;
565  long total_lock_attempts = 0;
566 
567  struct stat buf;
568  int stopFileLS = -1;
569  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
570  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
571  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
572  if (stopFileCheck == 0)
573  stopFileLS = readLastLSEntry(stopFilePath_);
574  else
575  stopFileLS = 1; //stop without drain if only pid is stopped
576  if (!stop_ls_override_) {
577  //if lumisection is higher than in stop file, should quit at next from current
578  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
579  stopFileLS = stop_ls_override_ = ls;
580  } else
581  stopFileLS = stop_ls_override_;
582  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
583  << stopFileLS;
584  //return runEnded;
585  } else //if file was removed before reaching stop condition, reset this
586  stop_ls_override_ = 0;
587 
588  timeval ts_lockbegin;
589  gettimeofday(&ts_lockbegin, nullptr);
590 
591  while (retval == -1) {
592  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
593  if (retval == -1)
594  usleep(fuLockPollInterval_);
595  else
596  continue;
597 
598  lock_attempts += fuLockPollInterval_;
599  total_lock_attempts += fuLockPollInterval_;
600  if (lock_attempts > 5000000 || errno == 116) {
601  if (errno == 116)
602  edm::LogWarning("EvFDaqDirector")
603  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
604  else
605  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
606  "fu.lock file are present -: errno "
607  << errno << ":" << strerror(errno) << std::endl;
608 
609  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
610  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
611  ls++;
612  return noFile;
613  }
614 
615  if (stat(bu_run_dir_.c_str(), &buf) != 0)
616  return runEnded;
617  if (stat(fulockfile_.c_str(), &buf) != 0)
618  return runEnded;
619 
620  lock_attempts = 0;
621  }
622  if (total_lock_attempts > 5 * 60000000) {
623  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
624  return runAbort;
625  }
626  }
627 
628  timeval ts_lockend;
629  gettimeofday(&ts_lockend, nullptr);
630  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
631  if (deltat > 0.)
632  lockWaitTime = deltat;
633 
634  if (retval != 0)
635  return fileStatus;
636 
637 #ifdef DEBUG
638  timeval ts_lockend;
639  gettimeofday(&ts_lockend, 0);
640 #endif
641 
642  //open another lock file FD after the lock using main fd has been acquired
643  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
644  if (fu_readwritelock_fd2 == -1)
645  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
646  << " create. error:" << strerror(errno);
647 
648  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
649 
650  // if the stream is readable
651  if (fu_rw_lock_stream2 != nullptr) {
652  unsigned int readLs, readIndex;
653  int check = 0;
654  // rewind the stream
655  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
656  // if rewinded ok
657  if (check == 0) {
658  // read its' values
659  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
660  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
661 
662  unsigned int currentLs = readLs;
663  bool bumpedOk = false;
664  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
665  //no lock file write in this case
666  if (ls && ls + 1 < currentLs)
667  ls++;
668  else {
669  // try to bump (look for new index or EoLS file)
670  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
671  //avoid 2 lumisections jump
672  if (ls && readLs > currentLs && currentLs > ls) {
673  ls++;
674  readLs = currentLs = ls;
675  readIndex = 0;
676  bumpedOk = false;
677  //no write to lock file
678  } else {
679  if (ls == 0 && readLs > currentLs) {
680  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
681  //in this case there is no new file in the same LS
682  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
683  readLs = currentLs;
684  readIndex = 0;
685  bumpedOk = false;
686  //no write to lock file
687  }
688  //update return LS value
689  ls = readLs;
690  }
691  }
692  if (bumpedOk) {
693  // there is a new index file to grab, lock file needs to be updated
694  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
695  if (check == 0) {
696  ftruncate(fu_readwritelock_fd2, 0);
697  // write next index in the file, which is the file the next process should take
698  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
699  fflush(fu_rw_lock_stream2);
700  fsync(fu_readwritelock_fd2);
701  fileStatus = newFile;
702  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
703  } else {
704  edm::LogError("EvFDaqDirector")
705  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
706  setExceptionState = true;
707  return noFile;
708  }
709  } else if (currentLs < readLs) {
710  //there is no new file in next LS (yet), but lock file can be updated to the next LS
711  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
712  if (check == 0) {
713  ftruncate(fu_readwritelock_fd2, 0);
714  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
715  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
716  fflush(fu_rw_lock_stream2);
717  fsync(fu_readwritelock_fd2);
718  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
719  } else {
720  edm::LogError("EvFDaqDirector")
721  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
722  setExceptionState = true;
723  return noFile;
724  }
725  }
726  } else {
727  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
728  << strerror(errno);
729  }
730  } else {
731  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
732  }
733  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
734 
735 #ifdef DEBUG
736  timeval ts_preunlock;
737  gettimeofday(&ts_preunlock, 0);
738  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
739  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
740 #endif
741 
742  //if new json is present, lock file which FedRawDataInputSource will later unlock
743  if (fileStatus == newFile)
744  lockFULocal();
745 
746  //release lock at this point
747  int retvalu = -1;
748  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
749  if (retvalu == -1)
750  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
751 
752 #ifdef DEBUG
753  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
754 #endif
755 
756  if (fileStatus == noFile) {
757  struct stat buf;
758  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
759  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
760  fileStatus = runEnded;
761  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
762  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
763  fileStatus = runEnded;
764  }
765  }
766  return fileStatus;
767  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 78 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

78 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dirs_all_

std::vector<std::string> evf::EvFDaqDirector::bu_base_dirs_all_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getBUBaseDirs(), and initRun().

◆ bu_base_dirs_nSources_

std::vector<int> evf::EvFDaqDirector::bu_base_dirs_nSources_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getBUBaseDirsNSources().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 259 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 261 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 249 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 242 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 252 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 258 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 260 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 251 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 295 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 291 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 244 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 263 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 250 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 294 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 283 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 274 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 272 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 256 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 290 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 276 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 289 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), initRun(), and runString().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 292 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 278 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), initRun(), and useFileBroker().