CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (unsigned int ls, unsigned int index)
 
void removeFile (std::string)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 38 of file EvFDaqDirector.cc.

References visDQMUpload::buf, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, io_service_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, socket_, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

39  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
40  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
41  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
42  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
43  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
44  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
45  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
46  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
47  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
48  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
49  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
50  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
51  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
52  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
53  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
54  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
55  hostname_(""),
56  bu_readlock_fd_(-1),
57  bu_writelock_fd_(-1),
61  bu_w_lock_stream(nullptr),
62  bu_r_lock_stream(nullptr),
63  fu_rw_lock_stream(nullptr),
66  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
67  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
68  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
71  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
77 
78  //save hostname for later
79  char hostname[33];
80  gethostname(hostname, 32);
81  hostname_ = hostname;
82 
83  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
84  if (fuLockPollIntervalPtr) {
85  try {
86  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
87  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
88  << " us";
89  } catch (const std::exception&) {
90  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
91  }
92  }
93 
94  //override file service parameter if specified by environment
95  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
96  if (fileBrokerParamPtr) {
97  try {
98  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
99  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
100  } catch (const std::exception&) {
101  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
102  }
103  }
104  if (useFileBroker_) {
106  //find BU data address from hltd configuration
108  struct stat buf;
109  if (stat("/etc/appliance/bus.config", &buf) == 0) {
110  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
111  std::getline(busconfig, fileBrokerHost_);
112  }
113  if (fileBrokerHost_.empty())
114  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
115  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
116  throw cms::Exception("EvFDaqDirector")
117  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
118 
119  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
120  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
121  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
122  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
123  }
124 
125  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
126  if (startFromLSPtr) {
127  try {
128  startFromLS_ = std::stoul(std::string(startFromLSPtr));
129  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
130  } catch (const std::exception&) {
131  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
132  }
133  }
134 
135  //override file service parameter if specified by environment
136  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
137  if (fileBrokerUseLockParamPtr) {
138  try {
139  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
140  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
142  } catch (const std::exception&) {
143  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
144  }
145  }
146  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string selectedTransferMode_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 301 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

301  {
302  //close server connection
303  if (socket_.get() && socket_->is_open()) {
304  boost::system::error_code ec;
305  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
306  socket_->close(ec);
307  }
308 
309  if (fulocal_rwlock_fd_ != -1) {
310  unlockFULocal();
311  close(fulocal_rwlock_fd_);
312  }
313 
314  if (fulocal_rwlock_fd2_ != -1) {
315  unlockFULocal2();
316  close(fulocal_rwlock_fd2_);
317  }
318  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

76 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References bu_run_dir_.

77 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

References bu_run_open_dir_.

78 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 779 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

785  {
786  if (previousFileSize_ != 0) {
787  if (!fms_) {
789  }
790  if (fms_)
792  previousFileSize_ = 0;
793  }
794  nextFile = "";
795 
796  //reached limit
797  if (maxLS >= 0 && ls > (unsigned int)maxLS)
798  return false;
799 
800  struct stat buf;
801  std::stringstream ss;
802  unsigned int nextIndex = index;
803  nextIndex++;
804 
805  // 1. Check suggested file
806  std::string nextFileJson = getInputJsonFilePath(ls, index);
807  if (stat(nextFileJson.c_str(), &buf) == 0) {
808  fsize = previousFileSize_ = buf.st_size;
809  nextFile = nextFileJson;
810  return true;
811  }
812  // 2. No file -> lumi ended? (and how many?)
813  else {
814  // 3. No file -> check for standalone raw file
815  std::string nextFileRaw = getRawFilePath(ls, index);
816  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
817  fsize = previousFileSize_ = buf.st_size;
818  nextFile = nextFileRaw;
819  return true;
820  }
821 
822  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
823 
824  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
825  // recheck that no raw file appeared in the meantime
826  if (stat(nextFileJson.c_str(), &buf) == 0) {
827  fsize = previousFileSize_ = buf.st_size;
828  nextFile = nextFileJson;
829  return true;
830  }
831  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
832  fsize = previousFileSize_ = buf.st_size;
833  nextFile = nextFileRaw;
834  return true;
835  }
836 
837  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
838  if (indexFilesInLS < 0)
839  //parsing failed
840  return false;
841  else {
842  //check index
843  if ((int)index < indexFilesInLS) {
844  //we have 2 files, and check for 1 failed... retry (2 will never be here)
845  edm::LogError("EvFDaqDirector")
846  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
847  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
848  setExceptionState = true;
849  return false;
850  }
851  }
852  // this lumi ended, check for files
853  ++ls;
854  index = 0;
855 
856  //reached limit
857  if (maxLS >= 0 && ls > (unsigned int)maxLS)
858  return false;
859 
860  nextFileJson = getInputJsonFilePath(ls, 0);
861  nextFileRaw = getRawFilePath(ls, 0);
862  if (stat(nextFileJson.c_str(), &buf) == 0) {
863  // a new file was found at new lumisection, index 0
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873  return false;
874  }
875  }
876  // no new file found
877  return false;
878  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 2019 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

2019  {
2020  if (mergeTypePset_.empty())
2021  return;
2022  if (!mergeTypeMap_.empty())
2023  return;
2024  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2025  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2026  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2027  for (const std::string& pname : tsPset.getParameterNames()) {
2028  std::string streamType = tsPset.getParameter<std::string>(pname);
2029  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2030  mergeTypeMap_.insert(ac, pname);
2031  ac->second = streamType;
2032  ac.release();
2033  }
2034  }
2035  }
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
std::string mergeTypePset_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
ParameterSet const & getParameterSet(ParameterSetID const &id)

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1911 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

1911  {
1912  if (transferSystemJson_)
1913  return;
1914 
1915  transferSystemJson_.reset(new Json::Value);
1916  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1917  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1918  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1919 
1920  Json::Value destinationsVal(Json::arrayValue);
1921  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1922  for (auto& dest : destinations)
1923  destinationsVal.append(dest);
1924  (*transferSystemJson_)["destinations"] = destinationsVal;
1925 
1926  Json::Value modesVal(Json::arrayValue);
1927  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1928  for (auto& mode : modes)
1929  modesVal.append(mode);
1930  (*transferSystemJson_)["transferModes"] = modesVal;
1931 
1932  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1933  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1934  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1935  Json::Value streamVal;
1936  for (auto& mode : modes) {
1937  //validation
1938  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1939  throw cms::Exception("EvFDaqDirector")
1940  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1941  << ")";
1942  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1943 
1944  Json::Value sDestsValue(Json::arrayValue);
1945 
1946  if (streamDestinations.empty())
1947  throw cms::Exception("EvFDaqDirector")
1948  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1949 
1950  for (auto& sdest : streamDestinations) {
1951  bool sDestValid = false;
1952  sDestsValue.append(sdest);
1953  for (auto& dest : destinations) {
1954  if (dest == sdest)
1955  sDestValid = true;
1956  }
1957  if (!sDestValid)
1958  throw cms::Exception("EvFDaqDirector")
1959  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1960  << ", dest:" << sdest;
1961  }
1962  streamVal[mode] = sDestsValue;
1963  }
1964  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1965  }
1966  }
1967  } else {
1968  if (requireTSPSet_)
1969  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1970  }
1971  }
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
Represents a JSON value.
Definition: value.h:99
ParameterSet const & getParameterSet(ParameterSetID const &id)
array value (ordered list)
Definition: value.h:30

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1457 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1464  {
1465  EvFDaqDirector::FileStatus fileStatus = noFile;
1466  serverError = false;
1467 
1468  boost::system::error_code ec;
1469  try {
1470  while (true) {
1471  //socket connect
1472  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1474 
1475  if (ec) {
1476  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1477  serverError = true;
1478  break;
1479  }
1480  }
1481 
1482  boost::asio::streambuf request;
1483  std::ostream request_stream(&request);
1484  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1485  if (maxLS >= 0) {
1486  std::stringstream spath;
1487  spath << path << "&stopls=" << maxLS;
1488  path = spath.str();
1489  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1490  }
1491  request_stream << "GET " << path << " HTTP/1.1\r\n";
1492  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1493  request_stream << "Accept: */*\r\n";
1494  request_stream << "Connection: keep-alive\r\n\r\n";
1495 
1496  boost::asio::write(*socket_, request, ec);
1497  if (ec) {
1498  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1499  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1500  //we got disconnected, try to reconnect to the server before writing the request
1502  if (ec) {
1503  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1504  serverError = true;
1505  break;
1506  }
1507  continue;
1508  }
1509  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1510  serverError = true;
1511  break;
1512  }
1513 
1514  boost::asio::streambuf response;
1515  boost::asio::read_until(*socket_, response, "\r\n", ec);
1516  if (ec) {
1517  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1518  serverError = true;
1519  break;
1520  }
1521 
1522  std::istream response_stream(&response);
1523 
1524  std::string http_version;
1525  response_stream >> http_version;
1526 
1527  response_stream >> serverHttpStatus;
1528 
1529  std::string status_message;
1530  std::getline(response_stream, status_message);
1531  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1532  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1533  serverError = true;
1534  break;
1535  }
1536  if (serverHttpStatus != 200) {
1537  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1538  serverError = true;
1539  break;
1540  }
1541 
1542  // Process the response headers.
1544  while (std::getline(response_stream, header) && header != "\r") {
1545  }
1546 
1547  std::string fileInfo;
1548  std::map<std::string, std::string> serverMap;
1549  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1550  auto pos = fileInfo.find('=');
1551  if (pos == std::string::npos)
1552  continue;
1553  auto stitle = fileInfo.substr(0, pos);
1554  auto svalue = fileInfo.substr(pos + 1);
1555  serverMap[stitle] = svalue;
1556  }
1557 
1558  //check that response run number if correct
1559  auto server_version = serverMap.find("version");
1560  assert(server_version != serverMap.end());
1561 
1562  auto server_run = serverMap.find("runnumber");
1563  assert(server_run != serverMap.end());
1564  assert(run_nstring_ == server_run->second);
1565 
1566  auto server_state = serverMap.find("state");
1567  assert(server_state != serverMap.end());
1568 
1569  auto server_eols = serverMap.find("lasteols");
1570  assert(server_eols != serverMap.end());
1571 
1572  auto server_ls = serverMap.find("lumisection");
1573 
1574  int version_maj = 1;
1575  int version_min = 0;
1576  int version_rev = 0;
1577  {
1578  auto* s_ptr = server_version->second.c_str();
1579  if (!server_version->second.empty() && server_version->second[0] == '"')
1580  s_ptr++;
1581  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1582  if (res < 3) {
1583  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1584  if (res < 2) {
1585  res = sscanf(s_ptr, "%d", &version_maj);
1586  if (res < 1) {
1587  //expecting at least 1 number (major version)
1588  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1589  }
1590  }
1591  }
1592  }
1593 
1594  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1595  if (server_ls != serverMap.end())
1596  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1597  else
1598  serverLS = closedServerLS + 1;
1599 
1600  std::string s_state = server_state->second;
1601  if (s_state == "STARTING") //initial, always empty starting with LS 1
1602  {
1603  auto server_file = serverMap.find("file");
1604  assert(server_file == serverMap.end()); //no file with starting state
1605  fileStatus = noFile;
1606  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1607  } else if (s_state == "READY") {
1608  auto server_file = serverMap.find("file");
1609  if (server_file == serverMap.end()) {
1610  //can be returned by server if files from new LS already appeared but LS is not yet closed
1611  if (serverLS <= closedServerLS)
1612  serverLS = closedServerLS + 1;
1613  fileStatus = noFile;
1614  edm::LogInfo("EvFDaqDirector")
1615  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1616  } else {
1617  std::string filestem;
1618  std::string fileprefix;
1619  auto server_fileprefix = serverMap.find("fileprefix");
1620 
1621  if (server_fileprefix != serverMap.end()) {
1622  auto pssize = server_fileprefix->second.size();
1623  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1624  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1625  else
1626  fileprefix = server_fileprefix->second;
1627  }
1628 
1629  //remove string literals
1630  auto ssize = server_file->second.size();
1631  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1632  filestem = server_file->second.substr(1, ssize - 2);
1633  else
1634  filestem = server_file->second;
1635  assert(!filestem.empty());
1636  if (version_maj > 1) {
1637  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1638  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1639  nextFileJson = "";
1640  rawHeader = true;
1641  } else {
1642  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1643  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1644  nextFileJson = filestem + ".jsn";
1645  rawHeader = false;
1646  }
1647  fileStatus = newFile;
1648  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1649  << serverLS << " file:" << filestem;
1650  }
1651  } else if (s_state == "EOLS") {
1652  serverLS = closedServerLS + 1;
1653  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1654  fileStatus = noFile;
1655  } else if (s_state == "EOR") {
1656  //server_eor = serverMap.find("iseor");
1657  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1658  fileStatus = runEnded;
1659  } else if (s_state == "NORUN") {
1660  auto err_msg = serverMap.find("errormessage");
1661  if (err_msg != serverMap.end())
1662  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1663  else
1664  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1665  edm::LogWarning("EvFDaqDirector") << "executing run end";
1666  fileStatus = runEnded;
1667  } else if (s_state == "ERROR") {
1668  auto err_msg = serverMap.find("errormessage");
1669  if (err_msg != serverMap.end())
1670  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1671  else
1672  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1673  fileStatus = noFile;
1674  serverError = true;
1675  } else {
1676  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1677  fileStatus = noFile;
1678  serverError = true;
1679  }
1680 
1681  // Read until EOF, writing data to output as we go.
1682  if (!fileBrokerKeepAlive_) {
1683  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1684  }
1685  if (ec != boost::asio::error::eof) {
1686  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1687  serverError = true;
1688  }
1689  }
1690 
1691  break;
1692  }
1693 
1694  } catch (std::exception const& e) {
1695  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1696  serverError = true;
1697  }
1698 
1699  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1700  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1701  if (ec) {
1702  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1703  }
1704  socket_->close(ec);
1705  if (ec) {
1706  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1707  }
1708  }
1709 
1710  if (serverError) {
1711  if (socket_->is_open())
1712  socket_->close(ec);
1713  if (ec) {
1714  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1715  }
1716  fileStatus = noFile;
1717  sleep(1); //back-off if error detected
1718  }
1719 
1720  return fileStatus;
1721  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 933 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

933  {
934  //used for backpressure mechanisms and monitoring
935  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
936  struct stat buf;
937  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
938  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
939  close(bol_fd);
940  }
941  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 943 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

946  {
947  if (currentLumiSection > 0) {
948  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
949  struct stat buf;
950  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
951  if (!found) {
952  if (doCreateEoLS) {
953  int eol_fd =
954  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
955  close(eol_fd);
956  }
957  if (doCreateBoLS)
958  createBoLSFile(lumiSection, false);
959  }
960  } else if (doCreateBoLS) {
961  createBoLSFile(lumiSection, true); //needed for initial lumisection
962  }
963  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2051 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext().

2051  {
2052  std::string proc_flag = run_dir_ + "/processing";
2053  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2054  close(proc_flag_fd);
2055  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1872 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1872  {
1873  // create open dir if not already there
1874 
1876  if (!std::filesystem::is_directory(openPath)) {
1877  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1878  std::filesystem::create_directories(openPath);
1879  }
1880  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 327 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

327  {
329  desc.setComment(
330  "Service used for file locking arbitration and for propagating information between other EvF components");
331  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
332  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
333  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
334  desc.addUntracked<bool>("useFileBroker", false)
335  ->setComment("Use BU file service to grab input data instead of NFS file locking");
336  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
337  ->setComment("Allow service to discover BU address from hltd configuration");
338  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
339  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
340  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
341  ->setComment("Use keep alive to avoid using large number of sockets");
342  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
343  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
344  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
345  ->setComment("Lock polling interval in microseconds for the input directory file lock");
346  desc.addUntracked<bool>("outputAdler32Recheck", false)
347  ->setComment("Check Adler32 of per-process output files while micro-merging");
348  desc.addUntracked<bool>("requireTransfersPSet", false)
349  ->setComment("Require complete transferSystem PSet in the process configuration");
350  desc.addUntracked<std::string>("selectedTransferMode", "")
351  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
352  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
353  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
354  desc.addUntracked<std::string>("mergingPset", "")
355  ->setComment("Name of merging PSet to look for merging type definitions for streams");
356  descriptions.add("EvFDaqDirector", desc);
357  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 81 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

81 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 484 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

484  {
485  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
486  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 417 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

417  {
419  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 476 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

476  {
477  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
478  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 480 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

480  {
481  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
482  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 488 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

488 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 490 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext().

490 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 492 of file EvFDaqDirector.cc.

References bu_run_dir_.

492 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 445 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

Referenced by dqm::DQMFileSaverPB::initRun().

445  {
447  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 401 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

401  {
403  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1896 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

1896  {
1897  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1899  struct stat buf;
1900  unsigned int lscount = 1;
1901  do {
1902  std::stringstream ss;
1903  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1904  fullpath = ss.str();
1905  lscount++;
1906  } while (stat(fullpath.c_str(), &buf) == 0);
1907  return lscount - 1;
1908  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 437 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

437  {
439  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 433 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

433  {
435  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 459 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

460  {
462  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 472 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

472  {
474  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1723 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

1730  {
1731  EvFDaqDirector::FileStatus fileStatus = noFile;
1732 
1733  //int retval = -1;
1734  //int lock_attempts = 0;
1735  //long total_lock_attempts = 0;
1736 
1737  struct stat buf;
1738  int stopFileLS = -1;
1739  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1740  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1741  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1742  if (stopFileCheck == 0)
1743  stopFileLS = readLastLSEntry(stopFilePath_);
1744  else
1745  stopFileLS = 1; //stop without drain if only pid is stopped
1746  if (!stop_ls_override_) {
1747  //if lumisection is higher than in stop file, should quit at next from current
1748  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1749  stopFileLS = stop_ls_override_ = ls;
1750  } else
1751  stopFileLS = stop_ls_override_;
1752  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1753  << stopFileLS;
1754  //return runEnded;
1755  } else //if file was removed before reaching stop condition, reset this
1756  stop_ls_override_ = 0;
1757 
1758  /* look for EoLS
1759  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1760  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1761  ls++;
1762  return noFile;
1763  }
1764  */
1765 
1766  timeval ts_lockbegin;
1767  gettimeofday(&ts_lockbegin, nullptr);
1768 
1769  std::string nextFileJson;
1770  uint32_t serverLS, closedServerLS;
1771  unsigned int serverHttpStatus;
1772  bool serverError;
1773 
1774  //local lock to force index json and EoLS files to appear in order
1776  lockFULocal();
1777 
1778  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1779  bool rawHeader = false;
1780  fileStatus = contactFileBroker(
1781  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1782 
1783  if (serverError) {
1784  //do not update anything
1786  unlockFULocal();
1787  return noFile;
1788  }
1789 
1790  //handle creation of BoLS files if lumisection has changed
1791  if (currentLumiSection == 0) {
1792  if (fileStatus == runEnded)
1793  createLumiSectionFiles(closedServerLS, 0, true, false);
1794  else
1795  createLumiSectionFiles(serverLS, 0, true, false);
1796  } else {
1797  if (closedServerLS >= currentLumiSection) {
1798  //only BoLS files
1799  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1800  createLumiSectionFiles(i + 1, i, true, false);
1801  }
1802  }
1803 
1804  bool fileFound = true;
1805 
1806  if (fileStatus == newFile) {
1807  if (rawHeader > 0)
1808  serverEventsInNewFile =
1809  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1810  else
1811  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1812  }
1813  //closing file in case of any error
1814  if (serverEventsInNewFile < 0 && rawFd != -1) {
1815  close(rawFd);
1816  rawFd = -1;
1817  }
1818 
1819  //can unlock because all files have been created locally
1821  unlockFULocal();
1822 
1823  if (!fileFound) {
1824  //catch condition where directory got deleted
1825  fileStatus = noFile;
1826  struct stat buf;
1827  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1828  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1829  fileStatus = runEnded;
1830  }
1831  }
1832 
1833  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1834  //so that EoLS files can not appear locally before index files
1835  if (currentLumiSection == 0) {
1836  lockFULocal2();
1837  if (fileStatus == runEnded) {
1838  createLumiSectionFiles(closedServerLS, 0, false, true);
1839  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1840  } else {
1841  createLumiSectionFiles(serverLS, 0, false, true);
1842  }
1843  unlockFULocal2();
1844  } else {
1845  if (closedServerLS >= currentLumiSection) {
1846  //lock exclusive to create EoLS files
1847  lockFULocal2();
1848  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1849  createLumiSectionFiles(i + 1, i, false, true);
1850  unlockFULocal2();
1851  }
1852  }
1853 
1854  if (fileStatus == runEnded)
1855  ls = std::max(currentLumiSection, serverLS);
1856  else if (fileStatus == newFile) {
1857  assert(serverLS >= ls);
1858  ls = serverLS;
1859  } else if (fileStatus == noFile) {
1860  if (serverLS >= ls)
1861  ls = serverLS;
1862  else {
1863  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1864  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1865  sleep(1);
1866  }
1867  }
1868 
1869  return fileStatus;
1870  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 718 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

718  {
719  std::ifstream ij(BUEoLSFile);
720  Json::Value deserializeRoot;
722 
723  if (!reader.parse(ij, deserializeRoot)) {
724  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
725  return -1;
726  }
727 
729  DataPoint dp;
730  dp.deserialize(deserializeRoot);
731 
732  //read definition
733  if (readEolsDefinition_) {
734  //std::string def = boost::algorithm::trim(dp.getDefinition());
735  std::string def = dp.getDefinition();
736  if (def.empty())
737  readEolsDefinition_ = false;
738  while (!def.empty()) {
740  if (def.find('/') == 0)
741  fullpath = def;
742  else
743  fullpath = bu_run_dir_ + '/' + def;
744  struct stat buf;
745  if (stat(fullpath.c_str(), &buf) == 0) {
746  DataPointDefinition eolsDpd;
747  std::string defLabel = "legend";
748  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
749  if (eolsDpd.getNames().empty()) {
750  //try with "data" label if "legend" format is not used
751  eolsDpd = DataPointDefinition();
752  defLabel = "data";
753  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
754  }
755  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
756  if (eolsDpd.getNames().at(i) == "NFiles")
758  readEolsDefinition_ = false;
759  break;
760  }
761  //check if we can still find definition
762  if (def.size() <= 1 || def.find('/') == std::string::npos) {
763  readEolsDefinition_ = false;
764  break;
765  }
766  def = def.substr(def.find('/') + 1);
767  }
768  }
769 
770  if (dp.getData().size() > eolsNFilesIndex_)
771  data = dp.getData()[eolsNFilesIndex_];
772  else {
773  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
774  return -1;
775  }
776  return std::stoi(data);
777  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 421 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

421  {
423  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 441 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

441  {
442  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
443  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 413 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

413  {
414  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
415  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 425 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

425  {
427  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 449 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

450  {
452  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 409 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

409  {
410  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
411  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 464 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

464  {
466  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 429 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

429  {
431  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 454 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

455  {
457  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 405 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and removeFile().

405  {
407  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 468 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

468  {
470  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

References run_.

118 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 106 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

106 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 176 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor().

176 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1973 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

1973  {
1974  std::string streamRequestName;
1975  if (transferSystemJson_->isMember(stream.c_str()))
1976  streamRequestName = stream;
1977  else {
1978  std::stringstream msg;
1979  msg << "Transfer system mode definitions missing for -: " << stream;
1980  if (requireTSPSet_)
1981  throw cms::Exception("EvFDaqDirector") << msg.str();
1982  else {
1983  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1984  return std::string("Failsafe");
1985  }
1986  }
1987  //return empty if strict check parameter is not on
1988  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1989  edm::LogWarning("EvFDaqDirector")
1990  << "Selected mode string is not provided as DaqDirector parameter."
1991  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1992  return std::string("Failsafe");
1993  }
1994  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1996  }
1997  //check if stream has properly listed transfer stream
1998  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1999  std::stringstream msg;
2000  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2001  if (requireTSPSet_)
2002  throw cms::Exception("EvFDaqDirector") << msg.str();
2003  else
2004  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2005  return std::string("Failsafe");
2006  }
2007  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2008 
2009  //flatten string json::Array into CSV std::string
2010  std::string ret;
2011  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2012  if (!ret.empty())
2013  ret += ",";
2014  ret += (*it).asString();
2015  }
2016  return ret;
2017  }
const_iterator end() const
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
ret
prodAgent to be discontinued
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:285
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 2037 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

2037  {
2038  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2039  if (mergeTypeMap_.find(search_ac, stream))
2040  return search_ac->second;
2041 
2042  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2043  std::string defaultName = MergeTypeNames_[defaultType];
2044  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2045  mergeTypeMap_.insert(ac, stream);
2046  ac->second = defaultName;
2047  ac.release();
2048  return defaultName;
2049  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1182 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1185  {
1186  fileFound = true;
1187 
1188  //should be ported to use fffnaming
1189  std::ostringstream fileNameWithPID;
1190  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1191  << std::setw(5) << pid_ << ".jsn";
1192 
1193  // assemble json destination path
1194  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1195 
1196  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1197 
1198  int infile = -1, outfile = -1;
1199 
1200  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1201  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1202  << strerror(errno);
1203  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1204  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1205  << jsonSourcePath << " : " << strerror(errno);
1206  if (errno == ENOENT)
1207  fileFound = false;
1208  return -1;
1209  }
1210  }
1211 
1212  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1213  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1214  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1215  if (errno == EEXIST) {
1216  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1217  << " : ";
1218  ::close(infile);
1219  return -1;
1220  }
1221  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1222  << strerror(errno);
1223  struct stat out_stat;
1224  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1225  edm::LogWarning("EvFDaqDirector")
1226  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1227  if (unlink(jsonDestPath.c_str()) == -1) {
1228  edm::LogWarning("EvFDaqDirector")
1229  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1230  }
1231  }
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1234  << jsonDestPath << " : " << strerror(errno);
1235  ::close(infile);
1236  return -1;
1237  }
1238  }
1239  //copy contents
1240  const std::size_t buf_sz = 512;
1241  std::size_t tot_written = 0;
1242  std::unique_ptr<char> buf(new char[buf_sz]);
1243 
1244  ssize_t sz, sz_read = 1, sz_write;
1245  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1246  sz_write = 0;
1247  do {
1248  assert(sz_read - sz_write > 0);
1249  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1250  sz_read = sz; // cause read loop termination
1251  break;
1252  }
1253  assert(sz > 0);
1254  sz_write += sz;
1255  tot_written += sz;
1256  } while (sz_write < sz_read);
1257  }
1258  close(infile);
1259  close(outfile);
1260 
1261  if (tot_written > 0) {
1262  //leave file if it was empty for diagnosis
1263  if (unlink(jsonSourcePath.c_str()) == -1) {
1264  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1265  << strerror(errno);
1266  return -1;
1267  }
1268  } else {
1269  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1270  << jsonSourcePath;
1271  return -1;
1272  }
1273 
1274  Json::Value deserializeRoot;
1276 
1277  std::string data;
1278  std::stringstream ss;
1279  bool result;
1280  try {
1281  if (tot_written <= buf_sz) {
1282  result = reader.parse(buf.get(), deserializeRoot);
1283  } else {
1284  //json will normally not be bigger than buf_sz bytes
1285  try {
1286  std::ifstream ij(jsonDestPath);
1287  ss << ij.rdbuf();
1288  } catch (std::filesystem::filesystem_error const& ex) {
1289  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1290  return -1;
1291  }
1292  result = reader.parse(ss.str(), deserializeRoot);
1293  }
1294  if (!result) {
1295  if (tot_written <= buf_sz)
1296  ss << buf.get();
1297  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1298  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1299  << ss.str() << ".";
1300  return -1;
1301  }
1302 
1303  //read BU JSON
1304  DataPoint dp;
1305  dp.deserialize(deserializeRoot);
1306  bool success = false;
1307  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1308  if (dpd_->getNames().at(i) == "NEvents")
1309  if (i < dp.getData().size()) {
1310  data = dp.getData()[i];
1311  success = true;
1312  break;
1313  }
1314  }
1315  if (!success) {
1316  if (!dp.getData().empty())
1317  data = dp.getData()[0];
1318  else {
1319  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1320  << "grabNextJsonFile - "
1321  << " error reading number of events from BU JSON; No input value. data -: " << data;
1322  return -1;
1323  }
1324  }
1325 
1326  //try to read raw file size
1327  fileSizeFromJson = -1;
1328  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1329  if (dpd_->getNames().at(i) == "NBytes") {
1330  if (i < dp.getData().size()) {
1331  std::string dataSize = dp.getData()[i];
1332  try {
1333  fileSizeFromJson = std::stol(dataSize);
1334  } catch (const std::exception&) {
1335  //non-fatal currently, processing can continue without this value
1336  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1337  << "Input value is -: " << dataSize;
1338  }
1339  break;
1340  }
1341  }
1342  }
1343  return std::stoi(data);
1344  } catch (const std::out_of_range& e) {
1345  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1346  << "Input value is -: " << data;
1347  } catch (const std::invalid_argument& e) {
1348  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1349  << "Input value is -: " << data;
1350  } catch (std::runtime_error const& e) {
1351  //Can be thrown by Json parser
1352  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1353  }
1354 
1355  catch (std::exception const& e) {
1356  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1357  } catch (...) {
1358  //unknown exception
1359  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1360  }
1361 
1362  return -1;
1363  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1365 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1365  {
1366  std::string data;
1367  try {
1368  // assemble json destination path
1369  std::filesystem::path jsonDestPath(baseRunDir());
1370 
1371  //should be ported to use fffnaming
1372  std::ostringstream fileNameWithPID;
1373  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1374  << ".jsn";
1375  jsonDestPath /= fileNameWithPID.str();
1376 
1377  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1378  try {
1379  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1380  } catch (std::filesystem::filesystem_error const& ex) {
1381  // Input dir gone?
1382  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1383  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1384  sleep(1);
1385  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386  }
1387  unlockFULocal();
1388 
1389  try {
1390  //sometimes this fails but file gets deleted
1391  std::filesystem::remove(jsonSourcePath);
1392  } catch (std::filesystem::filesystem_error const& ex) {
1393  // Input dir gone?
1394  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1395  } catch (std::exception const& ex) {
1396  // Input dir gone?
1397  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1398  }
1399 
1400  std::ifstream ij(jsonDestPath);
1401  Json::Value deserializeRoot;
1403 
1404  std::stringstream ss;
1405  ss << ij.rdbuf();
1406  if (!reader.parse(ss.str(), deserializeRoot)) {
1407  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1408  << "\nERROR:\n"
1409  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1410  << ss.str() << ".";
1411  throw std::runtime_error("Cannot deserialize input JSON file");
1412  }
1413 
1414  //read BU JSON
1415  std::string data;
1416  DataPoint dp;
1417  dp.deserialize(deserializeRoot);
1418  bool success = false;
1419  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1420  if (dpd_->getNames().at(i) == "NEvents")
1421  if (i < dp.getData().size()) {
1422  data = dp.getData()[i];
1423  success = true;
1424  }
1425  }
1426  if (!success) {
1427  if (!dp.getData().empty())
1428  data = dp.getData()[0];
1429  else
1430  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1431  << " error reading number of events from BU JSON -: No input value " << data;
1432  }
1433  return std::stoi(data);
1434  } catch (std::filesystem::filesystem_error const& ex) {
1435  // Input dir gone?
1436  unlockFULocal();
1437  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1438  } catch (std::runtime_error const& e) {
1439  // Another process grabbed the file and NFS did not register this
1440  unlockFULocal();
1441  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1442  } catch (const std::out_of_range&) {
1443  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1444  << "Input value is -: " << data;
1445  } catch (const std::invalid_argument&) {
1446  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1447  << "Input value is -: " << data;
1448  } catch (std::exception const& e) {
1449  // BU run directory disappeared?
1450  unlockFULocal();
1451  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1452  }
1453 
1454  return -1;
1455  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile 
)

Definition at line 1100 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1106  {
1107  fileFound = true;
1108 
1109  //take only first three tokens delimited by "_" in the renamed raw file name
1110  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1111  size_t pos = 0, n_tokens = 0;
1112  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1113  }
1114  std::string reducedJsonStem = jsonStem.substr(0, pos);
1115 
1116  std::ostringstream fileNameWithPID;
1117  //should be ported to use fffnaming
1118  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1119 
1120  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1121 
1122  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1123 
1124  //parse RAW file header if it exists
1125  uint32_t lsFromRaw;
1126  int32_t nbEventsWrittenRaw;
1127  int64_t fileSizeFromRaw;
1128  auto ret = parseFRDFileHeader(
1129  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1130  if (ret != 0) {
1131  if (ret == 1)
1132  fileFound = false;
1133  return -1;
1134  }
1135 
1136  int outfile;
1137  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1138  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1139  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1140  if (errno == EEXIST) {
1141  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1142  << " : ";
1143  return -1;
1144  }
1145  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1146  << strerror(errno);
1147  struct stat out_stat;
1148  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1151  << jsonDestPath;
1152  if (unlink(jsonDestPath.c_str()) == -1) {
1153  edm::LogWarning("EvFDaqDirector")
1154  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1155  }
1156  }
1157  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1158  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1159  << jsonDestPath << " : " << strerror(errno);
1160  return -1;
1161  }
1162  }
1163  //write JSON file (TODO: use jsoncpp)
1164  std::stringstream ss;
1165  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1166  std::string sstr = ss.str();
1167 
1168  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1169  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1170  << " : " << strerror(errno);
1171  return -1;
1172  }
1173  close(outfile);
1174  if (serverLS && serverLS != lsFromRaw)
1175  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1176  << " and raw file header LS " << lsFromRaw;
1177 
1178  fileSizeFromHeader = fileSizeFromRaw;
1179  return nbEventsWrittenRaw;
1180  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 148 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, input_throttled_file_, eostools::mkdir(), openFULockfileStream(), pid_, run_, run_dir_, run_nstring_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

148  {
149  std::stringstream ss;
150  ss << "run" << std::setfill('0') << std::setw(6) << run_;
151  run_string_ = ss.str();
152  ss = std::stringstream();
153  ss << run_;
154  run_nstring_ = ss.str();
155  run_dir_ = base_dir_ + "/" + run_string_;
156  input_throttled_file_ = run_dir_ + "/input_throttle";
157  ss = std::stringstream();
158  ss << getpid();
159  pid_ = ss.str();
160 
161  // check if base dir exists or create it accordingly
162  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
163  if (retval != 0 && errno != EEXIST) {
164  throw cms::Exception("DaqDirector")
165  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
166  }
167 
168  //create run dir in base dir
169  umask(0);
170  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
171  if (retval != 0 && errno != EEXIST) {
172  throw cms::Exception("DaqDirector")
173  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
174  }
175 
176  //create fu-local.lock in run open dir
177  if (!directorBU_) {
179  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
181  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
182  if (fulocal_rwlock_fd_ == -1)
183  throw cms::Exception("DaqDirector")
184  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
185  chmod(fulocal_lock_.c_str(), 0777);
186  fsync(fulocal_rwlock_fd_);
187  //open second fd for another input source thread
189  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
190  if (fulocal_rwlock_fd2_ == -1)
191  throw cms::Exception("DaqDirector")
192  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
193  }
194 
195  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
196  //for BU, it is created at this point
197  if (directorBU_) {
199  std::string bulockfile = bu_run_dir_ + "/bu.lock";
200  fulockfile_ = bu_run_dir_ + "/fu.lock";
201 
202  //make or find bu run dir
203  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
204  if (retval != 0 && errno != EEXIST) {
205  throw cms::Exception("DaqDirector")
206  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
207  }
208  bu_run_open_dir_ = bu_run_dir_ + "/open";
209  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
210  if (retval != 0 && errno != EEXIST) {
211  throw cms::Exception("DaqDirector")
212  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
213  }
214 
215  // the BU director does not need to know about the fu lock
216  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
217  if (bu_writelock_fd_ == -1)
218  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
219  else
220  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
221  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
222  if (bu_w_lock_stream == nullptr)
223  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
224 
225  // BU INITIALIZES LOCK FILE
226  // FU LOCK FILE OPEN
227  openFULockfileStream(true);
229  fflush(fu_rw_lock_stream);
230  close(fu_readwritelock_fd_);
231 
232  if (!hltSourceDirectory_.empty()) {
233  struct stat buf;
234  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
235  std::string hltdir = bu_run_dir_ + "/hlt";
236  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
237  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
238  if (retval != 0 && errno != EEXIST)
239  throw cms::Exception("DaqDirector")
240  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
241 
242  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
243  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
244 
245  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
246  for (auto& optfile : optfiles) {
247  try {
248  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
249  } catch (...) {
250  }
251  }
252 
253  std::filesystem::rename(tmphltdir, hltdir);
254  } else
255  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
256  }
257  //else{}//no configuration specified
258  } else {
259  // for FU, check if bu base dir exists
260 
261  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262  if (retval != 0 && errno != EEXIST) {
263  throw cms::Exception("DaqDirector")
264  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
265  }
266 
268  fulockfile_ = bu_run_dir_ + "/fu.lock";
269  openFULockfileStream(false);
270  }
271 
272  pthread_mutex_init(&init_lock_, nullptr);
273 
274  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
275  std::stringstream sstp;
276  sstp << stopFilePath_ << "_pid" << pid_;
277  stopFilePathPid_ = sstp.str();
278 
279  if (!directorBU_) {
280  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
281  struct stat statbuf;
282  if (!stat(defPath.c_str(), &statbuf))
283  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
284  else {
285  //look in source directory if not present in ramdisk
286  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
287  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
288  if (stat(defPath.c_str(), &statbuf)) {
289  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
290  if (stat(defPath.c_str(), &statbuf)) {
291  defPath = defPathSuffix;
292  }
293  }
294  }
295  dpd_ = new DataPointDefinition();
296  std::string defLabel = "data";
297  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
298  }
299  }
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string input_throttled_file_
std::string stopFilePathPid_
std::string run_nstring_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by FedRawDataInputSource::getNextEvent().

122 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 913 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

913  {
914  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
915  flock(fulocal_rwlock_fd_, LOCK_SH);
916  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 923 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

923  {
924  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
925  flock(fulocal_rwlock_fd2_, LOCK_EX);
926  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 909 of file EvFDaqDirector.cc.

References init_lock_.

909 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2057 of file EvFDaqDirector.cc.

References command_line::start.

2057  {
2058 #ifdef __APPLE__
2059  return {start, len, pid, type, whence};
2060 #else
2061  return {type, whence, start, len, pid};
2062 #endif
2063  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 890 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

890  {
891  if (create) {
893  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
894  chmod(fulockfile_.c_str(), 0766);
895  } else {
896  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
897  }
898  if (fu_readwritelock_fd_ == -1)
899  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
900  << " create:" << create << " error:" << strerror(errno);
901  else
902  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
903 
904  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
905  if (fu_rw_lock_stream == nullptr)
906  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
907  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

107 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 965 of file EvFDaqDirector.cc.

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and fileinputsource_cfi::read.

Referenced by grabNextJsonFromRaw(), and FedRawDataInputSource::readSupervisor().

973  {
974  int infile;
975 
976  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
977  if (retry) {
978  edm::LogWarning("EvFDaqDirector")
979  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
980  return parseFRDFileHeader(rawSourcePath,
981  rawFd,
982  rawHeaderSize,
983  lsFromHeader,
984  eventsFromHeader,
985  fileSizeFromHeader,
986  requireHeader,
987  false,
988  closeFile);
989  } else {
990  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
991  edm::LogError("EvFDaqDirector")
992  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
993  if (errno == ENOENT)
994  return 1; // error && file not found
995  else
996  return -1;
997  }
998  }
999  }
1000 
1001  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1002  FRDFileHeader_v1 fileHead;
1003 
1004  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1005  if (closeFile) {
1006  close(infile);
1007  infile = -1;
1008  }
1009 
1010  if (sz_read < 0) {
1011  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1012  << strerror(errno);
1013  if (infile != -1)
1014  close(infile);
1015  return -1;
1016  }
1017  if ((size_t)sz_read < buf_sz) {
1018  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1019  if (infile != -1)
1020  close(infile);
1021  return -1;
1022  }
1023 
1024  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1025 
1026  if (frd_version == 0) {
1027  //no header (specific sequence not detected)
1028  if (requireHeader) {
1029  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1030  if (infile != -1)
1031  close(infile);
1032  return -1;
1033  } else {
1034  //no header, but valid file
1035  lseek(infile, 0, SEEK_SET);
1036  rawHeaderSize = 0;
1037  lsFromHeader = 0;
1038  eventsFromHeader = -1;
1039  fileSizeFromHeader = -1;
1040  }
1041  } else {
1042  //version 1 header
1043  uint32_t headerSizeRaw = fileHead.headerSize_;
1044  if (headerSizeRaw < buf_sz) {
1045  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1046  << " v:" << frd_version;
1047  if (infile != -1)
1048  close(infile);
1049  return -1;
1050  }
1051  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1052  lsFromHeader = fileHead.lumiSection_;
1053  eventsFromHeader = (int32_t)fileHead.eventCount_;
1054  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1055  rawHeaderSize = fileHead.headerSize_;
1056  }
1057  rawFd = infile;
1058  return 0; //OK
1059  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
uint16_t eventCount_
Definition: FRDFileHeader.h:38
uint16_t headerSize_
Definition: FRDFileHeader.h:37
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:35
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:36
uint64_t fileSize_
Definition: FRDFileHeader.h:40
uint32_t lumiSection_
Definition: FRDFileHeader.h:39
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 374 of file EvFDaqDirector.cc.

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBU_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

374  {
375  close(bu_readlock_fd_);
376  close(bu_writelock_fd_);
377  if (directorBU_) {
378  std::string filename = bu_run_dir_ + "/bu.lock";
380  }
381  }
void removeFile(unsigned int ls, unsigned int index)
std::string bu_run_dir_

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 320 of file EvFDaqDirector.cc.

References initRun(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

320  {
321  initRun();
322 
323  nThreads_ = bounds.maxNumberOfStreams();
324  nStreams_ = bounds.maxNumberOfThreads();
325  }
unsigned int nThreads_
unsigned int nStreams_

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 359 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

359  {
361  checkMergeTypePSet(pc);
362  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 364 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

364  {
365  //assert(run_ == id.run());
366 
367  // check if the requested run is the latest one - issue a warning if it isn't
369  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
370  << ". This is not the highest run " << dirManager_.findHighestRunDir();
371  }
372  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 383 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

383  {
384  //delete all files belonging to just closed lumi
385  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
387  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
388  return;
389  }
390 
391  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
392  auto it = filesToDeletePtr_->begin();
393  while (it != filesToDeletePtr_->end()) {
394  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
395  it = filesToDeletePtr_->erase(it);
396  } else
397  it++;
398  }
399  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1061 of file EvFDaqDirector.cc.

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and fileinputsource_cfi::read.

Referenced by bumpFile().

1061  {
1062  int infile;
1063  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1064  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1065  << strerror(errno);
1066  return false;
1067  }
1068  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1069  FRDFileHeader_v1 fileHead;
1070 
1071  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1072 
1073  if (sz_read < 0) {
1074  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1075  << strerror(errno);
1076  if (infile != -1)
1077  close(infile);
1078  return false;
1079  }
1080  if ((size_t)sz_read < buf_sz) {
1081  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1082  if (infile != -1)
1083  close(infile);
1084  return false;
1085  }
1086 
1087  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1088 
1089  close(infile);
1090 
1091  if (frd_version > 0) {
1092  rawHeaderSize = fileHead.headerSize_;
1093  return true;
1094  }
1095 
1096  rawHeaderSize = 0;
1097  return false;
1098  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
uint16_t headerSize_
Definition: FRDFileHeader.h:37
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:35
Log< level::Error, false > LogError
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:36
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1882 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1882  {
1883  std::ifstream ij(file);
1884  Json::Value deserializeRoot;
1886 
1887  if (!reader.parse(ij, deserializeRoot)) {
1888  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1889  return -1;
1890  }
1891 
1892  int ret = deserializeRoot.get("lastLS", "").asInt();
1893  return ret;
1894  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile() [1/2]

void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 501 of file EvFDaqDirector.cc.

References getRawFilePath(), and eostools::ls().

Referenced by postEndRun().

std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
void removeFile(unsigned int ls, unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349

◆ removeFile() [2/2]

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 494 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

494  {
495  int retval = remove(filename.c_str());
496  if (retval != 0)
497  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
498  << ". error = " << strerror(errno);
499  }
Log< level::Error, false > LogError

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 177 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

178  {
179  fileDeleteLockPtr_ = fileDeleteLock;
180  filesToDeletePtr_ = filesToDelete;
181  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

References fms_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

121 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 880 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

880  {
881  if (fu_rw_lock_stream == nullptr)
882  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
883  else {
884  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
885  unsigned int readLs = 1, readIndex = 0;
886  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
887  }
888  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 918 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

918  {
919  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
920  flock(fulocal_rwlock_fd_, LOCK_UN);
921  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 928 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

928  {
929  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
930  flock(fulocal_rwlock_fd2_, LOCK_UN);
931  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 911 of file EvFDaqDirector.cc.

References init_lock_.

911 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 503 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

508  {
509  EvFDaqDirector::FileStatus fileStatus = noFile;
510  rawHeaderSize = 0;
511 
512  int retval = -1;
513  int lock_attempts = 0;
514  long total_lock_attempts = 0;
515 
516  struct stat buf;
517  int stopFileLS = -1;
518  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
519  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
520  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
521  if (stopFileCheck == 0)
522  stopFileLS = readLastLSEntry(stopFilePath_);
523  else
524  stopFileLS = 1; //stop without drain if only pid is stopped
525  if (!stop_ls_override_) {
526  //if lumisection is higher than in stop file, should quit at next from current
527  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
528  stopFileLS = stop_ls_override_ = ls;
529  } else
530  stopFileLS = stop_ls_override_;
531  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
532  << stopFileLS;
533  //return runEnded;
534  } else //if file was removed before reaching stop condition, reset this
535  stop_ls_override_ = 0;
536 
537  timeval ts_lockbegin;
538  gettimeofday(&ts_lockbegin, nullptr);
539 
540  while (retval == -1) {
541  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
542  if (retval == -1)
543  usleep(fuLockPollInterval_);
544  else
545  continue;
546 
547  lock_attempts += fuLockPollInterval_;
548  total_lock_attempts += fuLockPollInterval_;
549  if (lock_attempts > 5000000 || errno == 116) {
550  if (errno == 116)
551  edm::LogWarning("EvFDaqDirector")
552  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
553  else
554  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
555  "fu.lock file are present -: errno "
556  << errno << ":" << strerror(errno) << std::endl;
557 
558  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
559  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
560  ls++;
561  return noFile;
562  }
563 
564  if (stat(bu_run_dir_.c_str(), &buf) != 0)
565  return runEnded;
566  if (stat(fulockfile_.c_str(), &buf) != 0)
567  return runEnded;
568 
569  lock_attempts = 0;
570  }
571  if (total_lock_attempts > 5 * 60000000) {
572  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
573  return runAbort;
574  }
575  }
576 
577  timeval ts_lockend;
578  gettimeofday(&ts_lockend, nullptr);
579  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
580  if (deltat > 0.)
581  lockWaitTime = deltat;
582 
583  if (retval != 0)
584  return fileStatus;
585 
586 #ifdef DEBUG
587  timeval ts_lockend;
588  gettimeofday(&ts_lockend, 0);
589 #endif
590 
591  //open another lock file FD after the lock using main fd has been acquired
592  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
593  if (fu_readwritelock_fd2 == -1)
594  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
595  << " create. error:" << strerror(errno);
596 
597  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
598 
599  // if the stream is readable
600  if (fu_rw_lock_stream2 != nullptr) {
601  unsigned int readLs, readIndex;
602  int check = 0;
603  // rewind the stream
604  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
605  // if rewinded ok
606  if (check == 0) {
607  // read its' values
608  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
609  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
610 
611  unsigned int currentLs = readLs;
612  bool bumpedOk = false;
613  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
614  //no lock file write in this case
615  if (ls && ls + 1 < currentLs)
616  ls++;
617  else {
618  // try to bump (look for new index or EoLS file)
619  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
620  //avoid 2 lumisections jump
621  if (ls && readLs > currentLs && currentLs > ls) {
622  ls++;
623  readLs = currentLs = ls;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  } else {
628  if (ls == 0 && readLs > currentLs) {
629  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
630  //in this case there is no new file in the same LS
631  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
632  readLs = currentLs;
633  readIndex = 0;
634  bumpedOk = false;
635  //no write to lock file
636  }
637  //update return LS value
638  ls = readLs;
639  }
640  }
641  if (bumpedOk) {
642  // there is a new index file to grab, lock file needs to be updated
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  if (check == 0) {
645  ftruncate(fu_readwritelock_fd2, 0);
646  // write next index in the file, which is the file the next process should take
647  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
648  fflush(fu_rw_lock_stream2);
649  fsync(fu_readwritelock_fd2);
650  fileStatus = newFile;
651  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
652  } else {
653  edm::LogError("EvFDaqDirector")
654  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
655  setExceptionState = true;
656  return noFile;
657  }
658  } else if (currentLs < readLs) {
659  //there is no new file in next LS (yet), but lock file can be updated to the next LS
660  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
661  if (check == 0) {
662  ftruncate(fu_readwritelock_fd2, 0);
663  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
664  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
665  fflush(fu_rw_lock_stream2);
666  fsync(fu_readwritelock_fd2);
667  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
668  } else {
669  edm::LogError("EvFDaqDirector")
670  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
671  setExceptionState = true;
672  return noFile;
673  }
674  }
675  } else {
676  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
677  << strerror(errno);
678  }
679  } else {
680  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
681  }
682  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
683 
684 #ifdef DEBUG
685  timeval ts_preunlock;
686  gettimeofday(&ts_preunlock, 0);
687  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
688  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
689 #endif
690 
691  //if new json is present, lock file which FedRawDataInputSource will later unlock
692  if (fileStatus == newFile)
693  lockFULocal();
694 
695  //release lock at this point
696  int retvalu = -1;
697  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
698  if (retvalu == -1)
699  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
700 
701 #ifdef DEBUG
702  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
703 #endif
704 
705  if (fileStatus == noFile) {
706  struct stat buf;
707  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
708  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
709  fileStatus = runEnded;
710  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
711  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
712  fileStatus = runEnded;
713  }
714  }
715  return fileStatus;
716  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 79 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

79 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 206 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 207 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 251 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 253 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 241 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 244 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 250 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 252 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 240 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 243 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 285 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 211 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 210 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 213 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 212 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 214 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 259 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 260 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 236 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 255 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 242 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 238 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 288 of file EvFDaqDirector.h.

Referenced by initRun(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 282 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 274 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 277 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 219 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 264 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 284 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 283 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and initRun().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by getLumisectionToStart(), and initRun().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 271 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 269 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 209 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and useFileBroker().