CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::vector< std::string > const & getBUBaseDirs () const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFileName () const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInitTempFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
unsigned int getStartLumisectionFromEnv () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
 
void initRun ()
 
bool inputThrottled ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool lumisectionDiscarded (unsigned int ls)
 
unsigned int numConcurrentLumis () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
std::string const & runString () const
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Static Private Member Functions

static bool checkFileRead (char *buf, int infile, std::size_t buf_sz, std::string const &path)
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
std::vector< std::string > bu_base_dirs_all_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
std::string discard_ls_filestem_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::string input_throttled_file_
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nConcurrentLumis_ = 0
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 39 of file EvFDaqDirector.cc.

References base_dir_, visDQMUpload::buf, discard_ls_filestem_, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, input_throttled_file_, io_service_, pid_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, run_, run_dir_, run_nstring_, run_string_, socket_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
43  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
44  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
45  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
50  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
51  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
52  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
53  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
54  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname, 32);
83  hostname_ = hostname;
84 
85  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
90  << " us";
91  } catch (const std::exception&) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  } catch (const std::exception&) {
103  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
104  }
105  }
106  if (useFileBroker_) {
108  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config", &buf) == 0) {
112  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
113  std::getline(busconfig, fileBrokerHost_);
114  }
115  if (fileBrokerHost_.empty())
116  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
117  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
118  throw cms::Exception("EvFDaqDirector")
119  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
120 
121  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
122  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
123  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
124  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
125  }
126 
127  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_ = std::stoul(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  } catch (const std::exception&) {
133  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
134  }
135  }
136 
137  //override file service parameter if specified by environment
138  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
139  if (fileBrokerUseLockParamPtr) {
140  try {
141  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
142  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
144  } catch (const std::exception&) {
145  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
146  }
147  }
148 
149  std::stringstream ss;
150  ss << "run" << std::setfill('0') << std::setw(6) << run_;
151  run_string_ = ss.str();
152  ss = std::stringstream();
153  ss << run_;
154  run_nstring_ = ss.str();
155  run_dir_ = base_dir_ + "/" + run_string_;
156  input_throttled_file_ = run_dir_ + "/input_throttle";
157  discard_ls_filestem_ = run_dir_ + "/discard_ls";
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161  }
struct flock bu_w_fulk
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string selectedTransferMode_
std::string input_throttled_file_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Log< level::Info, false > LogInfo
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 333 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

333  {
334  //close server connection
335  if (socket_.get() && socket_->is_open()) {
336  boost::system::error_code ec;
337  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
338  socket_->close(ec);
339  }
340 
341  if (fulocal_rwlock_fd_ != -1) {
342  unlockFULocal();
343  close(fulocal_rwlock_fd_);
344  }
345 
346  if (fulocal_rwlock_fd2_ != -1) {
347  unlockFULocal2();
348  close(fulocal_rwlock_fd2_);
349  }
350  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

77 { return run_dir_; }

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

References bu_run_dir_.

Referenced by DAQSource::readSupervisor().

78 { return bu_run_dir_; }
std::string bu_run_dir_

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 79 of file EvFDaqDirector.h.

References bu_run_open_dir_.

79 { return bu_run_open_dir_; }
std::string bu_run_open_dir_

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 818 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

824  {
825  if (previousFileSize_ != 0) {
826  if (!fms_) {
828  }
829  if (fms_)
831  previousFileSize_ = 0;
832  }
833  nextFile = "";
834 
835  //reached limit
836  if (maxLS >= 0 && ls > (unsigned int)maxLS)
837  return false;
838 
839  struct stat buf;
840  std::stringstream ss;
841  unsigned int nextIndex = index;
842  nextIndex++;
843 
844  // 1. Check suggested file
845  std::string nextFileJson = getInputJsonFilePath(ls, index);
846  if (stat(nextFileJson.c_str(), &buf) == 0) {
847  fsize = previousFileSize_ = buf.st_size;
848  nextFile = nextFileJson;
849  return true;
850  }
851  // 2. No file -> lumi ended? (and how many?)
852  else {
853  // 3. No file -> check for standalone raw file
854  std::string nextFileRaw = getRawFilePath(ls, index);
855  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
856  fsize = previousFileSize_ = buf.st_size;
857  nextFile = nextFileRaw;
858  return true;
859  }
860 
861  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
862 
863  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
864  // recheck that no raw file appeared in the meantime
865  if (stat(nextFileJson.c_str(), &buf) == 0) {
866  fsize = previousFileSize_ = buf.st_size;
867  nextFile = nextFileJson;
868  return true;
869  }
870  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
871  fsize = previousFileSize_ = buf.st_size;
872  nextFile = nextFileRaw;
873  return true;
874  }
875 
876  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
877  if (indexFilesInLS < 0)
878  //parsing failed
879  return false;
880  else {
881  //check index
882  if ((int)index < indexFilesInLS) {
883  //we have 2 files, and check for 1 failed... retry (2 will never be here)
884  edm::LogError("EvFDaqDirector")
885  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
886  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
887  setExceptionState = true;
888  return false;
889  }
890  }
891  // this lumi ended, check for files
892  ++ls;
893  index = 0;
894 
895  //reached limit
896  if (maxLS >= 0 && ls > (unsigned int)maxLS)
897  return false;
898 
899  nextFileJson = getInputJsonFilePath(ls, 0);
900  nextFileRaw = getRawFilePath(ls, 0);
901  if (stat(nextFileJson.c_str(), &buf) == 0) {
902  // a new file was found at new lumisection, index 0
903  fsize = previousFileSize_ = buf.st_size;
904  nextFile = nextFileJson;
905  return true;
906  }
907  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
908  fsize = previousFileSize_ = buf.st_size;
909  nextFile = nextFileRaw;
910  return true;
911  }
912  return false;
913  }
914  }
915  // no new file found
916  return false;
917  }
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Log< level::Error, false > LogError
unsigned long previousFileSize_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_

◆ checkFileRead()

bool evf::EvFDaqDirector::checkFileRead ( char *  buf,
int  infile,
std::size_t  buf_sz,
std::string const &  path 
)
staticprivate

Definition at line 1112 of file EvFDaqDirector.cc.

References visDQMUpload::buf, timingPdfMaker::infile, castor_dqm_sourceclient_file_cfg::path, and fileinputsource_cfi::read.

Referenced by parseFRDFileHeader(), and rawFileHasHeader().

1112  {
1113  ssize_t sz_read = ::read(infile, buf, buf_sz);
1114  if (sz_read < 0) {
1115  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1116  if (infile != -1)
1117  close(infile);
1118  return false;
1119  }
1120  if ((size_t)sz_read < buf_sz) {
1121  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1122  if (infile != -1)
1123  close(infile);
1124  return false;
1125  }
1126  return true;
1127  }
Log< level::Error, false > LogError

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 2095 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

2095  {
2096  if (mergeTypePset_.empty())
2097  return;
2098  if (!mergeTypeMap_.empty())
2099  return;
2100  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2101  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2102  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2103  for (const std::string& pname : tsPset.getParameterNames()) {
2104  std::string streamType = tsPset.getParameter<std::string>(pname);
2105  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2106  mergeTypeMap_.insert(ac, pname);
2107  ac->second = streamType;
2108  ac.release();
2109  }
2110  }
2111  }
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
std::string mergeTypePset_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
ParameterSet const & getParameterSet(ParameterSetID const &id)

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1987 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

1987  {
1988  if (transferSystemJson_)
1989  return;
1990 
1991  transferSystemJson_.reset(new Json::Value);
1992  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1993  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1994  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1995 
1996  Json::Value destinationsVal(Json::arrayValue);
1997  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1998  for (auto& dest : destinations)
1999  destinationsVal.append(dest);
2000  (*transferSystemJson_)["destinations"] = destinationsVal;
2001 
2002  Json::Value modesVal(Json::arrayValue);
2003  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
2004  for (auto& mode : modes)
2005  modesVal.append(mode);
2006  (*transferSystemJson_)["transferModes"] = modesVal;
2007 
2008  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
2009  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
2010  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
2011  Json::Value streamVal;
2012  for (auto& mode : modes) {
2013  //validation
2014  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
2015  throw cms::Exception("EvFDaqDirector")
2016  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
2017  << ")";
2018  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
2019 
2020  Json::Value sDestsValue(Json::arrayValue);
2021 
2022  if (streamDestinations.empty())
2023  throw cms::Exception("EvFDaqDirector")
2024  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
2025 
2026  for (auto& sdest : streamDestinations) {
2027  bool sDestValid = false;
2028  sDestsValue.append(sdest);
2029  for (auto& dest : destinations) {
2030  if (dest == sdest)
2031  sDestValid = true;
2032  }
2033  if (!sDestValid)
2034  throw cms::Exception("EvFDaqDirector")
2035  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
2036  << ", dest:" << sdest;
2037  }
2038  streamVal[mode] = sDestsValue;
2039  }
2040  (*transferSystemJson_)[psKeyItr->first] = streamVal;
2041  }
2042  }
2043  } else {
2044  if (requireTSPSet_)
2045  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
2046  }
2047  }
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
Represents a JSON value.
Definition: value.h:99
ParameterSet const & getParameterSet(ParameterSetID const &id)
array value (ordered list)
Definition: value.h:30

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1532 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, GlobalPosition_Frontier_DevDB_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1539  {
1540  EvFDaqDirector::FileStatus fileStatus = noFile;
1541  serverError = false;
1542 
1543  boost::system::error_code ec;
1544  try {
1545  while (true) {
1546  //socket connect
1547  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1549 
1550  if (ec) {
1551  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1552  serverError = true;
1553  break;
1554  }
1555  }
1556 
1557  boost::asio::streambuf request;
1558  std::ostream request_stream(&request);
1559  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1560  if (maxLS >= 0) {
1561  std::stringstream spath;
1562  spath << path << "&stopls=" << maxLS;
1563  path = spath.str();
1564  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1565  }
1566  request_stream << "GET " << path << " HTTP/1.1\r\n";
1567  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1568  request_stream << "Accept: */*\r\n";
1569  request_stream << "Connection: keep-alive\r\n\r\n";
1570 
1571  boost::asio::write(*socket_, request, ec);
1572  if (ec) {
1573  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1574  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1575  //we got disconnected, try to reconnect to the server before writing the request
1577  if (ec) {
1578  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1579  serverError = true;
1580  break;
1581  }
1582  continue;
1583  }
1584  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1585  serverError = true;
1586  break;
1587  }
1588 
1589  boost::asio::streambuf response;
1590  boost::asio::read_until(*socket_, response, "\r\n", ec);
1591  if (ec) {
1592  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1593  serverError = true;
1594  break;
1595  }
1596 
1597  std::istream response_stream(&response);
1598 
1599  std::string http_version;
1600  response_stream >> http_version;
1601 
1602  response_stream >> serverHttpStatus;
1603 
1604  std::string status_message;
1605  std::getline(response_stream, status_message);
1606  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1607  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1608  serverError = true;
1609  break;
1610  }
1611  if (serverHttpStatus != 200) {
1612  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1613  serverError = true;
1614  break;
1615  }
1616 
1617  // Process the response headers.
1619  while (std::getline(response_stream, header) && header != "\r") {
1620  }
1621 
1622  std::string fileInfo;
1623  std::map<std::string, std::string> serverMap;
1624  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1625  auto pos = fileInfo.find('=');
1626  if (pos == std::string::npos)
1627  continue;
1628  auto stitle = fileInfo.substr(0, pos);
1629  auto svalue = fileInfo.substr(pos + 1);
1630  serverMap[stitle] = svalue;
1631  }
1632 
1633  //check that response run number if correct
1634  auto server_version = serverMap.find("version");
1635  assert(server_version != serverMap.end());
1636 
1637  auto server_run = serverMap.find("runnumber");
1638  assert(server_run != serverMap.end());
1639  assert(run_nstring_ == server_run->second);
1640 
1641  auto server_state = serverMap.find("state");
1642  assert(server_state != serverMap.end());
1643 
1644  auto server_eols = serverMap.find("lasteols");
1645  assert(server_eols != serverMap.end());
1646 
1647  auto server_ls = serverMap.find("lumisection");
1648 
1649  int version_maj = 1;
1650  int version_min = 0;
1651  int version_rev = 0;
1652  {
1653  auto* s_ptr = server_version->second.c_str();
1654  if (!server_version->second.empty() && server_version->second[0] == '"')
1655  s_ptr++;
1656  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1657  if (res < 3) {
1658  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1659  if (res < 2) {
1660  res = sscanf(s_ptr, "%d", &version_maj);
1661  if (res < 1) {
1662  //expecting at least 1 number (major version)
1663  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1664  }
1665  }
1666  }
1667  }
1668 
1669  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1670  if (server_ls != serverMap.end())
1671  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1672  else
1673  serverLS = closedServerLS + 1;
1674 
1675  std::string s_state = server_state->second;
1676  if (s_state == "STARTING") //initial, always empty starting with LS 1
1677  {
1678  auto server_file = serverMap.find("file");
1679  assert(server_file == serverMap.end()); //no file with starting state
1680  fileStatus = noFile;
1681  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1682  } else if (s_state == "READY") {
1683  auto server_file = serverMap.find("file");
1684  if (server_file == serverMap.end()) {
1685  //can be returned by server if files from new LS already appeared but LS is not yet closed
1686  if (serverLS <= closedServerLS)
1687  serverLS = closedServerLS + 1;
1688  fileStatus = noFile;
1689  edm::LogInfo("EvFDaqDirector")
1690  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1691  } else {
1692  std::string filestem;
1693  std::string fileprefix;
1694  auto server_fileprefix = serverMap.find("fileprefix");
1695 
1696  if (server_fileprefix != serverMap.end()) {
1697  auto pssize = server_fileprefix->second.size();
1698  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1699  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1700  else
1701  fileprefix = server_fileprefix->second;
1702  }
1703 
1704  //remove string literals
1705  auto ssize = server_file->second.size();
1706  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1707  filestem = server_file->second.substr(1, ssize - 2);
1708  else
1709  filestem = server_file->second;
1710  assert(!filestem.empty());
1711  if (version_maj > 1) {
1712  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1713  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1714  nextFileJson = "";
1715  rawHeader = true;
1716  } else {
1717  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1718  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1719  nextFileJson = filestem + ".jsn";
1720  rawHeader = false;
1721  }
1722  fileStatus = newFile;
1723  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1724  << serverLS << " file:" << filestem;
1725  }
1726  } else if (s_state == "EOLS") {
1727  serverLS = closedServerLS + 1;
1728  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1729  fileStatus = noFile;
1730  } else if (s_state == "EOR") {
1731  //server_eor = serverMap.find("iseor");
1732  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1733  fileStatus = runEnded;
1734  } else if (s_state == "NORUN") {
1735  auto err_msg = serverMap.find("errormessage");
1736  if (err_msg != serverMap.end())
1737  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1738  else
1739  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1740  edm::LogWarning("EvFDaqDirector") << "executing run end";
1741  fileStatus = runEnded;
1742  } else if (s_state == "ERROR") {
1743  auto err_msg = serverMap.find("errormessage");
1744  if (err_msg != serverMap.end())
1745  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1746  else
1747  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1748  fileStatus = noFile;
1749  serverError = true;
1750  } else {
1751  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1752  fileStatus = noFile;
1753  serverError = true;
1754  }
1755 
1756  // Read until EOF, writing data to output as we go.
1757  if (!fileBrokerKeepAlive_) {
1758  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1759  }
1760  if (ec != boost::asio::error::eof) {
1761  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1762  serverError = true;
1763  }
1764  }
1765 
1766  break;
1767  }
1768 
1769  } catch (std::exception const& e) {
1770  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1771  serverError = true;
1772  }
1773 
1774  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1775  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1776  if (ec) {
1777  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1778  }
1779  socket_->close(ec);
1780  if (ec) {
1781  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1782  }
1783  }
1784 
1785  if (serverError) {
1786  if (socket_->is_open())
1787  socket_->close(ec);
1788  if (ec) {
1789  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1790  }
1791  fileStatus = noFile;
1792  sleep(1); //back-off if error detected
1793  }
1794 
1795  return fileStatus;
1796  }
assert(be >=bs)
Definition: Electron.h:6
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
Log< level::Info, false > LogInfo
unsigned long long uint64_t
Definition: Time.h:13
std::string bu_run_dir_
Log< level::Warning, false > LogWarning
std::string fileBrokerHost_

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 972 of file EvFDaqDirector.cc.

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

972  {
973  //used for backpressure mechanisms and monitoring
974  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
975  struct stat buf;
976  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
977  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
978  close(bol_fd);
979  }
980  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS,
bool  doCreateEoLS 
)

Definition at line 982 of file EvFDaqDirector.cc.

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

985  {
986  if (currentLumiSection > 0) {
987  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
988  struct stat buf;
989  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
990  if (!found) {
991  if (doCreateEoLS) {
992  int eol_fd =
993  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
994  close(eol_fd);
995  }
996  if (doCreateBoLS)
997  createBoLSFile(lumiSection, false);
998  }
999  } else if (doCreateBoLS) {
1000  createBoLSFile(lumiSection, true); //needed for initial lumisection
1001  }
1002  }
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2127 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

2127  {
2128  std::string proc_flag = run_dir_ + "/processing";
2129  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2130  close(proc_flag_fd);
2131  }

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1948 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

1948  {
1949  // create open dir if not already there
1950 
1952  if (!std::filesystem::is_directory(openPath)) {
1953  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1954  std::filesystem::create_directories(openPath);
1955  }
1956  }
std::string getRunOpenDirPath() const
#define LogDebug(id)

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 360 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

360  {
362  desc.setComment(
363  "Service used for file locking arbitration and for propagating information between other EvF components");
364  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
365  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
366  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
367  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
368  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
369  desc.addUntracked<bool>("useFileBroker", false)
370  ->setComment("Use BU file service to grab input data instead of NFS file locking");
371  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
372  ->setComment("Allow service to discover BU address from hltd configuration");
373  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
374  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
375  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
376  ->setComment("Use keep alive to avoid using large number of sockets");
377  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
378  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
379  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
380  ->setComment("Lock polling interval in microseconds for the input directory file lock");
381  desc.addUntracked<bool>("outputAdler32Recheck", false)
382  ->setComment("Check Adler32 of per-process output files while micro-merging");
383  desc.addUntracked<bool>("requireTransfersPSet", false)
384  ->setComment("Require complete transferSystem PSet in the process configuration");
385  desc.addUntracked<std::string>("selectedTransferMode", "")
386  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
387  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
388  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
389  desc.addUntracked<std::string>("mergingPset", "")
390  ->setComment("Name of merging PSet to look for merging type definitions for streams");
391  descriptions.add("EvFDaqDirector", desc);
392  }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 82 of file EvFDaqDirector.h.

References dirManager_, evf::DirManager::findRunDir(), and run_.

82 { return dirManager_.findRunDir(run_); }
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 523 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

523  {
524  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
525  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getBUBaseDirs()

std::vector<std::string> const& evf::EvFDaqDirector::getBUBaseDirs ( ) const
inline

Definition at line 196 of file EvFDaqDirector.h.

References bu_base_dirs_all_.

Referenced by DAQSource::DAQSource().

196 { return bu_base_dirs_all_; }
std::vector< std::string > bu_base_dirs_all_

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 452 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

452  {
454  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 515 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

515  {
516  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
517  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 519 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

519  {
520  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
521  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)

◆ getEoRFileName()

std::string evf::EvFDaqDirector::getEoRFileName ( ) const

Definition at line 529 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), and run_.

Referenced by DAQSource::readSupervisor().

529 { return fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 527 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

527 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 531 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext(), and DAQSource::checkNext().

531 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }
std::string eorFileName(const unsigned int run)

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 533 of file EvFDaqDirector.cc.

References bu_run_dir_.

533 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
std::string bu_run_dir_

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 480 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

480  {
482  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInitTempFilePath()

std::string evf::EvFDaqDirector::getInitTempFilePath ( std::string const &  stream) const

Definition at line 484 of file EvFDaqDirector.cc.

References fffnaming::initTempFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

484  {
486  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 436 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

436  {
438  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1972 of file EvFDaqDirector.cc.

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1972  {
1973  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1975  struct stat buf;
1976  unsigned int lscount = 1;
1977  do {
1978  std::stringstream ss;
1979  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1980  fullpath = ss.str();
1981  lscount++;
1982  } while (stat(fullpath.c_str(), &buf) == 0);
1983  return lscount - 1;
1984  }
std::string run_string_

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 472 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataChecksumFileNameWithInstance().

472  {
474  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 468 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithInstance().

468  {
470  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 498 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

499  {
501  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 511 of file EvFDaqDirector.cc.

References hostname_, eostools::ls(), fffnaming::rootHistogramFileNameWithInstance(), run_, run_dir_, and cms::cuda::stream.

511  {
513  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs,
bool  requireHeader = true 
)

Definition at line 1798 of file EvFDaqDirector.cc.

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal(), lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, unlockFULocal(), and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1806  {
1807  EvFDaqDirector::FileStatus fileStatus = noFile;
1808 
1809  //int retval = -1;
1810  //int lock_attempts = 0;
1811  //long total_lock_attempts = 0;
1812 
1813  struct stat buf;
1814  int stopFileLS = -1;
1815  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1816  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1817  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1818  if (stopFileCheck == 0)
1819  stopFileLS = readLastLSEntry(stopFilePath_);
1820  else
1821  stopFileLS = 1; //stop without drain if only pid is stopped
1822  if (!stop_ls_override_) {
1823  //if lumisection is higher than in stop file, should quit at next from current
1824  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1825  stopFileLS = stop_ls_override_ = ls;
1826  } else
1827  stopFileLS = stop_ls_override_;
1828  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1829  << stopFileLS;
1830  //return runEnded;
1831  } else //if file was removed before reaching stop condition, reset this
1832  stop_ls_override_ = 0;
1833 
1834  /* look for EoLS
1835  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1836  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1837  ls++;
1838  return noFile;
1839  }
1840  */
1841 
1842  timeval ts_lockbegin;
1843  gettimeofday(&ts_lockbegin, nullptr);
1844 
1845  std::string nextFileJson;
1846  uint32_t serverLS, closedServerLS;
1847  unsigned int serverHttpStatus;
1848  bool serverError;
1849 
1850  //local lock to force index json and EoLS files to appear in order
1852  lockFULocal();
1853 
1854  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1855  bool rawHeader = false;
1856  fileStatus = contactFileBroker(
1857  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1858 
1859  if (serverError) {
1860  //do not update anything
1862  unlockFULocal();
1863  return noFile;
1864  }
1865 
1866  //handle creation of BoLS files if lumisection has changed
1867  if (currentLumiSection == 0) {
1868  if (fileStatus == runEnded)
1869  createLumiSectionFiles(closedServerLS, 0, true, false);
1870  else
1871  createLumiSectionFiles(serverLS, 0, true, false);
1872  } else {
1873  if (closedServerLS >= currentLumiSection) {
1874  //only BoLS files
1875  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1876  createLumiSectionFiles(i + 1, i, true, false);
1877  }
1878  }
1879 
1880  bool fileFound = true;
1881 
1882  if (fileStatus == newFile) {
1883  if (rawHeader > 0)
1884  serverEventsInNewFile = grabNextJsonFromRaw(
1885  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1886  else
1887  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1888  }
1889  //closing file in case of any error
1890  if (serverEventsInNewFile < 0 && rawFd != -1) {
1891  close(rawFd);
1892  rawFd = -1;
1893  }
1894 
1895  //can unlock because all files have been created locally
1897  unlockFULocal();
1898 
1899  if (!fileFound) {
1900  //catch condition where directory got deleted
1901  fileStatus = noFile;
1902  struct stat buf;
1903  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1904  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1905  fileStatus = runEnded;
1906  }
1907  }
1908 
1909  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1910  //so that EoLS files can not appear locally before index files
1911  if (currentLumiSection == 0) {
1912  lockFULocal2();
1913  if (fileStatus == runEnded) {
1914  createLumiSectionFiles(closedServerLS, 0, false, true);
1915  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1916  } else {
1917  createLumiSectionFiles(serverLS, 0, false, true);
1918  }
1919  unlockFULocal2();
1920  } else {
1921  if (closedServerLS >= currentLumiSection) {
1922  //lock exclusive to create EoLS files
1923  lockFULocal2();
1924  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1925  createLumiSectionFiles(i + 1, i, false, true);
1926  unlockFULocal2();
1927  }
1928  }
1929 
1930  if (fileStatus == runEnded)
1931  ls = std::max(currentLumiSection, serverLS);
1932  else if (fileStatus == newFile) {
1933  assert(serverLS >= ls);
1934  ls = serverLS;
1935  } else if (fileStatus == noFile) {
1936  if (serverLS >= ls)
1937  ls = serverLS;
1938  else {
1939  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1940  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1941  sleep(1);
1942  }
1943  }
1944 
1945  return fileStatus;
1946  }
assert(be >=bs)
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 757 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

757  {
758  std::ifstream ij(BUEoLSFile);
759  Json::Value deserializeRoot;
761 
762  if (!reader.parse(ij, deserializeRoot)) {
763  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
764  return -1;
765  }
766 
768  DataPoint dp;
769  dp.deserialize(deserializeRoot);
770 
771  //read definition
772  if (readEolsDefinition_) {
773  //std::string def = boost::algorithm::trim(dp.getDefinition());
774  std::string def = dp.getDefinition();
775  if (def.empty())
776  readEolsDefinition_ = false;
777  while (!def.empty()) {
779  if (def.find('/') == 0)
780  fullpath = def;
781  else
782  fullpath = bu_run_dir_ + '/' + def;
783  struct stat buf;
784  if (stat(fullpath.c_str(), &buf) == 0) {
785  DataPointDefinition eolsDpd;
786  std::string defLabel = "legend";
787  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
788  if (eolsDpd.getNames().empty()) {
789  //try with "data" label if "legend" format is not used
790  eolsDpd = DataPointDefinition();
791  defLabel = "data";
792  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
793  }
794  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
795  if (eolsDpd.getNames().at(i) == "NFiles")
797  readEolsDefinition_ = false;
798  break;
799  }
800  //check if we can still find definition
801  if (def.size() <= 1 || def.find('/') == std::string::npos) {
802  readEolsDefinition_ = false;
803  break;
804  }
805  def = def.substr(def.find('/') + 1);
806  }
807  }
808 
809  if (dp.getData().size() > eolsNFilesIndex_)
810  data = dp.getData()[eolsNFilesIndex_];
811  else {
812  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
813  return -1;
814  }
815  return std::stoi(data);
816  }
int def(FILE *, FILE *, int)
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
unsigned int eolsNFilesIndex_
std::vector< std::string > const & getNames() const
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 456 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerDataFileNameWithPid().

456  {
458  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 476 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

476  {
477  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
478  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 448 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

448  {
449  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
450  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 460 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

460  {
462  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 488 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

489  {
491  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 444 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

444  {
445  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
446  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 503 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

503  {
505  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 464 of file EvFDaqDirector.cc.

References eostools::ls(), run_, run_dir_, cms::cuda::stream, and fffnaming::streamerJsonFileNameWithPid().

464  {
466  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 493 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::protocolBufferHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

494  {
496  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 440 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

440  {
442  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 507 of file EvFDaqDirector.cc.

References eostools::ls(), fffnaming::rootHistogramFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

507  {
509  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 120 of file EvFDaqDirector.h.

References run_.

120 { return run_; }

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 109 of file EvFDaqDirector.h.

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

109 { return run_dir_ + "/open"; }

◆ getStartLumisectionFromEnv()

unsigned int evf::EvFDaqDirector::getStartLumisectionFromEnv ( ) const
inline

Definition at line 182 of file EvFDaqDirector.h.

References startFromLS_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

182 { return startFromLS_; }
unsigned int startFromLS_

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 2049 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

2049  {
2050  std::string streamRequestName;
2051  if (transferSystemJson_->isMember(stream.c_str()))
2052  streamRequestName = stream;
2053  else {
2054  std::stringstream msg;
2055  msg << "Transfer system mode definitions missing for -: " << stream;
2056  if (requireTSPSet_)
2057  throw cms::Exception("EvFDaqDirector") << msg.str();
2058  else {
2059  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2060  return std::string("Failsafe");
2061  }
2062  }
2063  //return empty if strict check parameter is not on
2064  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2065  edm::LogWarning("EvFDaqDirector")
2066  << "Selected mode string is not provided as DaqDirector parameter."
2067  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
2068  return std::string("Failsafe");
2069  }
2070  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2071  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2072  }
2073  //check if stream has properly listed transfer stream
2074  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2075  std::stringstream msg;
2076  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2077  if (requireTSPSet_)
2078  throw cms::Exception("EvFDaqDirector") << msg.str();
2079  else
2080  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2081  return std::string("Failsafe");
2082  }
2083  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2084 
2085  //flatten string json::Array into CSV std::string
2086  std::string ret;
2087  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2088  if (!ret.empty())
2089  ret += ",";
2090  ret += (*it).asString();
2091  }
2092  return ret;
2093  }
const_iterator end() const
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
ret
prodAgent to be discontinued
Represents a JSON value.
Definition: value.h:99
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:286
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 2113 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

2113  {
2114  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2115  if (mergeTypeMap_.find(search_ac, stream))
2116  return search_ac->second;
2117 
2118  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2119  std::string defaultName = MergeTypeNames_[defaultType];
2120  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2121  mergeTypeMap_.insert(ac, stream);
2122  ac->second = defaultName;
2123  ac.release();
2124  return defaultName;
2125  }
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1257 of file EvFDaqDirector.cc.

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, fileinputsource_cfi::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

1260  {
1261  fileFound = true;
1262 
1263  //should be ported to use fffnaming
1264  std::ostringstream fileNameWithPID;
1265  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1266  << std::setw(5) << pid_ << ".jsn";
1267 
1268  // assemble json destination path
1269  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1270 
1271  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1272 
1273  int infile = -1, outfile = -1;
1274 
1275  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1276  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1277  << strerror(errno);
1278  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1279  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1280  << jsonSourcePath << " : " << strerror(errno);
1281  if (errno == ENOENT)
1282  fileFound = false;
1283  return -1;
1284  }
1285  }
1286 
1287  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1288  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1289  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1290  if (errno == EEXIST) {
1291  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1292  << " : ";
1293  ::close(infile);
1294  return -1;
1295  }
1296  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1297  << strerror(errno);
1298  struct stat out_stat;
1299  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1300  edm::LogWarning("EvFDaqDirector")
1301  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1302  if (unlink(jsonDestPath.c_str()) == -1) {
1303  edm::LogWarning("EvFDaqDirector")
1304  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1305  }
1306  }
1307  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1308  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1309  << jsonDestPath << " : " << strerror(errno);
1310  ::close(infile);
1311  return -1;
1312  }
1313  }
1314  //copy contents
1315  const std::size_t buf_sz = 512;
1316  std::size_t tot_written = 0;
1317  std::unique_ptr<char[]> buf(new char[buf_sz]);
1318 
1319  ssize_t sz, sz_read = 1, sz_write;
1320  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1321  sz_write = 0;
1322  do {
1323  assert(sz_read - sz_write > 0);
1324  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1325  sz_read = sz; // cause read loop termination
1326  break;
1327  }
1328  assert(sz > 0);
1329  sz_write += sz;
1330  tot_written += sz;
1331  } while (sz_write < sz_read);
1332  }
1333  close(infile);
1334  close(outfile);
1335 
1336  if (tot_written > 0) {
1337  //leave file if it was empty for diagnosis
1338  if (unlink(jsonSourcePath.c_str()) == -1) {
1339  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1340  << strerror(errno);
1341  return -1;
1342  }
1343  } else {
1344  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1345  << jsonSourcePath;
1346  return -1;
1347  }
1348 
1349  Json::Value deserializeRoot;
1351 
1352  std::string data;
1353  std::stringstream ss;
1354  bool result;
1355  try {
1356  if (tot_written <= buf_sz) {
1357  result = reader.parse(buf.get(), deserializeRoot);
1358  } else {
1359  //json will normally not be bigger than buf_sz bytes
1360  try {
1361  std::ifstream ij(jsonDestPath);
1362  ss << ij.rdbuf();
1363  } catch (std::filesystem::filesystem_error const& ex) {
1364  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1365  return -1;
1366  }
1367  result = reader.parse(ss.str(), deserializeRoot);
1368  }
1369  if (!result) {
1370  if (tot_written <= buf_sz)
1371  ss << buf.get();
1372  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1373  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1374  << ss.str() << ".";
1375  return -1;
1376  }
1377 
1378  //read BU JSON
1379  DataPoint dp;
1380  dp.deserialize(deserializeRoot);
1381  bool success = false;
1382  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1383  if (dpd_->getNames().at(i) == "NEvents")
1384  if (i < dp.getData().size()) {
1385  data = dp.getData()[i];
1386  success = true;
1387  break;
1388  }
1389  }
1390  if (!success) {
1391  if (!dp.getData().empty())
1392  data = dp.getData()[0];
1393  else {
1394  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1395  << "grabNextJsonFile - "
1396  << " error reading number of events from BU JSON; No input value. data -: " << data;
1397  return -1;
1398  }
1399  }
1400 
1401  //try to read raw file size
1402  fileSizeFromJson = -1;
1403  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1404  if (dpd_->getNames().at(i) == "NBytes") {
1405  if (i < dp.getData().size()) {
1406  std::string dataSize = dp.getData()[i];
1407  try {
1408  fileSizeFromJson = std::stol(dataSize);
1409  } catch (const std::exception&) {
1410  //non-fatal currently, processing can continue without this value
1411  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1412  << "Input value is -: " << dataSize;
1413  }
1414  break;
1415  }
1416  }
1417  }
1418  return std::stoi(data);
1419  } catch (const std::out_of_range& e) {
1420  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1421  << "Input value is -: " << data;
1422  } catch (const std::invalid_argument& e) {
1423  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1424  << "Input value is -: " << data;
1425  } catch (std::runtime_error const& e) {
1426  //Can be thrown by Json parser
1427  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1428  }
1429 
1430  catch (std::exception const& e) {
1431  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1432  } catch (...) {
1433  //unknown exception
1434  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1435  }
1436 
1437  return -1;
1438  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
assert(be >=bs)
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1440 of file EvFDaqDirector.cc.

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1440  {
1441  std::string data;
1442  try {
1443  // assemble json destination path
1444  std::filesystem::path jsonDestPath(baseRunDir());
1445 
1446  //should be ported to use fffnaming
1447  std::ostringstream fileNameWithPID;
1448  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1449  << ".jsn";
1450  jsonDestPath /= fileNameWithPID.str();
1451 
1452  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1453  try {
1454  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1455  } catch (std::filesystem::filesystem_error const& ex) {
1456  // Input dir gone?
1457  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1458  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1459  sleep(1);
1460  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1461  }
1462  unlockFULocal();
1463 
1464  try {
1465  //sometimes this fails but file gets deleted
1466  std::filesystem::remove(jsonSourcePath);
1467  } catch (std::filesystem::filesystem_error const& ex) {
1468  // Input dir gone?
1469  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1470  } catch (std::exception const& ex) {
1471  // Input dir gone?
1472  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1473  }
1474 
1475  std::ifstream ij(jsonDestPath);
1476  Json::Value deserializeRoot;
1478 
1479  std::stringstream ss;
1480  ss << ij.rdbuf();
1481  if (!reader.parse(ss.str(), deserializeRoot)) {
1482  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1483  << "\nERROR:\n"
1484  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1485  << ss.str() << ".";
1486  throw std::runtime_error("Cannot deserialize input JSON file");
1487  }
1488 
1489  //read BU JSON
1490  std::string data;
1491  DataPoint dp;
1492  dp.deserialize(deserializeRoot);
1493  bool success = false;
1494  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1495  if (dpd_->getNames().at(i) == "NEvents")
1496  if (i < dp.getData().size()) {
1497  data = dp.getData()[i];
1498  success = true;
1499  }
1500  }
1501  if (!success) {
1502  if (!dp.getData().empty())
1503  data = dp.getData()[0];
1504  else
1505  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1506  << " error reading number of events from BU JSON -: No input value " << data;
1507  }
1508  return std::stoi(data);
1509  } catch (std::filesystem::filesystem_error const& ex) {
1510  // Input dir gone?
1511  unlockFULocal();
1512  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1513  } catch (std::runtime_error const& e) {
1514  // Another process grabbed the file and NFS did not register this
1515  unlockFULocal();
1516  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1517  } catch (const std::out_of_range&) {
1518  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1519  << "Input value is -: " << data;
1520  } catch (const std::invalid_argument&) {
1521  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1522  << "Input value is -: " << data;
1523  } catch (std::exception const& e) {
1524  // BU run directory disappeared?
1525  unlockFULocal();
1526  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1527  }
1528 
1529  return -1;
1530  }
jsoncollector::DataPointDefinition * dpd_
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
std::string & baseRunDir()
std::vector< std::string > const & getNames() const
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
Unserialize a JSON document into a Value.
Definition: reader.h:16
#define LogDebug(id)

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile,
bool  requireHeader = true 
)

Definition at line 1165 of file EvFDaqDirector.cc.

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

1172  {
1173  fileFound = true;
1174 
1175  //take only first three tokens delimited by "_" in the renamed raw file name
1176  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1177  size_t pos = 0, n_tokens = 0;
1178  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1179  }
1180  std::string reducedJsonStem = jsonStem.substr(0, pos);
1181 
1182  std::ostringstream fileNameWithPID;
1183  //should be ported to use fffnaming
1184  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1185 
1186  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1187 
1188  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1189 
1190  //parse RAW file header if it exists
1191  uint32_t lsFromRaw;
1192  int32_t nbEventsWrittenRaw;
1193  int64_t fileSizeFromRaw;
1194  uint16_t rawDataType;
1195  auto ret = parseFRDFileHeader(rawSourcePath,
1196  rawFd,
1197  rawHeaderSize,
1198  rawDataType,
1199  lsFromRaw,
1200  nbEventsWrittenRaw,
1201  fileSizeFromRaw,
1202  requireHeader,
1203  true,
1204  closeFile);
1205  if (ret != 0) {
1206  if (ret == 1)
1207  fileFound = false;
1208  return -1;
1209  }
1210 
1211  int outfile;
1212  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1213  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1214  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1215  if (errno == EEXIST) {
1216  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1217  << " : ";
1218  return -1;
1219  }
1220  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1221  << strerror(errno);
1222  struct stat out_stat;
1223  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1224  edm::LogWarning("EvFDaqDirector")
1225  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1226  << jsonDestPath;
1227  if (unlink(jsonDestPath.c_str()) == -1) {
1228  edm::LogWarning("EvFDaqDirector")
1229  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1230  }
1231  }
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1234  << jsonDestPath << " : " << strerror(errno);
1235  return -1;
1236  }
1237  }
1238  //write JSON file (TODO: use jsoncpp)
1239  std::stringstream ss;
1240  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1241  std::string sstr = ss.str();
1242 
1243  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1244  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1245  << " : " << strerror(errno);
1246  return -1;
1247  }
1248  close(outfile);
1249  if (serverLS && serverLS != lsFromRaw)
1250  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1251  << " and raw file header LS " << lsFromRaw;
1252 
1253  fileSizeFromHeader = fileSizeFromRaw;
1254  return nbEventsWrittenRaw;
1255  }
ret
prodAgent to be discontinued
Log< level::Error, false > LogError
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string & baseRunDir()
Log< level::Warning, false > LogWarning
#define LogDebug(id)

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 163 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_base_dirs_all_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, mps_fire::i, init_lock_, svgfig::load(), eostools::mkdir(), openFULockfileStream(), pid_, run_dir_, run_string_, edm::shutdown_flag, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, tryInitializeFuLockFile(), and useFileBroker_.

Referenced by preallocate().

163  {
164  // check if base dir exists or create it accordingly
165  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
166  if (retval != 0 && errno != EEXIST) {
167  throw cms::Exception("DaqDirector")
168  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
169  }
170 
171  //create run dir in base dir
172  umask(0);
173  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
174  if (retval != 0 && errno != EEXIST) {
175  throw cms::Exception("DaqDirector")
176  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
177  }
178 
179  //create fu-local.lock in run open dir
180  if (!directorBU_) {
182  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
184  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
185  if (fulocal_rwlock_fd_ == -1)
186  throw cms::Exception("DaqDirector")
187  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
188  chmod(fulocal_lock_.c_str(), 0777);
189  fsync(fulocal_rwlock_fd_);
190  //open second fd for another input source thread
192  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
193  if (fulocal_rwlock_fd2_ == -1)
194  throw cms::Exception("DaqDirector")
195  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
196  }
197 
198  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
199  //for BU, it is created at this point
200  if (directorBU_) {
202  std::string bulockfile = bu_run_dir_ + "/bu.lock";
203  fulockfile_ = bu_run_dir_ + "/fu.lock";
204 
205  //make or find bu run dir
206  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
207  if (retval != 0 && errno != EEXIST) {
208  throw cms::Exception("DaqDirector")
209  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
210  }
211  bu_run_open_dir_ = bu_run_dir_ + "/open";
212  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
213  if (retval != 0 && errno != EEXIST) {
214  throw cms::Exception("DaqDirector")
215  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
216  }
217 
218  // the BU director does not need to know about the fu lock
219  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
220  if (bu_writelock_fd_ == -1)
221  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
222  else
223  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
224  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
225  if (bu_w_lock_stream == nullptr)
226  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
227 
228  // BU INITIALIZES LOCK FILE
229  // FU LOCK FILE OPEN
230  openFULockfileStream(true);
232  fflush(fu_rw_lock_stream);
233  close(fu_readwritelock_fd_);
234 
235  if (!hltSourceDirectory_.empty()) {
236  struct stat buf;
237  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
238  std::string hltdir = bu_run_dir_ + "/hlt";
239  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
240  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
241  if (retval != 0 && errno != EEXIST)
242  throw cms::Exception("DaqDirector")
243  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
244 
245  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
246  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
247 
248  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
249  for (auto& optfile : optfiles) {
250  try {
251  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
252  } catch (...) {
253  }
254  }
255 
256  std::filesystem::rename(tmphltdir, hltdir);
257  } else
258  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
259  }
260  //else{}//no configuration specified
261  } else {
262  // for FU, check if bu base dir exists
263 
264  auto checkExists = [=](std::string const& bu_base_dir) -> void {
265  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
266  if (retval != 0 && errno != EEXIST) {
267  throw cms::Exception("DaqDirector")
268  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
269  }
270  };
271 
272  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
273  int cnt = 0;
274  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
275  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
276  if (retval != 0 && errno != EEXIST) {
277  usleep(500000);
278  cnt++;
279  if (cnt % 20 == 0)
280  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
281  if (cnt > 120)
282  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
283  << " mkdir error:" << strerror(errno);
284  }
285  break;
286  }
287  };
288 
289  if (!bu_base_dirs_all_.empty()) {
290  checkExists(bu_base_dirs_all_[0]);
292  for (unsigned int i = 1; i < bu_base_dirs_all_.size(); i++)
293  waitForDir(bu_base_dirs_all_[i]);
294  } else {
295  checkExists(bu_base_dir_);
297  }
298 
299  fulockfile_ = bu_run_dir_ + "/fu.lock";
300  if (!useFileBroker_)
301  openFULockfileStream(false);
302  }
303 
304  pthread_mutex_init(&init_lock_, nullptr);
305 
306  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
307  std::stringstream sstp;
308  sstp << stopFilePath_ << "_pid" << pid_;
309  stopFilePathPid_ = sstp.str();
310 
311  if (!directorBU_) {
312  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
313  struct stat statbuf;
314  if (!stat(defPath.c_str(), &statbuf))
315  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
316  else {
317  //look in source directory if not present in ramdisk
318  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
319  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
320  if (stat(defPath.c_str(), &statbuf)) {
321  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
322  if (stat(defPath.c_str(), &statbuf)) {
323  defPath = defPathSuffix;
324  }
325  }
326  }
327  dpd_ = new DataPointDefinition();
328  std::string defLabel = "data";
329  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
330  }
331  }
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
def load(fileName)
Definition: svgfig.py:547
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
Log< level::Warning, false > LogWarning
std::string bu_run_open_dir_

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ inputThrottled()

bool evf::EvFDaqDirector::inputThrottled ( )

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 124 of file EvFDaqDirector.h.

References nStreams_, and nThreads_.

Referenced by DAQSource::getNextDataBlock(), and FedRawDataInputSource::getNextEvent().

124 { return nStreams_ == 1 && nThreads_ == 1; }
unsigned int nThreads_
unsigned int nStreams_

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 952 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), and updateFuLock().

952  {
953  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
954  flock(fulocal_rwlock_fd_, LOCK_SH);
955  }

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 962 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

962  {
963  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
964  flock(fulocal_rwlock_fd2_, LOCK_EX);
965  }

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 948 of file EvFDaqDirector.cc.

References init_lock_.

948 { pthread_mutex_lock(&init_lock_); }
pthread_mutex_t init_lock_

◆ lumisectionDiscarded()

bool evf::EvFDaqDirector::lumisectionDiscarded ( unsigned int  ls)

Definition at line 2146 of file EvFDaqDirector.cc.

References visDQMUpload::buf, discard_ls_filestem_, eostools::ls(), edm_modernize_messagelogger::stat, and to_string().

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

2146  {
2147  struct stat buf;
2148  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2149  }
static std::string to_string(const XMLCh *ch)
def ls(path, rec=False)
Definition: eostools.py:349
std::string discard_ls_filestem_

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2133 of file EvFDaqDirector.cc.

References command_line::start.

2133  {
2134 #ifdef __APPLE__
2135  return {start, len, pid, type, whence};
2136 #else
2137  return {type, whence, start, len, pid};
2138 #endif
2139  }

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ numConcurrentLumis()

unsigned int evf::EvFDaqDirector::numConcurrentLumis ( ) const
inline

Definition at line 125 of file EvFDaqDirector.h.

References nConcurrentLumis_.

Referenced by FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

125 { return nConcurrentLumis_; }
unsigned int nConcurrentLumis_

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 929 of file EvFDaqDirector.cc.

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

929  {
930  if (create) {
932  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
933  chmod(fulockfile_.c_str(), 0766);
934  } else {
935  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
936  }
937  if (fu_readwritelock_fd_ == -1)
938  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
939  << " create:" << create << " error:" << strerror(errno);
940  else
941  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
942 
943  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
944  if (fu_rw_lock_stream == nullptr)
945  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
946  }
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
Log< level::Error, false > LogError
def chmod(path, mode)
Definition: eostools.py:294
#define LogDebug(id)

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 110 of file EvFDaqDirector.h.

References outputAdler32Recheck_.

110 { return outputAdler32Recheck_; }

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

References writedatasetfile::run, and run_.

Referenced by DAQSource::DAQSource().

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint16_t &  rawDataType,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 1004 of file EvFDaqDirector.cc.

References checkFileRead(), FRDFileHeaderContent_v2::dataType_, FRDFileHeaderContent_v1::eventCount_, FRDFileHeaderContent_v2::eventCount_, FRDFileHeaderContent_v1::fileSize_, FRDFileHeaderContent_v2::fileSize_, getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, FRDFileHeaderContent_v1::lumiSection_, FRDFileHeaderContent_v2::lumiSection_, and FRDFileHeaderIdentifier::version_.

Referenced by grabNextJsonFromRaw(), FedRawDataInputSource::readSupervisor(), and DAQSource::readSupervisor().

1013  {
1014  int infile;
1015 
1016  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1017  if (retry) {
1018  edm::LogWarning("EvFDaqDirector")
1019  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1020  return parseFRDFileHeader(rawSourcePath,
1021  rawFd,
1022  rawHeaderSize,
1023  rawDataType,
1024  lsFromHeader,
1025  eventsFromHeader,
1026  fileSizeFromHeader,
1027  requireHeader,
1028  false,
1029  closeFile);
1030  } else {
1031  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1032  edm::LogError("EvFDaqDirector")
1033  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1034  if (errno == ENOENT)
1035  return 1; // error && file not found
1036  else
1037  return -1;
1038  }
1039  }
1040  }
1041 
1042  //v2 is the largest possible read
1043  char hdr[sizeof(FRDFileHeader_v2)];
1044  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1045  return -1;
1046 
1048  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1049 
1050  if (frd_version == 0) {
1051  //no header (specific sequence not detected)
1052  if (requireHeader) {
1053  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1054  close(infile);
1055  return -1;
1056  } else {
1057  //no header, but valid file
1058  lseek(infile, 0, SEEK_SET);
1059  rawHeaderSize = 0;
1060  lsFromHeader = 0;
1061  eventsFromHeader = -1;
1062  fileSizeFromHeader = -1;
1063  }
1064  } else if (frd_version == 1) {
1065  //version 1 header
1066  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1067  return -1;
1069  uint32_t headerSizeRaw = fhContent->headerSize_;
1070  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1071  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1072  << " v:" << frd_version;
1073  close(infile);
1074  return -1;
1075  }
1076  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1077  rawDataType = 0;
1078  lsFromHeader = fhContent->lumiSection_;
1079  eventsFromHeader = (int32_t)fhContent->eventCount_;
1080  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1081  rawHeaderSize = fhContent->headerSize_;
1082 
1083  } else if (frd_version == 2) {
1084  //version 2 heade
1085  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1086  return -1;
1088  uint32_t headerSizeRaw = fhContent->headerSize_;
1089  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1090  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1091  << " v:" << frd_version;
1092  close(infile);
1093  return -1;
1094  }
1095  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1096  rawDataType = fhContent->dataType_;
1097  lsFromHeader = fhContent->lumiSection_;
1098  eventsFromHeader = (int32_t)fhContent->eventCount_;
1099  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1100  rawHeaderSize = fhContent->headerSize_;
1101  }
1102 
1103  if (closeFile) {
1104  close(infile);
1105  infile = -1;
1106  }
1107 
1108  rawFd = infile;
1109  return 0; //OK
1110  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 352 of file EvFDaqDirector.cc.

References initRun(), nConcurrentLumis_, nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

352  {
353  initRun();
354 
355  nThreads_ = bounds.maxNumberOfStreams();
356  nStreams_ = bounds.maxNumberOfThreads();
357  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
358  }
unsigned int nThreads_
unsigned int nConcurrentLumis_
unsigned int nStreams_

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 394 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

394  {
396  checkMergeTypePSet(pc);
397  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 399 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

399  {
400  //assert(run_ == id.run());
401 
402  // check if the requested run is the latest one - issue a warning if it isn't
404  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
405  << ". This is not the highest run " << dirManager_.findHighestRunDir();
406  }
407  }
std::string findHighestRunDir()
Definition: DirManager.cc:23
Log< level::Warning, false > LogWarning

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 418 of file EvFDaqDirector.cc.

References fileDeleteLockPtr_, filesToDeletePtr_, fms_, evf::FastMonitoringService::isExceptionOnData(), eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

418  {
419  //delete all files belonging to just closed lumi
420  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
422  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
423  return;
424  }
425 
426  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
427  auto it = filesToDeletePtr_->begin();
428  while (it != filesToDeletePtr_->end()) {
429  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
430  it = filesToDeletePtr_->erase(it);
431  } else
432  it++;
433  }
434  }
bool isExceptionOnData(unsigned int ls)
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Log< level::Warning, false > LogWarning

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1129 of file EvFDaqDirector.cc.

References checkFileRead(), getFRDFileHeaderVersion(), FRDFileHeaderContent_v1::headerSize_, FRDFileHeaderContent_v2::headerSize_, FRDFileHeaderIdentifier::id_, timingPdfMaker::infile, and FRDFileHeaderIdentifier::version_.

Referenced by bumpFile().

1129  {
1130  int infile;
1131  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1132  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1133  << strerror(errno);
1134  return false;
1135  }
1136  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1137  char hdr[sizeof(FRDFileHeader_v2)];
1138  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1139  return false;
1141  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1142 
1143  if (frd_version == 1) {
1144  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1145  return false;
1147  rawHeaderSize = fhContent->headerSize_;
1148  close(infile);
1149  return true;
1150  } else if (frd_version == 2) {
1151  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1152  return false;
1154  rawHeaderSize = fhContent->headerSize_;
1155  close(infile);
1156  return true;
1157  } else
1158  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1159 
1160  close(infile);
1161  rawHeaderSize = 0;
1162  return false;
1163  }
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
Log< level::Error, false > LogError
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
Log< level::Warning, false > LogWarning

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1958 of file EvFDaqDirector.cc.

References Json::Value::asInt(), geometryDiff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

1958  {
1959  std::ifstream ij(file);
1960  Json::Value deserializeRoot;
1962 
1963  if (!reader.parse(ij, deserializeRoot)) {
1964  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1965  return -1;
1966  }
1967 
1968  int ret = deserializeRoot.get("lastLS", "").asInt();
1969  return ret;
1970  }
Int asInt() const
ret
prodAgent to be discontinued
Value get(UInt index, const Value &defaultValue) const
reader
Definition: DQM.py:105
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
Unserialize a JSON document into a Value.
Definition: reader.h:16

◆ removeFile()

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 535 of file EvFDaqDirector.cc.

References corrVsCorr::filename.

Referenced by postEndRun().

535  {
536  int retval = remove(filename.c_str());
537  if (retval != 0)
538  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
539  << ". error = " << strerror(errno);
540  }
Log< level::Error, false > LogError

◆ runString()

std::string const& evf::EvFDaqDirector::runString ( ) const
inline

Definition at line 76 of file EvFDaqDirector.h.

References run_string_.

Referenced by DAQSource::DAQSource().

76 { return run_string_; }
std::string run_string_

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 183 of file EvFDaqDirector.h.

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

184  {
185  fileDeleteLockPtr_ = fileDeleteLock;
186  filesToDeletePtr_ = filesToDelete;
187  }
std::mutex * fileDeleteLockPtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 123 of file EvFDaqDirector.h.

References fms_.

Referenced by DAQSource::DAQSource(), and FedRawDataInputSource::FedRawDataInputSource().

123 { fms_ = fms; }
evf::FastMonitoringService * fms_

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 919 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

919  {
920  if (fu_rw_lock_stream == nullptr)
921  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
922  else {
923  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
924  unsigned int readLs = 1, readIndex = 0;
925  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
926  }
927  }
Log< level::Error, false > LogError
Log< level::Info, false > LogInfo

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 957 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by getNextFromFileBroker(), grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

957  {
958  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
959  flock(fulocal_rwlock_fd_, LOCK_UN);
960  }

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 967 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

967  {
968  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
969  flock(fulocal_rwlock_fd2_, LOCK_UN);
970  }

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 950 of file EvFDaqDirector.cc.

References init_lock_.

950 { pthread_mutex_unlock(&init_lock_); }
pthread_mutex_t init_lock_

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 542 of file EvFDaqDirector.cc.

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

547  {
548  EvFDaqDirector::FileStatus fileStatus = noFile;
549  rawHeaderSize = 0;
550 
551  int retval = -1;
552  int lock_attempts = 0;
553  long total_lock_attempts = 0;
554 
555  struct stat buf;
556  int stopFileLS = -1;
557  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
558  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
559  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
560  if (stopFileCheck == 0)
561  stopFileLS = readLastLSEntry(stopFilePath_);
562  else
563  stopFileLS = 1; //stop without drain if only pid is stopped
564  if (!stop_ls_override_) {
565  //if lumisection is higher than in stop file, should quit at next from current
566  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
567  stopFileLS = stop_ls_override_ = ls;
568  } else
569  stopFileLS = stop_ls_override_;
570  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
571  << stopFileLS;
572  //return runEnded;
573  } else //if file was removed before reaching stop condition, reset this
574  stop_ls_override_ = 0;
575 
576  timeval ts_lockbegin;
577  gettimeofday(&ts_lockbegin, nullptr);
578 
579  while (retval == -1) {
580  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
581  if (retval == -1)
582  usleep(fuLockPollInterval_);
583  else
584  continue;
585 
586  lock_attempts += fuLockPollInterval_;
587  total_lock_attempts += fuLockPollInterval_;
588  if (lock_attempts > 5000000 || errno == 116) {
589  if (errno == 116)
590  edm::LogWarning("EvFDaqDirector")
591  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
592  else
593  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
594  "fu.lock file are present -: errno "
595  << errno << ":" << strerror(errno) << std::endl;
596 
597  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
598  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
599  ls++;
600  return noFile;
601  }
602 
603  if (stat(bu_run_dir_.c_str(), &buf) != 0)
604  return runEnded;
605  if (stat(fulockfile_.c_str(), &buf) != 0)
606  return runEnded;
607 
608  lock_attempts = 0;
609  }
610  if (total_lock_attempts > 5 * 60000000) {
611  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
612  return runAbort;
613  }
614  }
615 
616  timeval ts_lockend;
617  gettimeofday(&ts_lockend, nullptr);
618  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
619  if (deltat > 0.)
620  lockWaitTime = deltat;
621 
622  if (retval != 0)
623  return fileStatus;
624 
625 #ifdef DEBUG
626  timeval ts_lockend;
627  gettimeofday(&ts_lockend, 0);
628 #endif
629 
630  //open another lock file FD after the lock using main fd has been acquired
631  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
632  if (fu_readwritelock_fd2 == -1)
633  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
634  << " create. error:" << strerror(errno);
635 
636  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
637 
638  // if the stream is readable
639  if (fu_rw_lock_stream2 != nullptr) {
640  unsigned int readLs, readIndex;
641  int check = 0;
642  // rewind the stream
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  // if rewinded ok
645  if (check == 0) {
646  // read its' values
647  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
648  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
649 
650  unsigned int currentLs = readLs;
651  bool bumpedOk = false;
652  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
653  //no lock file write in this case
654  if (ls && ls + 1 < currentLs)
655  ls++;
656  else {
657  // try to bump (look for new index or EoLS file)
658  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
659  //avoid 2 lumisections jump
660  if (ls && readLs > currentLs && currentLs > ls) {
661  ls++;
662  readLs = currentLs = ls;
663  readIndex = 0;
664  bumpedOk = false;
665  //no write to lock file
666  } else {
667  if (ls == 0 && readLs > currentLs) {
668  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
669  //in this case there is no new file in the same LS
670  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
671  readLs = currentLs;
672  readIndex = 0;
673  bumpedOk = false;
674  //no write to lock file
675  }
676  //update return LS value
677  ls = readLs;
678  }
679  }
680  if (bumpedOk) {
681  // there is a new index file to grab, lock file needs to be updated
682  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
683  if (check == 0) {
684  ftruncate(fu_readwritelock_fd2, 0);
685  // write next index in the file, which is the file the next process should take
686  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
687  fflush(fu_rw_lock_stream2);
688  fsync(fu_readwritelock_fd2);
689  fileStatus = newFile;
690  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
691  } else {
692  edm::LogError("EvFDaqDirector")
693  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
694  setExceptionState = true;
695  return noFile;
696  }
697  } else if (currentLs < readLs) {
698  //there is no new file in next LS (yet), but lock file can be updated to the next LS
699  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
700  if (check == 0) {
701  ftruncate(fu_readwritelock_fd2, 0);
702  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
703  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
704  fflush(fu_rw_lock_stream2);
705  fsync(fu_readwritelock_fd2);
706  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
707  } else {
708  edm::LogError("EvFDaqDirector")
709  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
710  setExceptionState = true;
711  return noFile;
712  }
713  }
714  } else {
715  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
716  << strerror(errno);
717  }
718  } else {
719  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
720  }
721  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
722 
723 #ifdef DEBUG
724  timeval ts_preunlock;
725  gettimeofday(&ts_preunlock, 0);
726  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
727  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
728 #endif
729 
730  //if new json is present, lock file which FedRawDataInputSource will later unlock
731  if (fileStatus == newFile)
732  lockFULocal();
733 
734  //release lock at this point
735  int retvalu = -1;
736  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
737  if (retvalu == -1)
738  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
739 
740 #ifdef DEBUG
741  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
742 #endif
743 
744  if (fileStatus == noFile) {
745  struct stat buf;
746  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
747  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
748  fileStatus = runEnded;
749  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
750  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
751  fileStatus = runEnded;
752  }
753  }
754  return fileStatus;
755  }
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Log< level::Error, false > LogError
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
Log< level::Info, false > LogInfo
def ls(path, rec=False)
Definition: eostools.py:349
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string bu_run_dir_
std::string getEoRFilePath() const
Log< level::Warning, false > LogWarning
unsigned int fuLockPollInterval_
#define LogDebug(id)

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 80 of file EvFDaqDirector.h.

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

80 { return useFileBroker_; }

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dirs_all_

std::vector<std::string> evf::EvFDaqDirector::bu_base_dirs_all_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by getBUBaseDirs(), and initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 262 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 264 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 252 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 242 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 255 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 261 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 263 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 254 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ discard_ls_filestem_

std::string evf::EvFDaqDirector::discard_ls_filestem_
private

Definition at line 301 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and lumisectionDiscarded().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 292 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 297 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 271 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by bumpFile(), preGlobalEndLumi(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 247 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 266 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 253 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 249 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 273 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ input_throttled_file_

std::string evf::EvFDaqDirector::input_throttled_file_
private

Definition at line 300 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and inputThrottled().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 294 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 286 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 289 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 230 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nConcurrentLumis_

unsigned int evf::EvFDaqDirector::nConcurrentLumis_ = 0
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by numConcurrentLumis(), and preallocate().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 275 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 276 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 227 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 259 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 296 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 295 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 238 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), getLumisectionToStart(), initRun(), and runString().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 298 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getStartLumisectionFromEnv().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 283 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 281 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 282 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 285 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), initRun(), and useFileBroker().