CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::vector< std::string > const & getBUBaseDirs () const
 
std::vector< int > const & getBUBaseDirsNSources () const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFileName () const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &) const
 
std::string getStreamMergeType (std::string const &, MergeType defaultType) const
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
std::string const & runString () const
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Static Private Member Functions

static bool checkFileRead (char *buf, int infile, std::size_t buf_sz, std::string const &path)
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
std::vector< std::string > bu_base_dirs_all_
 
std::vector< int > bu_base_dirs_nSources_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 61 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 41 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dirs_all_, bu_base_dirs_nSources_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, Exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, mps_fire::i, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

42  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
43  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
44  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
45  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
46  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
47  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
48  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
49  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
50  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
51  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
52  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
53  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
54  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (const std::exception&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (const std::exception&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = std::stoul(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (const std::exception&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (const std::exception&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147 
148  // set number of streams in each BU's ramdisk
149  if (bu_base_dirs_nSources_.empty()) {
150  // default is 1 stream per ramdisk
151  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
152  bu_base_dirs_nSources_.push_back(1);
153  }
154  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
155  throw cms::Exception("DaqDirector")
156  << " Error while setting number of sources: size mismatch with BU base directory vector";
157  } else {
158  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
160  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
161  << " for ramdisk " << bu_base_dirs_all_[i];
162  }
163  }
164 
165  std::stringstream ss;
166  ss << "run" << std::setfill('0') << std::setw(6) << run_;
167  run_string_ = ss.str();
168  ss = std::stringstream();
169  ss << run_;
170  run_nstring_ = ss.str();
171  run_dir_ = base_dir_ + "/" + run_string_;
172  input_throttled_file_ = run_dir_ + "/input_throttle";
173  discard_ls_filestem_ = run_dir_ + "/discard_ls";
174  ss = std::stringstream();
175  ss << getpid();
176  pid_ = ss.str();
177  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::vector< int > bu_base_dirs_nSources_
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 354 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

354  {
355  //close server connection
356  if (socket_.get() && socket_->is_open()) {
357  boost::system::error_code ec;
358  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
359  socket_->close(ec);
360  }
361 
362  if (fulocal_rwlock_fd_ != -1) {
363  unlockFULocal();
364  close(fulocal_rwlock_fd_);
365  }
366 
367  if (fulocal_rwlock_fd2_ != -1) {
368  unlockFULocal2();
369  close(fulocal_rwlock_fd2_);
370  }
371  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 75 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

75 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

References bu_run_dir_.

Referenced by DAQSource::readSupervisor().

76 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References bu_run_open_dir_.

77 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 832 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

838  {
839  if (previousFileSize_ != 0) {
840  if (!fms_) {
842  }
843  if (fms_)
845  previousFileSize_ = 0;
846  }
847  nextFile = "";
848 
849  //reached limit
850  if (maxLS >= 0 && ls > (unsigned int)maxLS)
851  return false;
852 
853  struct stat buf;
854  std::stringstream ss;
855 
856  // 1. Check suggested file
857  std::string nextFileJson = getInputJsonFilePath(ls, index);
858  if (stat(nextFileJson.c_str(), &buf) == 0) {
859  fsize = previousFileSize_ = buf.st_size;
860  nextFile = nextFileJson;
861  return true;
862  }
863  // 2. No file -> lumi ended? (and how many?)
864  else {
865  // 3. No file -> check for standalone raw file
866  std::string nextFileRaw = getRawFilePath(ls, index);
867  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
868  fsize = previousFileSize_ = buf.st_size;
869  nextFile = nextFileRaw;
870  return true;
871  }
872 
873  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
874 
875  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
876  // recheck that no raw file appeared in the meantime
877  if (stat(nextFileJson.c_str(), &buf) == 0) {
878  fsize = previousFileSize_ = buf.st_size;
879  nextFile = nextFileJson;
880  return true;
881  }
882  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
883  fsize = previousFileSize_ = buf.st_size;
884  nextFile = nextFileRaw;
885  return true;
886  }
887 
888  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
889  if (indexFilesInLS < 0)
890  //parsing failed
891  return false;
892  else {
893  //check index
894  if ((int)index < indexFilesInLS) {
895  //we have 2 files, and check for 1 failed... retry (2 will never be here)
896  edm::LogError("EvFDaqDirector")
897  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
898  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
899  setExceptionState = true;
900  return false;
901  }
902  }
903  // this lumi ended, check for files
904  ++ls;
905  index = 0;
906 
907  //reached limit
908  if (maxLS >= 0 && ls > (unsigned int)maxLS)
909  return false;
910 
911  nextFileJson = getInputJsonFilePath(ls, 0);
912  nextFileRaw = getRawFilePath(ls, 0);
913  if (stat(nextFileJson.c_str(), &buf) == 0) {
914  // a new file was found at new lumisection, index 0
915  fsize = previousFileSize_ = buf.st_size;
916  nextFile = nextFileJson;
917  return true;
918  }
919  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
920  fsize = previousFileSize_ = buf.st_size;
921  nextFile = nextFileRaw;
922  return true;
923  }
924  return false;
925  }
926  }
927  // no new file found
928  return false;
929  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkFileRead()

bool evf::EvFDaqDirector::checkFileRead ( char *  buf,
int  infile,
std::size_t  buf_sz,
std::string const &  path 
)
staticprivate

Definition at line 1124 of file EvFDaqDirector.cc.

References visDQMUpload::buf, timingPdfMaker::infile, castor_dqm_sourceclient_file_cfg::path, and fileinputsource_cfi::read.

Referenced by parseFRDFileHeader(), and rawFileHasHeader().

1124  {
1125  ssize_t sz_read = ::read(infile, buf, buf_sz);
1126  if (sz_read < 0) {
1127  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1128  if (infile != -1)
1129  close(infile);
1130  return false;
1131  }
1132  if ((size_t)sz_read < buf_sz) {
1133  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1134  if (infile != -1)
1135  close(infile);
1136  return false;
1137  }
1138  return true;
1139  }
Log< level::Error, false > LogError

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1544 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, mps_fire::dest, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, fileBrokerPort_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1551  {
1552  EvFDaqDirector::FileStatus fileStatus = noFile;
1553  serverError = false;
1554  std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1555 
1556  boost::system::error_code ec;
1557  try {
1558  while (true) {
1559  //socket connect
1560  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1562 
1563  if (ec) {
1564  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1565  serverError = true;
1566  break;
1567  }
1568  }
1569 
1570  boost::asio::streambuf request;
1571  std::ostream request_stream(&request);
1572  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1573  if (maxLS >= 0) {
1574  std::stringstream spath;
1575  spath << path << "&stopls=" << maxLS;
1576  path = spath.str();
1577  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1578  }
1579  request_stream << "GET " << path << " HTTP/1.1\r\n";
1580  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1581  request_stream << "Accept: */*\r\n";
1582  request_stream << "Connection: keep-alive\r\n\r\n";
1583 
1584  boost::asio::write(*socket_, request, ec);
1585  if (ec) {
1586  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1587  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1588  //we got disconnected, try to reconnect to the server before writing the request
1590  if (ec) {
1591  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1592  serverError = true;
1593  break;
1594  }
1595  continue;
1596  }
1597  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1598  serverError = true;
1599  break;
1600  }
1601 
1602  boost::asio::streambuf response;
1603  boost::asio::read_until(*socket_, response, "\r\n", ec);
1604  if (ec) {
1605  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1606  serverError = true;
1607  break;
1608  }
1609 
1610  std::istream response_stream(&response);
1611 
1612  std::string http_version;
1613  response_stream >> http_version;
1614 
1615  response_stream >> serverHttpStatus;
1616 
1617  std::string status_message;
1618  std::getline(response_stream, status_message);
1619  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1620  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1621  serverError = true;
1622  break;
1623  }
1624  if (serverHttpStatus != 200) {
1625  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1626  serverError = true;
1627  break;
1628  }
1629 
1630  // Process the response headers.
1632  while (std::getline(response_stream, header) && header != "\r") {
1633  }
1634 
1635  std::string fileInfo;
1636  std::map<std::string, std::string> serverMap;
1637  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1638  auto pos = fileInfo.find('=');
1639  if (pos == std::string::npos)
1640  continue;
1641  auto stitle = fileInfo.substr(0, pos);
1642  auto svalue = fileInfo.substr(pos + 1);
1643  serverMap[stitle] = svalue;
1644  }
1645 
1646  //check that response run number if correct
1647  auto server_version = serverMap.find("version");
1648  assert(server_version != serverMap.end());
1649 
1650  auto server_run = serverMap.find("runnumber");
1651  assert(server_run != serverMap.end());
1652  assert(run_nstring_ == server_run->second);
1653 
1654  auto server_state = serverMap.find("state");
1655  assert(server_state != serverMap.end());
1656 
1657  auto server_eols = serverMap.find("lasteols");
1658  assert(server_eols != serverMap.end());
1659 
1660  auto server_ls = serverMap.find("lumisection");
1661 
1662  int version_maj = 1;
1663  int version_min = 0;
1664  int version_rev = 0;
1665  {
1666  auto* s_ptr = server_version->second.c_str();
1667  if (!server_version->second.empty() && server_version->second[0] == '"')
1668  s_ptr++;
1669  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1670  if (res < 3) {
1671  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1672  if (res < 2) {
1673  res = sscanf(s_ptr, "%d", &version_maj);
1674  if (res < 1) {
1675  //expecting at least 1 number (major version)
1676  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1677  }
1678  }
1679  }
1680  }
1681 
1682  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1683  if (server_ls != serverMap.end())
1684  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1685  else
1686  serverLS = closedServerLS + 1;
1687 
1688  std::string s_state = server_state->second;
1689  if (s_state == "STARTING") //initial, always empty starting with LS 1
1690  {
1691  auto server_file = serverMap.find("file");
1692  assert(server_file == serverMap.end()); //no file with starting state
1693  fileStatus = noFile;
1694  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1695  } else if (s_state == "READY") {
1696  auto server_file = serverMap.find("file");
1697  if (server_file == serverMap.end()) {
1698  //can be returned by server if files from new LS already appeared but LS is not yet closed
1699  if (serverLS <= closedServerLS)
1700  serverLS = closedServerLS + 1;
1701  fileStatus = noFile;
1702  edm::LogInfo("EvFDaqDirector")
1703  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1704  } else {
1705  std::string filestem;
1706  std::string fileprefix;
1707  auto server_fileprefix = serverMap.find("fileprefix");
1708 
1709  if (server_fileprefix != serverMap.end()) {
1710  auto pssize = server_fileprefix->second.size();
1711  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1712  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1713  else
1714  fileprefix = server_fileprefix->second;
1715  }
1716 
1717  //remove string literals
1718  auto ssize = server_file->second.size();
1719  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1720  filestem = server_file->second.substr(1, ssize - 2);
1721  else
1722  filestem = server_file->second;
1723  assert(!filestem.empty());
1724  if (version_maj > 1) {
1725  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1726  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1727  nextFileJson = "";
1728  rawHeader = true;
1729  } else {
1730  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1731  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732  nextFileJson = filestem + ".jsn";
1733  rawHeader = false;
1734  }
1735  fileStatus = newFile;
1736  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1737  << serverLS << " file:" << filestem;
1738  }
1739  } else if (s_state == "EOLS") {
1740  serverLS = closedServerLS + 1;
1741  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1742  fileStatus = noFile;
1743  } else if (s_state == "EOR") {
1744  //server_eor = serverMap.find("iseor");
1745  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1746  fileStatus = runEnded;
1747  } else if (s_state == "NORUN") {
1748  auto err_msg = serverMap.find("errormessage");
1749  if (err_msg != serverMap.end())
1750  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1751  else
1752  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1753  edm::LogWarning("EvFDaqDirector") << "executing run end";
1754  fileStatus = runEnded;
1755  } else if (s_state == "ERROR") {
1756  auto err_msg = serverMap.find("errormessage");
1757  if (err_msg != serverMap.end())
1758  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1759  else
1760  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1761  fileStatus = noFile;
1762  serverError = true;
1763  } else {
1764  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1765  fileStatus = noFile;
1766  serverError = true;
1767  }
1768 
1769  // Read until EOF, writing data to output as we go.
1770  if (!fileBrokerKeepAlive_) {
1771  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1772  }
1773  if (ec != boost::asio::error::eof) {
1774  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1775  serverError = true;
1776  }
1777  }
1778 
1779  break;
1780  }
1781 
1782  } catch (std::exception const& e) {
1783  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1784  serverError = true;
1785  }
1786 
1787  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1788  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1789  if (ec) {
1790  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1791  }
1792  socket_->close(ec);
1793  if (ec) {
1794  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1795  }
1796  }
1797 
1798  if (serverError) {
1799  if (socket_->is_open())
1800  socket_->close(ec);
1801  if (ec) {
1802  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1803  }
1804  fileStatus = noFile;
1805  sleep(1); //back-off if error detected
1806  }
1807 
1808  return fileStatus;
1809  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
std::string fileBrokerPort_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 984 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

984  {
985  //used for backpressure mechanisms and monitoring
986  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
987  struct stat buf;
988  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
989  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
990  close(bol_fd);
991  }
992  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 994 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

997  {
998  if (currentLumiSection > 0) {
999  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1000  struct stat buf;
1001  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1002  if (!found) {
1003  if (doCreateEoLS) {
1004  int eol_fd =
1005  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1006  close(eol_fd);
1007  }
1008  if (doCreateBoLS)
1009  createBoLSFile(lumiSection, false);
1010  }
1011  } else if (doCreateBoLS) {
1012  createBoLSFile(lumiSection, true); //needed for initial lumisection
1013  }
1014  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2000 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

2000  {
2001  std::string proc_flag = run_dir_ + "/processing";
2002  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2003  close(proc_flag_fd);
2004  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1961 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1961  {
1962  // create open dir if not already there
1963 
1965  if (!std::filesystem::is_directory(openPath)) {
1966  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1967  std::filesystem::create_directories(openPath);
1968  }
1969  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 381 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

381  {
383  desc.setComment(
384  "Service used for file locking arbitration and for propagating information between other EvF components");
385  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
386  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
387  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
388  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
389  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
390  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
391  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
392  desc.addUntracked<bool>("useFileBroker", false)
393  ->setComment("Use BU file service to grab input data instead of NFS file locking");
394  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
395  ->setComment("Allow service to discover BU address from hltd configuration");
396  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
397  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
398  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
399  ->setComment("Use keep alive to avoid using large number of sockets");
400  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
401  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
402  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
403  ->setComment("Lock polling interval in microseconds for the input directory file lock");
404  desc.addUntracked<bool>("outputAdler32Recheck", false)
405  ->setComment("Check Adler32 of per-process output files while micro-merging");
406  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
407  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
408  desc.addUntracked<std::string>("mergingPset", "")
409  ->setComment("Name of merging PSet to look for merging type definitions for streams");
410  descriptions.add("EvFDaqDirector", desc);
411  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 80 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

80 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 537 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

537  {
538  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
539  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getBUBaseDirs()

std::vector<std::string> const& evf::EvFDaqDirector::getBUBaseDirs ( ) const
inline

Definition at line 194 of file EvFDaqDirector.h.

References bu_base_dirs_all_.

Referenced by DAQSource::DAQSource().

194 { return bu_base_dirs_all_; }
std::vector< std::string > bu_base_dirs_all_

◆ getBUBaseDirsNSources()

std::vector<int> const& evf::EvFDaqDirector::getBUBaseDirsNSources ( ) const
inline

Definition at line 195 of file EvFDaqDirector.h.

References bu_base_dirs_nSources_.

Referenced by DAQSource::DAQSource().

195 { return bu_base_dirs_nSources_; }
std::vector< int > bu_base_dirs_nSources_

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 466 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

466  {
468  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 529 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

529  {
530  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
531  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 533 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

533  {
534  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
535  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFileName()

std::string evf::EvFDaqDirector::getEoRFileName ( ) const

Definition at line 543 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), and run_.

Referenced by DAQSource::readSupervisor().

543 { return fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 541 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

541 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 545 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

545 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 547 of file EvFDaqDirector.cc.

References bu_run_dir_.

547 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 494 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

494  {
496  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 498 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

498  {
500  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 450 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

450  {
452  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1985 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1985  {
1986  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1988  struct stat buf;
1989  unsigned int lscount = 1;
1990  do {
1991  std::stringstream ss;
1992  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1993  fullpath = ss.str();
1994  lscount++;
1995  } while (stat(fullpath.c_str(), &buf) == 0);
1996  return lscount - 1;
1997  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 486 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

486  {
488  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 482 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

482  {
484  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 512 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

513  {
515  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 525 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

525  {
527  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs,
bool  requireHeader = true 
)

Definition at line 1811 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1819  {
1820  EvFDaqDirector::FileStatus fileStatus = noFile;
1821 
1822  //int retval = -1;
1823  //int lock_attempts = 0;
1824  //long total_lock_attempts = 0;
1825 
1826  struct stat buf;
1827  int stopFileLS = -1;
1828  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1829  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1830  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1831  if (stopFileCheck == 0)
1832  stopFileLS = readLastLSEntry(stopFilePath_);
1833  else
1834  stopFileLS = 1; //stop without drain if only pid is stopped
1835  if (!stop_ls_override_) {
1836  //if lumisection is higher than in stop file, should quit at next from current
1837  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1838  stopFileLS = stop_ls_override_ = ls;
1839  } else
1840  stopFileLS = stop_ls_override_;
1841  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1842  << stopFileLS;
1843  //return runEnded;
1844  } else //if file was removed before reaching stop condition, reset this
1845  stop_ls_override_ = 0;
1846 
1847  /* look for EoLS
1848  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1849  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1850  ls++;
1851  return noFile;
1852  }
1853  */
1854 
1855  timeval ts_lockbegin;
1856  gettimeofday(&ts_lockbegin, nullptr);
1857 
1858  std::string nextFileJson;
1859  uint32_t serverLS, closedServerLS;
1860  unsigned int serverHttpStatus;
1861  bool serverError;
1862 
1863  //local lock to force index json and EoLS files to appear in order
1865  lockFULocal();
1866 
1867  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1868  bool rawHeader = false;
1869  fileStatus = contactFileBroker(
1870  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1871 
1872  if (serverError) {
1873  //do not update anything
1875  unlockFULocal();
1876  return noFile;
1877  }
1878 
1879  //handle creation of BoLS files if lumisection has changed
1880  if (currentLumiSection == 0) {
1881  if (fileStatus == runEnded)
1882  createLumiSectionFiles(closedServerLS, 0, true, false);
1883  else
1884  createLumiSectionFiles(serverLS, 0, true, false);
1885  } else {
1886  if (closedServerLS >= currentLumiSection) {
1887  //only BoLS files
1888  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1889  createLumiSectionFiles(i + 1, i, true, false);
1890  }
1891  }
1892 
1893  bool fileFound = true;
1894 
1895  if (fileStatus == newFile) {
1896  if (rawHeader > 0)
1897  serverEventsInNewFile = grabNextJsonFromRaw(
1898  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1899  else
1900  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1901  }
1902  //closing file in case of any error
1903  if (serverEventsInNewFile < 0 && rawFd != -1) {
1904  close(rawFd);
1905  rawFd = -1;
1906  }
1907 
1908  //can unlock because all files have been created locally
1910  unlockFULocal();
1911 
1912  if (!fileFound) {
1913  //catch condition where directory got deleted
1914  fileStatus = noFile;
1915  struct stat buf;
1916  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1917  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1918  fileStatus = runEnded;
1919  }
1920  }
1921 
1922  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1923  //so that EoLS files can not appear locally before index files
1924  if (currentLumiSection == 0) {
1925  lockFULocal2();
1926  if (fileStatus == runEnded) {
1927  createLumiSectionFiles(closedServerLS, 0, false, true);
1928  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1929  } else {
1930  createLumiSectionFiles(serverLS, 0, false, true);
1931  }
1932  unlockFULocal2();
1933  } else {
1934  if (closedServerLS >= currentLumiSection) {
1935  //lock exclusive to create EoLS files
1936  lockFULocal2();
1937  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1938  createLumiSectionFiles(i + 1, i, false, true);
1939  unlockFULocal2();
1940  }
1941  }
1942 
1943  if (fileStatus == runEnded)
1944  ls = std::max(currentLumiSection, serverLS);
1945  else if (fileStatus == newFile) {
1946  assert(serverLS >= ls);
1947  ls = serverLS;
1948  } else if (fileStatus == noFile) {
1949  if (serverLS >= ls)
1950  ls = serverLS;
1951  else {
1952  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1953  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1954  sleep(1);
1955  }
1956  }
1957 
1958  return fileStatus;
1959  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 771 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

771  {
772  std::ifstream ij(BUEoLSFile);
773  Json::Value deserializeRoot;
775 
776  if (!reader.parse(ij, deserializeRoot)) {
777  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
778  return -1;
779  }
780 
782  DataPoint dp;
783  dp.deserialize(deserializeRoot);
784 
785  //read definition
786  if (readEolsDefinition_) {
787  //std::string def = boost::algorithm::trim(dp.getDefinition());
788  std::string def = dp.getDefinition();
789  if (def.empty())
790  readEolsDefinition_ = false;
791  while (!def.empty()) {
793  if (def.find('/') == 0)
794  fullpath = def;
795  else
796  fullpath = bu_run_dir_ + '/' + def;
797  struct stat buf;
798  if (stat(fullpath.c_str(), &buf) == 0) {
799  DataPointDefinition eolsDpd;
800  std::string defLabel = "legend";
801  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
802  if (eolsDpd.getNames().empty()) {
803  //try with "data" label if "legend" format is not used
804  eolsDpd = DataPointDefinition();
805  defLabel = "data";
806  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
807  }
808  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
809  if (eolsDpd.getNames().at(i) == "NFiles")
811  readEolsDefinition_ = false;
812  break;
813  }
814  //check if we can still find definition
815  if (def.size() <= 1 || def.find('/') == std::string::npos) {
816  readEolsDefinition_ = false;
817  break;
818  }
819  def = def.substr(def.find('/') + 1);
820  }
821  }
822 
823  if (dp.getData().size() > eolsNFilesIndex_)
824  data = dp.getData()[eolsNFilesIndex_];
825  else {
826  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
827  return -1;
828  }
829  return std::stoi(data);
830  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Unserialize a JSON document into a Value.
Definition: reader.h:17
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Represents a JSON value.
Definition: value.h:101

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 470 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

470  {
472  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 490 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

490  {
491  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
492  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 462 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

462  {
463  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
464  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 474 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

474  {
476  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 502 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

503  {
505  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 458 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

458  {
459  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
460  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 517 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

517  {
519  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 478 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

478  {
480  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 507 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

508  {
510  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 454 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

454  {
456  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 521 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

521  {
523  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

References run_.

118 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

107 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 180 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

180 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  ) const
inline

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  ,
MergeType  defaultType 
) const
inline

Definition at line 188 of file EvFDaqDirector.h.

References MergeTypeNames_.

188  {
189  return MergeTypeNames_[defaultType];
190  }
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1269 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, das-up-to-nevents::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1272  {
1273  fileFound = true;
1274 
1275  //should be ported to use fffnaming
1276  std::ostringstream fileNameWithPID;
1277  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1278  << std::setw(5) << pid_ << ".jsn";
1279 
1280  // assemble json destination path
1281  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1282 
1283  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1284 
1285  int infile = -1, outfile = -1;
1286 
1287  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1288  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1289  << strerror(errno);
1290  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1291  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1292  << jsonSourcePath << " : " << strerror(errno);
1293  if (errno == ENOENT)
1294  fileFound = false;
1295  return -1;
1296  }
1297  }
1298 
1299  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1300  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1301  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1302  if (errno == EEXIST) {
1303  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1304  << " : ";
1305  ::close(infile);
1306  return -1;
1307  }
1308  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1309  << strerror(errno);
1310  struct stat out_stat;
1311  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1312  edm::LogWarning("EvFDaqDirector")
1313  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1314  if (unlink(jsonDestPath.c_str()) == -1) {
1315  edm::LogWarning("EvFDaqDirector")
1316  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1317  }
1318  }
1319  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1320  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1321  << jsonDestPath << " : " << strerror(errno);
1322  ::close(infile);
1323  return -1;
1324  }
1325  }
1326  //copy contents
1327  const std::size_t buf_sz = 512;
1328  std::size_t tot_written = 0;
1329  std::unique_ptr<char[]> buf(new char[buf_sz]);
1330 
1331  ssize_t sz, sz_read = 1, sz_write;
1332  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1333  sz_write = 0;
1334  do {
1335  assert(sz_read - sz_write > 0);
1336  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1337  sz_read = sz; // cause read loop termination
1338  break;
1339  }
1340  assert(sz > 0);
1341  sz_write += sz;
1342  tot_written += sz;
1343  } while (sz_write < sz_read);
1344  }
1345  close(infile);
1346  close(outfile);
1347 
1348  if (tot_written > 0) {
1349  //leave file if it was empty for diagnosis
1350  if (unlink(jsonSourcePath.c_str()) == -1) {
1351  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1352  << strerror(errno);
1353  return -1;
1354  }
1355  } else {
1356  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1357  << jsonSourcePath;
1358  return -1;
1359  }
1360 
1361  Json::Value deserializeRoot;
1363 
1364  std::string data;
1365  std::stringstream ss;
1366  bool result;
1367  try {
1368  if (tot_written <= buf_sz) {
1369  result = reader.parse(buf.get(), deserializeRoot);
1370  } else {
1371  //json will normally not be bigger than buf_sz bytes
1372  try {
1373  std::ifstream ij(jsonDestPath);
1374  ss << ij.rdbuf();
1375  } catch (std::filesystem::filesystem_error const& ex) {
1376  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1377  return -1;
1378  }
1379  result = reader.parse(ss.str(), deserializeRoot);
1380  }
1381  if (!result) {
1382  if (tot_written <= buf_sz)
1383  ss << buf.get();
1384  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1385  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1386  << ss.str() << ".";
1387  return -1;
1388  }
1389 
1390  //read BU JSON
1391  DataPoint dp;
1392  dp.deserialize(deserializeRoot);
1393  bool success = false;
1394  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1395  if (dpd_->getNames().at(i) == "NEvents")
1396  if (i < dp.getData().size()) {
1397  data = dp.getData()[i];
1398  success = true;
1399  break;
1400  }
1401  }
1402  if (!success) {
1403  if (!dp.getData().empty())
1404  data = dp.getData()[0];
1405  else {
1406  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1407  << "grabNextJsonFile - "
1408  << " error reading number of events from BU JSON; No input value. data -: " << data;
1409  return -1;
1410  }
1411  }
1412 
1413  //try to read raw file size
1414  fileSizeFromJson = -1;
1415  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1416  if (dpd_->getNames().at(i) == "NBytes") {
1417  if (i < dp.getData().size()) {
1418  std::string dataSize = dp.getData()[i];
1419  try {
1420  fileSizeFromJson = std::stol(dataSize);
1421  } catch (const std::exception&) {
1422  //non-fatal currently, processing can continue without this value
1423  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1424  << "Input value is -: " << dataSize;
1425  }
1426  break;
1427  }
1428  }
1429  }
1430  return std::stoi(data);
1431  } catch (const std::out_of_range& e) {
1432  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1433  << "Input value is -: " << data;
1434  } catch (const std::invalid_argument& e) {
1435  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1436  << "Input value is -: " << data;
1437  } catch (std::runtime_error const& e) {
1438  //Can be thrown by Json parser
1439  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1440  }
1441 
1442  catch (std::exception const& e) {
1443  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1444  } catch (...) {
1445  //unknown exception
1446  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1447  }
1448 
1449  return -1;
1450  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Unserialize a JSON document into a Value.
Definition: reader.h:17
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Log< level::Warning, false > LogWarning
Represents a JSON value.
Definition: value.h:101
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1452 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1452  {
1453  std::string data;
1454  try {
1455  // assemble json destination path
1456  std::filesystem::path jsonDestPath(baseRunDir());
1457 
1458  //should be ported to use fffnaming
1459  std::ostringstream fileNameWithPID;
1460  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1461  << ".jsn";
1462  jsonDestPath /= fileNameWithPID.str();
1463 
1464  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1465  try {
1466  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1467  } catch (std::filesystem::filesystem_error const& ex) {
1468  // Input dir gone?
1469  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1470  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1471  sleep(1);
1472  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473  }
1474  unlockFULocal();
1475 
1476  try {
1477  //sometimes this fails but file gets deleted
1478  std::filesystem::remove(jsonSourcePath);
1479  } catch (std::filesystem::filesystem_error const& ex) {
1480  // Input dir gone?
1481  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1482  } catch (std::exception const& ex) {
1483  // Input dir gone?
1484  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1485  }
1486 
1487  std::ifstream ij(jsonDestPath);
1488  Json::Value deserializeRoot;
1490 
1491  std::stringstream ss;
1492  ss << ij.rdbuf();
1493  if (!reader.parse(ss.str(), deserializeRoot)) {
1494  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1495  << "\nERROR:\n"
1496  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1497  << ss.str() << ".";
1498  throw std::runtime_error("Cannot deserialize input JSON file");
1499  }
1500 
1501  //read BU JSON
1502  std::string data;
1503  DataPoint dp;
1504  dp.deserialize(deserializeRoot);
1505  bool success = false;
1506  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1507  if (dpd_->getNames().at(i) == "NEvents")
1508  if (i < dp.getData().size()) {
1509  data = dp.getData()[i];
1510  success = true;
1511  }
1512  }
1513  if (!success) {
1514  if (!dp.getData().empty())
1515  data = dp.getData()[0];
1516  else
1517  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1518  << " error reading number of events from BU JSON -: No input value " << data;
1519  }
1520  return std::stoi(data);
1521  } catch (std::filesystem::filesystem_error const& ex) {
1522  // Input dir gone?
1523  unlockFULocal();
1524  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1525  } catch (std::runtime_error const& e) {
1526  // Another process grabbed the file and NFS did not register this
1527  unlockFULocal();
1528  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1529  } catch (const std::out_of_range&) {
1530  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1531  << "Input value is -: " << data;
1532  } catch (const std::invalid_argument&) {
1533  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1534  << "Input value is -: " << data;
1535  } catch (std::exception const& e) {
1536  // BU run directory disappeared?
1537  unlockFULocal();
1538  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1539  }
1540 
1541  return -1;
1542  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Unserialize a JSON document into a Value.
Definition: reader.h:17
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:233
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Represents a JSON value.
Definition: value.h:101
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile,
bool  requireHeader = true 
)

Definition at line 1177 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, das-up-to-nevents::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1184  {
1185  fileFound = true;
1186 
1187  //take only first three tokens delimited by "_" in the renamed raw file name
1188  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1189  size_t pos = 0, n_tokens = 0;
1190  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1191  }
1192  std::string reducedJsonStem = jsonStem.substr(0, pos);
1193 
1194  std::ostringstream fileNameWithPID;
1195  //should be ported to use fffnaming
1196  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1197 
1198  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1199 
1200  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1201 
1202  //parse RAW file header if it exists
1203  uint32_t lsFromRaw;
1204  int32_t nbEventsWrittenRaw;
1205  int64_t fileSizeFromRaw;
1206  uint16_t rawDataType;
1207  auto ret = parseFRDFileHeader(rawSourcePath,
1208  rawFd,
1209  rawHeaderSize,
1210  rawDataType,
1211  lsFromRaw,
1212  nbEventsWrittenRaw,
1213  fileSizeFromRaw,
1214  requireHeader,
1215  true,
1216  closeFile);
1217  if (ret != 0) {
1218  if (ret == 1)
1219  fileFound = false;
1220  return -1;
1221  }
1222 
1223  int outfile;
1224  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1225  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1226  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1227  if (errno == EEXIST) {
1228  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1229  << " : ";
1230  return -1;
1231  }
1232  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1233  << strerror(errno);
1234  struct stat out_stat;
1235  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1236  edm::LogWarning("EvFDaqDirector")
1237  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1238  << jsonDestPath;
1239  if (unlink(jsonDestPath.c_str()) == -1) {
1240  edm::LogWarning("EvFDaqDirector")
1241  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1242  }
1243  }
1244  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1245  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1246  << jsonDestPath << " : " << strerror(errno);
1247  return -1;
1248  }
1249  }
1250  //write JSON file (TODO: use jsoncpp)
1251  std::stringstream ss;
1252  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1253  std::string sstr = ss.str();
1254 
1255  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1256  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1257  << " : " << strerror(errno);
1258  return -1;
1259  }
1260  close(outfile);
1261  if (serverLS && serverLS != lsFromRaw)
1262  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1263  << " and raw file header LS " << lsFromRaw;
1264 
1265  fileSizeFromHeader = fileSizeFromRaw;
1266  return nbEventsWrittenRaw;
1267  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 179 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_base_dirs_all_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, mps_fire::i, init_lock_, svgfig::load(), eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm::shutdown_flag, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, tryInitializeFuLockFile(), and useFileBroker_.

Referenced by preallocate().

179  {
180  // check if base dir exists or create it accordingly
181  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
182  if (retval != 0 && errno != EEXIST) {
183  throw cms::Exception("DaqDirector")
184  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
185  }
186 
187  //create run dir in base dir
188  umask(0);
189  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
190  if (retval != 0 && errno != EEXIST) {
191  throw cms::Exception("DaqDirector")
192  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
193  }
194 
195  //create fu-local.lock in run open dir
196  if (!directorBU_) {
198  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
200  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
201  if (fulocal_rwlock_fd_ == -1)
202  throw cms::Exception("DaqDirector")
203  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
204  chmod(fulocal_lock_.c_str(), 0777);
205  fsync(fulocal_rwlock_fd_);
206  //open second fd for another input source thread
208  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
209  if (fulocal_rwlock_fd2_ == -1)
210  throw cms::Exception("DaqDirector")
211  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
212  }
213 
214  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
215  //for BU, it is created at this point
216  if (directorBU_) {
218  std::string bulockfile = bu_run_dir_ + "/bu.lock";
219  fulockfile_ = bu_run_dir_ + "/fu.lock";
220 
221  //make or find bu run dir
222  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
223  if (retval != 0 && errno != EEXIST) {
224  throw cms::Exception("DaqDirector")
225  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
226  }
227  bu_run_open_dir_ = bu_run_dir_ + "/open";
228  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
229  if (retval != 0 && errno != EEXIST) {
230  throw cms::Exception("DaqDirector")
231  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
232  }
233 
234  // the BU director does not need to know about the fu lock
235  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
236  if (bu_writelock_fd_ == -1)
237  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
238  else
239  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
240  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
241  if (bu_w_lock_stream == nullptr)
242  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
243 
244  // BU INITIALIZES LOCK FILE
245  // FU LOCK FILE OPEN
246  openFULockfileStream(true);
248  fflush(fu_rw_lock_stream);
249  close(fu_readwritelock_fd_);
250 
251  if (!hltSourceDirectory_.empty()) {
252  struct stat buf;
253  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
254  std::string hltdir = bu_run_dir_ + "/hlt";
255  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
256  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
257  if (retval != 0 && errno != EEXIST)
258  throw cms::Exception("DaqDirector")
259  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
260 
261  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
262  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
263 
264  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
265  for (auto& optfile : optfiles) {
266  try {
267  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
268  } catch (...) {
269  }
270  }
271 
272  std::filesystem::rename(tmphltdir, hltdir);
273  } else
274  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
275  }
276  //else{}//no configuration specified
277  } else {
278  // for FU, check if bu base dir exists
279 
280  auto checkExists = [=](std::string const& bu_base_dir) -> void {
281  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
282  if (retval != 0 && errno != EEXIST) {
283  throw cms::Exception("DaqDirector")
284  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
285  }
286  };
287 
288  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
289  int cnt = 0;
290  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
291  //stat should trigger autofs mount (mkdir could fail with access denied first time)
292  struct stat statbuf;
293  stat(bu_base_dir.c_str(), &statbuf);
294  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
295  if (retval != 0 && errno != EEXIST) {
296  usleep(500000);
297  cnt++;
298  if (cnt % 20 == 0)
299  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
300  if (cnt > 120)
301  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
302  << " mkdir error:" << strerror(errno);
303  continue;
304  }
305  break;
306  }
307  };
308 
309  if (!bu_base_dirs_all_.empty()) {
310  std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
311  checkExists(check_dir);
312  bu_run_dir_ = check_dir + "/" + run_string_;
313  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
314  waitForDir(bu_base_dirs_all_[i]);
315  } else {
316  checkExists(bu_base_dir_);
318  }
319 
320  fulockfile_ = bu_run_dir_ + "/fu.lock";
321  if (!useFileBroker_)
322  openFULockfileStream(false);
323  }
324 
325  pthread_mutex_init(&init_lock_, nullptr);
326 
327  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
328  std::stringstream sstp;
329  sstp << stopFilePath_ << "_pid" << pid_;
330  stopFilePathPid_ = sstp.str();
331 
332  if (!directorBU_) {
333  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
334  struct stat statbuf;
335  if (!stat(defPath.c_str(), &statbuf))
336  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
337  else {
338  //look in source directory if not present in ramdisk
339  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
340  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
341  if (stat(defPath.c_str(), &statbuf)) {
342  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
343  if (stat(defPath.c_str(), &statbuf)) {
344  defPath = defPathSuffix;
345  }
346  }
347  }
348  dpd_ = new DataPointDefinition();
349  std::string defLabel = "data";
350  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
351  }
352  }
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
def load(fileName)
Definition: svgfig.py:547
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by DAQSource::getNextDataBlock(), and FedRawDataInputSource::getNextEvent().

122 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 964 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

964  {
965  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
966  flock(fulocal_rwlock_fd_, LOCK_SH);
967  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 974 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

974  {
975  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
976  flock(fulocal_rwlock_fd2_, LOCK_EX);
977  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 960 of file EvFDaqDirector.cc.

References init_lock_.

960 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2019 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and to_string().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

2019  {
2020  struct stat buf;
2021  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2022  }
static std::string to_string(const XMLCh *ch)
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2006 of file EvFDaqDirector.cc.

References command_line::start.

2006  {
2007 #ifdef __APPLE__
2008  return {start, len, pid, type, whence};
2009 #else
2010  return {type, whence, start, len, pid};
2011 #endif
2012  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 123 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

123 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 941 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

941  {
942  if (create) {
944  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
945  chmod(fulockfile_.c_str(), 0766);
946  } else {
947  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
948  }
949  if (fu_readwritelock_fd_ == -1)
950  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
951  << " create:" << create << " error:" << strerror(errno);
952  else
953  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
954 
955  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
956  if (fu_rw_lock_stream == nullptr)
957  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
958  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 108 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

108 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 73 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

Referenced by DAQSource::DAQSource().

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint16_t &  rawDataType,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 1016 of file EvFDaqDirector.cc.

References checkFileRead(), edm::streamer::FRDFileHeaderContent_v2::dataType_, edm::streamer::FRDFileHeaderContent_v1::eventCount_, edm::streamer::FRDFileHeaderContent_v2::eventCount_, edm::streamer::FRDFileHeaderContent_v1::fileSize_, edm::streamer::FRDFileHeaderContent_v2::fileSize_, edm::streamer::getFRDFileHeaderVersion(), edm::streamer::FRDFileHeaderContent_v1::headerSize_, edm::streamer::FRDFileHeaderContent_v2::headerSize_, edm::streamer::FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, edm::streamer::FRDFileHeaderContent_v1::lumiSection_, edm::streamer::FRDFileHeaderContent_v2::lumiSection_, and edm::streamer::FRDFileHeaderIdentifier::version_.

Referenced by grabNextJsonFromRaw(), FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1025  {
1026  int infile;
1027 
1028  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1029  if (retry) {
1030  edm::LogWarning("EvFDaqDirector")
1031  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1032  return parseFRDFileHeader(rawSourcePath,
1033  rawFd,
1034  rawHeaderSize,
1035  rawDataType,
1036  lsFromHeader,
1037  eventsFromHeader,
1038  fileSizeFromHeader,
1039  requireHeader,
1040  false,
1041  closeFile);
1042  } else {
1043  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1044  edm::LogError("EvFDaqDirector")
1045  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1046  if (errno == ENOENT)
1047  return 1; // error && file not found
1048  else
1049  return -1;
1050  }
1051  }
1052  }
1053 
1054  //v2 is the largest possible read
1055  char hdr[sizeof(FRDFileHeader_v2)];
1056  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1057  return -1;
1058 
1060  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1061 
1062  if (frd_version == 0) {
1063  //no header (specific sequence not detected)
1064  if (requireHeader) {
1065  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1066  close(infile);
1067  return -1;
1068  } else {
1069  //no header, but valid file
1070  lseek(infile, 0, SEEK_SET);
1071  rawHeaderSize = 0;
1072  lsFromHeader = 0;
1073  eventsFromHeader = -1;
1074  fileSizeFromHeader = -1;
1075  }
1076  } else if (frd_version == 1) {
1077  //version 1 header
1078  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1079  return -1;
1081  uint32_t headerSizeRaw = fhContent->headerSize_;
1082  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1083  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084  << " v:" << frd_version;
1085  close(infile);
1086  return -1;
1087  }
1088  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1089  rawDataType = 0;
1090  lsFromHeader = fhContent->lumiSection_;
1091  eventsFromHeader = (int32_t)fhContent->eventCount_;
1092  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093  rawHeaderSize = fhContent->headerSize_;
1094 
1095  } else if (frd_version == 2) {
1096  //version 2 heade
1097  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1098  return -1;
1100  uint32_t headerSizeRaw = fhContent->headerSize_;
1101  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1102  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1103  << " v:" << frd_version;
1104  close(infile);
1105  return -1;
1106  }
1107  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1108  rawDataType = fhContent->dataType_;
1109  lsFromHeader = fhContent->lumiSection_;
1110  eventsFromHeader = (int32_t)fhContent->eventCount_;
1111  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1112  rawHeaderSize = fhContent->headerSize_;
1113  }
1114 
1115  if (closeFile) {
1116  close(infile);
1117  infile = -1;
1118  }
1119 
1120  rawFd = infile;
1121  return 0; //OK
1122  }
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:31
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:81
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 373 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

373  {
374  initRun();
375 
376  nThreads_ = bounds.maxNumberOfStreams();
377  nStreams_ = bounds.maxNumberOfThreads();
378  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
379  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 413 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

413  {
414  //assert(run_ == id.run());
415 
416  // check if the requested run is the latest one - issue a warning if it isn't
418  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
419  << ". This is not the highest run " << dirManager_.findHighestRunDir();
420  }
421  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 432 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

432  {
433  //delete all files belonging to just closed lumi
434  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
436  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
437  return;
438  }
439 
440  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
441  auto it = filesToDeletePtr_->begin();
442  while (it != filesToDeletePtr_->end()) {
443  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
444  it = filesToDeletePtr_->erase(it);
445  } else
446  it++;
447  }
448  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1141 of file EvFDaqDirector.cc.

References checkFileRead(), edm::streamer::getFRDFileHeaderVersion(), edm::streamer::FRDFileHeaderContent_v1::headerSize_, edm::streamer::FRDFileHeaderContent_v2::headerSize_, edm::streamer::FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, and edm::streamer::FRDFileHeaderIdentifier::version_.

Referenced by bumpFile().

1141  {
1142  int infile;
1143  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1144  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1145  << strerror(errno);
1146  return false;
1147  }
1148  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1149  char hdr[sizeof(FRDFileHeader_v2)];
1150  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1151  return false;
1153  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1154 
1155  if (frd_version == 1) {
1156  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1157  return false;
1159  rawHeaderSize = fhContent->headerSize_;
1160  close(infile);
1161  return true;
1162  } else if (frd_version == 2) {
1163  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1164  return false;
1166  rawHeaderSize = fhContent->headerSize_;
1167  close(infile);
1168  return true;
1169  } else
1170  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1171 
1172  close(infile);
1173  rawHeaderSize = 0;
1174  return false;
1175  }
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:31
Log< level::Error, false > LogError
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:81
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1971 of file EvFDaqDirector.cc.

References jsoncollector::Json::Value::asInt(), geometryDiff::file, jsoncollector::Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1971  {
1972  std::ifstream ij(file);
1973  Json::Value deserializeRoot;
1975 
1976  if (!reader.parse(ij, deserializeRoot)) {
1977  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1978  return -1;
1979  }
1980 
1981  int ret = deserializeRoot.get("lastLS", "").asInt();
1982  return ret;
1983  }
ret
prodAgent to be discontinued
reader
Definition: DQM.py:105
Unserialize a JSON document into a Value.
Definition: reader.h:17
Log< level::Error, false > LogError
Value get(UInt index, const Value &defaultValue) const
Represents a JSON value.
Definition: value.h:101

◆ removeFile()

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 549 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

Referenced by postEndRun().

549  {
550  int retval = remove(filename.c_str());
551  if (retval != 0)
552  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
553  << ". error = " << strerror(errno);
554  }
Log< level::Error, false > LogError

◆ runString()

std::string const& evf::EvFDaqDirector::runString ( ) const
inline

Definition at line 74 of file EvFDaqDirector.h.

References run_string_.

Referenced by DAQSource::DAQSource().

74 { return run_string_; }
std::string run_string_

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 181 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

182  {
183  fileDeleteLockPtr_ = fileDeleteLock;
184  filesToDeletePtr_ = filesToDelete;
185  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

References fms_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

121 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 931 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

931  {
932  if (fu_rw_lock_stream == nullptr)
933  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
934  else {
935  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
936  unsigned int readLs = 1, readIndex = 0;
937  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
938  }
939  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 969 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

969  {
970  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
971  flock(fulocal_rwlock_fd_, LOCK_UN);
972  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 979 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

979  {
980  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
981  flock(fulocal_rwlock_fd2_, LOCK_UN);
982  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 962 of file EvFDaqDirector.cc.

References init_lock_.

962 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 556 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

561  {
562  EvFDaqDirector::FileStatus fileStatus = noFile;
563  rawHeaderSize = 0;
564 
565  int retval = -1;
566  int lock_attempts = 0;
567  long total_lock_attempts = 0;
568 
569  struct stat buf;
570  int stopFileLS = -1;
571  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
572  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
573  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
574  if (stopFileCheck == 0)
575  stopFileLS = readLastLSEntry(stopFilePath_);
576  else
577  stopFileLS = 1; //stop without drain if only pid is stopped
578  if (!stop_ls_override_) {
579  //if lumisection is higher than in stop file, should quit at next from current
580  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
581  stopFileLS = stop_ls_override_ = ls;
582  } else
583  stopFileLS = stop_ls_override_;
584  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
585  << stopFileLS;
586  //return runEnded;
587  } else //if file was removed before reaching stop condition, reset this
588  stop_ls_override_ = 0;
589 
590  timeval ts_lockbegin;
591  gettimeofday(&ts_lockbegin, nullptr);
592 
593  while (retval == -1) {
594  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
595  if (retval == -1)
596  usleep(fuLockPollInterval_);
597  else
598  continue;
599 
600  lock_attempts += fuLockPollInterval_;
601  total_lock_attempts += fuLockPollInterval_;
602  if (lock_attempts > 5000000 || errno == 116) {
603  if (errno == 116)
604  edm::LogWarning("EvFDaqDirector")
605  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
606  else
607  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
608  "fu.lock file are present -: errno "
609  << errno << ":" << strerror(errno) << std::endl;
610 
611  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
612  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
613  ls++;
614  return noFile;
615  }
616 
617  if (stat(bu_run_dir_.c_str(), &buf) != 0)
618  return runEnded;
619  if (stat(fulockfile_.c_str(), &buf) != 0)
620  return runEnded;
621 
622  lock_attempts = 0;
623  }
624  if (total_lock_attempts > 5 * 60000000) {
625  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
626  return runAbort;
627  }
628  }
629 
630  timeval ts_lockend;
631  gettimeofday(&ts_lockend, nullptr);
632  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
633  if (deltat > 0.)
634  lockWaitTime = deltat;
635 
636  if (retval != 0)
637  return fileStatus;
638 
639 #ifdef DEBUG
640  timeval ts_lockend;
641  gettimeofday(&ts_lockend, 0);
642 #endif
643 
644  //open another lock file FD after the lock using main fd has been acquired
645  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
646  if (fu_readwritelock_fd2 == -1)
647  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
648  << " create. error:" << strerror(errno);
649 
650  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
651 
652  // if the stream is readable
653  if (fu_rw_lock_stream2 != nullptr) {
654  unsigned int readLs, readIndex;
655  int check = 0;
656  // rewind the stream
657  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
658  // if rewinded ok
659  if (check == 0) {
660  // read its' values
661  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
662  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
663 
664  unsigned int currentLs = readLs;
665  bool bumpedOk = false;
666  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
667  //no lock file write in this case
668  if (ls && ls + 1 < currentLs)
669  ls++;
670  else {
671  // try to bump (look for new index or EoLS file)
672  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
673  //avoid 2 lumisections jump
674  if (ls && readLs > currentLs && currentLs > ls) {
675  ls++;
676  readLs = currentLs = ls;
677  readIndex = 0;
678  bumpedOk = false;
679  //no write to lock file
680  } else {
681  if (ls == 0 && readLs > currentLs) {
682  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
683  //in this case there is no new file in the same LS
684  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
685  readLs = currentLs;
686  readIndex = 0;
687  bumpedOk = false;
688  //no write to lock file
689  }
690  //update return LS value
691  ls = readLs;
692  }
693  }
694  if (bumpedOk) {
695  // there is a new index file to grab, lock file needs to be updated
696  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
697  if (check == 0) {
698  ftruncate(fu_readwritelock_fd2, 0);
699  // write next index in the file, which is the file the next process should take
700  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
701  fflush(fu_rw_lock_stream2);
702  fsync(fu_readwritelock_fd2);
703  fileStatus = newFile;
704  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
705  } else {
706  edm::LogError("EvFDaqDirector")
707  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
708  setExceptionState = true;
709  return noFile;
710  }
711  } else if (currentLs < readLs) {
712  //there is no new file in next LS (yet), but lock file can be updated to the next LS
713  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
714  if (check == 0) {
715  ftruncate(fu_readwritelock_fd2, 0);
716  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
717  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
718  fflush(fu_rw_lock_stream2);
719  fsync(fu_readwritelock_fd2);
720  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
721  } else {
722  edm::LogError("EvFDaqDirector")
723  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
724  setExceptionState = true;
725  return noFile;
726  }
727  }
728  } else {
729  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
730  << strerror(errno);
731  }
732  } else {
733  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
734  }
735  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
736 
737 #ifdef DEBUG
738  timeval ts_preunlock;
739  gettimeofday(&ts_preunlock, 0);
740  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
741  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
742 #endif
743 
744  //if new json is present, lock file which FedRawDataInputSource will later unlock
745  if (fileStatus == newFile)
746  lockFULocal();
747 
748  //release lock at this point
749  int retvalu = -1;
750  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
751  if (retvalu == -1)
752  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
753 
754 #ifdef DEBUG
755  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
756 #endif
757 
758  if (fileStatus == noFile) {
759  struct stat buf;
760  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
761  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
762  fileStatus = runEnded;
763  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
764  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
765  fileStatus = runEnded;
766  }
767  }
768  return fileStatus;
769  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 78 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

78 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dirs_all_

std::vector<std::string> evf::EvFDaqDirector::bu_base_dirs_all_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getBUBaseDirs(), and initRun().

◆ bu_base_dirs_nSources_

std::vector<int> evf::EvFDaqDirector::bu_base_dirs_nSources_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getBUBaseDirsNSources().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 259 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 261 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 249 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 242 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 252 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 258 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 260 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 251 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 295 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 291 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 244 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 263 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 250 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 294 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 283 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 274 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 272 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 256 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 290 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 276 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 289 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), initRun(), and runString().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 292 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 278 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), initRun(), and useFileBroker().