CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (unsigned int ls, unsigned int index)
 
void removeFile (std::string)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 38 of file EvFDaqDirector.cc.

References base_dir_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

39  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
40  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
41  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
42  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
43  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
44  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
45  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
46  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
47  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
48  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
49  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
50  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
51  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
52  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
53  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
54  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
55  hostname_(""),
56  bu_readlock_fd_(-1),
57  bu_writelock_fd_(-1),
61  bu_w_lock_stream(nullptr),
62  bu_r_lock_stream(nullptr),
63  fu_rw_lock_stream(nullptr),
66  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
67  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
68  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
71  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
77 
78  //save hostname for later
79  char hostname[33];
80  gethostname(hostname, 32);
81  hostname_ = hostname;
82 
83  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
84  if (fuLockPollIntervalPtr) {
85  try {
86  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
87  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
88  << " us";
89  } catch (const std::exception&) {
90  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
91  }
92  }
93 
94  //override file service parameter if specified by environment
95  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
96  if (fileBrokerParamPtr) {
97  try {
98  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
99  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
100  } catch (const std::exception&) {
101  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
102  }
103  }
104  if (useFileBroker_) {
106  //find BU data address from hltd configuration
108  struct stat buf;
109  if (stat("/etc/appliance/bus.config", &buf) == 0) {
110  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
111  std::getline(busconfig, fileBrokerHost_);
112  }
113  if (fileBrokerHost_.empty())
114  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
115  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
116  throw cms::Exception("EvFDaqDirector")
117  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
118 
119  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
120  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
121  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
122  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
123  }
124 
125  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
126  if (startFromLSPtr) {
127  try {
128  startFromLS_ = std::stoul(std::string(startFromLSPtr));
129  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
130  } catch (const std::exception&) {
131  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
132  }
133  }
134 
135  //override file service parameter if specified by environment
136  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
137  if (fileBrokerUseLockParamPtr) {
138  try {
139  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
140  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
142  } catch (const std::exception&) {
143  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
144  }
145  }
146 
147  std::stringstream ss;
148  ss << "run" << std::setfill('0') << std::setw(6) << run_;
149  run_string_ = ss.str();
150  ss = std::stringstream();
151  ss << run_;
152  run_nstring_ = ss.str();
153  run_dir_ = base_dir_ + "/" + run_string_;
154  input_throttled_file_ = run_dir_ + "/input_throttle";
155  discard_ls_filestem_ = run_dir_ + "/discard_ls";
156  ss = std::stringstream();
157  ss << getpid();
158  pid_ = ss.str();
159  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string selectedTransferMode_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 302 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

302  {
303  //close server connection
304  if (socket_.get() && socket_->is_open()) {
305  boost::system::error_code ec;
306  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
307  socket_->close(ec);
308  }
309 
310  if (fulocal_rwlock_fd_ != -1) {
311  unlockFULocal();
312  close(fulocal_rwlock_fd_);
313  }
314 
315  if (fulocal_rwlock_fd2_ != -1) {
316  unlockFULocal2();
317  close(fulocal_rwlock_fd2_);
318  }
319  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

76 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References bu_run_dir_.

77 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

References bu_run_open_dir_.

78 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 785 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

791  {
792  if (previousFileSize_ != 0) {
793  if (!fms_) {
795  }
796  if (fms_)
798  previousFileSize_ = 0;
799  }
800  nextFile = "";
801 
802  //reached limit
803  if (maxLS >= 0 && ls > (unsigned int)maxLS)
804  return false;
805 
806  struct stat buf;
807  std::stringstream ss;
808  unsigned int nextIndex = index;
809  nextIndex++;
810 
811  // 1. Check suggested file
812  std::string nextFileJson = getInputJsonFilePath(ls, index);
813  if (stat(nextFileJson.c_str(), &buf) == 0) {
814  fsize = previousFileSize_ = buf.st_size;
815  nextFile = nextFileJson;
816  return true;
817  }
818  // 2. No file -> lumi ended? (and how many?)
819  else {
820  // 3. No file -> check for standalone raw file
821  std::string nextFileRaw = getRawFilePath(ls, index);
822  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
823  fsize = previousFileSize_ = buf.st_size;
824  nextFile = nextFileRaw;
825  return true;
826  }
827 
828  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
829 
830  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
831  // recheck that no raw file appeared in the meantime
832  if (stat(nextFileJson.c_str(), &buf) == 0) {
833  fsize = previousFileSize_ = buf.st_size;
834  nextFile = nextFileJson;
835  return true;
836  }
837  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
838  fsize = previousFileSize_ = buf.st_size;
839  nextFile = nextFileRaw;
840  return true;
841  }
842 
843  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
844  if (indexFilesInLS < 0)
845  //parsing failed
846  return false;
847  else {
848  //check index
849  if ((int)index < indexFilesInLS) {
850  //we have 2 files, and check for 1 failed... retry (2 will never be here)
851  edm::LogError("EvFDaqDirector")
852  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
853  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
854  setExceptionState = true;
855  return false;
856  }
857  }
858  // this lumi ended, check for files
859  ++ls;
860  index = 0;
861 
862  //reached limit
863  if (maxLS >= 0 && ls > (unsigned int)maxLS)
864  return false;
865 
866  nextFileJson = getInputJsonFilePath(ls, 0);
867  nextFileRaw = getRawFilePath(ls, 0);
868  if (stat(nextFileJson.c_str(), &buf) == 0) {
869  // a new file was found at new lumisection, index 0
870  fsize = previousFileSize_ = buf.st_size;
871  nextFile = nextFileJson;
872  return true;
873  }
874  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
875  fsize = previousFileSize_ = buf.st_size;
876  nextFile = nextFileRaw;
877  return true;
878  }
879  return false;
880  }
881  }
882  // no new file found
883  return false;
884  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 2025 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

2025  {
2026  if (mergeTypePset_.empty())
2027  return;
2028  if (!mergeTypeMap_.empty())
2029  return;
2030  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2031  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2032  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2033  for (const std::string& pname : tsPset.getParameterNames()) {
2034  std::string streamType = tsPset.getParameter<std::string>(pname);
2035  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2036  mergeTypeMap_.insert(ac, pname);
2037  ac->second = streamType;
2038  ac.release();
2039  }
2040  }
2041  }
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
std::string mergeTypePset_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
ParameterSet const & getParameterSet(ParameterSetID const &id)

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1917 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

1917  {
1918  if (transferSystemJson_)
1919  return;
1920 
1921  transferSystemJson_.reset(new Json::Value);
1922  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1923  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1924  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1925 
1926  Json::Value destinationsVal(Json::arrayValue);
1927  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1928  for (auto& dest : destinations)
1929  destinationsVal.append(dest);
1930  (*transferSystemJson_)["destinations"] = destinationsVal;
1931 
1932  Json::Value modesVal(Json::arrayValue);
1933  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1934  for (auto& mode : modes)
1935  modesVal.append(mode);
1936  (*transferSystemJson_)["transferModes"] = modesVal;
1937 
1938  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1939  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1940  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1941  Json::Value streamVal;
1942  for (auto& mode : modes) {
1943  //validation
1944  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1945  throw cms::Exception("EvFDaqDirector")
1946  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1947  << ")";
1948  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1949 
1950  Json::Value sDestsValue(Json::arrayValue);
1951 
1952  if (streamDestinations.empty())
1953  throw cms::Exception("EvFDaqDirector")
1954  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1955 
1956  for (auto& sdest : streamDestinations) {
1957  bool sDestValid = false;
1958  sDestsValue.append(sdest);
1959  for (auto& dest : destinations) {
1960  if (dest == sdest)
1961  sDestValid = true;
1962  }
1963  if (!sDestValid)
1964  throw cms::Exception("EvFDaqDirector")
1965  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1966  << ", dest:" << sdest;
1967  }
1968  streamVal[mode] = sDestsValue;
1969  }
1970  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1971  }
1972  }
1973  } else {
1974  if (requireTSPSet_)
1975  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1976  }
1977  }
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
Represents a JSON value.
Definition: value.h:99
ParameterSet const & getParameterSet(ParameterSetID const &id)
array value (ordered list)
Definition: value.h:30

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1463 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1470  {
1471  EvFDaqDirector::FileStatus fileStatus = noFile;
1472  serverError = false;
1473 
1474  boost::system::error_code ec;
1475  try {
1476  while (true) {
1477  //socket connect
1478  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1480 
1481  if (ec) {
1482  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1483  serverError = true;
1484  break;
1485  }
1486  }
1487 
1488  boost::asio::streambuf request;
1489  std::ostream request_stream(&request);
1490  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1491  if (maxLS >= 0) {
1492  std::stringstream spath;
1493  spath << path << "&stopls=" << maxLS;
1494  path = spath.str();
1495  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1496  }
1497  request_stream << "GET " << path << " HTTP/1.1\r\n";
1498  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1499  request_stream << "Accept: */*\r\n";
1500  request_stream << "Connection: keep-alive\r\n\r\n";
1501 
1502  boost::asio::write(*socket_, request, ec);
1503  if (ec) {
1504  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1505  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1506  //we got disconnected, try to reconnect to the server before writing the request
1508  if (ec) {
1509  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1510  serverError = true;
1511  break;
1512  }
1513  continue;
1514  }
1515  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1516  serverError = true;
1517  break;
1518  }
1519 
1520  boost::asio::streambuf response;
1521  boost::asio::read_until(*socket_, response, "\r\n", ec);
1522  if (ec) {
1523  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1524  serverError = true;
1525  break;
1526  }
1527 
1528  std::istream response_stream(&response);
1529 
1530  std::string http_version;
1531  response_stream >> http_version;
1532 
1533  response_stream >> serverHttpStatus;
1534 
1535  std::string status_message;
1536  std::getline(response_stream, status_message);
1537  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1538  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1539  serverError = true;
1540  break;
1541  }
1542  if (serverHttpStatus != 200) {
1543  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1544  serverError = true;
1545  break;
1546  }
1547 
1548  // Process the response headers.
1550  while (std::getline(response_stream, header) && header != "\r") {
1551  }
1552 
1553  std::string fileInfo;
1554  std::map<std::string, std::string> serverMap;
1555  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1556  auto pos = fileInfo.find('=');
1557  if (pos == std::string::npos)
1558  continue;
1559  auto stitle = fileInfo.substr(0, pos);
1560  auto svalue = fileInfo.substr(pos + 1);
1561  serverMap[stitle] = svalue;
1562  }
1563 
1564  //check that response run number if correct
1565  auto server_version = serverMap.find("version");
1566  assert(server_version != serverMap.end());
1567 
1568  auto server_run = serverMap.find("runnumber");
1569  assert(server_run != serverMap.end());
1570  assert(run_nstring_ == server_run->second);
1571 
1572  auto server_state = serverMap.find("state");
1573  assert(server_state != serverMap.end());
1574 
1575  auto server_eols = serverMap.find("lasteols");
1576  assert(server_eols != serverMap.end());
1577 
1578  auto server_ls = serverMap.find("lumisection");
1579 
1580  int version_maj = 1;
1581  int version_min = 0;
1582  int version_rev = 0;
1583  {
1584  auto* s_ptr = server_version->second.c_str();
1585  if (!server_version->second.empty() && server_version->second[0] == '"')
1586  s_ptr++;
1587  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1588  if (res < 3) {
1589  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1590  if (res < 2) {
1591  res = sscanf(s_ptr, "%d", &version_maj);
1592  if (res < 1) {
1593  //expecting at least 1 number (major version)
1594  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1595  }
1596  }
1597  }
1598  }
1599 
1600  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1601  if (server_ls != serverMap.end())
1602  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1603  else
1604  serverLS = closedServerLS + 1;
1605 
1606  std::string s_state = server_state->second;
1607  if (s_state == "STARTING") //initial, always empty starting with LS 1
1608  {
1609  auto server_file = serverMap.find("file");
1610  assert(server_file == serverMap.end()); //no file with starting state
1611  fileStatus = noFile;
1612  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1613  } else if (s_state == "READY") {
1614  auto server_file = serverMap.find("file");
1615  if (server_file == serverMap.end()) {
1616  //can be returned by server if files from new LS already appeared but LS is not yet closed
1617  if (serverLS <= closedServerLS)
1618  serverLS = closedServerLS + 1;
1619  fileStatus = noFile;
1620  edm::LogInfo("EvFDaqDirector")
1621  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1622  } else {
1623  std::string filestem;
1624  std::string fileprefix;
1625  auto server_fileprefix = serverMap.find("fileprefix");
1626 
1627  if (server_fileprefix != serverMap.end()) {
1628  auto pssize = server_fileprefix->second.size();
1629  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1630  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1631  else
1632  fileprefix = server_fileprefix->second;
1633  }
1634 
1635  //remove string literals
1636  auto ssize = server_file->second.size();
1637  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1638  filestem = server_file->second.substr(1, ssize - 2);
1639  else
1640  filestem = server_file->second;
1641  assert(!filestem.empty());
1642  if (version_maj > 1) {
1643  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1644  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1645  nextFileJson = "";
1646  rawHeader = true;
1647  } else {
1648  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1649  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1650  nextFileJson = filestem + ".jsn";
1651  rawHeader = false;
1652  }
1653  fileStatus = newFile;
1654  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1655  << serverLS << " file:" << filestem;
1656  }
1657  } else if (s_state == "EOLS") {
1658  serverLS = closedServerLS + 1;
1659  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1660  fileStatus = noFile;
1661  } else if (s_state == "EOR") {
1662  //server_eor = serverMap.find("iseor");
1663  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1664  fileStatus = runEnded;
1665  } else if (s_state == "NORUN") {
1666  auto err_msg = serverMap.find("errormessage");
1667  if (err_msg != serverMap.end())
1668  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1669  else
1670  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1671  edm::LogWarning("EvFDaqDirector") << "executing run end";
1672  fileStatus = runEnded;
1673  } else if (s_state == "ERROR") {
1674  auto err_msg = serverMap.find("errormessage");
1675  if (err_msg != serverMap.end())
1676  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1677  else
1678  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1679  fileStatus = noFile;
1680  serverError = true;
1681  } else {
1682  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1683  fileStatus = noFile;
1684  serverError = true;
1685  }
1686 
1687  // Read until EOF, writing data to output as we go.
1688  if (!fileBrokerKeepAlive_) {
1689  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1690  }
1691  if (ec != boost::asio::error::eof) {
1692  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1693  serverError = true;
1694  }
1695  }
1696 
1697  break;
1698  }
1699 
1700  } catch (std::exception const& e) {
1701  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1702  serverError = true;
1703  }
1704 
1705  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1706  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1707  if (ec) {
1708  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1709  }
1710  socket_->close(ec);
1711  if (ec) {
1712  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1713  }
1714  }
1715 
1716  if (serverError) {
1717  if (socket_->is_open())
1718  socket_->close(ec);
1719  if (ec) {
1720  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1721  }
1722  fileStatus = noFile;
1723  sleep(1); //back-off if error detected
1724  }
1725 
1726  return fileStatus;
1727  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 939 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

939  {
940  //used for backpressure mechanisms and monitoring
941  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
942  struct stat buf;
943  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
944  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
945  close(bol_fd);
946  }
947  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 949 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

952  {
953  if (currentLumiSection > 0) {
954  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
955  struct stat buf;
956  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
957  if (!found) {
958  if (doCreateEoLS) {
959  int eol_fd =
960  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
961  close(eol_fd);
962  }
963  if (doCreateBoLS)
964  createBoLSFile(lumiSection, false);
965  }
966  } else if (doCreateBoLS) {
967  createBoLSFile(lumiSection, true); //needed for initial lumisection
968  }
969  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2057 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext().

2057  {
2058  std::string proc_flag = run_dir_ + "/processing";
2059  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2060  close(proc_flag_fd);
2061  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1878 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1878  {
1879  // create open dir if not already there
1880 
1882  if (!std::filesystem::is_directory(openPath)) {
1883  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1884  std::filesystem::create_directories(openPath);
1885  }
1886  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 329 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

329  {
331  desc.setComment(
332  "Service used for file locking arbitration and for propagating information between other EvF components");
333  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
334  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
335  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
336  desc.addUntracked<bool>("useFileBroker", false)
337  ->setComment("Use BU file service to grab input data instead of NFS file locking");
338  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
339  ->setComment("Allow service to discover BU address from hltd configuration");
340  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
341  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
342  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
343  ->setComment("Use keep alive to avoid using large number of sockets");
344  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
345  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
346  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
347  ->setComment("Lock polling interval in microseconds for the input directory file lock");
348  desc.addUntracked<bool>("outputAdler32Recheck", false)
349  ->setComment("Check Adler32 of per-process output files while micro-merging");
350  desc.addUntracked<bool>("requireTransfersPSet", false)
351  ->setComment("Require complete transferSystem PSet in the process configuration");
352  desc.addUntracked<std::string>("selectedTransferMode", "")
353  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
354  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
355  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
356  desc.addUntracked<std::string>("mergingPset", "")
357  ->setComment("Name of merging PSet to look for merging type definitions for streams");
358  descriptions.add("EvFDaqDirector", desc);
359  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 81 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

81 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 490 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

490  {
491  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
492  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 419 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

419  {
421  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 482 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

482  {
483  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
484  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 486 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

486  {
487  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
488  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 494 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

494 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 496 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext().

496 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 498 of file EvFDaqDirector.cc.

References bu_run_dir_.

498 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 447 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

447  {
449  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 451 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

451  {
453  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 403 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

403  {
405  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1902 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

1902  {
1903  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1905  struct stat buf;
1906  unsigned int lscount = 1;
1907  do {
1908  std::stringstream ss;
1909  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1910  fullpath = ss.str();
1911  lscount++;
1912  } while (stat(fullpath.c_str(), &buf) == 0);
1913  return lscount - 1;
1914  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 439 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

439  {
441  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 435 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

435  {
437  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 465 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

466  {
468  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 478 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

478  {
480  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1729 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

1736  {
1737  EvFDaqDirector::FileStatus fileStatus = noFile;
1738 
1739  //int retval = -1;
1740  //int lock_attempts = 0;
1741  //long total_lock_attempts = 0;
1742 
1743  struct stat buf;
1744  int stopFileLS = -1;
1745  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1746  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1747  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1748  if (stopFileCheck == 0)
1749  stopFileLS = readLastLSEntry(stopFilePath_);
1750  else
1751  stopFileLS = 1; //stop without drain if only pid is stopped
1752  if (!stop_ls_override_) {
1753  //if lumisection is higher than in stop file, should quit at next from current
1754  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1755  stopFileLS = stop_ls_override_ = ls;
1756  } else
1757  stopFileLS = stop_ls_override_;
1758  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1759  << stopFileLS;
1760  //return runEnded;
1761  } else //if file was removed before reaching stop condition, reset this
1762  stop_ls_override_ = 0;
1763 
1764  /* look for EoLS
1765  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1766  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1767  ls++;
1768  return noFile;
1769  }
1770  */
1771 
1772  timeval ts_lockbegin;
1773  gettimeofday(&ts_lockbegin, nullptr);
1774 
1775  std::string nextFileJson;
1776  uint32_t serverLS, closedServerLS;
1777  unsigned int serverHttpStatus;
1778  bool serverError;
1779 
1780  //local lock to force index json and EoLS files to appear in order
1782  lockFULocal();
1783 
1784  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1785  bool rawHeader = false;
1786  fileStatus = contactFileBroker(
1787  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1788 
1789  if (serverError) {
1790  //do not update anything
1792  unlockFULocal();
1793  return noFile;
1794  }
1795 
1796  //handle creation of BoLS files if lumisection has changed
1797  if (currentLumiSection == 0) {
1798  if (fileStatus == runEnded)
1799  createLumiSectionFiles(closedServerLS, 0, true, false);
1800  else
1801  createLumiSectionFiles(serverLS, 0, true, false);
1802  } else {
1803  if (closedServerLS >= currentLumiSection) {
1804  //only BoLS files
1805  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1806  createLumiSectionFiles(i + 1, i, true, false);
1807  }
1808  }
1809 
1810  bool fileFound = true;
1811 
1812  if (fileStatus == newFile) {
1813  if (rawHeader > 0)
1814  serverEventsInNewFile =
1815  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1816  else
1817  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1818  }
1819  //closing file in case of any error
1820  if (serverEventsInNewFile < 0 && rawFd != -1) {
1821  close(rawFd);
1822  rawFd = -1;
1823  }
1824 
1825  //can unlock because all files have been created locally
1827  unlockFULocal();
1828 
1829  if (!fileFound) {
1830  //catch condition where directory got deleted
1831  fileStatus = noFile;
1832  struct stat buf;
1833  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1834  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1835  fileStatus = runEnded;
1836  }
1837  }
1838 
1839  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1840  //so that EoLS files can not appear locally before index files
1841  if (currentLumiSection == 0) {
1842  lockFULocal2();
1843  if (fileStatus == runEnded) {
1844  createLumiSectionFiles(closedServerLS, 0, false, true);
1845  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1846  } else {
1847  createLumiSectionFiles(serverLS, 0, false, true);
1848  }
1849  unlockFULocal2();
1850  } else {
1851  if (closedServerLS >= currentLumiSection) {
1852  //lock exclusive to create EoLS files
1853  lockFULocal2();
1854  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1855  createLumiSectionFiles(i + 1, i, false, true);
1856  unlockFULocal2();
1857  }
1858  }
1859 
1860  if (fileStatus == runEnded)
1861  ls = std::max(currentLumiSection, serverLS);
1862  else if (fileStatus == newFile) {
1863  assert(serverLS >= ls);
1864  ls = serverLS;
1865  } else if (fileStatus == noFile) {
1866  if (serverLS >= ls)
1867  ls = serverLS;
1868  else {
1869  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1870  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1871  sleep(1);
1872  }
1873  }
1874 
1875  return fileStatus;
1876  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 724 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

724  {
725  std::ifstream ij(BUEoLSFile);
726  Json::Value deserializeRoot;
728 
729  if (!reader.parse(ij, deserializeRoot)) {
730  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
731  return -1;
732  }
733 
735  DataPoint dp;
736  dp.deserialize(deserializeRoot);
737 
738  //read definition
739  if (readEolsDefinition_) {
740  //std::string def = boost::algorithm::trim(dp.getDefinition());
741  std::string def = dp.getDefinition();
742  if (def.empty())
743  readEolsDefinition_ = false;
744  while (!def.empty()) {
746  if (def.find('/') == 0)
747  fullpath = def;
748  else
749  fullpath = bu_run_dir_ + '/' + def;
750  struct stat buf;
751  if (stat(fullpath.c_str(), &buf) == 0) {
752  DataPointDefinition eolsDpd;
753  std::string defLabel = "legend";
754  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
755  if (eolsDpd.getNames().empty()) {
756  //try with "data" label if "legend" format is not used
757  eolsDpd = DataPointDefinition();
758  defLabel = "data";
759  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
760  }
761  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
762  if (eolsDpd.getNames().at(i) == "NFiles")
764  readEolsDefinition_ = false;
765  break;
766  }
767  //check if we can still find definition
768  if (def.size() <= 1 || def.find('/') == std::string::npos) {
769  readEolsDefinition_ = false;
770  break;
771  }
772  def = def.substr(def.find('/') + 1);
773  }
774  }
775 
776  if (dp.getData().size() > eolsNFilesIndex_)
777  data = dp.getData()[eolsNFilesIndex_];
778  else {
779  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
780  return -1;
781  }
782  return std::stoi(data);
783  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 423 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

423  {
425  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 443 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

443  {
444  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
445  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 415 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

415  {
416  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
417  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 427 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

427  {
429  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 455 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

456  {
458  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 411 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

411  {
412  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
413  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 470 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

470  {
472  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 431 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

431  {
433  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 460 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

461  {
463  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 407 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and removeFile().

407  {
409  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 474 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

474  {
476  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 119 of file EvFDaqDirector.h.

References run_.

119 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

107 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 178 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor().

178 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1979 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

1979  {
1980  std::string streamRequestName;
1981  if (transferSystemJson_->isMember(stream.c_str()))
1982  streamRequestName = stream;
1983  else {
1984  std::stringstream msg;
1985  msg << "Transfer system mode definitions missing for -: " << stream;
1986  if (requireTSPSet_)
1987  throw cms::Exception("EvFDaqDirector") << msg.str();
1988  else {
1989  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1990  return std::string("Failsafe");
1991  }
1992  }
1993  //return empty if strict check parameter is not on
1994  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995  edm::LogWarning("EvFDaqDirector")
1996  << "Selected mode string is not provided as DaqDirector parameter."
1997  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1998  return std::string("Failsafe");
1999  }
2000  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2001  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2002  }
2003  //check if stream has properly listed transfer stream
2004  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2005  std::stringstream msg;
2006  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2007  if (requireTSPSet_)
2008  throw cms::Exception("EvFDaqDirector") << msg.str();
2009  else
2010  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2011  return std::string("Failsafe");
2012  }
2013  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2014 
2015  //flatten string json::Array into CSV std::string
2016  std::string ret;
2017  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2018  if (!ret.empty())
2019  ret += ",";
2020  ret += (*it).asString();
2021  }
2022  return ret;
2023  }
const_iterator end() const
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
ret
prodAgent to be discontinued
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:286
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 2043 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

2043  {
2044  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2045  if (mergeTypeMap_.find(search_ac, stream))
2046  return search_ac->second;
2047 
2048  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2049  std::string defaultName = MergeTypeNames_[defaultType];
2050  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2051  mergeTypeMap_.insert(ac, stream);
2052  ac->second = defaultName;
2053  ac.release();
2054  return defaultName;
2055  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1188 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1191  {
1192  fileFound = true;
1193 
1194  //should be ported to use fffnaming
1195  std::ostringstream fileNameWithPID;
1196  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1197  << std::setw(5) << pid_ << ".jsn";
1198 
1199  // assemble json destination path
1200  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1201 
1202  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1203 
1204  int infile = -1, outfile = -1;
1205 
1206  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1207  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1208  << strerror(errno);
1209  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1210  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1211  << jsonSourcePath << " : " << strerror(errno);
1212  if (errno == ENOENT)
1213  fileFound = false;
1214  return -1;
1215  }
1216  }
1217 
1218  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1219  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1220  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1221  if (errno == EEXIST) {
1222  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1223  << " : ";
1224  ::close(infile);
1225  return -1;
1226  }
1227  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1228  << strerror(errno);
1229  struct stat out_stat;
1230  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1231  edm::LogWarning("EvFDaqDirector")
1232  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1233  if (unlink(jsonDestPath.c_str()) == -1) {
1234  edm::LogWarning("EvFDaqDirector")
1235  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1236  }
1237  }
1238  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1239  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1240  << jsonDestPath << " : " << strerror(errno);
1241  ::close(infile);
1242  return -1;
1243  }
1244  }
1245  //copy contents
1246  const std::size_t buf_sz = 512;
1247  std::size_t tot_written = 0;
1248  std::unique_ptr<char[]> buf(new char[buf_sz]);
1249 
1250  ssize_t sz, sz_read = 1, sz_write;
1251  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1252  sz_write = 0;
1253  do {
1254  assert(sz_read - sz_write > 0);
1255  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1256  sz_read = sz; // cause read loop termination
1257  break;
1258  }
1259  assert(sz > 0);
1260  sz_write += sz;
1261  tot_written += sz;
1262  } while (sz_write < sz_read);
1263  }
1264  close(infile);
1265  close(outfile);
1266 
1267  if (tot_written > 0) {
1268  //leave file if it was empty for diagnosis
1269  if (unlink(jsonSourcePath.c_str()) == -1) {
1270  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1271  << strerror(errno);
1272  return -1;
1273  }
1274  } else {
1275  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1276  << jsonSourcePath;
1277  return -1;
1278  }
1279 
1280  Json::Value deserializeRoot;
1282 
1283  std::string data;
1284  std::stringstream ss;
1285  bool result;
1286  try {
1287  if (tot_written <= buf_sz) {
1288  result = reader.parse(buf.get(), deserializeRoot);
1289  } else {
1290  //json will normally not be bigger than buf_sz bytes
1291  try {
1292  std::ifstream ij(jsonDestPath);
1293  ss << ij.rdbuf();
1294  } catch (std::filesystem::filesystem_error const& ex) {
1295  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1296  return -1;
1297  }
1298  result = reader.parse(ss.str(), deserializeRoot);
1299  }
1300  if (!result) {
1301  if (tot_written <= buf_sz)
1302  ss << buf.get();
1303  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1304  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1305  << ss.str() << ".";
1306  return -1;
1307  }
1308 
1309  //read BU JSON
1310  DataPoint dp;
1311  dp.deserialize(deserializeRoot);
1312  bool success = false;
1313  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1314  if (dpd_->getNames().at(i) == "NEvents")
1315  if (i < dp.getData().size()) {
1316  data = dp.getData()[i];
1317  success = true;
1318  break;
1319  }
1320  }
1321  if (!success) {
1322  if (!dp.getData().empty())
1323  data = dp.getData()[0];
1324  else {
1325  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1326  << "grabNextJsonFile - "
1327  << " error reading number of events from BU JSON; No input value. data -: " << data;
1328  return -1;
1329  }
1330  }
1331 
1332  //try to read raw file size
1333  fileSizeFromJson = -1;
1334  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1335  if (dpd_->getNames().at(i) == "NBytes") {
1336  if (i < dp.getData().size()) {
1337  std::string dataSize = dp.getData()[i];
1338  try {
1339  fileSizeFromJson = std::stol(dataSize);
1340  } catch (const std::exception&) {
1341  //non-fatal currently, processing can continue without this value
1342  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1343  << "Input value is -: " << dataSize;
1344  }
1345  break;
1346  }
1347  }
1348  }
1349  return std::stoi(data);
1350  } catch (const std::out_of_range& e) {
1351  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1352  << "Input value is -: " << data;
1353  } catch (const std::invalid_argument& e) {
1354  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1355  << "Input value is -: " << data;
1356  } catch (std::runtime_error const& e) {
1357  //Can be thrown by Json parser
1358  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1359  }
1360 
1361  catch (std::exception const& e) {
1362  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1363  } catch (...) {
1364  //unknown exception
1365  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1366  }
1367 
1368  return -1;
1369  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1371 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1371  {
1372  std::string data;
1373  try {
1374  // assemble json destination path
1375  std::filesystem::path jsonDestPath(baseRunDir());
1376 
1377  //should be ported to use fffnaming
1378  std::ostringstream fileNameWithPID;
1379  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1380  << ".jsn";
1381  jsonDestPath /= fileNameWithPID.str();
1382 
1383  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1384  try {
1385  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386  } catch (std::filesystem::filesystem_error const& ex) {
1387  // Input dir gone?
1388  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1389  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1390  sleep(1);
1391  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1392  }
1393  unlockFULocal();
1394 
1395  try {
1396  //sometimes this fails but file gets deleted
1397  std::filesystem::remove(jsonSourcePath);
1398  } catch (std::filesystem::filesystem_error const& ex) {
1399  // Input dir gone?
1400  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1401  } catch (std::exception const& ex) {
1402  // Input dir gone?
1403  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1404  }
1405 
1406  std::ifstream ij(jsonDestPath);
1407  Json::Value deserializeRoot;
1409 
1410  std::stringstream ss;
1411  ss << ij.rdbuf();
1412  if (!reader.parse(ss.str(), deserializeRoot)) {
1413  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1414  << "\nERROR:\n"
1415  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1416  << ss.str() << ".";
1417  throw std::runtime_error("Cannot deserialize input JSON file");
1418  }
1419 
1420  //read BU JSON
1421  std::string data;
1422  DataPoint dp;
1423  dp.deserialize(deserializeRoot);
1424  bool success = false;
1425  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1426  if (dpd_->getNames().at(i) == "NEvents")
1427  if (i < dp.getData().size()) {
1428  data = dp.getData()[i];
1429  success = true;
1430  }
1431  }
1432  if (!success) {
1433  if (!dp.getData().empty())
1434  data = dp.getData()[0];
1435  else
1436  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1437  << " error reading number of events from BU JSON -: No input value " << data;
1438  }
1439  return std::stoi(data);
1440  } catch (std::filesystem::filesystem_error const& ex) {
1441  // Input dir gone?
1442  unlockFULocal();
1443  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1444  } catch (std::runtime_error const& e) {
1445  // Another process grabbed the file and NFS did not register this
1446  unlockFULocal();
1447  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1448  } catch (const std::out_of_range&) {
1449  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1450  << "Input value is -: " << data;
1451  } catch (const std::invalid_argument&) {
1452  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1453  << "Input value is -: " << data;
1454  } catch (std::exception const& e) {
1455  // BU run directory disappeared?
1456  unlockFULocal();
1457  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1458  }
1459 
1460  return -1;
1461  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile 
)

Definition at line 1106 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1112  {
1113  fileFound = true;
1114 
1115  //take only first three tokens delimited by "_" in the renamed raw file name
1116  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1117  size_t pos = 0, n_tokens = 0;
1118  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1119  }
1120  std::string reducedJsonStem = jsonStem.substr(0, pos);
1121 
1122  std::ostringstream fileNameWithPID;
1123  //should be ported to use fffnaming
1124  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1125 
1126  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1127 
1128  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1129 
1130  //parse RAW file header if it exists
1131  uint32_t lsFromRaw;
1132  int32_t nbEventsWrittenRaw;
1133  int64_t fileSizeFromRaw;
1134  auto ret = parseFRDFileHeader(
1135  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1136  if (ret != 0) {
1137  if (ret == 1)
1138  fileFound = false;
1139  return -1;
1140  }
1141 
1142  int outfile;
1143  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1144  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1145  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1146  if (errno == EEXIST) {
1147  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1148  << " : ";
1149  return -1;
1150  }
1151  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1152  << strerror(errno);
1153  struct stat out_stat;
1154  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1155  edm::LogWarning("EvFDaqDirector")
1156  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1157  << jsonDestPath;
1158  if (unlink(jsonDestPath.c_str()) == -1) {
1159  edm::LogWarning("EvFDaqDirector")
1160  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1161  }
1162  }
1163  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1164  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1165  << jsonDestPath << " : " << strerror(errno);
1166  return -1;
1167  }
1168  }
1169  //write JSON file (TODO: use jsoncpp)
1170  std::stringstream ss;
1171  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1172  std::string sstr = ss.str();
1173 
1174  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1175  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1176  << " : " << strerror(errno);
1177  return -1;
1178  }
1179  close(outfile);
1180  if (serverLS && serverLS != lsFromRaw)
1181  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1182  << " and raw file header LS " << lsFromRaw;
1183 
1184  fileSizeFromHeader = fileSizeFromRaw;
1185  return nbEventsWrittenRaw;
1186  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 161 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

161  {
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
245 
246  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
247  for (auto& optfile : optfiles) {
248  try {
249  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
250  } catch (...) {
251  }
252  }
253 
254  std::filesystem::rename(tmphltdir, hltdir);
255  } else
256  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
257  }
258  //else{}//no configuration specified
259  } else {
260  // for FU, check if bu base dir exists
261 
262  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
263  if (retval != 0 && errno != EEXIST) {
264  throw cms::Exception("DaqDirector")
265  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
266  }
267 
269  fulockfile_ = bu_run_dir_ + "/fu.lock";
270  openFULockfileStream(false);
271  }
272 
273  pthread_mutex_init(&init_lock_, nullptr);
274 
275  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
276  std::stringstream sstp;
277  sstp << stopFilePath_ << "_pid" << pid_;
278  stopFilePathPid_ = sstp.str();
279 
280  if (!directorBU_) {
281  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
282  struct stat statbuf;
283  if (!stat(defPath.c_str(), &statbuf))
284  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
285  else {
286  //look in source directory if not present in ramdisk
287  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
288  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
289  if (stat(defPath.c_str(), &statbuf)) {
290  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
291  if (stat(defPath.c_str(), &statbuf)) {
292  defPath = defPathSuffix;
293  }
294  }
295  }
296  dpd_ = new DataPointDefinition();
297  std::string defLabel = "data";
298  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
299  }
300  }
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 123 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by FedRawDataInputSource::getNextEvent().

123 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 919 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

919  {
920  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
921  flock(fulocal_rwlock_fd_, LOCK_SH);
922  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 929 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

929  {
930  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
931  flock(fulocal_rwlock_fd2_, LOCK_EX);
932  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 915 of file EvFDaqDirector.cc.

References init_lock_.

915 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2076 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and cond::impl::to_string().

Referenced by FedRawDataInputSource::readSupervisor().

2076  {
2077  struct stat buf;
2078  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2079  }
std::string to_string(const V &value)
Definition: OMSAccess.h:71
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2063 of file EvFDaqDirector.cc.

References command_line::start.

2063  {
2064 #ifdef __APPLE__
2065  return {start, len, pid, type, whence};
2066 #else
2067  return {type, whence, start, len, pid};
2068 #endif
2069  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 124 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor().

124 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 896 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

896  {
897  if (create) {
899  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
900  chmod(fulockfile_.c_str(), 0766);
901  } else {
902  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
903  }
904  if (fu_readwritelock_fd_ == -1)
905  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
906  << " create:" << create << " error:" << strerror(errno);
907  else
908  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
909 
910  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
911  if (fu_rw_lock_stream == nullptr)
912  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
913  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 108 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

108 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 971 of file EvFDaqDirector.cc.

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and fileinputsource_cfi::read.

Referenced by grabNextJsonFromRaw(), and FedRawDataInputSource::readSupervisor().

979  {
980  int infile;
981 
982  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
983  if (retry) {
984  edm::LogWarning("EvFDaqDirector")
985  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
986  return parseFRDFileHeader(rawSourcePath,
987  rawFd,
988  rawHeaderSize,
989  lsFromHeader,
990  eventsFromHeader,
991  fileSizeFromHeader,
992  requireHeader,
993  false,
994  closeFile);
995  } else {
996  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
997  edm::LogError("EvFDaqDirector")
998  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
999  if (errno == ENOENT)
1000  return 1; // error && file not found
1001  else
1002  return -1;
1003  }
1004  }
1005  }
1006 
1007  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1008  FRDFileHeader_v1 fileHead;
1009 
1010  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1011  if (closeFile) {
1012  close(infile);
1013  infile = -1;
1014  }
1015 
1016  if (sz_read < 0) {
1017  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1018  << strerror(errno);
1019  if (infile != -1)
1020  close(infile);
1021  return -1;
1022  }
1023  if ((size_t)sz_read < buf_sz) {
1024  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1025  if (infile != -1)
1026  close(infile);
1027  return -1;
1028  }
1029 
1030  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1031 
1032  if (frd_version == 0) {
1033  //no header (specific sequence not detected)
1034  if (requireHeader) {
1035  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1036  if (infile != -1)
1037  close(infile);
1038  return -1;
1039  } else {
1040  //no header, but valid file
1041  lseek(infile, 0, SEEK_SET);
1042  rawHeaderSize = 0;
1043  lsFromHeader = 0;
1044  eventsFromHeader = -1;
1045  fileSizeFromHeader = -1;
1046  }
1047  } else {
1048  //version 1 header
1049  uint32_t headerSizeRaw = fileHead.headerSize_;
1050  if (headerSizeRaw < buf_sz) {
1051  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1052  << " v:" << frd_version;
1053  if (infile != -1)
1054  close(infile);
1055  return -1;
1056  }
1057  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1058  lsFromHeader = fileHead.lumiSection_;
1059  eventsFromHeader = (int32_t)fileHead.eventCount_;
1060  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1061  rawHeaderSize = fileHead.headerSize_;
1062  }
1063  rawFd = infile;
1064  return 0; //OK
1065  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
uint16_t eventCount_
Definition: FRDFileHeader.h:38
uint16_t headerSize_
Definition: FRDFileHeader.h:37
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:35
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:36
uint64_t fileSize_
Definition: FRDFileHeader.h:40
uint32_t lumiSection_
Definition: FRDFileHeader.h:39
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 376 of file EvFDaqDirector.cc.

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBU_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

376  {
377  close(bu_readlock_fd_);
378  close(bu_writelock_fd_);
379  if (directorBU_) {
380  std::string filename = bu_run_dir_ + "/bu.lock";
382  }
383  }
void removeFile(unsigned int ls, unsigned int index)
std::string bu_run_dir_

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 321 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

321  {
322  initRun();
323 
324  nThreads_ = bounds.maxNumberOfStreams();
325  nStreams_ = bounds.maxNumberOfThreads();
326  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
327  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 361 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

361  {
363  checkMergeTypePSet(pc);
364  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 366 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

366  {
367  //assert(run_ == id.run());
368 
369  // check if the requested run is the latest one - issue a warning if it isn't
371  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
372  << ". This is not the highest run " << dirManager_.findHighestRunDir();
373  }
374  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 385 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

385  {
386  //delete all files belonging to just closed lumi
387  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
389  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
390  return;
391  }
392 
393  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
394  auto it = filesToDeletePtr_->begin();
395  while (it != filesToDeletePtr_->end()) {
396  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
397  it = filesToDeletePtr_->erase(it);
398  } else
399  it++;
400  }
401  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1067 of file EvFDaqDirector.cc.

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and fileinputsource_cfi::read.

Referenced by bumpFile().

1067  {
1068  int infile;
1069  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1070  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1071  << strerror(errno);
1072  return false;
1073  }
1074  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1075  FRDFileHeader_v1 fileHead;
1076 
1077  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1078 
1079  if (sz_read < 0) {
1080  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1081  << strerror(errno);
1082  if (infile != -1)
1083  close(infile);
1084  return false;
1085  }
1086  if ((size_t)sz_read < buf_sz) {
1087  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1088  if (infile != -1)
1089  close(infile);
1090  return false;
1091  }
1092 
1093  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1094 
1095  close(infile);
1096 
1097  if (frd_version > 0) {
1098  rawHeaderSize = fileHead.headerSize_;
1099  return true;
1100  }
1101 
1102  rawHeaderSize = 0;
1103  return false;
1104  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
uint16_t headerSize_
Definition: FRDFileHeader.h:37
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:35
Log< level::Error, false > LogError
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:36
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1888 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1888  {
1889  std::ifstream ij(file);
1890  Json::Value deserializeRoot;
1892 
1893  if (!reader.parse(ij, deserializeRoot)) {
1894  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1895  return -1;
1896  }
1897 
1898  int ret = deserializeRoot.get("lastLS", "").asInt();
1899  return ret;
1900  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile() [1/2]

void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 507 of file EvFDaqDirector.cc.

References getRawFilePath(), and eostools::ls().

Referenced by postEndRun().

std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
void removeFile(unsigned int ls, unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349

◆ removeFile() [2/2]

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 500 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

500  {
501  int retval = remove(filename.c_str());
502  if (retval != 0)
503  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
504  << ". error = " << strerror(errno);
505  }
Log< level::Error, false > LogError

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 179 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

180  {
181  fileDeleteLockPtr_ = fileDeleteLock;
182  filesToDeletePtr_ = filesToDelete;
183  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 122 of file EvFDaqDirector.h.

References fms_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

122 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 886 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

886  {
887  if (fu_rw_lock_stream == nullptr)
888  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
889  else {
890  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
891  unsigned int readLs = 1, readIndex = 0;
892  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
893  }
894  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 924 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

924  {
925  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
926  flock(fulocal_rwlock_fd_, LOCK_UN);
927  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 934 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

934  {
935  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
936  flock(fulocal_rwlock_fd2_, LOCK_UN);
937  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 917 of file EvFDaqDirector.cc.

References init_lock_.

917 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 509 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

514  {
515  EvFDaqDirector::FileStatus fileStatus = noFile;
516  rawHeaderSize = 0;
517 
518  int retval = -1;
519  int lock_attempts = 0;
520  long total_lock_attempts = 0;
521 
522  struct stat buf;
523  int stopFileLS = -1;
524  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
525  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
526  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
527  if (stopFileCheck == 0)
528  stopFileLS = readLastLSEntry(stopFilePath_);
529  else
530  stopFileLS = 1; //stop without drain if only pid is stopped
531  if (!stop_ls_override_) {
532  //if lumisection is higher than in stop file, should quit at next from current
533  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
534  stopFileLS = stop_ls_override_ = ls;
535  } else
536  stopFileLS = stop_ls_override_;
537  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
538  << stopFileLS;
539  //return runEnded;
540  } else //if file was removed before reaching stop condition, reset this
541  stop_ls_override_ = 0;
542 
543  timeval ts_lockbegin;
544  gettimeofday(&ts_lockbegin, nullptr);
545 
546  while (retval == -1) {
547  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
548  if (retval == -1)
549  usleep(fuLockPollInterval_);
550  else
551  continue;
552 
553  lock_attempts += fuLockPollInterval_;
554  total_lock_attempts += fuLockPollInterval_;
555  if (lock_attempts > 5000000 || errno == 116) {
556  if (errno == 116)
557  edm::LogWarning("EvFDaqDirector")
558  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
559  else
560  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
561  "fu.lock file are present -: errno "
562  << errno << ":" << strerror(errno) << std::endl;
563 
564  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
565  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
566  ls++;
567  return noFile;
568  }
569 
570  if (stat(bu_run_dir_.c_str(), &buf) != 0)
571  return runEnded;
572  if (stat(fulockfile_.c_str(), &buf) != 0)
573  return runEnded;
574 
575  lock_attempts = 0;
576  }
577  if (total_lock_attempts > 5 * 60000000) {
578  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
579  return runAbort;
580  }
581  }
582 
583  timeval ts_lockend;
584  gettimeofday(&ts_lockend, nullptr);
585  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
586  if (deltat > 0.)
587  lockWaitTime = deltat;
588 
589  if (retval != 0)
590  return fileStatus;
591 
592 #ifdef DEBUG
593  timeval ts_lockend;
594  gettimeofday(&ts_lockend, 0);
595 #endif
596 
597  //open another lock file FD after the lock using main fd has been acquired
598  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
599  if (fu_readwritelock_fd2 == -1)
600  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
601  << " create. error:" << strerror(errno);
602 
603  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
604 
605  // if the stream is readable
606  if (fu_rw_lock_stream2 != nullptr) {
607  unsigned int readLs, readIndex;
608  int check = 0;
609  // rewind the stream
610  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
611  // if rewinded ok
612  if (check == 0) {
613  // read its' values
614  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
615  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
616 
617  unsigned int currentLs = readLs;
618  bool bumpedOk = false;
619  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
620  //no lock file write in this case
621  if (ls && ls + 1 < currentLs)
622  ls++;
623  else {
624  // try to bump (look for new index or EoLS file)
625  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
626  //avoid 2 lumisections jump
627  if (ls && readLs > currentLs && currentLs > ls) {
628  ls++;
629  readLs = currentLs = ls;
630  readIndex = 0;
631  bumpedOk = false;
632  //no write to lock file
633  } else {
634  if (ls == 0 && readLs > currentLs) {
635  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
636  //in this case there is no new file in the same LS
637  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
638  readLs = currentLs;
639  readIndex = 0;
640  bumpedOk = false;
641  //no write to lock file
642  }
643  //update return LS value
644  ls = readLs;
645  }
646  }
647  if (bumpedOk) {
648  // there is a new index file to grab, lock file needs to be updated
649  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
650  if (check == 0) {
651  ftruncate(fu_readwritelock_fd2, 0);
652  // write next index in the file, which is the file the next process should take
653  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
654  fflush(fu_rw_lock_stream2);
655  fsync(fu_readwritelock_fd2);
656  fileStatus = newFile;
657  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
658  } else {
659  edm::LogError("EvFDaqDirector")
660  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
661  setExceptionState = true;
662  return noFile;
663  }
664  } else if (currentLs < readLs) {
665  //there is no new file in next LS (yet), but lock file can be updated to the next LS
666  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
667  if (check == 0) {
668  ftruncate(fu_readwritelock_fd2, 0);
669  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
670  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
671  fflush(fu_rw_lock_stream2);
672  fsync(fu_readwritelock_fd2);
673  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
674  } else {
675  edm::LogError("EvFDaqDirector")
676  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
677  setExceptionState = true;
678  return noFile;
679  }
680  }
681  } else {
682  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
683  << strerror(errno);
684  }
685  } else {
686  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
687  }
688  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
689 
690 #ifdef DEBUG
691  timeval ts_preunlock;
692  gettimeofday(&ts_preunlock, 0);
693  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
694  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
695 #endif
696 
697  //if new json is present, lock file which FedRawDataInputSource will later unlock
698  if (fileStatus == newFile)
699  lockFULocal();
700 
701  //release lock at this point
702  int retvalu = -1;
703  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
704  if (retvalu == -1)
705  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
706 
707 #ifdef DEBUG
708  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
709 #endif
710 
711  if (fileStatus == noFile) {
712  struct stat buf;
713  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
714  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
715  fileStatus = runEnded;
716  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
717  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
718  fileStatus = runEnded;
719  }
720  }
721  return fileStatus;
722  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 79 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

79 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 209 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 210 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 254 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 256 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 244 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 247 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 253 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 255 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 246 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 238 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 249 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 293 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 284 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 289 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 272 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 214 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 213 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 263 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 260 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 258 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 241 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 292 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 278 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 281 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 269 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 219 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 271 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 287 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 230 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), and initRun().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 290 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 275 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 274 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 212 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and useFileBroker().