CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
 
void initRun ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
void removeFile (unsigned int ls, unsigned int index)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Enumerator
noFile 
sameFile 
newFile 
newLumi 
runEnded 
runAbort 

Definition at line 64 of file EvFDaqDirector.h.

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 39 of file EvFDaqDirector.cc.

40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
43  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
44  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
45  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
46  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
47  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
48  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
49  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
53  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
54  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
55  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
67  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (boost::bad_lexical_cast const&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (boost::bad_lexical_cast const&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (boost::bad_lexical_cast const&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (boost::bad_lexical_cast const&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147  }

References visDQMUpload::buf, endpoint_iterator_, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, io_service_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, socket_, startFromLS_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 301 of file EvFDaqDirector.cc.

301  {
302  //close server connection
303  if (socket_.get() && socket_->is_open()) {
304  boost::system::error_code ec;
305  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
306  socket_->close(ec);
307  }
308 
309  if (fulocal_rwlock_fd_ != -1) {
310  unlockFULocal();
311  close(fulocal_rwlock_fd_);
312  }
313 
314  if (fulocal_rwlock_fd2_ != -1) {
315  unlockFULocal2();
316  close(fulocal_rwlock_fd2_);
317  }
318  }

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

76 { return run_dir_; }

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

77 { return bu_run_dir_; }

References bu_run_dir_.

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

78 { return bu_run_open_dir_; }

References bu_run_open_dir_.

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 779 of file EvFDaqDirector.cc.

785  {
786  if (previousFileSize_ != 0) {
787  if (!fms_) {
789  }
790  if (fms_)
792  previousFileSize_ = 0;
793  }
794  nextFile = "";
795 
796  //reached limit
797  if (maxLS >= 0 && ls > (unsigned int)maxLS)
798  return false;
799 
800  struct stat buf;
801  std::stringstream ss;
802  unsigned int nextIndex = index;
803  nextIndex++;
804 
805  // 1. Check suggested file
806  std::string nextFileJson = getInputJsonFilePath(ls, index);
807  if (stat(nextFileJson.c_str(), &buf) == 0) {
808  fsize = previousFileSize_ = buf.st_size;
809  nextFile = nextFileJson;
810  return true;
811  }
812  // 2. No file -> lumi ended? (and how many?)
813  else {
814  // 3. No file -> check for standalone raw file
815  std::string nextFileRaw = getRawFilePath(ls, index);
816  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
817  fsize = previousFileSize_ = buf.st_size;
818  nextFile = nextFileRaw;
819  return true;
820  }
821 
822  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
823 
824  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
825  // recheck that no raw file appeared in the meantime
826  if (stat(nextFileJson.c_str(), &buf) == 0) {
827  fsize = previousFileSize_ = buf.st_size;
828  nextFile = nextFileJson;
829  return true;
830  }
831  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
832  fsize = previousFileSize_ = buf.st_size;
833  nextFile = nextFileRaw;
834  return true;
835  }
836 
837  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
838  if (indexFilesInLS < 0)
839  //parsing failed
840  return false;
841  else {
842  //check index
843  if ((int)index < indexFilesInLS) {
844  //we have 2 files, and check for 1 failed... retry (2 will never be here)
845  edm::LogError("EvFDaqDirector")
846  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
847  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
848  setExceptionState = true;
849  return false;
850  }
851  }
852  // this lumi ended, check for files
853  ++ls;
854  index = 0;
855 
856  //reached limit
857  if (maxLS >= 0 && ls > (unsigned int)maxLS)
858  return false;
859 
860  nextFileJson = getInputJsonFilePath(ls, 0);
861  nextFileRaw = getRawFilePath(ls, 0);
862  if (stat(nextFileJson.c_str(), &buf) == 0) {
863  // a new file was found at new lumisection, index 0
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873  return false;
874  }
875  }
876  // no new file found
877  return false;
878  }

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 1985 of file EvFDaqDirector.cc.

1985  {
1986  if (mergeTypePset_.empty())
1987  return;
1988  if (!mergeTypeMap_.empty())
1989  return;
1990  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1991  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
1992  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1993  for (const std::string& pname : tsPset.getParameterNames()) {
1994  std::string streamType = tsPset.getParameter<std::string>(pname);
1995  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1996  mergeTypeMap_.insert(ac, pname);
1997  ac->second = streamType;
1998  ac.release();
1999  }
2000  }
2001  }

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1877 of file EvFDaqDirector.cc.

1877  {
1878  if (transferSystemJson_)
1879  return;
1880 
1881  transferSystemJson_.reset(new Json::Value);
1882  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1883  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1884  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1885 
1886  Json::Value destinationsVal(Json::arrayValue);
1887  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1888  for (auto& dest : destinations)
1889  destinationsVal.append(dest);
1890  (*transferSystemJson_)["destinations"] = destinationsVal;
1891 
1892  Json::Value modesVal(Json::arrayValue);
1893  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1894  for (auto& mode : modes)
1895  modesVal.append(mode);
1896  (*transferSystemJson_)["transferModes"] = modesVal;
1897 
1898  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1899  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1900  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1901  Json::Value streamVal;
1902  for (auto& mode : modes) {
1903  //validation
1904  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1905  throw cms::Exception("EvFDaqDirector")
1906  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1907  << ")";
1908  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1909 
1910  Json::Value sDestsValue(Json::arrayValue);
1911 
1912  if (streamDestinations.empty())
1913  throw cms::Exception("EvFDaqDirector")
1914  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1915 
1916  for (auto& sdest : streamDestinations) {
1917  bool sDestValid = false;
1918  sDestsValue.append(sdest);
1919  for (auto& dest : destinations) {
1920  if (dest == sdest)
1921  sDestValid = true;
1922  }
1923  if (!sDestValid)
1924  throw cms::Exception("EvFDaqDirector")
1925  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1926  << ", dest:" << sdest;
1927  }
1928  streamVal[mode] = sDestsValue;
1929  }
1930  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1931  }
1932  }
1933  } else {
1934  if (requireTSPSet_)
1935  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1936  }
1937  }

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1447 of file EvFDaqDirector.cc.

1454  {
1455  EvFDaqDirector::FileStatus fileStatus = noFile;
1456  serverError = false;
1457 
1458  boost::system::error_code ec;
1459  try {
1460  while (true) {
1461  //socket connect
1462  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1464 
1465  if (ec) {
1466  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1467  serverError = true;
1468  break;
1469  }
1470  }
1471 
1472  boost::asio::streambuf request;
1473  std::ostream request_stream(&request);
1474  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1475  if (maxLS >= 0) {
1476  std::stringstream spath;
1477  spath << path << "&stopls=" << maxLS;
1478  path = spath.str();
1479  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1480  }
1481  request_stream << "GET " << path << " HTTP/1.1\r\n";
1482  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1483  request_stream << "Accept: */*\r\n";
1484  request_stream << "Connection: keep-alive\r\n\r\n";
1485 
1486  boost::asio::write(*socket_, request, ec);
1487  if (ec) {
1488  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1489  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1490  //we got disconnected, try to reconnect to the server before writing the request
1492  if (ec) {
1493  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1494  serverError = true;
1495  break;
1496  }
1497  continue;
1498  }
1499  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1500  serverError = true;
1501  break;
1502  }
1503 
1504  boost::asio::streambuf response;
1505  boost::asio::read_until(*socket_, response, "\r\n", ec);
1506  if (ec) {
1507  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1508  serverError = true;
1509  break;
1510  }
1511 
1512  std::istream response_stream(&response);
1513 
1514  std::string http_version;
1515  response_stream >> http_version;
1516 
1517  response_stream >> serverHttpStatus;
1518 
1519  std::string status_message;
1520  std::getline(response_stream, status_message);
1521  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1522  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1523  serverError = true;
1524  break;
1525  }
1526  if (serverHttpStatus != 200) {
1527  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1528  serverError = true;
1529  break;
1530  }
1531 
1532  // Process the response headers.
1534  while (std::getline(response_stream, header) && header != "\r") {
1535  }
1536 
1537  std::string fileInfo;
1538  std::map<std::string, std::string> serverMap;
1539  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1540  auto pos = fileInfo.find('=');
1541  if (pos == std::string::npos)
1542  continue;
1543  auto stitle = fileInfo.substr(0, pos);
1544  auto svalue = fileInfo.substr(pos + 1);
1545  serverMap[stitle] = svalue;
1546  }
1547 
1548  //check that response run number if correct
1549  auto server_version = serverMap.find("version");
1550  assert(server_version != serverMap.end());
1551 
1552  auto server_run = serverMap.find("runnumber");
1553  assert(server_run != serverMap.end());
1554  assert(run_nstring_ == server_run->second);
1555 
1556  auto server_state = serverMap.find("state");
1557  assert(server_state != serverMap.end());
1558 
1559  auto server_eols = serverMap.find("lasteols");
1560  assert(server_eols != serverMap.end());
1561 
1562  auto server_ls = serverMap.find("lumisection");
1563 
1564  int version_maj = 1;
1565  int version_min = 0;
1566  int version_rev = 0;
1567  {
1568  auto* s_ptr = server_version->second.c_str();
1569  if (!server_version->second.empty() && server_version->second[0] == '"')
1570  s_ptr++;
1571  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1572  if (res < 3) {
1573  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1574  if (res < 2) {
1575  res = sscanf(s_ptr, "%d", &version_maj);
1576  if (res < 1) {
1577  //expecting at least 1 number (major version)
1578  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1579  }
1580  }
1581  }
1582  }
1583 
1584  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1585  if (server_ls != serverMap.end())
1586  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1587  else
1588  serverLS = closedServerLS + 1;
1589 
1590  std::string s_state = server_state->second;
1591  if (s_state == "STARTING") //initial, always empty starting with LS 1
1592  {
1593  auto server_file = serverMap.find("file");
1594  assert(server_file == serverMap.end()); //no file with starting state
1595  fileStatus = noFile;
1596  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1597  } else if (s_state == "READY") {
1598  auto server_file = serverMap.find("file");
1599  if (server_file == serverMap.end()) {
1600  //can be returned by server if files from new LS already appeared but LS is not yet closed
1601  if (serverLS <= closedServerLS)
1602  serverLS = closedServerLS + 1;
1603  fileStatus = noFile;
1604  edm::LogInfo("EvFDaqDirector")
1605  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1606  } else {
1607  std::string filestem;
1608  std::string fileprefix;
1609  auto server_fileprefix = serverMap.find("fileprefix");
1610 
1611  if (server_fileprefix != serverMap.end()) {
1612  auto pssize = server_fileprefix->second.size();
1613  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1614  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1615  else
1616  fileprefix = server_fileprefix->second;
1617  }
1618 
1619  //remove string literals
1620  auto ssize = server_file->second.size();
1621  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1622  filestem = server_file->second.substr(1, ssize - 2);
1623  else
1624  filestem = server_file->second;
1625  assert(!filestem.empty());
1626  if (version_maj > 1) {
1627  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1628  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1629  nextFileJson = "";
1630  rawHeader = true;
1631  } else {
1632  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1633  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1634  nextFileJson = filestem + ".jsn";
1635  rawHeader = false;
1636  }
1637  fileStatus = newFile;
1638  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1639  << serverLS << " file:" << filestem;
1640  }
1641  } else if (s_state == "EOLS") {
1642  serverLS = closedServerLS + 1;
1643  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1644  fileStatus = noFile;
1645  } else if (s_state == "EOR") {
1646  //server_eor = serverMap.find("iseor");
1647  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1648  fileStatus = runEnded;
1649  } else if (s_state == "NORUN") {
1650  auto err_msg = serverMap.find("errormessage");
1651  if (err_msg != serverMap.end())
1652  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1653  else
1654  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1655  edm::LogWarning("EvFDaqDirector") << "executing run end";
1656  fileStatus = runEnded;
1657  } else if (s_state == "ERROR") {
1658  auto err_msg = serverMap.find("errormessage");
1659  if (err_msg != serverMap.end())
1660  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1661  else
1662  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1663  fileStatus = noFile;
1664  serverError = true;
1665  } else {
1666  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1667  fileStatus = noFile;
1668  serverError = true;
1669  }
1670 
1671  // Read until EOF, writing data to output as we go.
1672  if (!fileBrokerKeepAlive_) {
1673  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1674  }
1675  if (ec != boost::asio::error::eof) {
1676  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1677  serverError = true;
1678  }
1679  }
1680  break;
1681  }
1682  } catch (std::exception const& e) {
1683  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1684  serverError = true;
1685  }
1686 
1687  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1688  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1689  if (ec) {
1690  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1691  }
1692  socket_->close(ec);
1693  if (ec) {
1694  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1695  }
1696  }
1697 
1698  if (serverError) {
1699  if (socket_->is_open())
1700  socket_->close(ec);
1701  if (ec) {
1702  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1703  }
1704  fileStatus = noFile;
1705  sleep(1); //back-off if error detected
1706  }
1707  return fileStatus;
1708  }

References cms::cuda::assert(), bu_run_dir_, DBConfiguration_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 933 of file EvFDaqDirector.cc.

933  {
934  //used for backpressure mechanisms and monitoring
935  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
936  struct stat buf;
937  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
938  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
939  close(bol_fd);
940  }
941  }

References visDQMUpload::buf, getBoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS = true 
)

Definition at line 943 of file EvFDaqDirector.cc.

945  {
946  if (currentLumiSection > 0) {
947  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
948  struct stat buf;
949  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
950  if (!found) {
951  int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
952  close(eol_fd);
953  if (doCreateBoLS)
954  createBoLSFile(lumiSection, false);
955  }
956  } else if (doCreateBoLS) {
957  createBoLSFile(lumiSection, true); //needed for initial lumisection
958  }
959  }

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2017 of file EvFDaqDirector.cc.

2017  {
2018  std::string proc_flag = run_dir_ + "/processing";
2019  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2020  close(proc_flag_fd);
2021  }

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext().

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1838 of file EvFDaqDirector.cc.

1838  {
1839  // create open dir if not already there
1840 
1842  if (!std::filesystem::is_directory(openPath)) {
1843  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1844  std::filesystem::create_directories(openPath);
1845  }
1846  }

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 327 of file EvFDaqDirector.cc.

327  {
329  desc.setComment(
330  "Service used for file locking arbitration and for propagating information between other EvF components");
331  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
332  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
333  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
334  desc.addUntracked<bool>("useFileBroker", false)
335  ->setComment("Use BU file service to grab input data instead of NFS file locking");
336  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
337  ->setComment("Allow service to discover BU address from hltd configuration");
338  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
339  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
340  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
341  ->setComment("Use keep alive to avoid using large number of sockets");
342  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
343  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
344  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
345  ->setComment("Lock polling interval in microseconds for the input directory file lock");
346  desc.addUntracked<bool>("outputAdler32Recheck", false)
347  ->setComment("Check Adler32 of per-process output files while micro-merging");
348  desc.addUntracked<bool>("requireTransfersPSet", false)
349  ->setComment("Require complete transferSystem PSet in the process configuration");
350  desc.addUntracked<std::string>("selectedTransferMode", "")
351  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
352  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
353  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
354  desc.addUntracked<std::string>("mergingPset", "")
355  ->setComment("Name of merging PSet to look for merging type definitions for streams");
356  descriptions.add("EvFDaqDirector", desc);
357  }

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 81 of file EvFDaqDirector.h.

81 { return dirManager_.findRunDir(run_); }

References dirManager_, evf::DirManager::findRunDir(), and run_.

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 484 of file EvFDaqDirector.cc.

484  {
485  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
486  }

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 476 of file EvFDaqDirector.cc.

476  {
477  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
478  }

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 488 of file EvFDaqDirector.cc.

488 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 490 of file EvFDaqDirector.cc.

490 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext().

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 492 of file EvFDaqDirector.cc.

492 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }

References bu_run_dir_.

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 445 of file EvFDaqDirector.cc.

445  {
447  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

Referenced by dqm::DQMFileSaverPB::initRun().

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 401 of file EvFDaqDirector.cc.

401  {
403  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1862 of file EvFDaqDirector.cc.

1862  {
1863  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1865  struct stat buf;
1866  unsigned int lscount = startFromLS_;
1867  do {
1868  std::stringstream ss;
1869  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1870  fullpath = ss.str();
1871  lscount++;
1872  } while (stat(fullpath.c_str(), &buf) == 0);
1873  return lscount - 1;
1874  }

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, startFromLS_, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1710 of file EvFDaqDirector.cc.

1717  {
1718  EvFDaqDirector::FileStatus fileStatus = noFile;
1719 
1720  //int retval = -1;
1721  //int lock_attempts = 0;
1722  //long total_lock_attempts = 0;
1723 
1724  struct stat buf;
1725  int stopFileLS = -1;
1726  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1727  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1728  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1729  if (stopFileCheck == 0)
1730  stopFileLS = readLastLSEntry(stopFilePath_);
1731  else
1732  stopFileLS = 1; //stop without drain if only pid is stopped
1733  if (!stop_ls_override_) {
1734  //if lumisection is higher than in stop file, should quit at next from current
1735  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1736  stopFileLS = stop_ls_override_ = ls;
1737  } else
1738  stopFileLS = stop_ls_override_;
1739  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1740  << stopFileLS;
1741  //return runEnded;
1742  } else //if file was removed before reaching stop condition, reset this
1743  stop_ls_override_ = 0;
1744 
1745  /* look for EoLS
1746  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1747  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1748  ls++;
1749  return noFile;
1750  }
1751  */
1752 
1753  timeval ts_lockbegin;
1754  gettimeofday(&ts_lockbegin, nullptr);
1755 
1756  std::string nextFileJson;
1757  uint32_t serverLS, closedServerLS;
1758  unsigned int serverHttpStatus;
1759  bool serverError;
1760 
1761  //local lock to force index json and EoLS files to appear in order
1763  lockFULocal2();
1764 
1765  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1766  bool rawHeader = false;
1767  fileStatus = contactFileBroker(
1768  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1769 
1770  if (serverError) {
1771  //do not update anything
1773  unlockFULocal2();
1774  return noFile;
1775  }
1776 
1777  //handle creation of EoLS and BoLS files if lumisection has changed
1778  if (currentLumiSection == 0) {
1779  if (fileStatus == runEnded) {
1780  createLumiSectionFiles(closedServerLS, 0);
1781  createLumiSectionFiles(serverLS, closedServerLS, false); // +1
1782  } else
1783  createLumiSectionFiles(serverLS, 0);
1784  } else {
1785  //loop over and create any EoLS files missing
1786  if (closedServerLS >= currentLumiSection) {
1787  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1788  createLumiSectionFiles(i + 1, i);
1789  }
1790  }
1791 
1792  bool fileFound = true;
1793 
1794  if (fileStatus == newFile) {
1795  if (rawHeader > 0)
1796  serverEventsInNewFile =
1797  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1798  else
1799  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1800  }
1801  //closing file in case of any error
1802  if (serverEventsInNewFile < 0 && rawFd != -1) {
1803  close(rawFd);
1804  rawFd = -1;
1805  }
1806  if (!fileFound) {
1807  //catch condition where directory got deleted
1808  fileStatus = noFile;
1809  struct stat buf;
1810  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1811  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1812  fileStatus = runEnded;
1813  }
1814  }
1815 
1816  //can unlock because all files have been created locally
1818  unlockFULocal2();
1819 
1820  if (fileStatus == runEnded)
1821  ls = std::max(currentLumiSection, serverLS);
1822  else if (fileStatus == newFile) {
1823  assert(serverLS >= ls);
1824  ls = serverLS;
1825  } else if (fileStatus == noFile) {
1826  if (serverLS >= ls)
1827  ls = serverLS;
1828  else {
1829  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1830  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1831  sleep(1);
1832  }
1833  }
1834 
1835  return fileStatus;
1836  }

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 718 of file EvFDaqDirector.cc.

718  {
719  std::ifstream ij(BUEoLSFile);
720  Json::Value deserializeRoot;
722 
723  if (!reader.parse(ij, deserializeRoot)) {
724  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
725  return -1;
726  }
727 
729  DataPoint dp;
730  dp.deserialize(deserializeRoot);
731 
732  //read definition
733  if (readEolsDefinition_) {
734  //std::string def = boost::algorithm::trim(dp.getDefinition());
735  std::string def = dp.getDefinition();
736  if (def.empty())
737  readEolsDefinition_ = false;
738  while (!def.empty()) {
740  if (def.find('/') == 0)
741  fullpath = def;
742  else
743  fullpath = bu_run_dir_ + '/' + def;
744  struct stat buf;
745  if (stat(fullpath.c_str(), &buf) == 0) {
746  DataPointDefinition eolsDpd;
747  std::string defLabel = "legend";
748  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
749  if (eolsDpd.getNames().empty()) {
750  //try with "data" label if "legend" format is not used
751  eolsDpd = DataPointDefinition();
752  defLabel = "data";
753  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
754  }
755  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
756  if (eolsDpd.getNames().at(i) == "NFiles")
758  readEolsDefinition_ = false;
759  break;
760  }
761  //check if we can still find definition
762  if (def.size() <= 1 || def.find('/') == std::string::npos) {
763  readEolsDefinition_ = false;
764  break;
765  }
766  def = def.substr(def.find('/') + 1);
767  }
768  }
769 
770  if (dp.getData().size() > eolsNFilesIndex_)
771  data = dp.getData()[eolsNFilesIndex_];
772  else {
773  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
774  return -1;
775  }
776  return boost::lexical_cast<int>(data);
777  }

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, edm_modernize_messagelogger::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 441 of file EvFDaqDirector.cc.

441  {
442  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
443  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 413 of file EvFDaqDirector.cc.

413  {
414  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
415  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 409 of file EvFDaqDirector.cc.

409  {
410  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
411  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 405 of file EvFDaqDirector.cc.

405  {
407  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and removeFile().

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

118 { return run_; }

References run_.

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 106 of file EvFDaqDirector.h.

106 { return run_dir_ + "/open"; }

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1939 of file EvFDaqDirector.cc.

1939  {
1940  std::string streamRequestName;
1941  if (transferSystemJson_->isMember(stream.c_str()))
1942  streamRequestName = stream;
1943  else {
1944  std::stringstream msg;
1945  msg << "Transfer system mode definitions missing for -: " << stream;
1946  if (requireTSPSet_)
1947  throw cms::Exception("EvFDaqDirector") << msg.str();
1948  else {
1949  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1950  return std::string("Failsafe");
1951  }
1952  }
1953  //return empty if strict check parameter is not on
1954  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1955  edm::LogWarning("EvFDaqDirector")
1956  << "Selected mode string is not provided as DaqDirector parameter."
1957  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1958  return std::string("Failsafe");
1959  }
1960  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1961  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1962  }
1963  //check if stream has properly listed transfer stream
1964  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1965  std::stringstream msg;
1966  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1967  if (requireTSPSet_)
1968  throw cms::Exception("EvFDaqDirector") << msg.str();
1969  else
1970  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1971  return std::string("Failsafe");
1972  }
1973  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
1974 
1975  //flatten string json::Array into CSV std::string
1976  std::string ret;
1977  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
1978  if (!ret.empty())
1979  ret += ",";
1980  ret += (*it).asString();
1981  }
1982  return ret;
1983  }

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 2003 of file EvFDaqDirector.cc.

2003  {
2004  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2005  if (mergeTypeMap_.find(search_ac, stream))
2006  return search_ac->second;
2007 
2008  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2009  std::string defaultName = MergeTypeNames_[defaultType];
2010  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2011  mergeTypeMap_.insert(ac, stream);
2012  ac->second = defaultName;
2013  ac.release();
2014  return defaultName;
2015  }

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1178 of file EvFDaqDirector.cc.

1181  {
1182  fileFound = true;
1183 
1184  //should be ported to use fffnaming
1185  std::ostringstream fileNameWithPID;
1186  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1187  << std::setw(5) << pid_ << ".jsn";
1188 
1189  // assemble json destination path
1190  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1191 
1192  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1193 
1194  int infile = -1, outfile = -1;
1195 
1196  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1197  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1198  << strerror(errno);
1199  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1200  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1201  << jsonSourcePath << " : " << strerror(errno);
1202  if (errno == ENOENT)
1203  fileFound = false;
1204  return -1;
1205  }
1206  }
1207 
1208  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1209  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1210  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1211  if (errno == EEXIST) {
1212  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1213  << " : ";
1214  ::close(infile);
1215  return -1;
1216  }
1217  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1218  << strerror(errno);
1219  struct stat out_stat;
1220  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1221  edm::LogWarning("EvFDaqDirector")
1222  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1223  if (unlink(jsonDestPath.c_str()) == -1) {
1224  edm::LogWarning("EvFDaqDirector")
1225  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1226  }
1227  }
1228  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1229  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1230  << jsonDestPath << " : " << strerror(errno);
1231  ::close(infile);
1232  return -1;
1233  }
1234  }
1235  //copy contents
1236  const std::size_t buf_sz = 512;
1237  std::size_t tot_written = 0;
1238  std::unique_ptr<char> buf(new char[buf_sz]);
1239 
1240  ssize_t sz, sz_read = 1, sz_write;
1241  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1242  sz_write = 0;
1243  do {
1244  assert(sz_read - sz_write > 0);
1245  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1246  sz_read = sz; // cause read loop termination
1247  break;
1248  }
1249  assert(sz > 0);
1250  sz_write += sz;
1251  tot_written += sz;
1252  } while (sz_write < sz_read);
1253  }
1254  close(infile);
1255  close(outfile);
1256 
1257  if (tot_written > 0) {
1258  //leave file if it was empty for diagnosis
1259  if (unlink(jsonSourcePath.c_str()) == -1) {
1260  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1261  << strerror(errno);
1262  return -1;
1263  }
1264  } else {
1265  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1266  << jsonSourcePath;
1267  return -1;
1268  }
1269 
1270  Json::Value deserializeRoot;
1272 
1273  std::string data;
1274  std::stringstream ss;
1275  bool result;
1276  try {
1277  if (tot_written <= buf_sz) {
1278  result = reader.parse(buf.get(), deserializeRoot);
1279  } else {
1280  //json will normally not be bigger than buf_sz bytes
1281  try {
1282  std::ifstream ij(jsonDestPath);
1283  ss << ij.rdbuf();
1284  } catch (std::filesystem::filesystem_error const& ex) {
1285  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1286  return -1;
1287  }
1288  result = reader.parse(ss.str(), deserializeRoot);
1289  }
1290  if (!result) {
1291  if (tot_written <= buf_sz)
1292  ss << buf.get();
1293  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1294  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1295  << ss.str() << ".";
1296  return -1;
1297  }
1298 
1299  //read BU JSON
1300  DataPoint dp;
1301  dp.deserialize(deserializeRoot);
1302  bool success = false;
1303  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1304  if (dpd_->getNames().at(i) == "NEvents")
1305  if (i < dp.getData().size()) {
1306  data = dp.getData()[i];
1307  success = true;
1308  break;
1309  }
1310  }
1311  if (!success) {
1312  if (!dp.getData().empty())
1313  data = dp.getData()[0];
1314  else {
1315  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1316  << "grabNextJsonFile - "
1317  << " error reading number of events from BU JSON; No input value. data -: " << data;
1318  return -1;
1319  }
1320  }
1321 
1322  //try to read raw file size
1323  fileSizeFromJson = -1;
1324  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1325  if (dpd_->getNames().at(i) == "NBytes") {
1326  if (i < dp.getData().size()) {
1327  std::string dataSize = dp.getData()[i];
1328  try {
1329  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1330  } catch (boost::bad_lexical_cast const&) {
1331  //non-fatal currently, processing can continue without this value
1332  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1333  << "Input value is -: " << dataSize;
1334  }
1335  break;
1336  }
1337  }
1338  }
1339  return boost::lexical_cast<int>(data);
1340  } catch (boost::bad_lexical_cast const& e) {
1341  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1342  << "Input value is -: " << data;
1343  } catch (std::runtime_error const& e) {
1344  //Can be thrown by Json parser
1345  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1346  }
1347 
1348  catch (std::exception const& e) {
1349  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1350  } catch (...) {
1351  //unknown exception
1352  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1353  }
1354 
1355  return -1;
1356  }

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1358 of file EvFDaqDirector.cc.

1358  {
1359  std::string data;
1360  try {
1361  // assemble json destination path
1362  std::filesystem::path jsonDestPath(baseRunDir());
1363 
1364  //should be ported to use fffnaming
1365  std::ostringstream fileNameWithPID;
1366  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1367  << ".jsn";
1368  jsonDestPath /= fileNameWithPID.str();
1369 
1370  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1371  try {
1372  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1373  } catch (std::filesystem::filesystem_error const& ex) {
1374  // Input dir gone?
1375  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1376  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1377  sleep(1);
1378  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1379  }
1380  unlockFULocal();
1381 
1382  try {
1383  //sometimes this fails but file gets deleted
1384  std::filesystem::remove(jsonSourcePath);
1385  } catch (std::filesystem::filesystem_error const& ex) {
1386  // Input dir gone?
1387  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1388  } catch (std::exception const& ex) {
1389  // Input dir gone?
1390  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1391  }
1392 
1393  std::ifstream ij(jsonDestPath);
1394  Json::Value deserializeRoot;
1396 
1397  std::stringstream ss;
1398  ss << ij.rdbuf();
1399  if (!reader.parse(ss.str(), deserializeRoot)) {
1400  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1401  << "\nERROR:\n"
1402  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1403  << ss.str() << ".";
1404  throw std::runtime_error("Cannot deserialize input JSON file");
1405  }
1406 
1407  //read BU JSON
1408  std::string data;
1409  DataPoint dp;
1410  dp.deserialize(deserializeRoot);
1411  bool success = false;
1412  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1413  if (dpd_->getNames().at(i) == "NEvents")
1414  if (i < dp.getData().size()) {
1415  data = dp.getData()[i];
1416  success = true;
1417  }
1418  }
1419  if (!success) {
1420  if (!dp.getData().empty())
1421  data = dp.getData()[0];
1422  else
1423  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1424  << " error reading number of events from BU JSON -: No input value " << data;
1425  }
1426  return boost::lexical_cast<int>(data);
1427  } catch (std::filesystem::filesystem_error const& ex) {
1428  // Input dir gone?
1429  unlockFULocal();
1430  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1431  } catch (std::runtime_error const& e) {
1432  // Another process grabbed the file and NFS did not register this
1433  unlockFULocal();
1434  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1435  } catch (boost::bad_lexical_cast const&) {
1436  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1437  << "Input value is -: " << data;
1438  } catch (std::exception const& e) {
1439  // BU run directory disappeared?
1440  unlockFULocal();
1441  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1442  }
1443 
1444  return -1;
1445  }

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile 
)

Definition at line 1096 of file EvFDaqDirector.cc.

1102  {
1103  fileFound = true;
1104 
1105  //take only first three tokens delimited by "_" in the renamed raw file name
1106  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1107  size_t pos = 0, n_tokens = 0;
1108  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1109  }
1110  std::string reducedJsonStem = jsonStem.substr(0, pos);
1111 
1112  std::ostringstream fileNameWithPID;
1113  //should be ported to use fffnaming
1114  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1115 
1116  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1117 
1118  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1119 
1120  //parse RAW file header if it exists
1121  uint32_t lsFromRaw;
1122  int32_t nbEventsWrittenRaw;
1123  int64_t fileSizeFromRaw;
1124  auto ret = parseFRDFileHeader(
1125  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1126  if (ret != 0) {
1127  if (ret == 1)
1128  fileFound = false;
1129  return -1;
1130  }
1131 
1132  int outfile;
1133  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1134  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1135  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1136  if (errno == EEXIST) {
1137  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1138  << " : ";
1139  return -1;
1140  }
1141  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1142  << strerror(errno);
1143  struct stat out_stat;
1144  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1145  edm::LogWarning("EvFDaqDirector")
1146  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1147  << jsonDestPath;
1148  if (unlink(jsonDestPath.c_str()) == -1) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1151  }
1152  }
1153  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1154  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1155  << jsonDestPath << " : " << strerror(errno);
1156  return -1;
1157  }
1158  }
1159  //write JSON file (TODO: use jsoncpp)
1160  std::stringstream ss;
1161  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1162  std::string sstr = ss.str();
1163 
1164  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1165  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1166  << " : " << strerror(errno);
1167  return -1;
1168  }
1169  close(outfile);
1170  if (serverLS && serverLS != lsFromRaw)
1171  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1172  << " and raw file header LS " << lsFromRaw;
1173 
1174  fileSizeFromHeader = fileSizeFromRaw;
1175  return nbEventsWrittenRaw;
1176  }

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 149 of file EvFDaqDirector.cc.

149  {
150  std::stringstream ss;
151  ss << "run" << std::setfill('0') << std::setw(6) << run_;
152  run_string_ = ss.str();
153  ss = std::stringstream();
154  ss << run_;
155  run_nstring_ = ss.str();
156  run_dir_ = base_dir_ + "/" + run_string_;
157  ss = std::stringstream();
158  ss << getpid();
159  pid_ = ss.str();
160 
161  // check if base dir exists or create it accordingly
162  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
163  if (retval != 0 && errno != EEXIST) {
164  throw cms::Exception("DaqDirector")
165  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
166  }
167 
168  //create run dir in base dir
169  umask(0);
170  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
171  if (retval != 0 && errno != EEXIST) {
172  throw cms::Exception("DaqDirector")
173  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
174  }
175 
176  //create fu-local.lock in run open dir
177  if (!directorBU_) {
179  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
181  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
182  if (fulocal_rwlock_fd_ == -1)
183  throw cms::Exception("DaqDirector")
184  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
185  chmod(fulocal_lock_.c_str(), 0777);
186  fsync(fulocal_rwlock_fd_);
187  //open second fd for another input source thread
189  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
190  if (fulocal_rwlock_fd2_ == -1)
191  throw cms::Exception("DaqDirector")
192  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
193  }
194 
195  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
196  //for BU, it is created at this point
197  if (directorBU_) {
199  std::string bulockfile = bu_run_dir_ + "/bu.lock";
200  fulockfile_ = bu_run_dir_ + "/fu.lock";
201 
202  //make or find bu run dir
203  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
204  if (retval != 0 && errno != EEXIST) {
205  throw cms::Exception("DaqDirector")
206  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
207  }
208  bu_run_open_dir_ = bu_run_dir_ + "/open";
209  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
210  if (retval != 0 && errno != EEXIST) {
211  throw cms::Exception("DaqDirector")
212  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
213  }
214 
215  // the BU director does not need to know about the fu lock
216  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
217  if (bu_writelock_fd_ == -1)
218  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
219  else
220  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
221  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
222  if (bu_w_lock_stream == nullptr)
223  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
224 
225  // BU INITIALIZES LOCK FILE
226  // FU LOCK FILE OPEN
227  openFULockfileStream(true);
229  fflush(fu_rw_lock_stream);
230  close(fu_readwritelock_fd_);
231 
232  if (!hltSourceDirectory_.empty()) {
233  struct stat buf;
234  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
235  std::string hltdir = bu_run_dir_ + "/hlt";
236  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
237  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
238  if (retval != 0 && errno != EEXIST)
239  throw cms::Exception("DaqDirector")
240  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
241 
242  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
243  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
244 
245  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
246  for (auto& optfile : optfiles) {
247  try {
248  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
249  } catch (...) {
250  }
251  }
252 
253  std::filesystem::rename(tmphltdir, hltdir);
254  } else
255  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
256  }
257  //else{}//no configuration specified
258  } else {
259  // for FU, check if bu base dir exists
260 
261  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262  if (retval != 0 && errno != EEXIST) {
263  throw cms::Exception("DaqDirector")
264  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
265  }
266 
268  fulockfile_ = bu_run_dir_ + "/fu.lock";
269  openFULockfileStream(false);
270  }
271 
272  pthread_mutex_init(&init_lock_, nullptr);
273 
274  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
275  std::stringstream sstp;
276  sstp << stopFilePath_ << "_pid" << pid_;
277  stopFilePathPid_ = sstp.str();
278 
279  if (!directorBU_) {
280  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
281  struct stat statbuf;
282  if (!stat(defPath.c_str(), &statbuf))
283  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
284  else {
285  //look in source directory if not present in ramdisk
286  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
287  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
288  if (stat(defPath.c_str(), &statbuf)) {
289  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
290  if (stat(defPath.c_str(), &statbuf)) {
291  defPath = defPathSuffix;
292  }
293  }
294  }
295  dpd_ = new DataPointDefinition();
296  std::string defLabel = "data";
297  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
298  }
299  }

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), pid_, run_, run_dir_, run_nstring_, run_string_, contentValuesCheck::ss, edm_modernize_messagelogger::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

122 { return nStreams_ == 1 && nThreads_ == 1; }

References nStreams_, and nThreads_.

Referenced by FedRawDataInputSource::getNextEvent().

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 913 of file EvFDaqDirector.cc.

913  {
914  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
915  flock(fulocal_rwlock_fd_, LOCK_SH);
916  }

References fulocal_rwlock_fd_.

Referenced by updateFuLock().

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 923 of file EvFDaqDirector.cc.

923  {
924  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
925  flock(fulocal_rwlock_fd2_, LOCK_EX);
926  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 909 of file EvFDaqDirector.cc.

909 { pthread_mutex_lock(&init_lock_); }

References init_lock_.

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2023 of file EvFDaqDirector.cc.

2023  {
2024 #ifdef __APPLE__
2025  return {start, len, pid, type, whence};
2026 #else
2027  return {type, whence, start, len, pid};
2028 #endif
2029  }

References command_line::start.

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 890 of file EvFDaqDirector.cc.

890  {
891  if (create) {
893  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
894  chmod(fulockfile_.c_str(), 0766);
895  } else {
896  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
897  }
898  if (fu_readwritelock_fd_ == -1)
899  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
900  << " create:" << create << " error:" << strerror(errno);
901  else
902  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
903 
904  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
905  if (fu_rw_lock_stream == nullptr)
906  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
907  }

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

107 { return outputAdler32Recheck_; }

References outputAdler32Recheck_.

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

75 { run_ = run; }

References writedatasetfile::run, and run_.

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 961 of file EvFDaqDirector.cc.

969  {
970  int infile;
971 
972  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
973  if (retry) {
974  edm::LogWarning("EvFDaqDirector")
975  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
976  return parseFRDFileHeader(rawSourcePath,
977  rawFd,
978  rawHeaderSize,
979  lsFromHeader,
980  eventsFromHeader,
981  fileSizeFromHeader,
982  requireHeader,
983  false,
984  closeFile);
985  } else {
986  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
987  edm::LogError("EvFDaqDirector")
988  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
989  if (errno == ENOENT)
990  return 1; // error && file not found
991  else
992  return -1;
993  }
994  }
995  }
996 
997  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
998  FRDFileHeader_v1 fileHead;
999 
1000  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1001  if (closeFile) {
1002  close(infile);
1003  infile = -1;
1004  }
1005 
1006  if (sz_read < 0) {
1007  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1008  << strerror(errno);
1009  if (infile != -1)
1010  close(infile);
1011  return -1;
1012  }
1013  if ((size_t)sz_read < buf_sz) {
1014  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1015  if (infile != -1)
1016  close(infile);
1017  return -1;
1018  }
1019 
1020  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1021 
1022  if (frd_version == 0) {
1023  //no header (specific sequence not detected)
1024  if (requireHeader) {
1025  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1026  if (infile != -1)
1027  close(infile);
1028  return -1;
1029  } else {
1030  //no header, but valid file
1031  lseek(infile, 0, SEEK_SET);
1032  rawHeaderSize = 0;
1033  lsFromHeader = 0;
1034  eventsFromHeader = -1;
1035  fileSizeFromHeader = -1;
1036  }
1037  } else {
1038  //version 1 header
1039  uint32_t headerSizeRaw = fileHead.headerSize_;
1040  if (headerSizeRaw < buf_sz) {
1041  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1042  << " v:" << frd_version;
1043  if (infile != -1)
1044  close(infile);
1045  return -1;
1046  }
1047  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1048  lsFromHeader = fileHead.lumiSection_;
1049  eventsFromHeader = (int32_t)fileHead.eventCount_;
1050  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1051  rawHeaderSize = fileHead.headerSize_;
1052  }
1053  rawFd = infile;
1054  return 0; //OK
1055  }

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and readEcalDQMStatus::read.

Referenced by grabNextJsonFromRaw(), and FedRawDataInputSource::readSupervisor().

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 374 of file EvFDaqDirector.cc.

374  {
375  close(bu_readlock_fd_);
376  close(bu_writelock_fd_);
377  if (directorBU_) {
378  std::string filename = bu_run_dir_ + "/bu.lock";
380  }
381  }

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBU_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 320 of file EvFDaqDirector.cc.

320  {
321  initRun();
322 
323  nThreads_ = bounds.maxNumberOfStreams();
324  nStreams_ = bounds.maxNumberOfThreads();
325  }

References initRun(), edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 359 of file EvFDaqDirector.cc.

359  {
361  checkMergeTypePSet(pc);
362  }

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 364 of file EvFDaqDirector.cc.

364  {
365  //assert(run_ == id.run());
366 
367  // check if the requested run is the latest one - issue a warning if it isn't
369  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
370  << ". This is not the highest run " << dirManager_.findHighestRunDir();
371  }
372  }

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 383 of file EvFDaqDirector.cc.

383  {
384  //delete all files belonging to just closed lumi
385  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
387  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
388  return;
389  }
390 
391  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
392  auto it = filesToDeletePtr_->begin();
393  while (it != filesToDeletePtr_->end()) {
394  if (it->second->lumi_ == ls) {
395  it = filesToDeletePtr_->erase(it);
396  } else
397  it++;
398  }
399  }

References fileDeleteLockPtr_, filesToDeletePtr_, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1057 of file EvFDaqDirector.cc.

1057  {
1058  int infile;
1059  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1060  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1061  << strerror(errno);
1062  return false;
1063  }
1064  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1065  FRDFileHeader_v1 fileHead;
1066 
1067  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1068 
1069  if (sz_read < 0) {
1070  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1071  << strerror(errno);
1072  if (infile != -1)
1073  close(infile);
1074  return false;
1075  }
1076  if ((size_t)sz_read < buf_sz) {
1077  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1078  if (infile != -1)
1079  close(infile);
1080  return false;
1081  }
1082 
1083  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1084 
1085  close(infile);
1086 
1087  if (frd_version > 0) {
1088  rawHeaderSize = fileHead.headerSize_;
1089  return true;
1090  }
1091 
1092  rawHeaderSize = 0;
1093  return false;
1094  }

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and readEcalDQMStatus::read.

Referenced by bumpFile().

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1848 of file EvFDaqDirector.cc.

1848  {
1849  std::ifstream ij(file);
1850  Json::Value deserializeRoot;
1852 
1853  if (!reader.parse(ij, deserializeRoot)) {
1854  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1855  return -1;
1856  }
1857 
1858  int ret = deserializeRoot.get("lastLS", "").asInt();
1859  return ret;
1860  }

References Json::Value::asInt(), FrontierConditions_GlobalTag_cff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ removeFile() [1/2]

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 494 of file EvFDaqDirector.cc.

494  {
495  int retval = remove(filename.c_str());
496  if (retval != 0)
497  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
498  << ". error = " << strerror(errno);
499  }

References corrVsCorr::filename, and MatrixUtil::remove().

◆ removeFile() [2/2]

void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 501 of file EvFDaqDirector.cc.

References getRawFilePath(), and eostools::ls().

Referenced by postEndRun().

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 175 of file EvFDaqDirector.h.

176  {
177  fileDeleteLockPtr_ = fileDeleteLock;
178  filesToDeletePtr_ = filesToDelete;
179  }

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

121 { fms_ = fms; }

References fms_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 880 of file EvFDaqDirector.cc.

880  {
881  if (fu_rw_lock_stream == nullptr)
882  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
883  else {
884  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
885  unsigned int readLs = 1, readIndex = 0;
886  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
887  }
888  }

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 918 of file EvFDaqDirector.cc.

918  {
919  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
920  flock(fulocal_rwlock_fd_, LOCK_UN);
921  }

References fulocal_rwlock_fd_.

Referenced by grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 928 of file EvFDaqDirector.cc.

928  {
929  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
930  flock(fulocal_rwlock_fd2_, LOCK_UN);
931  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 911 of file EvFDaqDirector.cc.

911 { pthread_mutex_unlock(&init_lock_); }

References init_lock_.

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 503 of file EvFDaqDirector.cc.

508  {
509  EvFDaqDirector::FileStatus fileStatus = noFile;
510  rawHeaderSize = 0;
511 
512  int retval = -1;
513  int lock_attempts = 0;
514  long total_lock_attempts = 0;
515 
516  struct stat buf;
517  int stopFileLS = -1;
518  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
519  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
520  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
521  if (stopFileCheck == 0)
522  stopFileLS = readLastLSEntry(stopFilePath_);
523  else
524  stopFileLS = 1; //stop without drain if only pid is stopped
525  if (!stop_ls_override_) {
526  //if lumisection is higher than in stop file, should quit at next from current
527  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
528  stopFileLS = stop_ls_override_ = ls;
529  } else
530  stopFileLS = stop_ls_override_;
531  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
532  << stopFileLS;
533  //return runEnded;
534  } else //if file was removed before reaching stop condition, reset this
535  stop_ls_override_ = 0;
536 
537  timeval ts_lockbegin;
538  gettimeofday(&ts_lockbegin, nullptr);
539 
540  while (retval == -1) {
541  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
542  if (retval == -1)
543  usleep(fuLockPollInterval_);
544  else
545  continue;
546 
547  lock_attempts += fuLockPollInterval_;
548  total_lock_attempts += fuLockPollInterval_;
549  if (lock_attempts > 5000000 || errno == 116) {
550  if (errno == 116)
551  edm::LogWarning("EvFDaqDirector")
552  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
553  else
554  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
555  "fu.lock file are present -: errno "
556  << errno << ":" << strerror(errno) << std::endl;
557 
558  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
559  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
560  ls++;
561  return noFile;
562  }
563 
564  if (stat(bu_run_dir_.c_str(), &buf) != 0)
565  return runEnded;
566  if (stat(fulockfile_.c_str(), &buf) != 0)
567  return runEnded;
568 
569  lock_attempts = 0;
570  }
571  if (total_lock_attempts > 5 * 60000000) {
572  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
573  return runAbort;
574  }
575  }
576 
577  timeval ts_lockend;
578  gettimeofday(&ts_lockend, nullptr);
579  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
580  if (deltat > 0.)
581  lockWaitTime = deltat;
582 
583  if (retval != 0)
584  return fileStatus;
585 
586 #ifdef DEBUG
587  timeval ts_lockend;
588  gettimeofday(&ts_lockend, 0);
589 #endif
590 
591  //open another lock file FD after the lock using main fd has been acquired
592  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
593  if (fu_readwritelock_fd2 == -1)
594  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
595  << " create. error:" << strerror(errno);
596 
597  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
598 
599  // if the stream is readable
600  if (fu_rw_lock_stream2 != nullptr) {
601  unsigned int readLs, readIndex;
602  int check = 0;
603  // rewind the stream
604  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
605  // if rewinded ok
606  if (check == 0) {
607  // read its' values
608  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
609  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
610 
611  unsigned int currentLs = readLs;
612  bool bumpedOk = false;
613  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
614  //no lock file write in this case
615  if (ls && ls + 1 < currentLs)
616  ls++;
617  else {
618  // try to bump (look for new index or EoLS file)
619  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
620  //avoid 2 lumisections jump
621  if (ls && readLs > currentLs && currentLs > ls) {
622  ls++;
623  readLs = currentLs = ls;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  } else {
628  if (ls == 0 && readLs > currentLs) {
629  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
630  //in this case there is no new file in the same LS
631  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
632  readLs = currentLs;
633  readIndex = 0;
634  bumpedOk = false;
635  //no write to lock file
636  }
637  //update return LS value
638  ls = readLs;
639  }
640  }
641  if (bumpedOk) {
642  // there is a new index file to grab, lock file needs to be updated
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  if (check == 0) {
645  ftruncate(fu_readwritelock_fd2, 0);
646  // write next index in the file, which is the file the next process should take
647  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
648  fflush(fu_rw_lock_stream2);
649  fsync(fu_readwritelock_fd2);
650  fileStatus = newFile;
651  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
652  } else {
653  edm::LogError("EvFDaqDirector")
654  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
655  setExceptionState = true;
656  return noFile;
657  }
658  } else if (currentLs < readLs) {
659  //there is no new file in next LS (yet), but lock file can be updated to the next LS
660  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
661  if (check == 0) {
662  ftruncate(fu_readwritelock_fd2, 0);
663  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
664  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
665  fflush(fu_rw_lock_stream2);
666  fsync(fu_readwritelock_fd2);
667  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
668  } else {
669  edm::LogError("EvFDaqDirector")
670  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
671  setExceptionState = true;
672  return noFile;
673  }
674  }
675  } else {
676  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
677  << strerror(errno);
678  }
679  } else {
680  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
681  }
682  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
683 
684 #ifdef DEBUG
685  timeval ts_preunlock;
686  gettimeofday(&ts_preunlock, 0);
687  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
688  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
689 #endif
690 
691  //if new json is present, lock file which FedRawDataInputSource will later unlock
692  if (fileStatus == newFile)
693  lockFULocal();
694 
695  //release lock at this point
696  int retvalu = -1;
697  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
698  if (retvalu == -1)
699  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
700 
701 #ifdef DEBUG
702  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
703 #endif
704 
705  if (fileStatus == noFile) {
706  struct stat buf;
707  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
708  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
709  fileStatus = runEnded;
710  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
711  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
712  fileStatus = runEnded;
713  }
714  }
715  return fileStatus;
716  }

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, edm_modernize_messagelogger::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 79 of file EvFDaqDirector.h.

79 { return useFileBroker_; }

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 203 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 204 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 248 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 250 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 238 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 241 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 247 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 249 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 240 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 282 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 208 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 207 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 210 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 209 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 211 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 256 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by bumpFile(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 233 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 252 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 212 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 259 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 271 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 274 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 261 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 213 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 281 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 264 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 214 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and initRun().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by getLumisectionToStart(), and initRun().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 283 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getLumisectionToStart().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 266 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 206 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and useFileBroker().

runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:542
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:320
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:217
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:245
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
Definition: EvFDaqDirector.cc:779
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:214
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:928
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:282
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:243
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
FRDFileHeader_v1::lumiSection_
uint32_t lumiSection_
Definition: FRDFileHeader.h:37
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:933
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:210
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:405
mps_fire.i
i
Definition: mps_fire.py:428
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:234
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:41
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:206
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:259
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:476
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:257
evf::EvFDaqDirector::bu_r_fulk
struct flock bu_r_fulk
Definition: EvFDaqDirector.h:250
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:488
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:264
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:216
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
FRDFileHeader_v1::headerSize_
uint16_t headerSize_
Definition: FRDFileHeader.h:35
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:334
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:228
FRDFileHeader_v1::id_
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:33
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames()
Definition: DataPointDefinition.h:44
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_w_fulk
struct flock bu_w_fulk
Definition: EvFDaqDirector.h:249
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:231
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:262
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1877
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
evf::EvFDaqDirector::make_flock
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
Definition: EvFDaqDirector.cc:2023
pos
Definition: PixelAliasList.h:18
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:232
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1447
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:880
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:279
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:268
mps_check.msg
tuple msg
Definition: mps_check.py:285
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:208
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::bu_r_flk
struct flock bu_r_flk
Definition: EvFDaqDirector.h:248
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:281
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:359
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:668
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:890
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:480
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:266
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:158
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
DQM.reader
reader
Definition: DQM.py:105
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:961
FRDFileHeader_v1::eventCount_
uint16_t eventCount_
Definition: FRDFileHeader.h:36
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:215
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:254
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:115
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:205
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:265
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:225
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:211
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:274
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:235
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:47
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:364
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:209
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
evf::EvFDaqDirector::outputAdler32Recheck_
bool outputAdler32Recheck_
Definition: EvFDaqDirector.h:213
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:271
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:918
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
FRDFileHeader_v1::version_
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:34
recoMuon::in
Definition: RecoMuonEnumerators.h:6
gainCalibHelper::gainCalibPI::type
type
Definition: SiPixelGainCalibHelper.h:40
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:226
edm::Service
Definition: Service.h:30
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:23
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:212
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:913
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:204
jsoncollector::DataPoint
Definition: DataPoint.h:36
DBConfiguration_cff.connect
connect
Definition: DBConfiguration_cff.py:18
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1838
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:252
res
Definition: Electron.h:6
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
Definition: EvFDaqDirector.cc:943
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:218
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:220
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1848
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:405
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
FRDFileHeader_v1
Definition: FRDFileHeader.h:22
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:251
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:223
evf::DirManager::findRunDir
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:283
writedatasetfile.run
run
Definition: writedatasetfile.py:27
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:149
evf::EvFDaqDirector::rawFileHasHeader
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
Definition: EvFDaqDirector.cc:1057
evf::EvFDaqDirector::bu_w_flk
struct flock bu_w_flk
Definition: EvFDaqDirector.h:247
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:245
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:229
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:383
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:270
FRDFileHeader_v1::fileSize_
uint64_t fileSize_
Definition: FRDFileHeader.h:38
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:401
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:222
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:277
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:280
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:501
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:261
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:350
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:233
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:227
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:484
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:311
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:203
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:351
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:317
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:239
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:267
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:224
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:374
edm::Log
Definition: MessageLogger.h:70
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:923
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1178
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:718
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2128
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:237
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:207
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::bu_r_lock_stream
FILE * bu_r_lock_stream
Definition: EvFDaqDirector.h:238
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1096
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1985