CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (std::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
 
void initRun ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
bool rawFileHasHeader (std::string const &rawSourcePath, uint16_t &rawHeaderSize)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
void removeFile (unsigned int ls, unsigned int index)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Enumerator
noFile 
sameFile 
newFile 
newLumi 
runEnded 
runAbort 

Definition at line 64 of file EvFDaqDirector.h.

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 40 of file EvFDaqDirector.cc.

41  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
43  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
44  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
45  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
50  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
51  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
52  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
53  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
54  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname, 32);
83  hostname_ = hostname;
84 
85  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
90  << " us";
91  } catch (boost::bad_lexical_cast const&) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  } catch (boost::bad_lexical_cast const&) {
103  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
104  }
105  }
106  if (useFileBroker_) {
108  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config", &buf) == 0) {
112  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
113  std::getline(busconfig, fileBrokerHost_);
114  }
115  if (fileBrokerHost_.empty())
116  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
117  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
118  throw cms::Exception("EvFDaqDirector")
119  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
120 
121  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
122  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
123  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
124  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
125  }
126 
127  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  } catch (boost::bad_lexical_cast const&) {
133  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
134  }
135  }
136 
137  //override file service parameter if specified by environment
138  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
139  if (fileBrokerUseLockParamPtr) {
140  try {
141  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
142  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
144  } catch (boost::bad_lexical_cast const&) {
145  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
146  }
147  }
148  }

References visDQMUpload::buf, endpoint_iterator_, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, io_service_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, socket_, startFromLS_, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 295 of file EvFDaqDirector.cc.

295  {
296  //close server connection
297  if (socket_.get() && socket_->is_open()) {
298  boost::system::error_code ec;
299  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
300  socket_->close(ec);
301  }
302 
303  if (fulocal_rwlock_fd_ != -1) {
304  unlockFULocal();
305  close(fulocal_rwlock_fd_);
306  }
307 
308  if (fulocal_rwlock_fd2_ != -1) {
309  unlockFULocal2();
310  close(fulocal_rwlock_fd2_);
311  }
312  }

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

76 { return run_dir_; }

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

77 { return bu_run_dir_; }

References bu_run_dir_.

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

78 { return bu_run_open_dir_; }

References bu_run_open_dir_.

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
int  maxLS,
bool &  setExceptionState 
)
private

Definition at line 773 of file EvFDaqDirector.cc.

779  {
780  if (previousFileSize_ != 0) {
781  if (!fms_) {
783  }
784  if (fms_)
786  previousFileSize_ = 0;
787  }
788  nextFile = "";
789 
790  //reached limit
791  if (maxLS >= 0 && ls > (unsigned int)maxLS)
792  return false;
793 
794  struct stat buf;
795  std::stringstream ss;
796  unsigned int nextIndex = index;
797  nextIndex++;
798 
799  // 1. Check suggested file
800  std::string nextFileJson = getInputJsonFilePath(ls, index);
801  if (stat(nextFileJson.c_str(), &buf) == 0) {
802  fsize = previousFileSize_ = buf.st_size;
803  nextFile = nextFileJson;
804  return true;
805  }
806  // 2. No file -> lumi ended? (and how many?)
807  else {
808  // 3. No file -> check for standalone raw file
809  std::string nextFileRaw = getRawFilePath(ls, index);
810  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
811  fsize = previousFileSize_ = buf.st_size;
812  nextFile = nextFileRaw;
813  return true;
814  }
815 
816  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
817 
818  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
819  // recheck that no raw file appeared in the meantime
820  if (stat(nextFileJson.c_str(), &buf) == 0) {
821  fsize = previousFileSize_ = buf.st_size;
822  nextFile = nextFileJson;
823  return true;
824  }
825  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
826  fsize = previousFileSize_ = buf.st_size;
827  nextFile = nextFileRaw;
828  return true;
829  }
830 
831  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
832  if (indexFilesInLS < 0)
833  //parsing failed
834  return false;
835  else {
836  //check index
837  if ((int)index < indexFilesInLS) {
838  //we have 2 files, and check for 1 failed... retry (2 will never be here)
839  edm::LogError("EvFDaqDirector")
840  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
841  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
842  setExceptionState = true;
843  return false;
844  }
845  }
846  // this lumi ended, check for files
847  ++ls;
848  index = 0;
849 
850  //reached limit
851  if (maxLS >= 0 && ls > (unsigned int)maxLS)
852  return false;
853 
854  nextFileJson = getInputJsonFilePath(ls, 0);
855  nextFileRaw = getRawFilePath(ls, 0);
856  if (stat(nextFileJson.c_str(), &buf) == 0) {
857  // a new file was found at new lumisection, index 0
858  fsize = previousFileSize_ = buf.st_size;
859  nextFile = nextFileJson;
860  return true;
861  }
862  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
863  fsize = previousFileSize_ = buf.st_size;
864  nextFile = nextFileRaw;
865  return true;
866  }
867  return false;
868  }
869  }
870  // no new file found
871  return false;
872  }

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), getRawFilePath(), eostools::ls(), previousFileSize_, rawFileHasHeader(), contentValuesCheck::ss, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 1979 of file EvFDaqDirector.cc.

1979  {
1980  if (mergeTypePset_.empty())
1981  return;
1982  if (!mergeTypeMap_.empty())
1983  return;
1984  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1985  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
1986  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1987  for (const std::string& pname : tsPset.getParameterNames()) {
1988  std::string streamType = tsPset.getParameter<std::string>(pname);
1989  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1990  mergeTypeMap_.insert(ac, pname);
1991  ac->second = streamType;
1992  ac.release();
1993  }
1994  }
1995  }

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1871 of file EvFDaqDirector.cc.

1871  {
1872  if (transferSystemJson_)
1873  return;
1874 
1875  transferSystemJson_.reset(new Json::Value);
1876  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1877  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1878  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1879 
1880  Json::Value destinationsVal(Json::arrayValue);
1881  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1882  for (auto& dest : destinations)
1883  destinationsVal.append(dest);
1884  (*transferSystemJson_)["destinations"] = destinationsVal;
1885 
1886  Json::Value modesVal(Json::arrayValue);
1887  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1888  for (auto& mode : modes)
1889  modesVal.append(mode);
1890  (*transferSystemJson_)["transferModes"] = modesVal;
1891 
1892  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1893  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1894  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1895  Json::Value streamVal;
1896  for (auto& mode : modes) {
1897  //validation
1898  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1899  throw cms::Exception("EvFDaqDirector")
1900  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1901  << ")";
1902  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1903 
1904  Json::Value sDestsValue(Json::arrayValue);
1905 
1906  if (streamDestinations.empty())
1907  throw cms::Exception("EvFDaqDirector")
1908  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1909 
1910  for (auto& sdest : streamDestinations) {
1911  bool sDestValid = false;
1912  sDestsValue.append(sdest);
1913  for (auto& dest : destinations) {
1914  if (dest == sdest)
1915  sDestValid = true;
1916  }
1917  if (!sDestValid)
1918  throw cms::Exception("EvFDaqDirector")
1919  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1920  << ", dest:" << sdest;
1921  }
1922  streamVal[mode] = sDestsValue;
1923  }
1924  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1925  }
1926  }
1927  } else {
1928  if (requireTSPSet_)
1929  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1930  }
1931  }

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1441 of file EvFDaqDirector.cc.

1448  {
1449  EvFDaqDirector::FileStatus fileStatus = noFile;
1450  serverError = false;
1451 
1452  boost::system::error_code ec;
1453  try {
1454  while (true) {
1455  //socket connect
1456  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1458 
1459  if (ec) {
1460  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1461  serverError = true;
1462  break;
1463  }
1464  }
1465 
1466  boost::asio::streambuf request;
1467  std::ostream request_stream(&request);
1468  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1469  if (maxLS >= 0) {
1470  std::stringstream spath;
1471  spath << path << "&stopls=" << maxLS;
1472  path = spath.str();
1473  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1474  }
1475  request_stream << "GET " << path << " HTTP/1.1\r\n";
1476  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1477  request_stream << "Accept: */*\r\n";
1478  request_stream << "Connection: keep-alive\r\n\r\n";
1479 
1480  boost::asio::write(*socket_, request, ec);
1481  if (ec) {
1482  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1483  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1484  //we got disconnected, try to reconnect to the server before writing the request
1486  if (ec) {
1487  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1488  serverError = true;
1489  break;
1490  }
1491  continue;
1492  }
1493  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1494  serverError = true;
1495  break;
1496  }
1497 
1498  boost::asio::streambuf response;
1499  boost::asio::read_until(*socket_, response, "\r\n", ec);
1500  if (ec) {
1501  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1502  serverError = true;
1503  break;
1504  }
1505 
1506  std::istream response_stream(&response);
1507 
1508  std::string http_version;
1509  response_stream >> http_version;
1510 
1511  response_stream >> serverHttpStatus;
1512 
1513  std::string status_message;
1514  std::getline(response_stream, status_message);
1515  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1516  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1517  serverError = true;
1518  break;
1519  }
1520  if (serverHttpStatus != 200) {
1521  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1522  serverError = true;
1523  break;
1524  }
1525 
1526  // Process the response headers.
1528  while (std::getline(response_stream, header) && header != "\r") {
1529  }
1530 
1531  std::string fileInfo;
1532  std::map<std::string, std::string> serverMap;
1533  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1534  auto pos = fileInfo.find('=');
1535  if (pos == std::string::npos)
1536  continue;
1537  auto stitle = fileInfo.substr(0, pos);
1538  auto svalue = fileInfo.substr(pos + 1);
1539  serverMap[stitle] = svalue;
1540  }
1541 
1542  //check that response run number if correct
1543  auto server_version = serverMap.find("version");
1544  assert(server_version != serverMap.end());
1545 
1546  auto server_run = serverMap.find("runnumber");
1547  assert(server_run != serverMap.end());
1548  assert(run_nstring_ == server_run->second);
1549 
1550  auto server_state = serverMap.find("state");
1551  assert(server_state != serverMap.end());
1552 
1553  auto server_eols = serverMap.find("lasteols");
1554  assert(server_eols != serverMap.end());
1555 
1556  auto server_ls = serverMap.find("lumisection");
1557 
1558  int version_maj = 1;
1559  int version_min = 0;
1560  int version_rev = 0;
1561  {
1562  auto* s_ptr = server_version->second.c_str();
1563  if (!server_version->second.empty() && server_version->second[0] == '"')
1564  s_ptr++;
1565  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1566  if (res < 3) {
1567  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1568  if (res < 2) {
1569  res = sscanf(s_ptr, "%d", &version_maj);
1570  if (res < 1) {
1571  //expecting at least 1 number (major version)
1572  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1573  }
1574  }
1575  }
1576  }
1577 
1578  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1579  if (server_ls != serverMap.end())
1580  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1581  else
1582  serverLS = closedServerLS + 1;
1583 
1584  std::string s_state = server_state->second;
1585  if (s_state == "STARTING") //initial, always empty starting with LS 1
1586  {
1587  auto server_file = serverMap.find("file");
1588  assert(server_file == serverMap.end()); //no file with starting state
1589  fileStatus = noFile;
1590  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1591  } else if (s_state == "READY") {
1592  auto server_file = serverMap.find("file");
1593  if (server_file == serverMap.end()) {
1594  //can be returned by server if files from new LS already appeared but LS is not yet closed
1595  if (serverLS <= closedServerLS)
1596  serverLS = closedServerLS + 1;
1597  fileStatus = noFile;
1598  edm::LogInfo("EvFDaqDirector")
1599  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1600  } else {
1601  std::string filestem;
1602  std::string fileprefix;
1603  auto server_fileprefix = serverMap.find("fileprefix");
1604 
1605  if (server_fileprefix != serverMap.end()) {
1606  auto pssize = server_fileprefix->second.size();
1607  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1608  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1609  else
1610  fileprefix = server_fileprefix->second;
1611  }
1612 
1613  //remove string literals
1614  auto ssize = server_file->second.size();
1615  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1616  filestem = server_file->second.substr(1, ssize - 2);
1617  else
1618  filestem = server_file->second;
1619  assert(!filestem.empty());
1620  if (version_maj > 1) {
1621  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1622  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1623  nextFileJson = "";
1624  rawHeader = true;
1625  } else {
1626  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1627  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1628  nextFileJson = filestem + ".jsn";
1629  rawHeader = false;
1630  }
1631  fileStatus = newFile;
1632  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1633  << serverLS << " file:" << filestem;
1634  }
1635  } else if (s_state == "EOLS") {
1636  serverLS = closedServerLS + 1;
1637  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1638  fileStatus = noFile;
1639  } else if (s_state == "EOR") {
1640  //server_eor = serverMap.find("iseor");
1641  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1642  fileStatus = runEnded;
1643  } else if (s_state == "NORUN") {
1644  auto err_msg = serverMap.find("errormessage");
1645  if (err_msg != serverMap.end())
1646  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1647  else
1648  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1649  edm::LogWarning("EvFDaqDirector") << "executing run end";
1650  fileStatus = runEnded;
1651  } else if (s_state == "ERROR") {
1652  auto err_msg = serverMap.find("errormessage");
1653  if (err_msg != serverMap.end())
1654  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1655  else
1656  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1657  fileStatus = noFile;
1658  serverError = true;
1659  } else {
1660  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1661  fileStatus = noFile;
1662  serverError = true;
1663  }
1664 
1665  // Read until EOF, writing data to output as we go.
1666  if (!fileBrokerKeepAlive_) {
1667  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1668  }
1669  if (ec != boost::asio::error::eof) {
1670  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1671  serverError = true;
1672  }
1673  }
1674  break;
1675  }
1676  } catch (std::exception const& e) {
1677  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1678  serverError = true;
1679  }
1680 
1681  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1682  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1683  if (ec) {
1684  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1685  }
1686  socket_->close(ec);
1687  if (ec) {
1688  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1689  }
1690  }
1691 
1692  if (serverError) {
1693  if (socket_->is_open())
1694  socket_->close(ec);
1695  if (ec) {
1696  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1697  }
1698  fileStatus = noFile;
1699  sleep(1); //back-off if error detected
1700  }
1701  return fileStatus;
1702  }

References cms::cuda::assert(), bu_run_dir_, DBConfiguration_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 927 of file EvFDaqDirector.cc.

927  {
928  //used for backpressure mechanisms and monitoring
929  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
930  struct stat buf;
931  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
932  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
933  close(bol_fd);
934  }
935  }

References visDQMUpload::buf, getBoLSFilePathOnFU(), hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS = true 
)

Definition at line 937 of file EvFDaqDirector.cc.

939  {
940  if (currentLumiSection > 0) {
941  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
942  struct stat buf;
943  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
944  if (!found) {
945  int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
946  close(eol_fd);
947  if (doCreateBoLS)
948  createBoLSFile(lumiSection, false);
949  }
950  } else if (doCreateBoLS) {
951  createBoLSFile(lumiSection, true); //needed for initial lumisection
952  }
953  }

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 2011 of file EvFDaqDirector.cc.

2011  {
2012  std::string proc_flag = run_dir_ + "/processing";
2013  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2014  close(proc_flag_fd);
2015  }

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext().

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1832 of file EvFDaqDirector.cc.

1832  {
1833  // create open dir if not already there
1834 
1836  if (!std::filesystem::is_directory(openPath)) {
1837  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1838  std::filesystem::create_directories(openPath);
1839  }
1840  }

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 321 of file EvFDaqDirector.cc.

321  {
323  desc.setComment(
324  "Service used for file locking arbitration and for propagating information between other EvF components");
325  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
326  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
327  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
328  desc.addUntracked<bool>("useFileBroker", false)
329  ->setComment("Use BU file service to grab input data instead of NFS file locking");
330  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
331  ->setComment("Allow service to discover BU address from hltd configuration");
332  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
333  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
334  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
335  ->setComment("Use keep alive to avoid using large number of sockets");
336  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
337  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
338  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
339  ->setComment("Lock polling interval in microseconds for the input directory file lock");
340  desc.addUntracked<bool>("outputAdler32Recheck", false)
341  ->setComment("Check Adler32 of per-process output files while micro-merging");
342  desc.addUntracked<bool>("requireTransfersPSet", false)
343  ->setComment("Require complete transferSystem PSet in the process configuration");
344  desc.addUntracked<std::string>("selectedTransferMode", "")
345  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
346  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
347  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
348  desc.addUntracked<std::string>("mergingPset", "")
349  ->setComment("Name of merging PSet to look for merging type definitions for streams");
350  descriptions.add("EvFDaqDirector", desc);
351  }

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 81 of file EvFDaqDirector.h.

81 { return dirManager_.findRunDir(run_); }

References dirManager_, evf::DirManager::findRunDir(), and run_.

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 478 of file EvFDaqDirector.cc.

478  {
479  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
480  }

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 470 of file EvFDaqDirector.cc.

470  {
471  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
472  }

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 482 of file EvFDaqDirector.cc.

482 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 484 of file EvFDaqDirector.cc.

484 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext().

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 486 of file EvFDaqDirector.cc.

486 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }

References bu_run_dir_.

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 439 of file EvFDaqDirector.cc.

439  {
441  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

Referenced by dqm::DQMFileSaverPB::initRun().

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 395 of file EvFDaqDirector.cc.

395  {
397  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1856 of file EvFDaqDirector.cc.

1856  {
1857  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1859  struct stat buf;
1860  unsigned int lscount = startFromLS_;
1861  do {
1862  std::stringstream ss;
1863  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1864  fullpath = ss.str();
1865  lscount++;
1866  } while (stat(fullpath.c_str(), &buf) == 0);
1867  return lscount - 1;
1868  }

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, startFromLS_, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1704 of file EvFDaqDirector.cc.

1711  {
1712  EvFDaqDirector::FileStatus fileStatus = noFile;
1713 
1714  //int retval = -1;
1715  //int lock_attempts = 0;
1716  //long total_lock_attempts = 0;
1717 
1718  struct stat buf;
1719  int stopFileLS = -1;
1720  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1721  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1722  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1723  if (stopFileCheck == 0)
1724  stopFileLS = readLastLSEntry(stopFilePath_);
1725  else
1726  stopFileLS = 1; //stop without drain if only pid is stopped
1727  if (!stop_ls_override_) {
1728  //if lumisection is higher than in stop file, should quit at next from current
1729  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1730  stopFileLS = stop_ls_override_ = ls;
1731  } else
1732  stopFileLS = stop_ls_override_;
1733  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1734  << stopFileLS;
1735  //return runEnded;
1736  } else //if file was removed before reaching stop condition, reset this
1737  stop_ls_override_ = 0;
1738 
1739  /* look for EoLS
1740  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1741  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1742  ls++;
1743  return noFile;
1744  }
1745  */
1746 
1747  timeval ts_lockbegin;
1748  gettimeofday(&ts_lockbegin, nullptr);
1749 
1750  std::string nextFileJson;
1751  uint32_t serverLS, closedServerLS;
1752  unsigned int serverHttpStatus;
1753  bool serverError;
1754 
1755  //local lock to force index json and EoLS files to appear in order
1757  lockFULocal2();
1758 
1759  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1760  bool rawHeader = false;
1761  fileStatus = contactFileBroker(
1762  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1763 
1764  if (serverError) {
1765  //do not update anything
1767  unlockFULocal2();
1768  return noFile;
1769  }
1770 
1771  //handle creation of EoLS and BoLS files if lumisection has changed
1772  if (currentLumiSection == 0) {
1773  if (fileStatus == runEnded) {
1774  createLumiSectionFiles(closedServerLS, 0);
1775  createLumiSectionFiles(serverLS, closedServerLS, false); // +1
1776  } else
1777  createLumiSectionFiles(serverLS, 0);
1778  } else {
1779  //loop over and create any EoLS files missing
1780  if (closedServerLS >= currentLumiSection) {
1781  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1782  createLumiSectionFiles(i + 1, i);
1783  }
1784  }
1785 
1786  bool fileFound = true;
1787 
1788  if (fileStatus == newFile) {
1789  if (rawHeader > 0)
1790  serverEventsInNewFile =
1791  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1792  else
1793  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1794  }
1795  //closing file in case of any error
1796  if (serverEventsInNewFile < 0 && rawFd != -1) {
1797  close(rawFd);
1798  rawFd = -1;
1799  }
1800  if (!fileFound) {
1801  //catch condition where directory got deleted
1802  fileStatus = noFile;
1803  struct stat buf;
1804  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1805  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1806  fileStatus = runEnded;
1807  }
1808  }
1809 
1810  //can unlock because all files have been created locally
1812  unlockFULocal2();
1813 
1814  if (fileStatus == runEnded)
1815  ls = std::max(currentLumiSection, serverLS);
1816  else if (fileStatus == newFile) {
1817  assert(serverLS >= ls);
1818  ls = serverLS;
1819  } else if (fileStatus == noFile) {
1820  if (serverLS >= ls)
1821  ls = serverLS;
1822  else {
1823  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1824  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1825  sleep(1);
1826  }
1827  }
1828 
1829  return fileStatus;
1830  }

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, hgcalPlots::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 712 of file EvFDaqDirector.cc.

712  {
713  std::ifstream ij(BUEoLSFile);
714  Json::Value deserializeRoot;
716 
717  if (!reader.parse(ij, deserializeRoot)) {
718  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
719  return -1;
720  }
721 
723  DataPoint dp;
724  dp.deserialize(deserializeRoot);
725 
726  //read definition
727  if (readEolsDefinition_) {
728  //std::string def = boost::algorithm::trim(dp.getDefinition());
729  std::string def = dp.getDefinition();
730  if (def.empty())
731  readEolsDefinition_ = false;
732  while (!def.empty()) {
734  if (def.find('/') == 0)
735  fullpath = def;
736  else
737  fullpath = bu_run_dir_ + '/' + def;
738  struct stat buf;
739  if (stat(fullpath.c_str(), &buf) == 0) {
740  DataPointDefinition eolsDpd;
741  std::string defLabel = "legend";
742  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
743  if (eolsDpd.getNames().empty()) {
744  //try with "data" label if "legend" format is not used
745  eolsDpd = DataPointDefinition();
746  defLabel = "data";
747  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
748  }
749  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
750  if (eolsDpd.getNames().at(i) == "NFiles")
752  readEolsDefinition_ = false;
753  break;
754  }
755  //check if we can still find definition
756  if (def.size() <= 1 || def.find('/') == std::string::npos) {
757  readEolsDefinition_ = false;
758  break;
759  }
760  def = def.substr(def.find('/') + 1);
761  }
762  }
763 
764  if (dp.getData().size() > eolsNFilesIndex_)
765  data = dp.getData()[eolsNFilesIndex_];
766  else {
767  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
768  return -1;
769  }
770  return boost::lexical_cast<int>(data);
771  }

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 435 of file EvFDaqDirector.cc.

435  {
436  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
437  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 407 of file EvFDaqDirector.cc.

407  {
408  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
409  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 403 of file EvFDaqDirector.cc.

403  {
404  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
405  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 399 of file EvFDaqDirector.cc.

399  {
401  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and removeFile().

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 118 of file EvFDaqDirector.h.

118 { return run_; }

References run_.

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 106 of file EvFDaqDirector.h.

106 { return run_dir_ + "/open"; }

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1933 of file EvFDaqDirector.cc.

1933  {
1934  std::string streamRequestName;
1935  if (transferSystemJson_->isMember(stream.c_str()))
1936  streamRequestName = stream;
1937  else {
1938  std::stringstream msg;
1939  msg << "Transfer system mode definitions missing for -: " << stream;
1940  if (requireTSPSet_)
1941  throw cms::Exception("EvFDaqDirector") << msg.str();
1942  else {
1943  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1944  return std::string("Failsafe");
1945  }
1946  }
1947  //return empty if strict check parameter is not on
1948  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1949  edm::LogWarning("EvFDaqDirector")
1950  << "Selected mode string is not provided as DaqDirector parameter."
1951  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1952  return std::string("Failsafe");
1953  }
1954  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1955  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1956  }
1957  //check if stream has properly listed transfer stream
1958  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1959  std::stringstream msg;
1960  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1961  if (requireTSPSet_)
1962  throw cms::Exception("EvFDaqDirector") << msg.str();
1963  else
1964  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1965  return std::string("Failsafe");
1966  }
1967  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
1968 
1969  //flatten string json::Array into CSV std::string
1970  std::string ret;
1971  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
1972  if (!ret.empty())
1973  ret += ",";
1974  ret += (*it).asString();
1975  }
1976  return ret;
1977  }

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 1997 of file EvFDaqDirector.cc.

1997  {
1998  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
1999  if (mergeTypeMap_.find(search_ac, stream))
2000  return search_ac->second;
2001 
2002  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2003  std::string defaultName = MergeTypeNames_[defaultType];
2004  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2005  mergeTypeMap_.insert(ac, stream);
2006  ac->second = defaultName;
2007  ac.release();
2008  return defaultName;
2009  }

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1172 of file EvFDaqDirector.cc.

1175  {
1176  fileFound = true;
1177 
1178  //should be ported to use fffnaming
1179  std::ostringstream fileNameWithPID;
1180  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1181  << std::setw(5) << pid_ << ".jsn";
1182 
1183  // assemble json destination path
1184  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1185 
1186  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1187 
1188  int infile = -1, outfile = -1;
1189 
1190  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1191  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1192  << strerror(errno);
1193  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1194  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1195  << jsonSourcePath << " : " << strerror(errno);
1196  if (errno == ENOENT)
1197  fileFound = false;
1198  return -1;
1199  }
1200  }
1201 
1202  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1203  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1204  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1205  if (errno == EEXIST) {
1206  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1207  << " : ";
1208  ::close(infile);
1209  return -1;
1210  }
1211  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1212  << strerror(errno);
1213  struct stat out_stat;
1214  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1215  edm::LogWarning("EvFDaqDirector")
1216  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1217  if (unlink(jsonDestPath.c_str()) == -1) {
1218  edm::LogWarning("EvFDaqDirector")
1219  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1220  }
1221  }
1222  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1223  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1224  << jsonDestPath << " : " << strerror(errno);
1225  ::close(infile);
1226  return -1;
1227  }
1228  }
1229  //copy contents
1230  const std::size_t buf_sz = 512;
1231  std::size_t tot_written = 0;
1232  std::unique_ptr<char> buf(new char[buf_sz]);
1233 
1234  ssize_t sz, sz_read = 1, sz_write;
1235  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1236  sz_write = 0;
1237  do {
1238  assert(sz_read - sz_write > 0);
1239  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1240  sz_read = sz; // cause read loop termination
1241  break;
1242  }
1243  assert(sz > 0);
1244  sz_write += sz;
1245  tot_written += sz;
1246  } while (sz_write < sz_read);
1247  }
1248  close(infile);
1249  close(outfile);
1250 
1251  if (tot_written > 0) {
1252  //leave file if it was empty for diagnosis
1253  if (unlink(jsonSourcePath.c_str()) == -1) {
1254  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1255  << strerror(errno);
1256  return -1;
1257  }
1258  } else {
1259  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1260  << jsonSourcePath;
1261  return -1;
1262  }
1263 
1264  Json::Value deserializeRoot;
1266 
1267  std::string data;
1268  std::stringstream ss;
1269  bool result;
1270  try {
1271  if (tot_written <= buf_sz) {
1272  result = reader.parse(buf.get(), deserializeRoot);
1273  } else {
1274  //json will normally not be bigger than buf_sz bytes
1275  try {
1276  std::ifstream ij(jsonDestPath);
1277  ss << ij.rdbuf();
1278  } catch (std::filesystem::filesystem_error const& ex) {
1279  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1280  return -1;
1281  }
1282  result = reader.parse(ss.str(), deserializeRoot);
1283  }
1284  if (!result) {
1285  if (tot_written <= buf_sz)
1286  ss << buf.get();
1287  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1288  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1289  << ss.str() << ".";
1290  return -1;
1291  }
1292 
1293  //read BU JSON
1294  DataPoint dp;
1295  dp.deserialize(deserializeRoot);
1296  bool success = false;
1297  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1298  if (dpd_->getNames().at(i) == "NEvents")
1299  if (i < dp.getData().size()) {
1300  data = dp.getData()[i];
1301  success = true;
1302  break;
1303  }
1304  }
1305  if (!success) {
1306  if (!dp.getData().empty())
1307  data = dp.getData()[0];
1308  else {
1309  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1310  << "grabNextJsonFile - "
1311  << " error reading number of events from BU JSON; No input value. data -: " << data;
1312  return -1;
1313  }
1314  }
1315 
1316  //try to read raw file size
1317  fileSizeFromJson = -1;
1318  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1319  if (dpd_->getNames().at(i) == "NBytes") {
1320  if (i < dp.getData().size()) {
1321  std::string dataSize = dp.getData()[i];
1322  try {
1323  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1324  } catch (boost::bad_lexical_cast const&) {
1325  //non-fatal currently, processing can continue without this value
1326  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1327  << "Input value is -: " << dataSize;
1328  }
1329  break;
1330  }
1331  }
1332  }
1333  return boost::lexical_cast<int>(data);
1334  } catch (boost::bad_lexical_cast const& e) {
1335  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1336  << "Input value is -: " << data;
1337  } catch (std::runtime_error const& e) {
1338  //Can be thrown by Json parser
1339  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1340  }
1341 
1342  catch (std::exception const& e) {
1343  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1344  } catch (...) {
1345  //unknown exception
1346  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1347  }
1348 
1349  return -1;
1350  }

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( std::filesystem::path const &  jsonSourcePath)

Definition at line 1352 of file EvFDaqDirector.cc.

1352  {
1353  std::string data;
1354  try {
1355  // assemble json destination path
1356  std::filesystem::path jsonDestPath(baseRunDir());
1357 
1358  //should be ported to use fffnaming
1359  std::ostringstream fileNameWithPID;
1360  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1361  << ".jsn";
1362  jsonDestPath /= fileNameWithPID.str();
1363 
1364  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1365  try {
1366  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1367  } catch (std::filesystem::filesystem_error const& ex) {
1368  // Input dir gone?
1369  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1370  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1371  sleep(1);
1372  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1373  }
1374  unlockFULocal();
1375 
1376  try {
1377  //sometimes this fails but file gets deleted
1378  std::filesystem::remove(jsonSourcePath);
1379  } catch (std::filesystem::filesystem_error const& ex) {
1380  // Input dir gone?
1381  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1382  } catch (std::exception const& ex) {
1383  // Input dir gone?
1384  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1385  }
1386 
1387  std::ifstream ij(jsonDestPath);
1388  Json::Value deserializeRoot;
1390 
1391  std::stringstream ss;
1392  ss << ij.rdbuf();
1393  if (!reader.parse(ss.str(), deserializeRoot)) {
1394  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1395  << "\nERROR:\n"
1396  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1397  << ss.str() << ".";
1398  throw std::runtime_error("Cannot deserialize input JSON file");
1399  }
1400 
1401  //read BU JSON
1402  std::string data;
1403  DataPoint dp;
1404  dp.deserialize(deserializeRoot);
1405  bool success = false;
1406  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1407  if (dpd_->getNames().at(i) == "NEvents")
1408  if (i < dp.getData().size()) {
1409  data = dp.getData()[i];
1410  success = true;
1411  }
1412  }
1413  if (!success) {
1414  if (!dp.getData().empty())
1415  data = dp.getData()[0];
1416  else
1417  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1418  << " error reading number of events from BU JSON -: No input value " << data;
1419  }
1420  return boost::lexical_cast<int>(data);
1421  } catch (std::filesystem::filesystem_error const& ex) {
1422  // Input dir gone?
1423  unlockFULocal();
1424  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1425  } catch (std::runtime_error const& e) {
1426  // Another process grabbed the file and NFS did not register this
1427  unlockFULocal();
1428  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1429  } catch (boost::bad_lexical_cast const&) {
1430  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1431  << "Input value is -: " << data;
1432  } catch (std::exception const& e) {
1433  // BU run directory disappeared?
1434  unlockFULocal();
1435  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1436  }
1437 
1438  return -1;
1439  }

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS,
bool  closeFile 
)

Definition at line 1090 of file EvFDaqDirector.cc.

1096  {
1097  fileFound = true;
1098 
1099  //take only first three tokens delimited by "_" in the renamed raw file name
1100  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1101  size_t pos = 0, n_tokens = 0;
1102  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1103  }
1104  std::string reducedJsonStem = jsonStem.substr(0, pos);
1105 
1106  std::ostringstream fileNameWithPID;
1107  //should be ported to use fffnaming
1108  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1109 
1110  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1111 
1112  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1113 
1114  //parse RAW file header if it exists
1115  uint32_t lsFromRaw;
1116  int32_t nbEventsWrittenRaw;
1117  int64_t fileSizeFromRaw;
1118  auto ret = parseFRDFileHeader(
1119  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1120  if (ret != 0) {
1121  if (ret == 1)
1122  fileFound = false;
1123  return -1;
1124  }
1125 
1126  int outfile;
1127  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1128  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1129  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1130  if (errno == EEXIST) {
1131  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1132  << " : ";
1133  return -1;
1134  }
1135  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1136  << strerror(errno);
1137  struct stat out_stat;
1138  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1139  edm::LogWarning("EvFDaqDirector")
1140  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1141  << jsonDestPath;
1142  if (unlink(jsonDestPath.c_str()) == -1) {
1143  edm::LogWarning("EvFDaqDirector")
1144  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1145  }
1146  }
1147  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1148  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1149  << jsonDestPath << " : " << strerror(errno);
1150  return -1;
1151  }
1152  }
1153  //write JSON file (TODO: use jsoncpp)
1154  std::stringstream ss;
1155  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1156  std::string sstr = ss.str();
1157 
1158  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1159  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1160  << " : " << strerror(errno);
1161  return -1;
1162  }
1163  close(outfile);
1164  if (serverLS && serverLS != lsFromRaw)
1165  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1166  << " and raw file header LS " << lsFromRaw;
1167 
1168  fileSizeFromHeader = fileSizeFromRaw;
1169  return nbEventsWrittenRaw;
1170  }

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker(), and FedRawDataInputSource::readSupervisor().

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 150 of file EvFDaqDirector.cc.

150  {
151  std::stringstream ss;
152  ss << "run" << std::setfill('0') << std::setw(6) << run_;
153  run_string_ = ss.str();
154  ss = std::stringstream();
155  ss << run_;
156  run_nstring_ = ss.str();
157  run_dir_ = base_dir_ + "/" + run_string_;
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161 
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244 
245  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
246 
247  std::filesystem::rename(tmphltdir, hltdir);
248  } else
249  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
250  }
251  //else{}//no configuration specified
252  } else {
253  // for FU, check if bu base dir exists
254 
255  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
256  if (retval != 0 && errno != EEXIST) {
257  throw cms::Exception("DaqDirector")
258  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
259  }
260 
262  fulockfile_ = bu_run_dir_ + "/fu.lock";
263  openFULockfileStream(false);
264  }
265 
266  pthread_mutex_init(&init_lock_, nullptr);
267 
268  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
269  std::stringstream sstp;
270  sstp << stopFilePath_ << "_pid" << pid_;
271  stopFilePathPid_ = sstp.str();
272 
273  if (!directorBU_) {
274  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
275  struct stat statbuf;
276  if (!stat(defPath.c_str(), &statbuf))
277  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
278  else {
279  //look in source directory if not present in ramdisk
280  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
281  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
282  if (stat(defPath.c_str(), &statbuf)) {
283  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
284  if (stat(defPath.c_str(), &statbuf)) {
285  defPath = defPathSuffix;
286  }
287  }
288  }
289  dpd_ = new DataPointDefinition();
290  std::string defLabel = "data";
291  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
292  }
293  }

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), pid_, run_, run_dir_, run_nstring_, run_string_, contentValuesCheck::ss, hgcalPlots::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 122 of file EvFDaqDirector.h.

122 { return nStreams_ == 1 && nThreads_ == 1; }

References nStreams_, and nThreads_.

Referenced by FedRawDataInputSource::getNextEvent().

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 907 of file EvFDaqDirector.cc.

907  {
908  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
909  flock(fulocal_rwlock_fd_, LOCK_SH);
910  }

References fulocal_rwlock_fd_.

Referenced by updateFuLock().

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 917 of file EvFDaqDirector.cc.

917  {
918  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
919  flock(fulocal_rwlock_fd2_, LOCK_EX);
920  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 903 of file EvFDaqDirector.cc.

903 { pthread_mutex_lock(&init_lock_); }

References init_lock_.

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 2017 of file EvFDaqDirector.cc.

2017  {
2018 #ifdef __APPLE__
2019  return {start, len, pid, type, whence};
2020 #else
2021  return {type, whence, start, len, pid};
2022 #endif
2023  }

References command_line::start.

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 884 of file EvFDaqDirector.cc.

884  {
885  if (create) {
887  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
888  chmod(fulockfile_.c_str(), 0766);
889  } else {
890  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
891  }
892  if (fu_readwritelock_fd_ == -1)
893  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
894  << " create:" << create << " error:" << strerror(errno);
895  else
896  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
897 
898  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
899  if (fu_rw_lock_stream == nullptr)
900  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
901  }

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

107 { return outputAdler32Recheck_; }

References outputAdler32Recheck_.

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

75 { run_ = run; }

References writedatasetfile::run, and run_.

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 955 of file EvFDaqDirector.cc.

963  {
964  int infile;
965 
966  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
967  if (retry) {
968  edm::LogWarning("EvFDaqDirector")
969  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
970  return parseFRDFileHeader(rawSourcePath,
971  rawFd,
972  rawHeaderSize,
973  lsFromHeader,
974  eventsFromHeader,
975  fileSizeFromHeader,
976  requireHeader,
977  false,
978  closeFile);
979  } else {
980  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
981  edm::LogError("EvFDaqDirector")
982  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
983  if (errno == ENOENT)
984  return 1; // error && file not found
985  else
986  return -1;
987  }
988  }
989  }
990 
991  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
992  FRDFileHeader_v1 fileHead;
993 
994  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
995  if (closeFile) {
996  close(infile);
997  infile = -1;
998  }
999 
1000  if (sz_read < 0) {
1001  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1002  << strerror(errno);
1003  if (infile != -1)
1004  close(infile);
1005  return -1;
1006  }
1007  if ((size_t)sz_read < buf_sz) {
1008  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1009  if (infile != -1)
1010  close(infile);
1011  return -1;
1012  }
1013 
1014  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1015 
1016  if (frd_version == 0) {
1017  //no header (specific sequence not detected)
1018  if (requireHeader) {
1019  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1020  if (infile != -1)
1021  close(infile);
1022  return -1;
1023  } else {
1024  //no header, but valid file
1025  lseek(infile, 0, SEEK_SET);
1026  rawHeaderSize = 0;
1027  lsFromHeader = 0;
1028  eventsFromHeader = -1;
1029  fileSizeFromHeader = -1;
1030  }
1031  } else {
1032  //version 1 header
1033  uint32_t headerSizeRaw = fileHead.headerSize_;
1034  if (headerSizeRaw < buf_sz) {
1035  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1036  << " v:" << frd_version;
1037  if (infile != -1)
1038  close(infile);
1039  return -1;
1040  }
1041  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1042  lsFromHeader = fileHead.lumiSection_;
1043  eventsFromHeader = (int32_t)fileHead.eventCount_;
1044  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1045  rawHeaderSize = fileHead.headerSize_;
1046  }
1047  rawFd = infile;
1048  return 0; //OK
1049  }

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and readEcalDQMStatus::read.

Referenced by grabNextJsonFromRaw(), and FedRawDataInputSource::readSupervisor().

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 368 of file EvFDaqDirector.cc.

368  {
369  close(bu_readlock_fd_);
370  close(bu_writelock_fd_);
371  if (directorBU_) {
372  std::string filename = bu_run_dir_ + "/bu.lock";
374  }
375  }

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBU_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 314 of file EvFDaqDirector.cc.

314  {
315  initRun();
316 
317  nThreads_ = bounds.maxNumberOfStreams();
318  nStreams_ = bounds.maxNumberOfThreads();
319  }

References initRun(), edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 353 of file EvFDaqDirector.cc.

353  {
355  checkMergeTypePSet(pc);
356  }

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 358 of file EvFDaqDirector.cc.

358  {
359  //assert(run_ == id.run());
360 
361  // check if the requested run is the latest one - issue a warning if it isn't
363  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
364  << ". This is not the highest run " << dirManager_.findHighestRunDir();
365  }
366  }

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 377 of file EvFDaqDirector.cc.

377  {
378  //delete all files belonging to just closed lumi
379  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
381  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
382  return;
383  }
384 
385  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
386  auto it = filesToDeletePtr_->begin();
387  while (it != filesToDeletePtr_->end()) {
388  if (it->second->lumi_ == ls) {
389  it = filesToDeletePtr_->erase(it);
390  } else
391  it++;
392  }
393  }

References fileDeleteLockPtr_, filesToDeletePtr_, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

◆ rawFileHasHeader()

bool evf::EvFDaqDirector::rawFileHasHeader ( std::string const &  rawSourcePath,
uint16_t &  rawHeaderSize 
)

Definition at line 1051 of file EvFDaqDirector.cc.

1051  {
1052  int infile;
1053  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1054  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1055  << strerror(errno);
1056  return false;
1057  }
1058  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1059  FRDFileHeader_v1 fileHead;
1060 
1061  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1062 
1063  if (sz_read < 0) {
1064  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1065  << strerror(errno);
1066  if (infile != -1)
1067  close(infile);
1068  return false;
1069  }
1070  if ((size_t)sz_read < buf_sz) {
1071  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1072  if (infile != -1)
1073  close(infile);
1074  return false;
1075  }
1076 
1077  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1078 
1079  close(infile);
1080 
1081  if (frd_version > 0) {
1082  rawHeaderSize = fileHead.headerSize_;
1083  return true;
1084  }
1085 
1086  rawHeaderSize = 0;
1087  return false;
1088  }

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and readEcalDQMStatus::read.

Referenced by bumpFile().

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1842 of file EvFDaqDirector.cc.

1842  {
1843  std::ifstream ij(file);
1844  Json::Value deserializeRoot;
1846 
1847  if (!reader.parse(ij, deserializeRoot)) {
1848  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1849  return -1;
1850  }
1851 
1852  int ret = deserializeRoot.get("lastLS", "").asInt();
1853  return ret;
1854  }

References Json::Value::asInt(), FrontierConditions_GlobalTag_cff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ removeFile() [1/2]

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 488 of file EvFDaqDirector.cc.

488  {
489  int retval = remove(filename.c_str());
490  if (retval != 0)
491  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
492  << ". error = " << strerror(errno);
493  }

References corrVsCorr::filename, and MatrixUtil::remove().

◆ removeFile() [2/2]

void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 495 of file EvFDaqDirector.cc.

References getRawFilePath(), and eostools::ls().

Referenced by postEndRun().

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 175 of file EvFDaqDirector.h.

176  {
177  fileDeleteLockPtr_ = fileDeleteLock;
178  filesToDeletePtr_ = filesToDelete;
179  }

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 121 of file EvFDaqDirector.h.

121 { fms_ = fms; }

References fms_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 874 of file EvFDaqDirector.cc.

874  {
875  if (fu_rw_lock_stream == nullptr)
876  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
877  else {
878  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
879  unsigned int readLs = 1, readIndex = 0;
880  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
881  }
882  }

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 912 of file EvFDaqDirector.cc.

912  {
913  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
914  flock(fulocal_rwlock_fd_, LOCK_UN);
915  }

References fulocal_rwlock_fd_.

Referenced by grabNextJsonFileAndUnlock(), FedRawDataInputSource::readSupervisor(), and ~EvFDaqDirector().

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 922 of file EvFDaqDirector.cc.

922  {
923  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
924  flock(fulocal_rwlock_fd2_, LOCK_UN);
925  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 905 of file EvFDaqDirector.cc.

905 { pthread_mutex_unlock(&init_lock_); }

References init_lock_.

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint16_t &  rawHeaderSize,
uint64_t &  lockWaitTime,
bool &  setExceptionState 
)

Definition at line 497 of file EvFDaqDirector.cc.

502  {
503  EvFDaqDirector::FileStatus fileStatus = noFile;
504  rawHeaderSize = 0;
505 
506  int retval = -1;
507  int lock_attempts = 0;
508  long total_lock_attempts = 0;
509 
510  struct stat buf;
511  int stopFileLS = -1;
512  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
513  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
514  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
515  if (stopFileCheck == 0)
516  stopFileLS = readLastLSEntry(stopFilePath_);
517  else
518  stopFileLS = 1; //stop without drain if only pid is stopped
519  if (!stop_ls_override_) {
520  //if lumisection is higher than in stop file, should quit at next from current
521  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
522  stopFileLS = stop_ls_override_ = ls;
523  } else
524  stopFileLS = stop_ls_override_;
525  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
526  << stopFileLS;
527  //return runEnded;
528  } else //if file was removed before reaching stop condition, reset this
529  stop_ls_override_ = 0;
530 
531  timeval ts_lockbegin;
532  gettimeofday(&ts_lockbegin, nullptr);
533 
534  while (retval == -1) {
535  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
536  if (retval == -1)
537  usleep(fuLockPollInterval_);
538  else
539  continue;
540 
541  lock_attempts += fuLockPollInterval_;
542  total_lock_attempts += fuLockPollInterval_;
543  if (lock_attempts > 5000000 || errno == 116) {
544  if (errno == 116)
545  edm::LogWarning("EvFDaqDirector")
546  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
547  else
548  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
549  "fu.lock file are present -: errno "
550  << errno << ":" << strerror(errno) << std::endl;
551 
552  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
553  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
554  ls++;
555  return noFile;
556  }
557 
558  if (stat(bu_run_dir_.c_str(), &buf) != 0)
559  return runEnded;
560  if (stat(fulockfile_.c_str(), &buf) != 0)
561  return runEnded;
562 
563  lock_attempts = 0;
564  }
565  if (total_lock_attempts > 5 * 60000000) {
566  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
567  return runAbort;
568  }
569  }
570 
571  timeval ts_lockend;
572  gettimeofday(&ts_lockend, nullptr);
573  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
574  if (deltat > 0.)
575  lockWaitTime = deltat;
576 
577  if (retval != 0)
578  return fileStatus;
579 
580 #ifdef DEBUG
581  timeval ts_lockend;
582  gettimeofday(&ts_lockend, 0);
583 #endif
584 
585  //open another lock file FD after the lock using main fd has been acquired
586  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
587  if (fu_readwritelock_fd2 == -1)
588  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
589  << " create. error:" << strerror(errno);
590 
591  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
592 
593  // if the stream is readable
594  if (fu_rw_lock_stream2 != nullptr) {
595  unsigned int readLs, readIndex;
596  int check = 0;
597  // rewind the stream
598  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
599  // if rewinded ok
600  if (check == 0) {
601  // read its' values
602  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
603  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
604 
605  unsigned int currentLs = readLs;
606  bool bumpedOk = false;
607  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
608  //no lock file write in this case
609  if (ls && ls + 1 < currentLs)
610  ls++;
611  else {
612  // try to bump (look for new index or EoLS file)
613  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
614  //avoid 2 lumisections jump
615  if (ls && readLs > currentLs && currentLs > ls) {
616  ls++;
617  readLs = currentLs = ls;
618  readIndex = 0;
619  bumpedOk = false;
620  //no write to lock file
621  } else {
622  if (ls == 0 && readLs > currentLs) {
623  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
624  //in this case there is no new file in the same LS
625  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
626  readLs = currentLs;
627  readIndex = 0;
628  bumpedOk = false;
629  //no write to lock file
630  }
631  //update return LS value
632  ls = readLs;
633  }
634  }
635  if (bumpedOk) {
636  // there is a new index file to grab, lock file needs to be updated
637  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
638  if (check == 0) {
639  ftruncate(fu_readwritelock_fd2, 0);
640  // write next index in the file, which is the file the next process should take
641  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
642  fflush(fu_rw_lock_stream2);
643  fsync(fu_readwritelock_fd2);
644  fileStatus = newFile;
645  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
646  } else {
647  edm::LogError("EvFDaqDirector")
648  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
649  setExceptionState = true;
650  return noFile;
651  }
652  } else if (currentLs < readLs) {
653  //there is no new file in next LS (yet), but lock file can be updated to the next LS
654  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
655  if (check == 0) {
656  ftruncate(fu_readwritelock_fd2, 0);
657  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
658  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
659  fflush(fu_rw_lock_stream2);
660  fsync(fu_readwritelock_fd2);
661  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
662  } else {
663  edm::LogError("EvFDaqDirector")
664  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
665  setExceptionState = true;
666  return noFile;
667  }
668  }
669  } else {
670  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
671  << strerror(errno);
672  }
673  } else {
674  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
675  }
676  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
677 
678 #ifdef DEBUG
679  timeval ts_preunlock;
680  gettimeofday(&ts_preunlock, 0);
681  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
682  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
683 #endif
684 
685  //if new json is present, lock file which FedRawDataInputSource will later unlock
686  if (fileStatus == newFile)
687  lockFULocal();
688 
689  //release lock at this point
690  int retvalu = -1;
691  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
692  if (retvalu == -1)
693  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
694 
695 #ifdef DEBUG
696  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
697 #endif
698 
699  if (fileStatus == noFile) {
700  struct stat buf;
701  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
702  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
703  fileStatus = runEnded;
704  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
705  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
706  fileStatus = runEnded;
707  }
708  }
709  return fileStatus;
710  }

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, hgcalPlots::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 79 of file EvFDaqDirector.h.

79 { return useFileBroker_; }

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 203 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 204 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 248 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 250 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 238 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 231 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 228 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 241 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 247 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 249 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 240 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 277 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 282 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 265 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 208 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 207 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 210 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 209 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 211 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 256 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by bumpFile(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 233 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 252 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 234 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 229 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 212 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 259 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 279 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 271 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 274 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 261 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 262 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 213 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 245 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 281 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 264 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 214 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 280 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and initRun().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by getLumisectionToStart(), and initRun().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 283 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getLumisectionToStart().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 266 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 206 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and useFileBroker().

runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:367
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:314
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:217
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:245
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
Definition: EvFDaqDirector.cc:773
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:214
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:922
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:282
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:243
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
FRDFileHeader_v1::lumiSection_
uint32_t lumiSection_
Definition: FRDFileHeader.h:37
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:927
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:210
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:399
mps_fire.i
i
Definition: mps_fire.py:428
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:234
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:41
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:206
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:259
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:470
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:257
evf::EvFDaqDirector::bu_r_fulk
struct flock bu_r_fulk
Definition: EvFDaqDirector.h:250
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:482
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:264
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:216
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
FRDFileHeader_v1::headerSize_
uint16_t headerSize_
Definition: FRDFileHeader.h:35
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:333
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:228
FRDFileHeader_v1::id_
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:33
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames()
Definition: DataPointDefinition.h:44
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_w_fulk
struct flock bu_w_fulk
Definition: EvFDaqDirector.h:249
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:231
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:262
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1871
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
evf::EvFDaqDirector::make_flock
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
Definition: EvFDaqDirector.cc:2017
pos
Definition: PixelAliasList.h:18
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:232
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1441
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:874
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:279
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:268
mps_check.msg
tuple msg
Definition: mps_check.py:285
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:208
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::bu_r_flk
struct flock bu_r_flk
Definition: EvFDaqDirector.h:248
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:281
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:353
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:652
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:884
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:474
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:266
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1119
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:157
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:150
DQM.reader
reader
Definition: DQM.py:105
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:955
FRDFileHeader_v1::eventCount_
uint16_t eventCount_
Definition: FRDFileHeader.h:36
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:215
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:254
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:115
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:205
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:265
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:225
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:211
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:274
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:256
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:235
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:223
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:47
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:358
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:209
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
evf::EvFDaqDirector::outputAdler32Recheck_
bool outputAdler32Recheck_
Definition: EvFDaqDirector.h:213
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:271
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:912
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
FRDFileHeader_v1::version_
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:34
recoMuon::in
Definition: RecoMuonEnumerators.h:6
gainCalibHelper::gainCalibPI::type
type
Definition: SiPixelGainCalibHelper.h:39
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:226
edm::Service
Definition: Service.h:30
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:23
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:212
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:907
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:204
jsoncollector::DataPoint
Definition: DataPoint.h:36
DBConfiguration_cff.connect
connect
Definition: DBConfiguration_cff.py:18
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1832
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:252
res
Definition: Electron.h:6
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
Definition: EvFDaqDirector.cc:937
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:218
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:220
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1842
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:404
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:143
FRDFileHeader_v1
Definition: FRDFileHeader.h:22
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:251
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:223
evf::DirManager::findRunDir
std::string findRunDir(unsigned int)
Definition: DirManager.cc:43
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:283
writedatasetfile.run
run
Definition: writedatasetfile.py:27
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:150
evf::EvFDaqDirector::rawFileHasHeader
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
Definition: EvFDaqDirector.cc:1051
evf::EvFDaqDirector::bu_w_flk
struct flock bu_w_flk
Definition: EvFDaqDirector.h:247
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:246
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:229
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:377
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:270
FRDFileHeader_v1::fileSize_
uint64_t fileSize_
Definition: FRDFileHeader.h:38
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:395
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:222
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:277
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:280
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:495
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:261
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:350
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:233
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:227
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:478
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:311
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:203
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:351
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:316
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:239
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:267
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:224
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:368
edm::Log
Definition: MessageLogger.h:70
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:917
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1172
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:712
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2128
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:237
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:207
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::bu_r_lock_stream
FILE * bu_r_lock_stream
Definition: EvFDaqDirector.h:238
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1090
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1979