CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::vector< std::string > const & getBUBaseDirs () const
 
std::vector< int > const & getBUBaseDirsNSources () const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFileName () const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &) const
 
std::string getStreamMergeType (std::string const &, MergeType defaultType) const
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
std::string const & runString () const
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Static Private Member Functions

static bool checkFileRead (char *buf, int infile, std::size_t buf_sz, std::string const &path)
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
std::vector< std::string > bu_base_dirs_all_
 
std::vector< int > bu_base_dirs_nSources_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 61 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 40 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dirs_all_, bu_base_dirs_nSources_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, Exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, mps_fire::i, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

41  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
43  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
44  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
45  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
46  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
47  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
48  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
49  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
50  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
51  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
52  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
53  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
54  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
55  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
67  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
77 
78  //save hostname for later
79  char hostname[33];
80  gethostname(hostname, 32);
81  hostname_ = hostname;
82 
83  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
84  if (fuLockPollIntervalPtr) {
85  try {
86  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
87  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
88  << " us";
89  } catch (const std::exception&) {
90  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
91  }
92  }
93 
94  //override file service parameter if specified by environment
95  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
96  if (fileBrokerParamPtr) {
97  try {
98  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
99  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
100  } catch (const std::exception&) {
101  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
102  }
103  }
104  if (useFileBroker_) {
106  //find BU data address from hltd configuration
108  struct stat buf;
109  if (stat("/etc/appliance/bus.config", &buf) == 0) {
110  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
111  std::getline(busconfig, fileBrokerHost_);
112  }
113  if (fileBrokerHost_.empty())
114  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
115  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
116  throw cms::Exception("EvFDaqDirector")
117  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
118 
119  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
120  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
121  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
122  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
123  }
124 
125  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
126  if (startFromLSPtr) {
127  try {
128  startFromLS_ = std::stoul(std::string(startFromLSPtr));
129  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
130  } catch (const std::exception&) {
131  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
132  }
133  }
134 
135  //override file service parameter if specified by environment
136  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
137  if (fileBrokerUseLockParamPtr) {
138  try {
139  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
140  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
142  } catch (const std::exception&) {
143  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
144  }
145  }
146 
147  // set number of streams in each BU's ramdisk
148  if (bu_base_dirs_nSources_.empty()) {
149  // default is 1 stream per ramdisk
150  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
151  bu_base_dirs_nSources_.push_back(1);
152  }
153  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
154  throw cms::Exception("DaqDirector")
155  << " Error while setting number of sources: size mismatch with BU base directory vector";
156  } else {
157  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
159  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
160  << " for ramdisk " << bu_base_dirs_all_[i];
161  }
162  }
163 
164  std::stringstream ss;
165  ss << "run" << std::setfill('0') << std::setw(6) << run_;
166  run_string_ = ss.str();
167  ss = std::stringstream();
168  ss << run_;
169  run_nstring_ = ss.str();
170  run_dir_ = base_dir_ + "/" + run_string_;
171  input_throttled_file_ = run_dir_ + "/input_throttle";
172  discard_ls_filestem_ = run_dir_ + "/discard_ls";
173  ss = std::stringstream();
174  ss << getpid();
175  pid_ = ss.str();
176  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::vector< int > bu_base_dirs_nSources_
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 353 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

353  {
354  //close server connection
355  if (socket_.get() && socket_->is_open()) {
356  boost::system::error_code ec;
357  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
358  socket_->close(ec);
359  }
360 
361  if (fulocal_rwlock_fd_ != -1) {
362  unlockFULocal();
363  close(fulocal_rwlock_fd_);
364  }
365 
366  if (fulocal_rwlock_fd2_ != -1) {
367  unlockFULocal2();
368  close(fulocal_rwlock_fd2_);
369  }
370  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 75 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

75 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

References bu_run_dir_.

Referenced by DAQSource::readSupervisor().

76 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References bu_run_open_dir_.

77 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 831 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

837  {
838  if (previousFileSize_ != 0) {
839  if (!fms_) {
841  }
842  if (fms_)
844  previousFileSize_ = 0;
845  }
846  nextFile = "";
847 
848  //reached limit
849  if (maxLS >= 0 && ls > (unsigned int)maxLS)
850  return false;
851 
852  struct stat buf;
853  std::stringstream ss;
854 
855  // 1. Check suggested file
856  std::string nextFileJson = getInputJsonFilePath(ls, index);
857  if (stat(nextFileJson.c_str(), &buf) == 0) {
858  fsize = previousFileSize_ = buf.st_size;
859  nextFile = nextFileJson;
860  return true;
861  }
862  // 2. No file -> lumi ended? (and how many?)
863  else {
864  // 3. No file -> check for standalone raw file
865  std::string nextFileRaw = getRawFilePath(ls, index);
866  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
867  fsize = previousFileSize_ = buf.st_size;
868  nextFile = nextFileRaw;
869  return true;
870  }
871 
872  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
873 
874  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
875  // recheck that no raw file appeared in the meantime
876  if (stat(nextFileJson.c_str(), &buf) == 0) {
877  fsize = previousFileSize_ = buf.st_size;
878  nextFile = nextFileJson;
879  return true;
880  }
881  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
882  fsize = previousFileSize_ = buf.st_size;
883  nextFile = nextFileRaw;
884  return true;
885  }
886 
887  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
888  if (indexFilesInLS < 0)
889  //parsing failed
890  return false;
891  else {
892  //check index
893  if ((int)index < indexFilesInLS) {
894  //we have 2 files, and check for 1 failed... retry (2 will never be here)
895  edm::LogError("EvFDaqDirector")
896  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
897  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
898  setExceptionState = true;
899  return false;
900  }
901  }
902  // this lumi ended, check for files
903  ++ls;
904  index = 0;
905 
906  //reached limit
907  if (maxLS >= 0 && ls > (unsigned int)maxLS)
908  return false;
909 
910  nextFileJson = getInputJsonFilePath(ls, 0);
911  nextFileRaw = getRawFilePath(ls, 0);
912  if (stat(nextFileJson.c_str(), &buf) == 0) {
913  // a new file was found at new lumisection, index 0
914  fsize = previousFileSize_ = buf.st_size;
915  nextFile = nextFileJson;
916  return true;
917  }
918  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
919  fsize = previousFileSize_ = buf.st_size;
920  nextFile = nextFileRaw;
921  return true;
922  }
923  return false;
924  }
925  }
926  // no new file found
927  return false;
928  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkFileRead()

bool evf::EvFDaqDirector::checkFileRead ( char *  buf,
int  infile,
std::size_t  buf_sz,
std::string const &  path 
)
staticprivate

Definition at line 1123 of file EvFDaqDirector.cc.

References visDQMUpload::buf, timingPdfMaker::infile, castor_dqm_sourceclient_file_cfg::path, and fileinputsource_cfi::read.

Referenced by parseFRDFileHeader(), and rawFileHasHeader().

1123  {
1124  ssize_t sz_read = ::read(infile, buf, buf_sz);
1125  if (sz_read < 0) {
1126  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1127  if (infile != -1)
1128  close(infile);
1129  return false;
1130  }
1131  if ((size_t)sz_read < buf_sz) {
1132  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1133  if (infile != -1)
1134  close(infile);
1135  return false;
1136  }
1137  return true;
1138  }
Log< level::Error, false > LogError

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1543 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, mps_fire::dest, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, fileBrokerPort_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1550  {
1551  EvFDaqDirector::FileStatus fileStatus = noFile;
1552  serverError = false;
1553  std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1554 
1555  boost::system::error_code ec;
1556  try {
1557  while (true) {
1558  //socket connect
1559  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1561 
1562  if (ec) {
1563  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1564  serverError = true;
1565  break;
1566  }
1567  }
1568 
1569  boost::asio::streambuf request;
1570  std::ostream request_stream(&request);
1571  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1572  if (maxLS >= 0) {
1573  std::stringstream spath;
1574  spath << path << "&stopls=" << maxLS;
1575  path = spath.str();
1576  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1577  }
1578  request_stream << "GET " << path << " HTTP/1.1\r\n";
1579  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1580  request_stream << "Accept: */*\r\n";
1581  request_stream << "Connection: keep-alive\r\n\r\n";
1582 
1583  boost::asio::write(*socket_, request, ec);
1584  if (ec) {
1585  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1586  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1587  //we got disconnected, try to reconnect to the server before writing the request
1589  if (ec) {
1590  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1591  serverError = true;
1592  break;
1593  }
1594  continue;
1595  }
1596  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1597  serverError = true;
1598  break;
1599  }
1600 
1601  boost::asio::streambuf response;
1602  boost::asio::read_until(*socket_, response, "\r\n", ec);
1603  if (ec) {
1604  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1605  serverError = true;
1606  break;
1607  }
1608 
1609  std::istream response_stream(&response);
1610 
1611  std::string http_version;
1612  response_stream >> http_version;
1613 
1614  response_stream >> serverHttpStatus;
1615 
1616  std::string status_message;
1617  std::getline(response_stream, status_message);
1618  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1619  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1620  serverError = true;
1621  break;
1622  }
1623  if (serverHttpStatus != 200) {
1624  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1625  serverError = true;
1626  break;
1627  }
1628 
1629  // Process the response headers.
1631  while (std::getline(response_stream, header) && header != "\r") {
1632  }
1633 
1634  std::string fileInfo;
1635  std::map<std::string, std::string> serverMap;
1636  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1637  auto pos = fileInfo.find('=');
1638  if (pos == std::string::npos)
1639  continue;
1640  auto stitle = fileInfo.substr(0, pos);
1641  auto svalue = fileInfo.substr(pos + 1);
1642  serverMap[stitle] = svalue;
1643  }
1644 
1645  //check that response run number if correct
1646  auto server_version = serverMap.find("version");
1647  assert(server_version != serverMap.end());
1648 
1649  auto server_run = serverMap.find("runnumber");
1650  assert(server_run != serverMap.end());
1651  assert(run_nstring_ == server_run->second);
1652 
1653  auto server_state = serverMap.find("state");
1654  assert(server_state != serverMap.end());
1655 
1656  auto server_eols = serverMap.find("lasteols");
1657  assert(server_eols != serverMap.end());
1658 
1659  auto server_ls = serverMap.find("lumisection");
1660 
1661  int version_maj = 1;
1662  int version_min = 0;
1663  int version_rev = 0;
1664  {
1665  auto* s_ptr = server_version->second.c_str();
1666  if (!server_version->second.empty() && server_version->second[0] == '"')
1667  s_ptr++;
1668  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1669  if (res < 3) {
1670  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1671  if (res < 2) {
1672  res = sscanf(s_ptr, "%d", &version_maj);
1673  if (res < 1) {
1674  //expecting at least 1 number (major version)
1675  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1676  }
1677  }
1678  }
1679  }
1680 
1681  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1682  if (server_ls != serverMap.end())
1683  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1684  else
1685  serverLS = closedServerLS + 1;
1686 
1687  std::string s_state = server_state->second;
1688  if (s_state == "STARTING") //initial, always empty starting with LS 1
1689  {
1690  auto server_file = serverMap.find("file");
1691  assert(server_file == serverMap.end()); //no file with starting state
1692  fileStatus = noFile;
1693  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1694  } else if (s_state == "READY") {
1695  auto server_file = serverMap.find("file");
1696  if (server_file == serverMap.end()) {
1697  //can be returned by server if files from new LS already appeared but LS is not yet closed
1698  if (serverLS <= closedServerLS)
1699  serverLS = closedServerLS + 1;
1700  fileStatus = noFile;
1701  edm::LogInfo("EvFDaqDirector")
1702  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1703  } else {
1704  std::string filestem;
1705  std::string fileprefix;
1706  auto server_fileprefix = serverMap.find("fileprefix");
1707 
1708  if (server_fileprefix != serverMap.end()) {
1709  auto pssize = server_fileprefix->second.size();
1710  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1711  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1712  else
1713  fileprefix = server_fileprefix->second;
1714  }
1715 
1716  //remove string literals
1717  auto ssize = server_file->second.size();
1718  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1719  filestem = server_file->second.substr(1, ssize - 2);
1720  else
1721  filestem = server_file->second;
1722  assert(!filestem.empty());
1723  if (version_maj > 1) {
1724  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1725  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1726  nextFileJson = "";
1727  rawHeader = true;
1728  } else {
1729  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1730  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1731  nextFileJson = filestem + ".jsn";
1732  rawHeader = false;
1733  }
1734  fileStatus = newFile;
1735  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1736  << serverLS << " file:" << filestem;
1737  }
1738  } else if (s_state == "EOLS") {
1739  serverLS = closedServerLS + 1;
1740  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1741  fileStatus = noFile;
1742  } else if (s_state == "EOR") {
1743  //server_eor = serverMap.find("iseor");
1744  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1745  fileStatus = runEnded;
1746  } else if (s_state == "NORUN") {
1747  auto err_msg = serverMap.find("errormessage");
1748  if (err_msg != serverMap.end())
1749  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1750  else
1751  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1752  edm::LogWarning("EvFDaqDirector") << "executing run end";
1753  fileStatus = runEnded;
1754  } else if (s_state == "ERROR") {
1755  auto err_msg = serverMap.find("errormessage");
1756  if (err_msg != serverMap.end())
1757  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1758  else
1759  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1760  fileStatus = noFile;
1761  serverError = true;
1762  } else {
1763  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1764  fileStatus = noFile;
1765  serverError = true;
1766  }
1767 
1768  // Read until EOF, writing data to output as we go.
1769  if (!fileBrokerKeepAlive_) {
1770  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1771  }
1772  if (ec != boost::asio::error::eof) {
1773  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1774  serverError = true;
1775  }
1776  }
1777 
1778  break;
1779  }
1780 
1781  } catch (std::exception const& e) {
1782  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1783  serverError = true;
1784  }
1785 
1786  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1787  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1788  if (ec) {
1789  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1790  }
1791  socket_->close(ec);
1792  if (ec) {
1793  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1794  }
1795  }
1796 
1797  if (serverError) {
1798  if (socket_->is_open())
1799  socket_->close(ec);
1800  if (ec) {
1801  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1802  }
1803  fileStatus = noFile;
1804  sleep(1); //back-off if error detected
1805  }
1806 
1807  return fileStatus;
1808  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
std::string fileBrokerPort_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 983 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

983  {
984  //used for backpressure mechanisms and monitoring
985  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
986  struct stat buf;
987  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
988  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
989  close(bol_fd);
990  }
991  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 993 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

996  {
997  if (currentLumiSection > 0) {
998  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
999  struct stat buf;
1000  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1001  if (!found) {
1002  if (doCreateEoLS) {
1003  int eol_fd =
1004  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1005  close(eol_fd);
1006  }
1007  if (doCreateBoLS)
1008  createBoLSFile(lumiSection, false);
1009  }
1010  } else if (doCreateBoLS) {
1011  createBoLSFile(lumiSection, true); //needed for initial lumisection
1012  }
1013  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 1999 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

1999  {
2000  std::string proc_flag = run_dir_ + "/processing";
2001  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2002  close(proc_flag_fd);
2003  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1960 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1960  {
1961  // create open dir if not already there
1962 
1964  if (!std::filesystem::is_directory(openPath)) {
1965  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1966  std::filesystem::create_directories(openPath);
1967  }
1968  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 380 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

380  {
382  desc.setComment(
383  "Service used for file locking arbitration and for propagating information between other EvF components");
384  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
385  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
386  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
387  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
388  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
389  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
390  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
391  desc.addUntracked<bool>("useFileBroker", false)
392  ->setComment("Use BU file service to grab input data instead of NFS file locking");
393  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
394  ->setComment("Allow service to discover BU address from hltd configuration");
395  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
396  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
397  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
398  ->setComment("Use keep alive to avoid using large number of sockets");
399  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
400  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
401  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
402  ->setComment("Lock polling interval in microseconds for the input directory file lock");
403  desc.addUntracked<bool>("outputAdler32Recheck", false)
404  ->setComment("Check Adler32 of per-process output files while micro-merging");
405  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
406  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
407  desc.addUntracked<std::string>("mergingPset", "")
408  ->setComment("Name of merging PSet to look for merging type definitions for streams");
409  descriptions.add("EvFDaqDirector", desc);
410  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 80 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

80 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 536 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

536  {
537  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
538  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getBUBaseDirs()

std::vector<std::string> const& evf::EvFDaqDirector::getBUBaseDirs ( ) const
inline

Definition at line 194 of file EvFDaqDirector.h.

References bu_base_dirs_all_.

Referenced by DAQSource::DAQSource().

194 { return bu_base_dirs_all_; }
std::vector< std::string > bu_base_dirs_all_

◆ getBUBaseDirsNSources()

std::vector<int> const& evf::EvFDaqDirector::getBUBaseDirsNSources ( ) const
inline

Definition at line 195 of file EvFDaqDirector.h.

References bu_base_dirs_nSources_.

Referenced by DAQSource::DAQSource().

195 { return bu_base_dirs_nSources_; }
std::vector< int > bu_base_dirs_nSources_

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 465 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

465  {
467  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 528 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

528  {
529  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
530  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 532 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

532  {
533  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
534  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFileName()

std::string evf::EvFDaqDirector::getEoRFileName ( ) const

Definition at line 542 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), and run_.

Referenced by DAQSource::readSupervisor().

542 { return fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 540 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

540 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 544 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

544 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 546 of file EvFDaqDirector.cc.

References bu_run_dir_.

546 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 493 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

493  {
495  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 497 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

497  {
499  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 449 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

449  {
451  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1984 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1984  {
1985  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1987  struct stat buf;
1988  unsigned int lscount = 1;
1989  do {
1990  std::stringstream ss;
1991  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1992  fullpath = ss.str();
1993  lscount++;
1994  } while (stat(fullpath.c_str(), &buf) == 0);
1995  return lscount - 1;
1996  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 485 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

485  {
487  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 481 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

481  {
483  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 511 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

512  {
514  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 524 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

524  {
526  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs,
bool  requireHeader = true 
)

Definition at line 1810 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1818  {
1819  EvFDaqDirector::FileStatus fileStatus = noFile;
1820 
1821  //int retval = -1;
1822  //int lock_attempts = 0;
1823  //long total_lock_attempts = 0;
1824 
1825  struct stat buf;
1826  int stopFileLS = -1;
1827  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1828  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1829  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1830  if (stopFileCheck == 0)
1831  stopFileLS = readLastLSEntry(stopFilePath_);
1832  else
1833  stopFileLS = 1; //stop without drain if only pid is stopped
1834  if (!stop_ls_override_) {
1835  //if lumisection is higher than in stop file, should quit at next from current
1836  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1837  stopFileLS = stop_ls_override_ = ls;
1838  } else
1839  stopFileLS = stop_ls_override_;
1840  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1841  << stopFileLS;
1842  //return runEnded;
1843  } else //if file was removed before reaching stop condition, reset this
1844  stop_ls_override_ = 0;
1845 
1846  /* look for EoLS
1847  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1848  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1849  ls++;
1850  return noFile;
1851  }
1852  */
1853 
1854  timeval ts_lockbegin;
1855  gettimeofday(&ts_lockbegin, nullptr);
1856 
1857  std::string nextFileJson;
1858  uint32_t serverLS, closedServerLS;
1859  unsigned int serverHttpStatus;
1860  bool serverError;
1861 
1862  //local lock to force index json and EoLS files to appear in order
1864  lockFULocal();
1865 
1866  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1867  bool rawHeader = false;
1868  fileStatus = contactFileBroker(
1869  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1870 
1871  if (serverError) {
1872  //do not update anything
1874  unlockFULocal();
1875  return noFile;
1876  }
1877 
1878  //handle creation of BoLS files if lumisection has changed
1879  if (currentLumiSection == 0) {
1880  if (fileStatus == runEnded)
1881  createLumiSectionFiles(closedServerLS, 0, true, false);
1882  else
1883  createLumiSectionFiles(serverLS, 0, true, false);
1884  } else {
1885  if (closedServerLS >= currentLumiSection) {
1886  //only BoLS files
1887  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1888  createLumiSectionFiles(i + 1, i, true, false);
1889  }
1890  }
1891 
1892  bool fileFound = true;
1893 
1894  if (fileStatus == newFile) {
1895  if (rawHeader > 0)
1896  serverEventsInNewFile = grabNextJsonFromRaw(
1897  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1898  else
1899  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1900  }
1901  //closing file in case of any error
1902  if (serverEventsInNewFile < 0 && rawFd != -1) {
1903  close(rawFd);
1904  rawFd = -1;
1905  }
1906 
1907  //can unlock because all files have been created locally
1909  unlockFULocal();
1910 
1911  if (!fileFound) {
1912  //catch condition where directory got deleted
1913  fileStatus = noFile;
1914  struct stat buf;
1915  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1916  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1917  fileStatus = runEnded;
1918  }
1919  }
1920 
1921  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1922  //so that EoLS files can not appear locally before index files
1923  if (currentLumiSection == 0) {
1924  lockFULocal2();
1925  if (fileStatus == runEnded) {
1926  createLumiSectionFiles(closedServerLS, 0, false, true);
1927  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1928  } else {
1929  createLumiSectionFiles(serverLS, 0, false, true);
1930  }
1931  unlockFULocal2();
1932  } else {
1933  if (closedServerLS >= currentLumiSection) {
1934  //lock exclusive to create EoLS files
1935  lockFULocal2();
1936  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1937  createLumiSectionFiles(i + 1, i, false, true);
1938  unlockFULocal2();
1939  }
1940  }
1941 
1942  if (fileStatus == runEnded)
1943  ls = std::max(currentLumiSection, serverLS);
1944  else if (fileStatus == newFile) {
1945  assert(serverLS >= ls);
1946  ls = serverLS;
1947  } else if (fileStatus == noFile) {
1948  if (serverLS >= ls)
1949  ls = serverLS;
1950  else {
1951  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1952  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1953  sleep(1);
1954  }
1955  }
1956 
1957  return fileStatus;
1958  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 770 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

770  {
771  std::ifstream ij(BUEoLSFile);
772  Json::Value deserializeRoot;
774 
775  if (!reader.parse(ij, deserializeRoot)) {
776  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
777  return -1;
778  }
779 
781  DataPoint dp;
782  dp.deserialize(deserializeRoot);
783 
784  //read definition
785  if (readEolsDefinition_) {
786  //std::string def = boost::algorithm::trim(dp.getDefinition());
787  std::string def = dp.getDefinition();
788  if (def.empty())
789  readEolsDefinition_ = false;
790  while (!def.empty()) {
792  if (def.find('/') == 0)
793  fullpath = def;
794  else
795  fullpath = bu_run_dir_ + '/' + def;
796  struct stat buf;
797  if (stat(fullpath.c_str(), &buf) == 0) {
798  DataPointDefinition eolsDpd;
799  std::string defLabel = "legend";
800  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
801  if (eolsDpd.getNames().empty()) {
802  //try with "data" label if "legend" format is not used
803  eolsDpd = DataPointDefinition();
804  defLabel = "data";
805  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
806  }
807  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
808  if (eolsDpd.getNames().at(i) == "NFiles")
810  readEolsDefinition_ = false;
811  break;
812  }
813  //check if we can still find definition
814  if (def.size() <= 1 || def.find('/') == std::string::npos) {
815  readEolsDefinition_ = false;
816  break;
817  }
818  def = def.substr(def.find('/') + 1);
819  }
820  }
821 
822  if (dp.getData().size() > eolsNFilesIndex_)
823  data = dp.getData()[eolsNFilesIndex_];
824  else {
825  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
826  return -1;
827  }
828  return std::stoi(data);
829  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 469 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

469  {
471  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 489 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

489  {
490  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
491  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 461 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

461  {
462  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
463  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 473 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

473  {
475  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 501 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

502  {
504  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 457 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

457  {
458  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
459  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 516 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

516  {
518  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 477 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

477  {
479  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 506 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

507  {
509  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 453 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

453  {
455  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 520 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

520  {
522  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

References run_.

118 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

107 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 180 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

180 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  ) const
inline

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  ,
MergeType  defaultType 
) const
inline

Definition at line 188 of file EvFDaqDirector.h.

References MergeTypeNames_.

188  {
189  return MergeTypeNames_[defaultType];
190  }
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1268 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1271  {
1272  fileFound = true;
1273 
1274  //should be ported to use fffnaming
1275  std::ostringstream fileNameWithPID;
1276  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1277  << std::setw(5) << pid_ << ".jsn";
1278 
1279  // assemble json destination path
1280  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1281 
1282  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1283 
1284  int infile = -1, outfile = -1;
1285 
1286  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1287  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1288  << strerror(errno);
1289  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1290  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1291  << jsonSourcePath << " : " << strerror(errno);
1292  if (errno == ENOENT)
1293  fileFound = false;
1294  return -1;
1295  }
1296  }
1297 
1298  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1299  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1300  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1301  if (errno == EEXIST) {
1302  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1303  << " : ";
1304  ::close(infile);
1305  return -1;
1306  }
1307  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1308  << strerror(errno);
1309  struct stat out_stat;
1310  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1311  edm::LogWarning("EvFDaqDirector")
1312  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1313  if (unlink(jsonDestPath.c_str()) == -1) {
1314  edm::LogWarning("EvFDaqDirector")
1315  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1316  }
1317  }
1318  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1319  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1320  << jsonDestPath << " : " << strerror(errno);
1321  ::close(infile);
1322  return -1;
1323  }
1324  }
1325  //copy contents
1326  const std::size_t buf_sz = 512;
1327  std::size_t tot_written = 0;
1328  std::unique_ptr<char[]> buf(new char[buf_sz]);
1329 
1330  ssize_t sz, sz_read = 1, sz_write;
1331  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1332  sz_write = 0;
1333  do {
1334  assert(sz_read - sz_write > 0);
1335  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1336  sz_read = sz; // cause read loop termination
1337  break;
1338  }
1339  assert(sz > 0);
1340  sz_write += sz;
1341  tot_written += sz;
1342  } while (sz_write < sz_read);
1343  }
1344  close(infile);
1345  close(outfile);
1346 
1347  if (tot_written > 0) {
1348  //leave file if it was empty for diagnosis
1349  if (unlink(jsonSourcePath.c_str()) == -1) {
1350  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1351  << strerror(errno);
1352  return -1;
1353  }
1354  } else {
1355  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1356  << jsonSourcePath;
1357  return -1;
1358  }
1359 
1360  Json::Value deserializeRoot;
1362 
1363  std::string data;
1364  std::stringstream ss;
1365  bool result;
1366  try {
1367  if (tot_written <= buf_sz) {
1368  result = reader.parse(buf.get(), deserializeRoot);
1369  } else {
1370  //json will normally not be bigger than buf_sz bytes
1371  try {
1372  std::ifstream ij(jsonDestPath);
1373  ss << ij.rdbuf();
1374  } catch (std::filesystem::filesystem_error const& ex) {
1375  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1376  return -1;
1377  }
1378  result = reader.parse(ss.str(), deserializeRoot);
1379  }
1380  if (!result) {
1381  if (tot_written <= buf_sz)
1382  ss << buf.get();
1383  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1384  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1385  << ss.str() << ".";
1386  return -1;
1387  }
1388 
1389  //read BU JSON
1390  DataPoint dp;
1391  dp.deserialize(deserializeRoot);
1392  bool success = false;
1393  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1394  if (dpd_->getNames().at(i) == "NEvents")
1395  if (i < dp.getData().size()) {
1396  data = dp.getData()[i];
1397  success = true;
1398  break;
1399  }
1400  }
1401  if (!success) {
1402  if (!dp.getData().empty())
1403  data = dp.getData()[0];
1404  else {
1405  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1406  << "grabNextJsonFile - "
1407  << " error reading number of events from BU JSON; No input value. data -: " << data;
1408  return -1;
1409  }
1410  }
1411 
1412  //try to read raw file size
1413  fileSizeFromJson = -1;
1414  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1415  if (dpd_->getNames().at(i) == "NBytes") {
1416  if (i < dp.getData().size()) {
1417  std::string dataSize = dp.getData()[i];
1418  try {
1419  fileSizeFromJson = std::stol(dataSize);
1420  } catch (const std::exception&) {
1421  //non-fatal currently, processing can continue without this value
1422  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1423  << "Input value is -: " << dataSize;
1424  }
1425  break;
1426  }
1427  }
1428  }
1429  return std::stoi(data);
1430  } catch (const std::out_of_range& e) {
1431  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1432  << "Input value is -: " << data;
1433  } catch (const std::invalid_argument& e) {
1434  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1435  << "Input value is -: " << data;
1436  } catch (std::runtime_error const& e) {
1437  //Can be thrown by Json parser
1438  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1439  }
1440 
1441  catch (std::exception const& e) {
1442  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1443  } catch (...) {
1444  //unknown exception
1445  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1446  }
1447 
1448  return -1;
1449  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1451 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1451  {
1452  std::string data;
1453  try {
1454  // assemble json destination path
1455  std::filesystem::path jsonDestPath(baseRunDir());
1456 
1457  //should be ported to use fffnaming
1458  std::ostringstream fileNameWithPID;
1459  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1460  << ".jsn";
1461  jsonDestPath /= fileNameWithPID.str();
1462 
1463  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1464  try {
1465  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1466  } catch (std::filesystem::filesystem_error const& ex) {
1467  // Input dir gone?
1468  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1469  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1470  sleep(1);
1471  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1472  }
1473  unlockFULocal();
1474 
1475  try {
1476  //sometimes this fails but file gets deleted
1477  std::filesystem::remove(jsonSourcePath);
1478  } catch (std::filesystem::filesystem_error const& ex) {
1479  // Input dir gone?
1480  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1481  } catch (std::exception const& ex) {
1482  // Input dir gone?
1483  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1484  }
1485 
1486  std::ifstream ij(jsonDestPath);
1487  Json::Value deserializeRoot;
1489 
1490  std::stringstream ss;
1491  ss << ij.rdbuf();
1492  if (!reader.parse(ss.str(), deserializeRoot)) {
1493  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1494  << "\nERROR:\n"
1495  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1496  << ss.str() << ".";
1497  throw std::runtime_error("Cannot deserialize input JSON file");
1498  }
1499 
1500  //read BU JSON
1501  std::string data;
1502  DataPoint dp;
1503  dp.deserialize(deserializeRoot);
1504  bool success = false;
1505  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1506  if (dpd_->getNames().at(i) == "NEvents")
1507  if (i < dp.getData().size()) {
1508  data = dp.getData()[i];
1509  success = true;
1510  }
1511  }
1512  if (!success) {
1513  if (!dp.getData().empty())
1514  data = dp.getData()[0];
1515  else
1516  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1517  << " error reading number of events from BU JSON -: No input value " << data;
1518  }
1519  return std::stoi(data);
1520  } catch (std::filesystem::filesystem_error const& ex) {
1521  // Input dir gone?
1522  unlockFULocal();
1523  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1524  } catch (std::runtime_error const& e) {
1525  // Another process grabbed the file and NFS did not register this
1526  unlockFULocal();
1527  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1528  } catch (const std::out_of_range&) {
1529  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1530  << "Input value is -: " << data;
1531  } catch (const std::invalid_argument&) {
1532  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1533  << "Input value is -: " << data;
1534  } catch (std::exception const& e) {
1535  // BU run directory disappeared?
1536  unlockFULocal();
1537  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1538  }
1539 
1540  return -1;
1541  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile,
bool  requireHeader = true 
)

Definition at line 1176 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1183  {
1184  fileFound = true;
1185 
1186  //take only first three tokens delimited by "_" in the renamed raw file name
1187  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1188  size_t pos = 0, n_tokens = 0;
1189  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1190  }
1191  std::string reducedJsonStem = jsonStem.substr(0, pos);
1192 
1193  std::ostringstream fileNameWithPID;
1194  //should be ported to use fffnaming
1195  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1196 
1197  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1198 
1199  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1200 
1201  //parse RAW file header if it exists
1202  uint32_t lsFromRaw;
1203  int32_t nbEventsWrittenRaw;
1204  int64_t fileSizeFromRaw;
1205  uint16_t rawDataType;
1206  auto ret = parseFRDFileHeader(rawSourcePath,
1207  rawFd,
1208  rawHeaderSize,
1209  rawDataType,
1210  lsFromRaw,
1211  nbEventsWrittenRaw,
1212  fileSizeFromRaw,
1213  requireHeader,
1214  true,
1215  closeFile);
1216  if (ret != 0) {
1217  if (ret == 1)
1218  fileFound = false;
1219  return -1;
1220  }
1221 
1222  int outfile;
1223  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1224  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1225  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1226  if (errno == EEXIST) {
1227  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1228  << " : ";
1229  return -1;
1230  }
1231  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1232  << strerror(errno);
1233  struct stat out_stat;
1234  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1235  edm::LogWarning("EvFDaqDirector")
1236  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1237  << jsonDestPath;
1238  if (unlink(jsonDestPath.c_str()) == -1) {
1239  edm::LogWarning("EvFDaqDirector")
1240  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1241  }
1242  }
1243  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1244  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1245  << jsonDestPath << " : " << strerror(errno);
1246  return -1;
1247  }
1248  }
1249  //write JSON file (TODO: use jsoncpp)
1250  std::stringstream ss;
1251  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1252  std::string sstr = ss.str();
1253 
1254  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1255  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1256  << " : " << strerror(errno);
1257  return -1;
1258  }
1259  close(outfile);
1260  if (serverLS && serverLS != lsFromRaw)
1261  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1262  << " and raw file header LS " << lsFromRaw;
1263 
1264  fileSizeFromHeader = fileSizeFromRaw;
1265  return nbEventsWrittenRaw;
1266  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 178 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_base_dirs_all_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, mps_fire::i, init_lock_, svgfig::load(), eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm::shutdown_flag, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, tryInitializeFuLockFile(), and useFileBroker_.

Referenced by preallocate().

178  {
179  // check if base dir exists or create it accordingly
180  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
181  if (retval != 0 && errno != EEXIST) {
182  throw cms::Exception("DaqDirector")
183  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
184  }
185 
186  //create run dir in base dir
187  umask(0);
188  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
189  if (retval != 0 && errno != EEXIST) {
190  throw cms::Exception("DaqDirector")
191  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
192  }
193 
194  //create fu-local.lock in run open dir
195  if (!directorBU_) {
197  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
199  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
200  if (fulocal_rwlock_fd_ == -1)
201  throw cms::Exception("DaqDirector")
202  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
203  chmod(fulocal_lock_.c_str(), 0777);
204  fsync(fulocal_rwlock_fd_);
205  //open second fd for another input source thread
207  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
208  if (fulocal_rwlock_fd2_ == -1)
209  throw cms::Exception("DaqDirector")
210  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
211  }
212 
213  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
214  //for BU, it is created at this point
215  if (directorBU_) {
217  std::string bulockfile = bu_run_dir_ + "/bu.lock";
218  fulockfile_ = bu_run_dir_ + "/fu.lock";
219 
220  //make or find bu run dir
221  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
222  if (retval != 0 && errno != EEXIST) {
223  throw cms::Exception("DaqDirector")
224  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
225  }
226  bu_run_open_dir_ = bu_run_dir_ + "/open";
227  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
228  if (retval != 0 && errno != EEXIST) {
229  throw cms::Exception("DaqDirector")
230  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
231  }
232 
233  // the BU director does not need to know about the fu lock
234  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
235  if (bu_writelock_fd_ == -1)
236  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
237  else
238  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
239  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
240  if (bu_w_lock_stream == nullptr)
241  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
242 
243  // BU INITIALIZES LOCK FILE
244  // FU LOCK FILE OPEN
245  openFULockfileStream(true);
247  fflush(fu_rw_lock_stream);
248  close(fu_readwritelock_fd_);
249 
250  if (!hltSourceDirectory_.empty()) {
251  struct stat buf;
252  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
253  std::string hltdir = bu_run_dir_ + "/hlt";
254  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
255  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
256  if (retval != 0 && errno != EEXIST)
257  throw cms::Exception("DaqDirector")
258  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
259 
260  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
261  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
262 
263  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
264  for (auto& optfile : optfiles) {
265  try {
266  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
267  } catch (...) {
268  }
269  }
270 
271  std::filesystem::rename(tmphltdir, hltdir);
272  } else
273  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
274  }
275  //else{}//no configuration specified
276  } else {
277  // for FU, check if bu base dir exists
278 
279  auto checkExists = [=](std::string const& bu_base_dir) -> void {
280  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
281  if (retval != 0 && errno != EEXIST) {
282  throw cms::Exception("DaqDirector")
283  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
284  }
285  };
286 
287  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
288  int cnt = 0;
289  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
290  //stat should trigger autofs mount (mkdir could fail with access denied first time)
291  struct stat statbuf;
292  stat(bu_base_dir.c_str(), &statbuf);
293  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
294  if (retval != 0 && errno != EEXIST) {
295  usleep(500000);
296  cnt++;
297  if (cnt % 20 == 0)
298  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
299  if (cnt > 120)
300  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
301  << " mkdir error:" << strerror(errno);
302  continue;
303  }
304  break;
305  }
306  };
307 
308  if (!bu_base_dirs_all_.empty()) {
309  std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
310  checkExists(check_dir);
311  bu_run_dir_ = check_dir + "/" + run_string_;
312  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
313  waitForDir(bu_base_dirs_all_[i]);
314  } else {
315  checkExists(bu_base_dir_);
317  }
318 
319  fulockfile_ = bu_run_dir_ + "/fu.lock";
320  if (!useFileBroker_)
321  openFULockfileStream(false);
322  }
323 
324  pthread_mutex_init(&init_lock_, nullptr);
325 
326  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
327  std::stringstream sstp;
328  sstp << stopFilePath_ << "_pid" << pid_;
329  stopFilePathPid_ = sstp.str();
330 
331  if (!directorBU_) {
332  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
333  struct stat statbuf;
334  if (!stat(defPath.c_str(), &statbuf))
335  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
336  else {
337  //look in source directory if not present in ramdisk
338  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
339  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
340  if (stat(defPath.c_str(), &statbuf)) {
341  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
342  if (stat(defPath.c_str(), &statbuf)) {
343  defPath = defPathSuffix;
344  }
345  }
346  }
347  dpd_ = new DataPointDefinition();
348  std::string defLabel = "data";
349  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
350  }
351  }
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
def load(fileName)
Definition: svgfig.py:547
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by DAQSource::getNextDataBlock(), and FedRawDataInputSource::getNextEvent().

122 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 963 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

963  {
964  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
965  flock(fulocal_rwlock_fd_, LOCK_SH);
966  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 973 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

973  {
974  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
975  flock(fulocal_rwlock_fd2_, LOCK_EX);
976  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 959 of file EvFDaqDirector.cc.

References init_lock_.

959 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2018 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and to_string().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

2018  {
2019  struct stat buf;
2020  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2021  }
static std::string to_string(const XMLCh *ch)
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2005 of file EvFDaqDirector.cc.

References command_line::start.

2005  {
2006 #ifdef __APPLE__
2007  return {start, len, pid, type, whence};
2008 #else
2009  return {type, whence, start, len, pid};
2010 #endif
2011  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 123 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

123 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 940 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

940  {
941  if (create) {
943  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
944  chmod(fulockfile_.c_str(), 0766);
945  } else {
946  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
947  }
948  if (fu_readwritelock_fd_ == -1)
949  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
950  << " create:" << create << " error:" << strerror(errno);
951  else
952  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
953 
954  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
955  if (fu_rw_lock_stream == nullptr)
956  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
957  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 108 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

108 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 73 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

Referenced by DAQSource::DAQSource().

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint16_t &  rawDataType,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 1015 of file EvFDaqDirector.cc.

References checkFileRead(), FRDFileHeaderContent_v2::dataType_, FRDFileHeaderContent_v1::eventCount_, FRDFileHeaderContent_v2::eventCount_, FRDFileHeaderContent_v1::fileSize_, FRDFileHeaderContent_v2::fileSize_, getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, FRDFileHeaderContent_v1::lumiSection_, FRDFileHeaderContent_v2::lumiSection_, and FRDFileHeaderIdentifier::version_.

Referenced by grabNextJsonFromRaw(), FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1024  {
1025  int infile;
1026 
1027  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1028  if (retry) {
1029  edm::LogWarning("EvFDaqDirector")
1030  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1031  return parseFRDFileHeader(rawSourcePath,
1032  rawFd,
1033  rawHeaderSize,
1034  rawDataType,
1035  lsFromHeader,
1036  eventsFromHeader,
1037  fileSizeFromHeader,
1038  requireHeader,
1039  false,
1040  closeFile);
1041  } else {
1042  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1043  edm::LogError("EvFDaqDirector")
1044  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1045  if (errno == ENOENT)
1046  return 1; // error && file not found
1047  else
1048  return -1;
1049  }
1050  }
1051  }
1052 
1053  //v2 is the largest possible read
1054  char hdr[sizeof(FRDFileHeader_v2)];
1055  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1056  return -1;
1057 
1059  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1060 
1061  if (frd_version == 0) {
1062  //no header (specific sequence not detected)
1063  if (requireHeader) {
1064  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1065  close(infile);
1066  return -1;
1067  } else {
1068  //no header, but valid file
1069  lseek(infile, 0, SEEK_SET);
1070  rawHeaderSize = 0;
1071  lsFromHeader = 0;
1072  eventsFromHeader = -1;
1073  fileSizeFromHeader = -1;
1074  }
1075  } else if (frd_version == 1) {
1076  //version 1 header
1077  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1078  return -1;
1080  uint32_t headerSizeRaw = fhContent->headerSize_;
1081  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1082  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1083  << " v:" << frd_version;
1084  close(infile);
1085  return -1;
1086  }
1087  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1088  rawDataType = 0;
1089  lsFromHeader = fhContent->lumiSection_;
1090  eventsFromHeader = (int32_t)fhContent->eventCount_;
1091  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1092  rawHeaderSize = fhContent->headerSize_;
1093 
1094  } else if (frd_version == 2) {
1095  //version 2 heade
1096  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1097  return -1;
1099  uint32_t headerSizeRaw = fhContent->headerSize_;
1100  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1101  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1102  << " v:" << frd_version;
1103  close(infile);
1104  return -1;
1105  }
1106  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1107  rawDataType = fhContent->dataType_;
1108  lsFromHeader = fhContent->lumiSection_;
1109  eventsFromHeader = (int32_t)fhContent->eventCount_;
1110  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1111  rawHeaderSize = fhContent->headerSize_;
1112  }
1113 
1114  if (closeFile) {
1115  close(infile);
1116  infile = -1;
1117  }
1118 
1119  rawFd = infile;
1120  return 0; //OK
1121  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 372 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

372  {
373  initRun();
374 
375  nThreads_ = bounds.maxNumberOfStreams();
376  nStreams_ = bounds.maxNumberOfThreads();
377  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
378  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 412 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

412  {
413  //assert(run_ == id.run());
414 
415  // check if the requested run is the latest one - issue a warning if it isn't
417  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
418  << ". This is not the highest run " << dirManager_.findHighestRunDir();
419  }
420  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 431 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

431  {
432  //delete all files belonging to just closed lumi
433  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
435  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
436  return;
437  }
438 
439  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
440  auto it = filesToDeletePtr_->begin();
441  while (it != filesToDeletePtr_->end()) {
442  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
443  it = filesToDeletePtr_->erase(it);
444  } else
445  it++;
446  }
447  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1140 of file EvFDaqDirector.cc.

References checkFileRead(), getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, and FRDFileHeaderIdentifier::version_.

Referenced by bumpFile().

1140  {
1141  int infile;
1142  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1143  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1144  << strerror(errno);
1145  return false;
1146  }
1147  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1148  char hdr[sizeof(FRDFileHeader_v2)];
1149  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1150  return false;
1152  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1153 
1154  if (frd_version == 1) {
1155  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1156  return false;
1158  rawHeaderSize = fhContent->headerSize_;
1159  close(infile);
1160  return true;
1161  } else if (frd_version == 2) {
1162  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1163  return false;
1165  rawHeaderSize = fhContent->headerSize_;
1166  close(infile);
1167  return true;
1168  } else
1169  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1170 
1171  close(infile);
1172  rawHeaderSize = 0;
1173  return false;
1174  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1970 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1970  {
1971  std::ifstream ij(file);
1972  Json::Value deserializeRoot;
1974 
1975  if (!reader.parse(ij, deserializeRoot)) {
1976  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1977  return -1;
1978  }
1979 
1980  int ret = deserializeRoot.get("lastLS", "").asInt();
1981  return ret;
1982  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile()

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 548 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

Referenced by postEndRun().

548  {
549  int retval = remove(filename.c_str());
550  if (retval != 0)
551  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
552  << ". error = " << strerror(errno);
553  }
Log< level::Error, false > LogError

◆ runString()

std::string const& evf::EvFDaqDirector::runString ( ) const
inline

Definition at line 74 of file EvFDaqDirector.h.

References run_string_.

Referenced by DAQSource::DAQSource().

74 { return run_string_; }
std::string run_string_

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 181 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

182  {
183  fileDeleteLockPtr_ = fileDeleteLock;
184  filesToDeletePtr_ = filesToDelete;
185  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

References fms_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

121 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 930 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

930  {
931  if (fu_rw_lock_stream == nullptr)
932  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
933  else {
934  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
935  unsigned int readLs = 1, readIndex = 0;
936  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
937  }
938  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 968 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

968  {
969  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
970  flock(fulocal_rwlock_fd_, LOCK_UN);
971  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 978 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

978  {
979  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
980  flock(fulocal_rwlock_fd2_, LOCK_UN);
981  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 961 of file EvFDaqDirector.cc.

References init_lock_.

961 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 555 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

560  {
561  EvFDaqDirector::FileStatus fileStatus = noFile;
562  rawHeaderSize = 0;
563 
564  int retval = -1;
565  int lock_attempts = 0;
566  long total_lock_attempts = 0;
567 
568  struct stat buf;
569  int stopFileLS = -1;
570  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
571  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
572  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
573  if (stopFileCheck == 0)
574  stopFileLS = readLastLSEntry(stopFilePath_);
575  else
576  stopFileLS = 1; //stop without drain if only pid is stopped
577  if (!stop_ls_override_) {
578  //if lumisection is higher than in stop file, should quit at next from current
579  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
580  stopFileLS = stop_ls_override_ = ls;
581  } else
582  stopFileLS = stop_ls_override_;
583  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
584  << stopFileLS;
585  //return runEnded;
586  } else //if file was removed before reaching stop condition, reset this
587  stop_ls_override_ = 0;
588 
589  timeval ts_lockbegin;
590  gettimeofday(&ts_lockbegin, nullptr);
591 
592  while (retval == -1) {
593  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
594  if (retval == -1)
595  usleep(fuLockPollInterval_);
596  else
597  continue;
598 
599  lock_attempts += fuLockPollInterval_;
600  total_lock_attempts += fuLockPollInterval_;
601  if (lock_attempts > 5000000 || errno == 116) {
602  if (errno == 116)
603  edm::LogWarning("EvFDaqDirector")
604  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
605  else
606  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
607  "fu.lock file are present -: errno "
608  << errno << ":" << strerror(errno) << std::endl;
609 
610  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
611  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
612  ls++;
613  return noFile;
614  }
615 
616  if (stat(bu_run_dir_.c_str(), &buf) != 0)
617  return runEnded;
618  if (stat(fulockfile_.c_str(), &buf) != 0)
619  return runEnded;
620 
621  lock_attempts = 0;
622  }
623  if (total_lock_attempts > 5 * 60000000) {
624  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
625  return runAbort;
626  }
627  }
628 
629  timeval ts_lockend;
630  gettimeofday(&ts_lockend, nullptr);
631  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
632  if (deltat > 0.)
633  lockWaitTime = deltat;
634 
635  if (retval != 0)
636  return fileStatus;
637 
638 #ifdef DEBUG
639  timeval ts_lockend;
640  gettimeofday(&ts_lockend, 0);
641 #endif
642 
643  //open another lock file FD after the lock using main fd has been acquired
644  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
645  if (fu_readwritelock_fd2 == -1)
646  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
647  << " create. error:" << strerror(errno);
648 
649  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
650 
651  // if the stream is readable
652  if (fu_rw_lock_stream2 != nullptr) {
653  unsigned int readLs, readIndex;
654  int check = 0;
655  // rewind the stream
656  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
657  // if rewinded ok
658  if (check == 0) {
659  // read its' values
660  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
661  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
662 
663  unsigned int currentLs = readLs;
664  bool bumpedOk = false;
665  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
666  //no lock file write in this case
667  if (ls && ls + 1 < currentLs)
668  ls++;
669  else {
670  // try to bump (look for new index or EoLS file)
671  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
672  //avoid 2 lumisections jump
673  if (ls && readLs > currentLs && currentLs > ls) {
674  ls++;
675  readLs = currentLs = ls;
676  readIndex = 0;
677  bumpedOk = false;
678  //no write to lock file
679  } else {
680  if (ls == 0 && readLs > currentLs) {
681  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
682  //in this case there is no new file in the same LS
683  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
684  readLs = currentLs;
685  readIndex = 0;
686  bumpedOk = false;
687  //no write to lock file
688  }
689  //update return LS value
690  ls = readLs;
691  }
692  }
693  if (bumpedOk) {
694  // there is a new index file to grab, lock file needs to be updated
695  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
696  if (check == 0) {
697  ftruncate(fu_readwritelock_fd2, 0);
698  // write next index in the file, which is the file the next process should take
699  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
700  fflush(fu_rw_lock_stream2);
701  fsync(fu_readwritelock_fd2);
702  fileStatus = newFile;
703  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
704  } else {
705  edm::LogError("EvFDaqDirector")
706  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
707  setExceptionState = true;
708  return noFile;
709  }
710  } else if (currentLs < readLs) {
711  //there is no new file in next LS (yet), but lock file can be updated to the next LS
712  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
713  if (check == 0) {
714  ftruncate(fu_readwritelock_fd2, 0);
715  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
716  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
717  fflush(fu_rw_lock_stream2);
718  fsync(fu_readwritelock_fd2);
719  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
720  } else {
721  edm::LogError("EvFDaqDirector")
722  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
723  setExceptionState = true;
724  return noFile;
725  }
726  }
727  } else {
728  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
729  << strerror(errno);
730  }
731  } else {
732  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
733  }
734  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
735 
736 #ifdef DEBUG
737  timeval ts_preunlock;
738  gettimeofday(&ts_preunlock, 0);
739  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
740  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
741 #endif
742 
743  //if new json is present, lock file which FedRawDataInputSource will later unlock
744  if (fileStatus == newFile)
745  lockFULocal();
746 
747  //release lock at this point
748  int retvalu = -1;
749  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
750  if (retvalu == -1)
751  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
752 
753 #ifdef DEBUG
754  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
755 #endif
756 
757  if (fileStatus == noFile) {
758  struct stat buf;
759  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
760  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
761  fileStatus = runEnded;
762  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
763  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
764  fileStatus = runEnded;
765  }
766  }
767  return fileStatus;
768  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 78 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

78 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dirs_all_

std::vector<std::string> evf::EvFDaqDirector::bu_base_dirs_all_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getBUBaseDirs(), and initRun().

◆ bu_base_dirs_nSources_

std::vector<int> evf::EvFDaqDirector::bu_base_dirs_nSources_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getBUBaseDirsNSources().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 259 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 261 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 249 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 242 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 252 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 258 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 260 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 251 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 295 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 291 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 244 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 263 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 250 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 294 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 283 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 274 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 272 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 256 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 290 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 276 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 289 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), initRun(), and runString().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 292 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 278 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), initRun(), and useFileBroker().