CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::vector< std::string > const & getBUBaseDirs () const
 
std::vector< int > const & getBUBaseDirsNSources () const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFileName () const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
std::string const & runString () const
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Static Private Member Functions

static bool checkFileRead (char *buf, int infile, std::size_t buf_sz, std::string const &path)
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
std::vector< std::string > bu_base_dirs_all_
 
std::vector< int > bu_base_dirs_nSources_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 39 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dirs_all_, bu_base_dirs_nSources_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, Exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, mps_fire::i, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
43  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
44  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
45  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
46  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
47  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
48  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
49  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
50  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
51  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
52  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
53  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
54  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
55  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
56  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
57  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
58  hostname_(""),
59  bu_readlock_fd_(-1),
60  bu_writelock_fd_(-1),
64  bu_w_lock_stream(nullptr),
65  bu_r_lock_stream(nullptr),
66  fu_rw_lock_stream(nullptr),
69  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
71  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
73  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
74  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
80 
81  //save hostname for later
82  char hostname[33];
83  gethostname(hostname, 32);
84  hostname_ = hostname;
85 
86  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
87  if (fuLockPollIntervalPtr) {
88  try {
89  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
90  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
91  << " us";
92  } catch (const std::exception&) {
93  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
94  }
95  }
96 
97  //override file service parameter if specified by environment
98  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
99  if (fileBrokerParamPtr) {
100  try {
101  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
102  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
103  } catch (const std::exception&) {
104  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
105  }
106  }
107  if (useFileBroker_) {
109  //find BU data address from hltd configuration
111  struct stat buf;
112  if (stat("/etc/appliance/bus.config", &buf) == 0) {
113  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
114  std::getline(busconfig, fileBrokerHost_);
115  }
116  if (fileBrokerHost_.empty())
117  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
118  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
119  throw cms::Exception("EvFDaqDirector")
120  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
121 
122  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
123  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
124  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
125  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
126  }
127 
128  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
129  if (startFromLSPtr) {
130  try {
131  startFromLS_ = std::stoul(std::string(startFromLSPtr));
132  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
133  } catch (const std::exception&) {
134  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
135  }
136  }
137 
138  //override file service parameter if specified by environment
139  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
140  if (fileBrokerUseLockParamPtr) {
141  try {
142  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
143  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
145  } catch (const std::exception&) {
146  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
147  }
148  }
149 
150  // set number of streams in each BU's ramdisk
151  if (bu_base_dirs_nSources_.empty()) {
152  // default is 1 stream per ramdisk
153  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
154  bu_base_dirs_nSources_.push_back(1);
155  }
156  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
157  throw cms::Exception("DaqDirector")
158  << " Error while setting number of sources: size mismatch with BU base directory vector";
159  } else {
160  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
162  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
163  << " for ramdisk " << bu_base_dirs_all_[i];
164  }
165  }
166 
167  std::stringstream ss;
168  ss << "run" << std::setfill('0') << std::setw(6) << run_;
169  run_string_ = ss.str();
170  ss = std::stringstream();
171  ss << run_;
172  run_nstring_ = ss.str();
173  run_dir_ = base_dir_ + "/" + run_string_;
174  input_throttled_file_ = run_dir_ + "/input_throttle";
175  discard_ls_filestem_ = run_dir_ + "/discard_ls";
176  ss = std::stringstream();
177  ss << getpid();
178  pid_ = ss.str();
179  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string selectedTransferMode_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::vector< int > bu_base_dirs_nSources_
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 351 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

351  {
352  //close server connection
353  if (socket_.get() && socket_->is_open()) {
354  boost::system::error_code ec;
355  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
356  socket_->close(ec);
357  }
358 
359  if (fulocal_rwlock_fd_ != -1) {
360  unlockFULocal();
361  close(fulocal_rwlock_fd_);
362  }
363 
364  if (fulocal_rwlock_fd2_ != -1) {
365  unlockFULocal2();
366  close(fulocal_rwlock_fd2_);
367  }
368  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

77 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

References bu_run_dir_.

Referenced by DAQSource::readSupervisor().

78 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 79 of file EvFDaqDirector.h.

References bu_run_open_dir_.

79 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 838 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

844  {
845  if (previousFileSize_ != 0) {
846  if (!fms_) {
848  }
849  if (fms_)
851  previousFileSize_ = 0;
852  }
853  nextFile = "";
854 
855  //reached limit
856  if (maxLS >= 0 && ls > (unsigned int)maxLS)
857  return false;
858 
859  struct stat buf;
860  std::stringstream ss;
861 
862  // 1. Check suggested file
863  std::string nextFileJson = getInputJsonFilePath(ls, index);
864  if (stat(nextFileJson.c_str(), &buf) == 0) {
865  fsize = previousFileSize_ = buf.st_size;
866  nextFile = nextFileJson;
867  return true;
868  }
869  // 2. No file -> lumi ended? (and how many?)
870  else {
871  // 3. No file -> check for standalone raw file
872  std::string nextFileRaw = getRawFilePath(ls, index);
873  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
874  fsize = previousFileSize_ = buf.st_size;
875  nextFile = nextFileRaw;
876  return true;
877  }
878 
879  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
880 
881  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
882  // recheck that no raw file appeared in the meantime
883  if (stat(nextFileJson.c_str(), &buf) == 0) {
884  fsize = previousFileSize_ = buf.st_size;
885  nextFile = nextFileJson;
886  return true;
887  }
888  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
889  fsize = previousFileSize_ = buf.st_size;
890  nextFile = nextFileRaw;
891  return true;
892  }
893 
894  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
895  if (indexFilesInLS < 0)
896  //parsing failed
897  return false;
898  else {
899  //check index
900  if ((int)index < indexFilesInLS) {
901  //we have 2 files, and check for 1 failed... retry (2 will never be here)
902  edm::LogError("EvFDaqDirector")
903  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
904  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
905  setExceptionState = true;
906  return false;
907  }
908  }
909  // this lumi ended, check for files
910  ++ls;
911  index = 0;
912 
913  //reached limit
914  if (maxLS >= 0 && ls > (unsigned int)maxLS)
915  return false;
916 
917  nextFileJson = getInputJsonFilePath(ls, 0);
918  nextFileRaw = getRawFilePath(ls, 0);
919  if (stat(nextFileJson.c_str(), &buf) == 0) {
920  // a new file was found at new lumisection, index 0
921  fsize = previousFileSize_ = buf.st_size;
922  nextFile = nextFileJson;
923  return true;
924  }
925  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
926  fsize = previousFileSize_ = buf.st_size;
927  nextFile = nextFileRaw;
928  return true;
929  }
930  return false;
931  }
932  }
933  // no new file found
934  return false;
935  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkFileRead()

bool evf::EvFDaqDirector::checkFileRead ( char *  buf,
int  infile,
std::size_t  buf_sz,
std::string const &  path 
)
staticprivate

Definition at line 1130 of file EvFDaqDirector.cc.

References visDQMUpload::buf, timingPdfMaker::infile, castor_dqm_sourceclient_file_cfg::path, and fileinputsource_cfi::read.

Referenced by parseFRDFileHeader(), and rawFileHasHeader().

1130  {
1131  ssize_t sz_read = ::read(infile, buf, buf_sz);
1132  if (sz_read < 0) {
1133  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1134  if (infile != -1)
1135  close(infile);
1136  return false;
1137  }
1138  if ((size_t)sz_read < buf_sz) {
1139  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1140  if (infile != -1)
1141  close(infile);
1142  return false;
1143  }
1144  return true;
1145  }
Log< level::Error, false > LogError

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 2113 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

2113  {
2114  if (mergeTypePset_.empty())
2115  return;
2116  if (!mergeTypeMap_.empty())
2117  return;
2118  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2119  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2120  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2121  for (const std::string& pname : tsPset.getParameterNames()) {
2122  std::string streamType = tsPset.getParameter<std::string>(pname);
2123  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2124  mergeTypeMap_.insert(ac, pname);
2125  ac->second = streamType;
2126  ac.release();
2127  }
2128  }
2129  }
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:172
std::string mergeTypePset_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
ParameterSet const & getParameterSet(ParameterSetID const &id)

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 2005 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

2005  {
2006  if (transferSystemJson_)
2007  return;
2008 
2009  transferSystemJson_.reset(new Json::Value);
2010  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2011  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
2012  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
2013 
2014  Json::Value destinationsVal(Json::arrayValue);
2015  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
2016  for (auto& dest : destinations)
2017  destinationsVal.append(dest);
2018  (*transferSystemJson_)["destinations"] = destinationsVal;
2019 
2020  Json::Value modesVal(Json::arrayValue);
2021  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
2022  for (auto& mode : modes)
2023  modesVal.append(mode);
2024  (*transferSystemJson_)["transferModes"] = modesVal;
2025 
2026  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
2027  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
2028  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
2029  Json::Value streamVal;
2030  for (auto& mode : modes) {
2031  //validation
2032  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
2033  throw cms::Exception("EvFDaqDirector")
2034  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
2035  << ")";
2036  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
2037 
2038  Json::Value sDestsValue(Json::arrayValue);
2039 
2040  if (streamDestinations.empty())
2041  throw cms::Exception("EvFDaqDirector")
2042  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
2043 
2044  for (auto& sdest : streamDestinations) {
2045  bool sDestValid = false;
2046  sDestsValue.append(sdest);
2047  for (auto& dest : destinations) {
2048  if (dest == sdest)
2049  sDestValid = true;
2050  }
2051  if (!sDestValid)
2052  throw cms::Exception("EvFDaqDirector")
2053  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
2054  << ", dest:" << sdest;
2055  }
2056  streamVal[mode] = sDestsValue;
2057  }
2058  (*transferSystemJson_)[psKeyItr->first] = streamVal;
2059  }
2060  }
2061  } else {
2062  if (requireTSPSet_)
2063  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
2064  }
2065  }
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:172
Represents a JSON value.
Definition: value.h:99
ParameterSet const & getParameterSet(ParameterSetID const &id)
array value (ordered list)
Definition: value.h:30

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1550 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1557  {
1558  EvFDaqDirector::FileStatus fileStatus = noFile;
1559  serverError = false;
1560 
1561  boost::system::error_code ec;
1562  try {
1563  while (true) {
1564  //socket connect
1565  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1567 
1568  if (ec) {
1569  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1570  serverError = true;
1571  break;
1572  }
1573  }
1574 
1575  boost::asio::streambuf request;
1576  std::ostream request_stream(&request);
1577  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1578  if (maxLS >= 0) {
1579  std::stringstream spath;
1580  spath << path << "&stopls=" << maxLS;
1581  path = spath.str();
1582  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1583  }
1584  request_stream << "GET " << path << " HTTP/1.1\r\n";
1585  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1586  request_stream << "Accept: */*\r\n";
1587  request_stream << "Connection: keep-alive\r\n\r\n";
1588 
1589  boost::asio::write(*socket_, request, ec);
1590  if (ec) {
1591  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1592  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1593  //we got disconnected, try to reconnect to the server before writing the request
1595  if (ec) {
1596  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1597  serverError = true;
1598  break;
1599  }
1600  continue;
1601  }
1602  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1603  serverError = true;
1604  break;
1605  }
1606 
1607  boost::asio::streambuf response;
1608  boost::asio::read_until(*socket_, response, "\r\n", ec);
1609  if (ec) {
1610  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1611  serverError = true;
1612  break;
1613  }
1614 
1615  std::istream response_stream(&response);
1616 
1617  std::string http_version;
1618  response_stream >> http_version;
1619 
1620  response_stream >> serverHttpStatus;
1621 
1622  std::string status_message;
1623  std::getline(response_stream, status_message);
1624  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1625  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1626  serverError = true;
1627  break;
1628  }
1629  if (serverHttpStatus != 200) {
1630  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1631  serverError = true;
1632  break;
1633  }
1634 
1635  // Process the response headers.
1637  while (std::getline(response_stream, header) && header != "\r") {
1638  }
1639 
1640  std::string fileInfo;
1641  std::map<std::string, std::string> serverMap;
1642  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1643  auto pos = fileInfo.find('=');
1644  if (pos == std::string::npos)
1645  continue;
1646  auto stitle = fileInfo.substr(0, pos);
1647  auto svalue = fileInfo.substr(pos + 1);
1648  serverMap[stitle] = svalue;
1649  }
1650 
1651  //check that response run number if correct
1652  auto server_version = serverMap.find("version");
1653  assert(server_version != serverMap.end());
1654 
1655  auto server_run = serverMap.find("runnumber");
1656  assert(server_run != serverMap.end());
1657  assert(run_nstring_ == server_run->second);
1658 
1659  auto server_state = serverMap.find("state");
1660  assert(server_state != serverMap.end());
1661 
1662  auto server_eols = serverMap.find("lasteols");
1663  assert(server_eols != serverMap.end());
1664 
1665  auto server_ls = serverMap.find("lumisection");
1666 
1667  int version_maj = 1;
1668  int version_min = 0;
1669  int version_rev = 0;
1670  {
1671  auto* s_ptr = server_version->second.c_str();
1672  if (!server_version->second.empty() && server_version->second[0] == '"')
1673  s_ptr++;
1674  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1675  if (res < 3) {
1676  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1677  if (res < 2) {
1678  res = sscanf(s_ptr, "%d", &version_maj);
1679  if (res < 1) {
1680  //expecting at least 1 number (major version)
1681  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1682  }
1683  }
1684  }
1685  }
1686 
1687  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1688  if (server_ls != serverMap.end())
1689  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1690  else
1691  serverLS = closedServerLS + 1;
1692 
1693  std::string s_state = server_state->second;
1694  if (s_state == "STARTING") //initial, always empty starting with LS 1
1695  {
1696  auto server_file = serverMap.find("file");
1697  assert(server_file == serverMap.end()); //no file with starting state
1698  fileStatus = noFile;
1699  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1700  } else if (s_state == "READY") {
1701  auto server_file = serverMap.find("file");
1702  if (server_file == serverMap.end()) {
1703  //can be returned by server if files from new LS already appeared but LS is not yet closed
1704  if (serverLS <= closedServerLS)
1705  serverLS = closedServerLS + 1;
1706  fileStatus = noFile;
1707  edm::LogInfo("EvFDaqDirector")
1708  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1709  } else {
1710  std::string filestem;
1711  std::string fileprefix;
1712  auto server_fileprefix = serverMap.find("fileprefix");
1713 
1714  if (server_fileprefix != serverMap.end()) {
1715  auto pssize = server_fileprefix->second.size();
1716  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1717  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1718  else
1719  fileprefix = server_fileprefix->second;
1720  }
1721 
1722  //remove string literals
1723  auto ssize = server_file->second.size();
1724  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1725  filestem = server_file->second.substr(1, ssize - 2);
1726  else
1727  filestem = server_file->second;
1728  assert(!filestem.empty());
1729  if (version_maj > 1) {
1730  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1731  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732  nextFileJson = "";
1733  rawHeader = true;
1734  } else {
1735  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1736  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1737  nextFileJson = filestem + ".jsn";
1738  rawHeader = false;
1739  }
1740  fileStatus = newFile;
1741  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1742  << serverLS << " file:" << filestem;
1743  }
1744  } else if (s_state == "EOLS") {
1745  serverLS = closedServerLS + 1;
1746  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1747  fileStatus = noFile;
1748  } else if (s_state == "EOR") {
1749  //server_eor = serverMap.find("iseor");
1750  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1751  fileStatus = runEnded;
1752  } else if (s_state == "NORUN") {
1753  auto err_msg = serverMap.find("errormessage");
1754  if (err_msg != serverMap.end())
1755  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1756  else
1757  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1758  edm::LogWarning("EvFDaqDirector") << "executing run end";
1759  fileStatus = runEnded;
1760  } else if (s_state == "ERROR") {
1761  auto err_msg = serverMap.find("errormessage");
1762  if (err_msg != serverMap.end())
1763  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1764  else
1765  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1766  fileStatus = noFile;
1767  serverError = true;
1768  } else {
1769  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1770  fileStatus = noFile;
1771  serverError = true;
1772  }
1773 
1774  // Read until EOF, writing data to output as we go.
1775  if (!fileBrokerKeepAlive_) {
1776  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1777  }
1778  if (ec != boost::asio::error::eof) {
1779  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1780  serverError = true;
1781  }
1782  }
1783 
1784  break;
1785  }
1786 
1787  } catch (std::exception const& e) {
1788  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1789  serverError = true;
1790  }
1791 
1792  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1793  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1794  if (ec) {
1795  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1796  }
1797  socket_->close(ec);
1798  if (ec) {
1799  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1800  }
1801  }
1802 
1803  if (serverError) {
1804  if (socket_->is_open())
1805  socket_->close(ec);
1806  if (ec) {
1807  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1808  }
1809  fileStatus = noFile;
1810  sleep(1); //back-off if error detected
1811  }
1812 
1813  return fileStatus;
1814  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 990 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

990  {
991  //used for backpressure mechanisms and monitoring
992  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
993  struct stat buf;
994  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
995  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
996  close(bol_fd);
997  }
998  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 1000 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

1003  {
1004  if (currentLumiSection > 0) {
1005  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1006  struct stat buf;
1007  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1008  if (!found) {
1009  if (doCreateEoLS) {
1010  int eol_fd =
1011  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1012  close(eol_fd);
1013  }
1014  if (doCreateBoLS)
1015  createBoLSFile(lumiSection, false);
1016  }
1017  } else if (doCreateBoLS) {
1018  createBoLSFile(lumiSection, true); //needed for initial lumisection
1019  }
1020  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2145 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

2145  {
2146  std::string proc_flag = run_dir_ + "/processing";
2147  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2148  close(proc_flag_fd);
2149  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1966 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1966  {
1967  // create open dir if not already there
1968 
1970  if (!std::filesystem::is_directory(openPath)) {
1971  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1972  std::filesystem::create_directories(openPath);
1973  }
1974  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 378 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

378  {
380  desc.setComment(
381  "Service used for file locking arbitration and for propagating information between other EvF components");
382  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
383  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
384  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
385  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
386  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
387  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
388  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
389  desc.addUntracked<bool>("useFileBroker", false)
390  ->setComment("Use BU file service to grab input data instead of NFS file locking");
391  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
392  ->setComment("Allow service to discover BU address from hltd configuration");
393  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
394  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
395  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
396  ->setComment("Use keep alive to avoid using large number of sockets");
397  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
398  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
399  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
400  ->setComment("Lock polling interval in microseconds for the input directory file lock");
401  desc.addUntracked<bool>("outputAdler32Recheck", false)
402  ->setComment("Check Adler32 of per-process output files while micro-merging");
403  desc.addUntracked<bool>("requireTransfersPSet", false)
404  ->setComment("Require complete transferSystem PSet in the process configuration");
405  desc.addUntracked<std::string>("selectedTransferMode", "")
406  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
407  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
408  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
409  desc.addUntracked<std::string>("mergingPset", "")
410  ->setComment("Name of merging PSet to look for merging type definitions for streams");
411  descriptions.add("EvFDaqDirector", desc);
412  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 82 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

82 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 543 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

543  {
544  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
545  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getBUBaseDirs()

std::vector<std::string> const& evf::EvFDaqDirector::getBUBaseDirs ( ) const
inline

Definition at line 196 of file EvFDaqDirector.h.

References bu_base_dirs_all_.

Referenced by DAQSource::DAQSource().

196 { return bu_base_dirs_all_; }
std::vector< std::string > bu_base_dirs_all_

◆ getBUBaseDirsNSources()

std::vector<int> const& evf::EvFDaqDirector::getBUBaseDirsNSources ( ) const
inline

Definition at line 197 of file EvFDaqDirector.h.

References bu_base_dirs_nSources_.

Referenced by DAQSource::DAQSource().

197 { return bu_base_dirs_nSources_; }
std::vector< int > bu_base_dirs_nSources_

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 472 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

472  {
474  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 535 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

535  {
536  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
537  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 539 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

539  {
540  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
541  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFileName()

std::string evf::EvFDaqDirector::getEoRFileName ( ) const

Definition at line 549 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), and run_.

Referenced by DAQSource::readSupervisor().

549 { return fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 547 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

547 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 551 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

551 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 553 of file EvFDaqDirector.cc.

References bu_run_dir_.

553 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 500 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

500  {
502  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 504 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

504  {
506  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 456 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

456  {
458  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1990 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1990  {
1991  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1993  struct stat buf;
1994  unsigned int lscount = 1;
1995  do {
1996  std::stringstream ss;
1997  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1998  fullpath = ss.str();
1999  lscount++;
2000  } while (stat(fullpath.c_str(), &buf) == 0);
2001  return lscount - 1;
2002  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 492 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

492  {
494  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 488 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

488  {
490  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 518 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

519  {
521  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 531 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

531  {
533  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs,
bool  requireHeader = true 
)

Definition at line 1816 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1824  {
1825  EvFDaqDirector::FileStatus fileStatus = noFile;
1826 
1827  //int retval = -1;
1828  //int lock_attempts = 0;
1829  //long total_lock_attempts = 0;
1830 
1831  struct stat buf;
1832  int stopFileLS = -1;
1833  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1834  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1835  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1836  if (stopFileCheck == 0)
1837  stopFileLS = readLastLSEntry(stopFilePath_);
1838  else
1839  stopFileLS = 1; //stop without drain if only pid is stopped
1840  if (!stop_ls_override_) {
1841  //if lumisection is higher than in stop file, should quit at next from current
1842  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1843  stopFileLS = stop_ls_override_ = ls;
1844  } else
1845  stopFileLS = stop_ls_override_;
1846  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1847  << stopFileLS;
1848  //return runEnded;
1849  } else //if file was removed before reaching stop condition, reset this
1850  stop_ls_override_ = 0;
1851 
1852  /* look for EoLS
1853  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1854  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1855  ls++;
1856  return noFile;
1857  }
1858  */
1859 
1860  timeval ts_lockbegin;
1861  gettimeofday(&ts_lockbegin, nullptr);
1862 
1863  std::string nextFileJson;
1864  uint32_t serverLS, closedServerLS;
1865  unsigned int serverHttpStatus;
1866  bool serverError;
1867 
1868  //local lock to force index json and EoLS files to appear in order
1870  lockFULocal();
1871 
1872  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1873  bool rawHeader = false;
1874  fileStatus = contactFileBroker(
1875  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1876 
1877  if (serverError) {
1878  //do not update anything
1880  unlockFULocal();
1881  return noFile;
1882  }
1883 
1884  //handle creation of BoLS files if lumisection has changed
1885  if (currentLumiSection == 0) {
1886  if (fileStatus == runEnded)
1887  createLumiSectionFiles(closedServerLS, 0, true, false);
1888  else
1889  createLumiSectionFiles(serverLS, 0, true, false);
1890  } else {
1891  if (closedServerLS >= currentLumiSection) {
1892  //only BoLS files
1893  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1894  createLumiSectionFiles(i + 1, i, true, false);
1895  }
1896  }
1897 
1898  bool fileFound = true;
1899 
1900  if (fileStatus == newFile) {
1901  if (rawHeader > 0)
1902  serverEventsInNewFile = grabNextJsonFromRaw(
1903  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1904  else
1905  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1906  }
1907  //closing file in case of any error
1908  if (serverEventsInNewFile < 0 && rawFd != -1) {
1909  close(rawFd);
1910  rawFd = -1;
1911  }
1912 
1913  //can unlock because all files have been created locally
1915  unlockFULocal();
1916 
1917  if (!fileFound) {
1918  //catch condition where directory got deleted
1919  fileStatus = noFile;
1920  struct stat buf;
1921  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1922  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1923  fileStatus = runEnded;
1924  }
1925  }
1926 
1927  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1928  //so that EoLS files can not appear locally before index files
1929  if (currentLumiSection == 0) {
1930  lockFULocal2();
1931  if (fileStatus == runEnded) {
1932  createLumiSectionFiles(closedServerLS, 0, false, true);
1933  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1934  } else {
1935  createLumiSectionFiles(serverLS, 0, false, true);
1936  }
1937  unlockFULocal2();
1938  } else {
1939  if (closedServerLS >= currentLumiSection) {
1940  //lock exclusive to create EoLS files
1941  lockFULocal2();
1942  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1943  createLumiSectionFiles(i + 1, i, false, true);
1944  unlockFULocal2();
1945  }
1946  }
1947 
1948  if (fileStatus == runEnded)
1949  ls = std::max(currentLumiSection, serverLS);
1950  else if (fileStatus == newFile) {
1951  assert(serverLS >= ls);
1952  ls = serverLS;
1953  } else if (fileStatus == noFile) {
1954  if (serverLS >= ls)
1955  ls = serverLS;
1956  else {
1957  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1958  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1959  sleep(1);
1960  }
1961  }
1962 
1963  return fileStatus;
1964  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 777 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

777  {
778  std::ifstream ij(BUEoLSFile);
779  Json::Value deserializeRoot;
781 
782  if (!reader.parse(ij, deserializeRoot)) {
783  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
784  return -1;
785  }
786 
788  DataPoint dp;
789  dp.deserialize(deserializeRoot);
790 
791  //read definition
792  if (readEolsDefinition_) {
793  //std::string def = boost::algorithm::trim(dp.getDefinition());
794  std::string def = dp.getDefinition();
795  if (def.empty())
796  readEolsDefinition_ = false;
797  while (!def.empty()) {
799  if (def.find('/') == 0)
800  fullpath = def;
801  else
802  fullpath = bu_run_dir_ + '/' + def;
803  struct stat buf;
804  if (stat(fullpath.c_str(), &buf) == 0) {
805  DataPointDefinition eolsDpd;
806  std::string defLabel = "legend";
807  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
808  if (eolsDpd.getNames().empty()) {
809  //try with "data" label if "legend" format is not used
810  eolsDpd = DataPointDefinition();
811  defLabel = "data";
812  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
813  }
814  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
815  if (eolsDpd.getNames().at(i) == "NFiles")
817  readEolsDefinition_ = false;
818  break;
819  }
820  //check if we can still find definition
821  if (def.size() <= 1 || def.find('/') == std::string::npos) {
822  readEolsDefinition_ = false;
823  break;
824  }
825  def = def.substr(def.find('/') + 1);
826  }
827  }
828 
829  if (dp.getData().size() > eolsNFilesIndex_)
830  data = dp.getData()[eolsNFilesIndex_];
831  else {
832  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
833  return -1;
834  }
835  return std::stoi(data);
836  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 476 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

476  {
478  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 496 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

496  {
497  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
498  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 468 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

468  {
469  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
470  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 480 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

480  {
482  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 508 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

509  {
511  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 464 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

464  {
465  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
466  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 523 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

523  {
525  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 484 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

484  {
486  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 513 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

514  {
516  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 460 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

460  {
462  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 527 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

527  {
529  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 120 of file EvFDaqDirector.h.

References run_.

120 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 109 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

109 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 182 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

182 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 2067 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

2067  {
2068  std::string streamRequestName;
2069  if (transferSystemJson_->isMember(stream.c_str()))
2070  streamRequestName = stream;
2071  else {
2072  std::stringstream msg;
2073  msg << "Transfer system mode definitions missing for -: " << stream;
2074  if (requireTSPSet_)
2075  throw cms::Exception("EvFDaqDirector") << msg.str();
2076  else {
2077  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2078  return std::string("Failsafe");
2079  }
2080  }
2081  //return empty if strict check parameter is not on
2082  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2083  edm::LogWarning("EvFDaqDirector")
2084  << "Selected mode string is not provided as DaqDirector parameter."
2085  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
2086  return std::string("Failsafe");
2087  }
2088  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2089  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2090  }
2091  //check if stream has properly listed transfer stream
2092  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2093  std::stringstream msg;
2094  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2095  if (requireTSPSet_)
2096  throw cms::Exception("EvFDaqDirector") << msg.str();
2097  else
2098  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2099  return std::string("Failsafe");
2100  }
2101  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2102 
2103  //flatten string json::Array into CSV std::string
2104  std::string ret;
2105  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2106  if (!ret.empty())
2107  ret += ",";
2108  ret += (*it).asString();
2109  }
2110  return ret;
2111  }
const_iterator end() const
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
ret
prodAgent to be discontinued
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:286
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 2131 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

2131  {
2132  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2133  if (mergeTypeMap_.find(search_ac, stream))
2134  return search_ac->second;
2135 
2136  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2137  std::string defaultName = MergeTypeNames_[defaultType];
2138  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2139  mergeTypeMap_.insert(ac, stream);
2140  ac->second = defaultName;
2141  ac.release();
2142  return defaultName;
2143  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1275 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1278  {
1279  fileFound = true;
1280 
1281  //should be ported to use fffnaming
1282  std::ostringstream fileNameWithPID;
1283  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1284  << std::setw(5) << pid_ << ".jsn";
1285 
1286  // assemble json destination path
1287  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1288 
1289  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1290 
1291  int infile = -1, outfile = -1;
1292 
1293  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1294  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1295  << strerror(errno);
1296  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1297  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1298  << jsonSourcePath << " : " << strerror(errno);
1299  if (errno == ENOENT)
1300  fileFound = false;
1301  return -1;
1302  }
1303  }
1304 
1305  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1306  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1307  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1308  if (errno == EEXIST) {
1309  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1310  << " : ";
1311  ::close(infile);
1312  return -1;
1313  }
1314  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1315  << strerror(errno);
1316  struct stat out_stat;
1317  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1318  edm::LogWarning("EvFDaqDirector")
1319  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1320  if (unlink(jsonDestPath.c_str()) == -1) {
1321  edm::LogWarning("EvFDaqDirector")
1322  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1323  }
1324  }
1325  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1326  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1327  << jsonDestPath << " : " << strerror(errno);
1328  ::close(infile);
1329  return -1;
1330  }
1331  }
1332  //copy contents
1333  const std::size_t buf_sz = 512;
1334  std::size_t tot_written = 0;
1335  std::unique_ptr<char[]> buf(new char[buf_sz]);
1336 
1337  ssize_t sz, sz_read = 1, sz_write;
1338  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1339  sz_write = 0;
1340  do {
1341  assert(sz_read - sz_write > 0);
1342  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1343  sz_read = sz; // cause read loop termination
1344  break;
1345  }
1346  assert(sz > 0);
1347  sz_write += sz;
1348  tot_written += sz;
1349  } while (sz_write < sz_read);
1350  }
1351  close(infile);
1352  close(outfile);
1353 
1354  if (tot_written > 0) {
1355  //leave file if it was empty for diagnosis
1356  if (unlink(jsonSourcePath.c_str()) == -1) {
1357  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1358  << strerror(errno);
1359  return -1;
1360  }
1361  } else {
1362  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1363  << jsonSourcePath;
1364  return -1;
1365  }
1366 
1367  Json::Value deserializeRoot;
1369 
1370  std::string data;
1371  std::stringstream ss;
1372  bool result;
1373  try {
1374  if (tot_written <= buf_sz) {
1375  result = reader.parse(buf.get(), deserializeRoot);
1376  } else {
1377  //json will normally not be bigger than buf_sz bytes
1378  try {
1379  std::ifstream ij(jsonDestPath);
1380  ss << ij.rdbuf();
1381  } catch (std::filesystem::filesystem_error const& ex) {
1382  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1383  return -1;
1384  }
1385  result = reader.parse(ss.str(), deserializeRoot);
1386  }
1387  if (!result) {
1388  if (tot_written <= buf_sz)
1389  ss << buf.get();
1390  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1391  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1392  << ss.str() << ".";
1393  return -1;
1394  }
1395 
1396  //read BU JSON
1397  DataPoint dp;
1398  dp.deserialize(deserializeRoot);
1399  bool success = false;
1400  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1401  if (dpd_->getNames().at(i) == "NEvents")
1402  if (i < dp.getData().size()) {
1403  data = dp.getData()[i];
1404  success = true;
1405  break;
1406  }
1407  }
1408  if (!success) {
1409  if (!dp.getData().empty())
1410  data = dp.getData()[0];
1411  else {
1412  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1413  << "grabNextJsonFile - "
1414  << " error reading number of events from BU JSON; No input value. data -: " << data;
1415  return -1;
1416  }
1417  }
1418 
1419  //try to read raw file size
1420  fileSizeFromJson = -1;
1421  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1422  if (dpd_->getNames().at(i) == "NBytes") {
1423  if (i < dp.getData().size()) {
1424  std::string dataSize = dp.getData()[i];
1425  try {
1426  fileSizeFromJson = std::stol(dataSize);
1427  } catch (const std::exception&) {
1428  //non-fatal currently, processing can continue without this value
1429  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1430  << "Input value is -: " << dataSize;
1431  }
1432  break;
1433  }
1434  }
1435  }
1436  return std::stoi(data);
1437  } catch (const std::out_of_range& e) {
1438  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1439  << "Input value is -: " << data;
1440  } catch (const std::invalid_argument& e) {
1441  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1442  << "Input value is -: " << data;
1443  } catch (std::runtime_error const& e) {
1444  //Can be thrown by Json parser
1445  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1446  }
1447 
1448  catch (std::exception const& e) {
1449  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1450  } catch (...) {
1451  //unknown exception
1452  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1453  }
1454 
1455  return -1;
1456  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1458 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1458  {
1459  std::string data;
1460  try {
1461  // assemble json destination path
1462  std::filesystem::path jsonDestPath(baseRunDir());
1463 
1464  //should be ported to use fffnaming
1465  std::ostringstream fileNameWithPID;
1466  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1467  << ".jsn";
1468  jsonDestPath /= fileNameWithPID.str();
1469 
1470  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1471  try {
1472  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473  } catch (std::filesystem::filesystem_error const& ex) {
1474  // Input dir gone?
1475  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1476  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1477  sleep(1);
1478  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1479  }
1480  unlockFULocal();
1481 
1482  try {
1483  //sometimes this fails but file gets deleted
1484  std::filesystem::remove(jsonSourcePath);
1485  } catch (std::filesystem::filesystem_error const& ex) {
1486  // Input dir gone?
1487  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1488  } catch (std::exception const& ex) {
1489  // Input dir gone?
1490  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1491  }
1492 
1493  std::ifstream ij(jsonDestPath);
1494  Json::Value deserializeRoot;
1496 
1497  std::stringstream ss;
1498  ss << ij.rdbuf();
1499  if (!reader.parse(ss.str(), deserializeRoot)) {
1500  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1501  << "\nERROR:\n"
1502  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1503  << ss.str() << ".";
1504  throw std::runtime_error("Cannot deserialize input JSON file");
1505  }
1506 
1507  //read BU JSON
1508  std::string data;
1509  DataPoint dp;
1510  dp.deserialize(deserializeRoot);
1511  bool success = false;
1512  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1513  if (dpd_->getNames().at(i) == "NEvents")
1514  if (i < dp.getData().size()) {
1515  data = dp.getData()[i];
1516  success = true;
1517  }
1518  }
1519  if (!success) {
1520  if (!dp.getData().empty())
1521  data = dp.getData()[0];
1522  else
1523  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1524  << " error reading number of events from BU JSON -: No input value " << data;
1525  }
1526  return std::stoi(data);
1527  } catch (std::filesystem::filesystem_error const& ex) {
1528  // Input dir gone?
1529  unlockFULocal();
1530  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1531  } catch (std::runtime_error const& e) {
1532  // Another process grabbed the file and NFS did not register this
1533  unlockFULocal();
1534  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1535  } catch (const std::out_of_range&) {
1536  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1537  << "Input value is -: " << data;
1538  } catch (const std::invalid_argument&) {
1539  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1540  << "Input value is -: " << data;
1541  } catch (std::exception const& e) {
1542  // BU run directory disappeared?
1543  unlockFULocal();
1544  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1545  }
1546 
1547  return -1;
1548  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile,
bool  requireHeader = true 
)

Definition at line 1183 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1190  {
1191  fileFound = true;
1192 
1193  //take only first three tokens delimited by "_" in the renamed raw file name
1194  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1195  size_t pos = 0, n_tokens = 0;
1196  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1197  }
1198  std::string reducedJsonStem = jsonStem.substr(0, pos);
1199 
1200  std::ostringstream fileNameWithPID;
1201  //should be ported to use fffnaming
1202  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1203 
1204  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1205 
1206  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1207 
1208  //parse RAW file header if it exists
1209  uint32_t lsFromRaw;
1210  int32_t nbEventsWrittenRaw;
1211  int64_t fileSizeFromRaw;
1212  uint16_t rawDataType;
1213  auto ret = parseFRDFileHeader(rawSourcePath,
1214  rawFd,
1215  rawHeaderSize,
1216  rawDataType,
1217  lsFromRaw,
1218  nbEventsWrittenRaw,
1219  fileSizeFromRaw,
1220  requireHeader,
1221  true,
1222  closeFile);
1223  if (ret != 0) {
1224  if (ret == 1)
1225  fileFound = false;
1226  return -1;
1227  }
1228 
1229  int outfile;
1230  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1231  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  if (errno == EEXIST) {
1234  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1235  << " : ";
1236  return -1;
1237  }
1238  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1239  << strerror(errno);
1240  struct stat out_stat;
1241  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1242  edm::LogWarning("EvFDaqDirector")
1243  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1244  << jsonDestPath;
1245  if (unlink(jsonDestPath.c_str()) == -1) {
1246  edm::LogWarning("EvFDaqDirector")
1247  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1248  }
1249  }
1250  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1251  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1252  << jsonDestPath << " : " << strerror(errno);
1253  return -1;
1254  }
1255  }
1256  //write JSON file (TODO: use jsoncpp)
1257  std::stringstream ss;
1258  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1259  std::string sstr = ss.str();
1260 
1261  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1262  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1263  << " : " << strerror(errno);
1264  return -1;
1265  }
1266  close(outfile);
1267  if (serverLS && serverLS != lsFromRaw)
1268  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1269  << " and raw file header LS " << lsFromRaw;
1270 
1271  fileSizeFromHeader = fileSizeFromRaw;
1272  return nbEventsWrittenRaw;
1273  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 181 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_base_dirs_all_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, mps_fire::i, init_lock_, svgfig::load(), eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm::shutdown_flag, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, tryInitializeFuLockFile(), and useFileBroker_.

Referenced by preallocate().

181  {
182  // check if base dir exists or create it accordingly
183  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
184  if (retval != 0 && errno != EEXIST) {
185  throw cms::Exception("DaqDirector")
186  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
187  }
188 
189  //create run dir in base dir
190  umask(0);
191  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
192  if (retval != 0 && errno != EEXIST) {
193  throw cms::Exception("DaqDirector")
194  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
195  }
196 
197  //create fu-local.lock in run open dir
198  if (!directorBU_) {
200  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
202  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
203  if (fulocal_rwlock_fd_ == -1)
204  throw cms::Exception("DaqDirector")
205  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
206  chmod(fulocal_lock_.c_str(), 0777);
207  fsync(fulocal_rwlock_fd_);
208  //open second fd for another input source thread
210  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
211  if (fulocal_rwlock_fd2_ == -1)
212  throw cms::Exception("DaqDirector")
213  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
214  }
215 
216  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
217  //for BU, it is created at this point
218  if (directorBU_) {
220  std::string bulockfile = bu_run_dir_ + "/bu.lock";
221  fulockfile_ = bu_run_dir_ + "/fu.lock";
222 
223  //make or find bu run dir
224  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
225  if (retval != 0 && errno != EEXIST) {
226  throw cms::Exception("DaqDirector")
227  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
228  }
229  bu_run_open_dir_ = bu_run_dir_ + "/open";
230  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
231  if (retval != 0 && errno != EEXIST) {
232  throw cms::Exception("DaqDirector")
233  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
234  }
235 
236  // the BU director does not need to know about the fu lock
237  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
238  if (bu_writelock_fd_ == -1)
239  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
240  else
241  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
242  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
243  if (bu_w_lock_stream == nullptr)
244  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
245 
246  // BU INITIALIZES LOCK FILE
247  // FU LOCK FILE OPEN
248  openFULockfileStream(true);
250  fflush(fu_rw_lock_stream);
251  close(fu_readwritelock_fd_);
252 
253  if (!hltSourceDirectory_.empty()) {
254  struct stat buf;
255  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
256  std::string hltdir = bu_run_dir_ + "/hlt";
257  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
258  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
259  if (retval != 0 && errno != EEXIST)
260  throw cms::Exception("DaqDirector")
261  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
262 
263  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
264  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
265 
266  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
267  for (auto& optfile : optfiles) {
268  try {
269  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
270  } catch (...) {
271  }
272  }
273 
274  std::filesystem::rename(tmphltdir, hltdir);
275  } else
276  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
277  }
278  //else{}//no configuration specified
279  } else {
280  // for FU, check if bu base dir exists
281 
282  auto checkExists = [=](std::string const& bu_base_dir) -> void {
283  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
284  if (retval != 0 && errno != EEXIST) {
285  throw cms::Exception("DaqDirector")
286  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
287  }
288  };
289 
290  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
291  int cnt = 0;
292  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
293  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
294  if (retval != 0 && errno != EEXIST) {
295  usleep(500000);
296  cnt++;
297  if (cnt % 20 == 0)
298  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
299  if (cnt > 120)
300  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
301  << " mkdir error:" << strerror(errno);
302  }
303  break;
304  }
305  };
306 
307  if (!bu_base_dirs_all_.empty()) {
308  checkExists(bu_base_dirs_all_[0]);
310  for (unsigned int i = 1; i < bu_base_dirs_all_.size(); i++)
311  waitForDir(bu_base_dirs_all_[i]);
312  } else {
313  checkExists(bu_base_dir_);
315  }
316 
317  fulockfile_ = bu_run_dir_ + "/fu.lock";
318  if (!useFileBroker_)
319  openFULockfileStream(false);
320  }
321 
322  pthread_mutex_init(&init_lock_, nullptr);
323 
324  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
325  std::stringstream sstp;
326  sstp << stopFilePath_ << "_pid" << pid_;
327  stopFilePathPid_ = sstp.str();
328 
329  if (!directorBU_) {
330  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
331  struct stat statbuf;
332  if (!stat(defPath.c_str(), &statbuf))
333  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
334  else {
335  //look in source directory if not present in ramdisk
336  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
337  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
338  if (stat(defPath.c_str(), &statbuf)) {
339  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
340  if (stat(defPath.c_str(), &statbuf)) {
341  defPath = defPathSuffix;
342  }
343  }
344  }
345  dpd_ = new DataPointDefinition();
346  std::string defLabel = "data";
347  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
348  }
349  }
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
def load(fileName)
Definition: svgfig.py:547
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 124 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by DAQSource::getNextDataBlock(), and FedRawDataInputSource::getNextEvent().

124 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 970 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

970  {
971  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
972  flock(fulocal_rwlock_fd_, LOCK_SH);
973  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 980 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

980  {
981  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
982  flock(fulocal_rwlock_fd2_, LOCK_EX);
983  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 966 of file EvFDaqDirector.cc.

References init_lock_.

966 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2164 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and to_string().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

2164  {
2165  struct stat buf;
2166  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2167  }
static std::string to_string(const XMLCh *ch)
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2151 of file EvFDaqDirector.cc.

References command_line::start.

2151  {
2152 #ifdef __APPLE__
2153  return {start, len, pid, type, whence};
2154 #else
2155  return {type, whence, start, len, pid};
2156 #endif
2157  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 125 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

125 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 947 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

947  {
948  if (create) {
950  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
951  chmod(fulockfile_.c_str(), 0766);
952  } else {
953  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
954  }
955  if (fu_readwritelock_fd_ == -1)
956  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
957  << " create:" << create << " error:" << strerror(errno);
958  else
959  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
960 
961  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
962  if (fu_rw_lock_stream == nullptr)
963  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
964  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 110 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

110 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

Referenced by DAQSource::DAQSource().

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint16_t &  rawDataType,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 1022 of file EvFDaqDirector.cc.

References checkFileRead(), FRDFileHeaderContent_v2::dataType_, FRDFileHeaderContent_v1::eventCount_, FRDFileHeaderContent_v2::eventCount_, FRDFileHeaderContent_v1::fileSize_, FRDFileHeaderContent_v2::fileSize_, getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, FRDFileHeaderContent_v1::lumiSection_, FRDFileHeaderContent_v2::lumiSection_, and FRDFileHeaderIdentifier::version_.

Referenced by grabNextJsonFromRaw(), FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1031  {
1032  int infile;
1033 
1034  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1035  if (retry) {
1036  edm::LogWarning("EvFDaqDirector")
1037  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1038  return parseFRDFileHeader(rawSourcePath,
1039  rawFd,
1040  rawHeaderSize,
1041  rawDataType,
1042  lsFromHeader,
1043  eventsFromHeader,
1044  fileSizeFromHeader,
1045  requireHeader,
1046  false,
1047  closeFile);
1048  } else {
1049  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1050  edm::LogError("EvFDaqDirector")
1051  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1052  if (errno == ENOENT)
1053  return 1; // error && file not found
1054  else
1055  return -1;
1056  }
1057  }
1058  }
1059 
1060  //v2 is the largest possible read
1061  char hdr[sizeof(FRDFileHeader_v2)];
1062  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1063  return -1;
1064 
1066  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1067 
1068  if (frd_version == 0) {
1069  //no header (specific sequence not detected)
1070  if (requireHeader) {
1071  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1072  close(infile);
1073  return -1;
1074  } else {
1075  //no header, but valid file
1076  lseek(infile, 0, SEEK_SET);
1077  rawHeaderSize = 0;
1078  lsFromHeader = 0;
1079  eventsFromHeader = -1;
1080  fileSizeFromHeader = -1;
1081  }
1082  } else if (frd_version == 1) {
1083  //version 1 header
1084  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1085  return -1;
1087  uint32_t headerSizeRaw = fhContent->headerSize_;
1088  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1089  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1090  << " v:" << frd_version;
1091  close(infile);
1092  return -1;
1093  }
1094  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1095  rawDataType = 0;
1096  lsFromHeader = fhContent->lumiSection_;
1097  eventsFromHeader = (int32_t)fhContent->eventCount_;
1098  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1099  rawHeaderSize = fhContent->headerSize_;
1100 
1101  } else if (frd_version == 2) {
1102  //version 2 heade
1103  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1104  return -1;
1106  uint32_t headerSizeRaw = fhContent->headerSize_;
1107  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1108  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1109  << " v:" << frd_version;
1110  close(infile);
1111  return -1;
1112  }
1113  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1114  rawDataType = fhContent->dataType_;
1115  lsFromHeader = fhContent->lumiSection_;
1116  eventsFromHeader = (int32_t)fhContent->eventCount_;
1117  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1118  rawHeaderSize = fhContent->headerSize_;
1119  }
1120 
1121  if (closeFile) {
1122  close(infile);
1123  infile = -1;
1124  }
1125 
1126  rawFd = infile;
1127  return 0; //OK
1128  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 370 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

370  {
371  initRun();
372 
373  nThreads_ = bounds.maxNumberOfStreams();
374  nStreams_ = bounds.maxNumberOfThreads();
375  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
376  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 414 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

414  {
416  checkMergeTypePSet(pc);
417  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 419 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

419  {
420  //assert(run_ == id.run());
421 
422  // check if the requested run is the latest one - issue a warning if it isn't
424  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
425  << ". This is not the highest run " << dirManager_.findHighestRunDir();
426  }
427  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 438 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

438  {
439  //delete all files belonging to just closed lumi
440  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
442  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
443  return;
444  }
445 
446  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
447  auto it = filesToDeletePtr_->begin();
448  while (it != filesToDeletePtr_->end()) {
449  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
450  it = filesToDeletePtr_->erase(it);
451  } else
452  it++;
453  }
454  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1147 of file EvFDaqDirector.cc.

References checkFileRead(), getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, and FRDFileHeaderIdentifier::version_.

Referenced by bumpFile().

1147  {
1148  int infile;
1149  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1150  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1151  << strerror(errno);
1152  return false;
1153  }
1154  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1155  char hdr[sizeof(FRDFileHeader_v2)];
1156  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1157  return false;
1159  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1160 
1161  if (frd_version == 1) {
1162  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1163  return false;
1165  rawHeaderSize = fhContent->headerSize_;
1166  close(infile);
1167  return true;
1168  } else if (frd_version == 2) {
1169  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1170  return false;
1172  rawHeaderSize = fhContent->headerSize_;
1173  close(infile);
1174  return true;
1175  } else
1176  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1177 
1178  close(infile);
1179  rawHeaderSize = 0;
1180  return false;
1181  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1976 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1976  {
1977  std::ifstream ij(file);
1978  Json::Value deserializeRoot;
1980 
1981  if (!reader.parse(ij, deserializeRoot)) {
1982  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1983  return -1;
1984  }
1985 
1986  int ret = deserializeRoot.get("lastLS", "").asInt();
1987  return ret;
1988  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile()

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 555 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

Referenced by postEndRun().

555  {
556  int retval = remove(filename.c_str());
557  if (retval != 0)
558  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
559  << ". error = " << strerror(errno);
560  }
Log< level::Error, false > LogError

◆ runString()

std::string const& evf::EvFDaqDirector::runString ( ) const
inline

Definition at line 76 of file EvFDaqDirector.h.

References run_string_.

Referenced by DAQSource::DAQSource().

76 { return run_string_; }
std::string run_string_

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 183 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

184  {
185  fileDeleteLockPtr_ = fileDeleteLock;
186  filesToDeletePtr_ = filesToDelete;
187  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 123 of file EvFDaqDirector.h.

References fms_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

123 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 937 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

937  {
938  if (fu_rw_lock_stream == nullptr)
939  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
940  else {
941  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
942  unsigned int readLs = 1, readIndex = 0;
943  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
944  }
945  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 975 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

975  {
976  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
977  flock(fulocal_rwlock_fd_, LOCK_UN);
978  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 985 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

985  {
986  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
987  flock(fulocal_rwlock_fd2_, LOCK_UN);
988  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 968 of file EvFDaqDirector.cc.

References init_lock_.

968 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 562 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

567  {
568  EvFDaqDirector::FileStatus fileStatus = noFile;
569  rawHeaderSize = 0;
570 
571  int retval = -1;
572  int lock_attempts = 0;
573  long total_lock_attempts = 0;
574 
575  struct stat buf;
576  int stopFileLS = -1;
577  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
578  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
579  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
580  if (stopFileCheck == 0)
581  stopFileLS = readLastLSEntry(stopFilePath_);
582  else
583  stopFileLS = 1; //stop without drain if only pid is stopped
584  if (!stop_ls_override_) {
585  //if lumisection is higher than in stop file, should quit at next from current
586  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
587  stopFileLS = stop_ls_override_ = ls;
588  } else
589  stopFileLS = stop_ls_override_;
590  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
591  << stopFileLS;
592  //return runEnded;
593  } else //if file was removed before reaching stop condition, reset this
594  stop_ls_override_ = 0;
595 
596  timeval ts_lockbegin;
597  gettimeofday(&ts_lockbegin, nullptr);
598 
599  while (retval == -1) {
600  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
601  if (retval == -1)
602  usleep(fuLockPollInterval_);
603  else
604  continue;
605 
606  lock_attempts += fuLockPollInterval_;
607  total_lock_attempts += fuLockPollInterval_;
608  if (lock_attempts > 5000000 || errno == 116) {
609  if (errno == 116)
610  edm::LogWarning("EvFDaqDirector")
611  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
612  else
613  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
614  "fu.lock file are present -: errno "
615  << errno << ":" << strerror(errno) << std::endl;
616 
617  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
618  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
619  ls++;
620  return noFile;
621  }
622 
623  if (stat(bu_run_dir_.c_str(), &buf) != 0)
624  return runEnded;
625  if (stat(fulockfile_.c_str(), &buf) != 0)
626  return runEnded;
627 
628  lock_attempts = 0;
629  }
630  if (total_lock_attempts > 5 * 60000000) {
631  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
632  return runAbort;
633  }
634  }
635 
636  timeval ts_lockend;
637  gettimeofday(&ts_lockend, nullptr);
638  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
639  if (deltat > 0.)
640  lockWaitTime = deltat;
641 
642  if (retval != 0)
643  return fileStatus;
644 
645 #ifdef DEBUG
646  timeval ts_lockend;
647  gettimeofday(&ts_lockend, 0);
648 #endif
649 
650  //open another lock file FD after the lock using main fd has been acquired
651  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
652  if (fu_readwritelock_fd2 == -1)
653  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
654  << " create. error:" << strerror(errno);
655 
656  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
657 
658  // if the stream is readable
659  if (fu_rw_lock_stream2 != nullptr) {
660  unsigned int readLs, readIndex;
661  int check = 0;
662  // rewind the stream
663  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
664  // if rewinded ok
665  if (check == 0) {
666  // read its' values
667  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
668  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
669 
670  unsigned int currentLs = readLs;
671  bool bumpedOk = false;
672  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
673  //no lock file write in this case
674  if (ls && ls + 1 < currentLs)
675  ls++;
676  else {
677  // try to bump (look for new index or EoLS file)
678  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
679  //avoid 2 lumisections jump
680  if (ls && readLs > currentLs && currentLs > ls) {
681  ls++;
682  readLs = currentLs = ls;
683  readIndex = 0;
684  bumpedOk = false;
685  //no write to lock file
686  } else {
687  if (ls == 0 && readLs > currentLs) {
688  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
689  //in this case there is no new file in the same LS
690  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
691  readLs = currentLs;
692  readIndex = 0;
693  bumpedOk = false;
694  //no write to lock file
695  }
696  //update return LS value
697  ls = readLs;
698  }
699  }
700  if (bumpedOk) {
701  // there is a new index file to grab, lock file needs to be updated
702  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
703  if (check == 0) {
704  ftruncate(fu_readwritelock_fd2, 0);
705  // write next index in the file, which is the file the next process should take
706  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
707  fflush(fu_rw_lock_stream2);
708  fsync(fu_readwritelock_fd2);
709  fileStatus = newFile;
710  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
711  } else {
712  edm::LogError("EvFDaqDirector")
713  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
714  setExceptionState = true;
715  return noFile;
716  }
717  } else if (currentLs < readLs) {
718  //there is no new file in next LS (yet), but lock file can be updated to the next LS
719  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
720  if (check == 0) {
721  ftruncate(fu_readwritelock_fd2, 0);
722  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
723  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
724  fflush(fu_rw_lock_stream2);
725  fsync(fu_readwritelock_fd2);
726  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
727  } else {
728  edm::LogError("EvFDaqDirector")
729  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
730  setExceptionState = true;
731  return noFile;
732  }
733  }
734  } else {
735  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
736  << strerror(errno);
737  }
738  } else {
739  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
740  }
741  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
742 
743 #ifdef DEBUG
744  timeval ts_preunlock;
745  gettimeofday(&ts_preunlock, 0);
746  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
747  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
748 #endif
749 
750  //if new json is present, lock file which FedRawDataInputSource will later unlock
751  if (fileStatus == newFile)
752  lockFULocal();
753 
754  //release lock at this point
755  int retvalu = -1;
756  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
757  if (retvalu == -1)
758  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
759 
760 #ifdef DEBUG
761  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
762 #endif
763 
764  if (fileStatus == noFile) {
765  struct stat buf;
766  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
767  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
768  fileStatus = runEnded;
769  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
770  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
771  fileStatus = runEnded;
772  }
773  }
774  return fileStatus;
775  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 80 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

80 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dirs_all_

std::vector<std::string> evf::EvFDaqDirector::bu_base_dirs_all_
private

Definition at line 219 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getBUBaseDirs(), and initRun().

◆ bu_base_dirs_nSources_

std::vector<int> evf::EvFDaqDirector::bu_base_dirs_nSources_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getBUBaseDirsNSources().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 264 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 266 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 254 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 247 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 244 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 257 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 263 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 265 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 253 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 256 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 233 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 259 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 303 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 294 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 299 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 282 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 272 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 249 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 255 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 250 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 275 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 302 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 296 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 291 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 278 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 261 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 298 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 281 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 230 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 297 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), initRun(), and runString().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 300 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 236 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 285 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 283 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 284 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 287 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), initRun(), and useFileBroker().