CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (boost::filesystem::path const &jsonSourcePath)
 
int grabNextJsonFromRaw (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS)
 
void initRun ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (std::string)
 
void removeFile (unsigned int ls, unsigned int index)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 
static int parseFRDFileHeader (std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBU_
 
DirManager dirManager_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerHostFromCfg_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nStreams_ = 0
 
unsigned int nThreads_ = 0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

◆ FileStatus

Enumerator
noFile 
sameFile 
newFile 
newLumi 
runEnded 
runAbort 

Definition at line 64 of file EvFDaqDirector.h.

Constructor & Destructor Documentation

◆ EvFDaqDirector()

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 40 of file EvFDaqDirector.cc.

41  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
43  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
44  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
45  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
50  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
51  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
52  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
53  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
54  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname, 32);
83  hostname_ = hostname;
84 
85  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
90  << " us";
91  } catch (boost::bad_lexical_cast const&) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  } catch (boost::bad_lexical_cast const&) {
103  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
104  }
105  }
106  if (useFileBroker_) {
108  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config", &buf) == 0) {
112  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
113  std::getline(busconfig, fileBrokerHost_);
114  }
115  if (fileBrokerHost_.empty())
116  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
117  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
118  throw cms::Exception("EvFDaqDirector")
119  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
120 
121  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
122  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
123  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
124  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
125  }
126 
127  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  } catch (boost::bad_lexical_cast const&) {
133  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
134  }
135  }
136 
137  //override file service parameter if specified by environment
138  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
139  if (fileBrokerUseLockParamPtr) {
140  try {
141  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
142  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
144  } catch (boost::bad_lexical_cast const&) {
145  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
146  }
147  }
148  }

References visDQMUpload::buf, endpoint_iterator_, fileBrokerHost_, fileBrokerHostFromCfg_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, io_service_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, socket_, startFromLS_, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

◆ ~EvFDaqDirector()

evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 295 of file EvFDaqDirector.cc.

295  {
296  //close server connection
297  if (socket_.get() && socket_->is_open()) {
298  boost::system::error_code ec;
299  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
300  socket_->close(ec);
301  }
302 
303  if (fulocal_rwlock_fd_ != -1) {
304  unlockFULocal();
305  close(fulocal_rwlock_fd_);
306  }
307 
308  if (fulocal_rwlock_fd2_ != -1) {
309  unlockFULocal2();
310  close(fulocal_rwlock_fd2_);
311  }
312  }

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

Member Function Documentation

◆ baseRunDir()

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 76 of file EvFDaqDirector.h.

76 { return run_dir_; }

References run_dir_.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and grabNextJsonFromRaw().

◆ buBaseRunDir()

std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 77 of file EvFDaqDirector.h.

77 { return bu_run_dir_; }

References bu_run_dir_.

◆ buBaseRunOpenDir()

std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

78 { return bu_run_open_dir_; }

References bu_run_open_dir_.

◆ bumpFile()

bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
int  maxLS 
)
private

Definition at line 766 of file EvFDaqDirector.cc.

767  {
768  if (previousFileSize_ != 0) {
769  if (!fms_) {
771  }
772  if (fms_)
774  previousFileSize_ = 0;
775  }
776 
777  //reached limit
778  if (maxLS >= 0 && ls > (unsigned int)maxLS)
779  return false;
780 
781  struct stat buf;
782  std::stringstream ss;
783  unsigned int nextIndex = index;
784  nextIndex++;
785 
786  // 1. Check suggested file
787  nextFile = getInputJsonFilePath(ls, index);
788  if (stat(nextFile.c_str(), &buf) == 0) {
789  previousFileSize_ = buf.st_size;
790  fsize = buf.st_size;
791  return true;
792  }
793  // 2. No file -> lumi ended? (and how many?)
794  else {
795  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
796  bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
797  while (eolFound) {
798  // recheck that no raw file appeared in the meantime
799  if (stat(nextFile.c_str(), &buf) == 0) {
800  previousFileSize_ = buf.st_size;
801  fsize = buf.st_size;
802  return true;
803  }
804 
805  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
806  if (indexFilesInLS < 0)
807  //parsing failed
808  return false;
809  else {
810  //check index
811  if ((int)index < indexFilesInLS) {
812  //we have 2 files, and check for 1 failed... retry (2 will never be here)
813  edm::LogError("EvFDaqDirector")
814  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
815  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
816  return false;
817  }
818  }
819  // this lumi ended, check for files
820  ++ls;
821  index = 0;
822 
823  //reached limit
824  if (maxLS >= 0 && ls > (unsigned int)maxLS)
825  return false;
826 
827  nextFile = getInputJsonFilePath(ls, 0);
828  if (stat(nextFile.c_str(), &buf) == 0) {
829  // a new file was found at new lumisection, index 0
830  previousFileSize_ = buf.st_size;
831  fsize = buf.st_size;
832  return true;
833  } else {
834  //change of policy: we need to cycle through each LS
835  return false;
836  }
837  BUEoLSFile = getEoLSFilePathOnBU(ls);
838  eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
839  }
840  }
841  // no new file found
842  return false;
843  }

References evf::FastMonitoringService::accumulateFileSize(), visDQMUpload::buf, fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), eostools::ls(), previousFileSize_, contentValuesCheck::ss, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

◆ checkMergeTypePSet()

void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 1910 of file EvFDaqDirector.cc.

1910  {
1911  if (mergeTypePset_.empty())
1912  return;
1913  if (!mergeTypeMap_.empty())
1914  return;
1915  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1916  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
1917  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1918  for (std::string pname : tsPset.getParameterNames()) {
1919  std::string streamType = tsPset.getParameter<std::string>(pname);
1920  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1921  mergeTypeMap_.insert(ac, pname);
1922  ac->second = streamType;
1923  ac.release();
1924  }
1925  }
1926  }

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), unpackData-CaloStage2::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

◆ checkTransferSystemPSet()

void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1802 of file EvFDaqDirector.cc.

1802  {
1803  if (transferSystemJson_)
1804  return;
1805 
1806  transferSystemJson_.reset(new Json::Value);
1807  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1808  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1809  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1810 
1811  Json::Value destinationsVal(Json::arrayValue);
1812  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1813  for (auto& dest : destinations)
1814  destinationsVal.append(dest);
1815  (*transferSystemJson_)["destinations"] = destinationsVal;
1816 
1817  Json::Value modesVal(Json::arrayValue);
1818  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1819  for (auto& mode : modes)
1820  modesVal.append(mode);
1821  (*transferSystemJson_)["transferModes"] = modesVal;
1822 
1823  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1824  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1825  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1826  Json::Value streamVal;
1827  for (auto& mode : modes) {
1828  //validation
1829  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1830  throw cms::Exception("EvFDaqDirector")
1831  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1832  << ")";
1833  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1834 
1835  Json::Value sDestsValue(Json::arrayValue);
1836 
1837  if (streamDestinations.empty())
1838  throw cms::Exception("EvFDaqDirector")
1839  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1840 
1841  for (auto& sdest : streamDestinations) {
1842  bool sDestValid = false;
1843  sDestsValue.append(sdest);
1844  for (auto& dest : destinations) {
1845  if (dest == sdest)
1846  sDestValid = true;
1847  }
1848  if (!sDestValid)
1849  throw cms::Exception("EvFDaqDirector")
1850  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1851  << ", dest:" << sdest;
1852  }
1853  streamVal[mode] = sDestsValue;
1854  }
1855  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1856  }
1857  }
1858  } else {
1859  if (requireTSPSet_)
1860  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1861  }
1862  }

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

◆ contactFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
bool &  rawHeader,
int  maxLS 
)

Definition at line 1372 of file EvFDaqDirector.cc.

1379  {
1380  EvFDaqDirector::FileStatus fileStatus = noFile;
1381  serverError = false;
1382 
1383  boost::system::error_code ec;
1384  try {
1385  while (true) {
1386  //socket connect
1387  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1389 
1390  if (ec) {
1391  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1392  serverError = true;
1393  break;
1394  }
1395  }
1396 
1397  boost::asio::streambuf request;
1398  std::ostream request_stream(&request);
1399  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1400  if (maxLS >= 0) {
1401  std::stringstream spath;
1402  spath << path << "&stopls=" << maxLS;
1403  path = spath.str();
1404  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1405  }
1406  request_stream << "GET " << path << " HTTP/1.1\r\n";
1407  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1408  request_stream << "Accept: */*\r\n";
1409  request_stream << "Connection: keep-alive\r\n\r\n";
1410 
1411  boost::asio::write(*socket_, request, ec);
1412  if (ec) {
1413  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1414  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1415  //we got disconnected, try to reconnect to the server before writing the request
1417  if (ec) {
1418  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1419  serverError = true;
1420  break;
1421  }
1422  continue;
1423  }
1424  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1425  serverError = true;
1426  break;
1427  }
1428 
1429  boost::asio::streambuf response;
1430  boost::asio::read_until(*socket_, response, "\r\n", ec);
1431  if (ec) {
1432  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1433  serverError = true;
1434  break;
1435  }
1436 
1437  std::istream response_stream(&response);
1438 
1439  std::string http_version;
1440  response_stream >> http_version;
1441 
1442  response_stream >> serverHttpStatus;
1443 
1444  std::string status_message;
1445  std::getline(response_stream, status_message);
1446  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1447  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1448  serverError = true;
1449  break;
1450  }
1451  if (serverHttpStatus != 200) {
1452  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1453  serverError = true;
1454  break;
1455  }
1456 
1457  // Process the response headers.
1459  while (std::getline(response_stream, header) && header != "\r") {
1460  }
1461 
1462  std::string fileInfo;
1463  std::map<std::string, std::string> serverMap;
1464  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1465  auto pos = fileInfo.find("=");
1466  if (pos == std::string::npos)
1467  continue;
1468  auto stitle = fileInfo.substr(0, pos);
1469  auto svalue = fileInfo.substr(pos + 1);
1470  serverMap[stitle] = svalue;
1471  }
1472 
1473  //check that response run number if correct
1474  auto server_version = serverMap.find("version");
1475  assert(server_version != serverMap.end());
1476 
1477  auto server_run = serverMap.find("runnumber");
1478  assert(server_run != serverMap.end());
1479  assert(run_nstring_ == server_run->second);
1480 
1481  auto server_state = serverMap.find("state");
1482  assert(server_state != serverMap.end());
1483 
1484  auto server_eols = serverMap.find("lasteols");
1485  assert(server_eols != serverMap.end());
1486 
1487  auto server_ls = serverMap.find("lumisection");
1488 
1489  int version_maj = 1;
1490  int version_min = 0;
1491  int version_rev = 0;
1492  {
1493  auto* s_ptr = server_version->second.c_str();
1494  if (!server_version->second.empty() && server_version->second[0] == '"')
1495  s_ptr++;
1496  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1497  if (res < 3) {
1498  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1499  if (res < 2) {
1500  res = sscanf(s_ptr, "%d", &version_maj);
1501  if (res < 1) {
1502  //expecting at least 1 number (major version)
1503  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1504  }
1505  }
1506  }
1507  }
1508 
1509  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1510  if (server_ls != serverMap.end())
1511  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1512  else
1513  serverLS = closedServerLS + 1;
1514 
1515  std::string s_state = server_state->second;
1516  if (s_state == "STARTING") //initial, always empty starting with LS 1
1517  {
1518  auto server_file = serverMap.find("file");
1519  assert(server_file == serverMap.end()); //no file with starting state
1520  fileStatus = noFile;
1521  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1522  } else if (s_state == "READY") {
1523  auto server_file = serverMap.find("file");
1524  if (server_file == serverMap.end()) {
1525  //can be returned by server if files from new LS already appeared but LS is not yet closed
1526  if (serverLS <= closedServerLS)
1527  serverLS = closedServerLS + 1;
1528  fileStatus = noFile;
1529  edm::LogInfo("EvFDaqDirector")
1530  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1531  } else {
1532  std::string filestem;
1533  std::string fileprefix;
1534  auto server_fileprefix = serverMap.find("fileprefix");
1535 
1536  if (server_fileprefix != serverMap.end()) {
1537  auto pssize = server_fileprefix->second.size();
1538  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1539  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1540  else
1541  fileprefix = server_fileprefix->second;
1542  }
1543 
1544  //remove string literals
1545  auto ssize = server_file->second.size();
1546  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1547  filestem = server_file->second.substr(1, ssize - 2);
1548  else
1549  filestem = server_file->second;
1550  assert(!filestem.empty());
1551  if (version_maj > 1) {
1552  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1553  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1554  nextFileJson = "";
1555  rawHeader = true;
1556  } else {
1557  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1558  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1559  nextFileJson = filestem + ".jsn";
1560  rawHeader = false;
1561  }
1562  fileStatus = newFile;
1563  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1564  << serverLS << " file:" << filestem;
1565  }
1566  } else if (s_state == "EOLS") {
1567  serverLS = closedServerLS + 1;
1568  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1569  fileStatus = noFile;
1570  } else if (s_state == "EOR") {
1571  //server_eor = serverMap.find("iseor");
1572  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1573  fileStatus = runEnded;
1574  } else if (s_state == "NORUN") {
1575  auto err_msg = serverMap.find("errormessage");
1576  if (err_msg != serverMap.end())
1577  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1578  else
1579  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1580  edm::LogWarning("EvFDaqDirector") << "executing run end";
1581  fileStatus = runEnded;
1582  } else if (s_state == "ERROR") {
1583  auto err_msg = serverMap.find("errormessage");
1584  if (err_msg != serverMap.end())
1585  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1586  else
1587  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1588  fileStatus = noFile;
1589  serverError = true;
1590  } else {
1591  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1592  fileStatus = noFile;
1593  serverError = true;
1594  }
1595 
1596  // Read until EOF, writing data to output as we go.
1597  if (!fileBrokerKeepAlive_) {
1598  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1599  }
1600  if (ec != boost::asio::error::eof) {
1601  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1602  serverError = true;
1603  }
1604  }
1605  break;
1606  }
1607  } catch (std::exception const& e) {
1608  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1609  serverError = true;
1610  }
1611 
1612  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1613  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1614  if (ec) {
1615  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1616  }
1617  socket_->close(ec);
1618  if (ec) {
1619  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1620  }
1621  }
1622 
1623  if (serverError) {
1624  if (socket_->is_open())
1625  socket_->close(ec);
1626  if (ec) {
1627  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1628  }
1629  fileStatus = noFile;
1630  sleep(1); //back-off if error detected
1631  }
1632  return fileStatus;
1633  }

References cms::cuda::assert(), bu_run_dir_, DBConfiguration_cff::connect, MillePedeFileConverter_cfg::e, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ createBoLSFile()

void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 898 of file EvFDaqDirector.cc.

898  {
899  //used for backpressure mechanisms and monitoring
900  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
901  struct stat buf;
902  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
903  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
904  close(bol_fd);
905  }
906  }

References visDQMUpload::buf, getBoLSFilePathOnFU(), hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ createLumiSectionFiles()

void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS = true 
)

Definition at line 908 of file EvFDaqDirector.cc.

910  {
911  if (currentLumiSection > 0) {
912  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
913  struct stat buf;
914  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
915  if (!found) {
916  int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
917  close(eol_fd);
918  if (doCreateBoLS)
919  createBoLSFile(lumiSection, false);
920  }
921  } else if (doCreateBoLS) {
922  createBoLSFile(lumiSection, true); //needed for initial lumisection
923  }
924  }

References visDQMUpload::buf, createBoLSFile(), newFWLiteAna::found, getEoLSFilePathOnFU(), hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

◆ createProcessingNotificationMaybe()

void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 1942 of file EvFDaqDirector.cc.

1942  {
1943  std::string proc_flag = run_dir_ + "/processing";
1944  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1945  close(proc_flag_fd);
1946  }

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNext().

◆ createRunOpendirMaybe()

void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1763 of file EvFDaqDirector.cc.

1763  {
1764  // create open dir if not already there
1765 
1767  if (!boost::filesystem::is_directory(openPath)) {
1768  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1769  boost::filesystem::create_directories(openPath);
1770  }
1771  }

References getRunOpenDirPath(), LogDebug, and castor_dqm_sourceclient_file_cfg::path.

Referenced by initRun().

◆ eolsFileName()

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private

◆ eorFileName()

std::string evf::EvFDaqDirector::eorFileName ( ) const
private

◆ fillDescriptions()

void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 321 of file EvFDaqDirector.cc.

321  {
323  desc.setComment(
324  "Service used for file locking arbitration and for propagating information between other EvF components");
325  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
326  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
327  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
328  desc.addUntracked<bool>("useFileBroker", false)
329  ->setComment("Use BU file service to grab input data instead of NFS file locking");
330  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
331  ->setComment("Allow service to discover BU address from hltd configuration");
332  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
333  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
334  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
335  ->setComment("Use keep alive to avoid using large number of sockets");
336  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
337  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
338  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
339  ->setComment("Lock polling interval in microseconds for the input directory file lock");
340  desc.addUntracked<bool>("outputAdler32Recheck", false)
341  ->setComment("Check Adler32 of per-process output files while micro-merging");
342  desc.addUntracked<bool>("requireTransfersPSet", false)
343  ->setComment("Require complete transferSystem PSet in the process configuration");
344  desc.addUntracked<std::string>("selectedTransferMode", "")
345  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
346  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
347  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
348  desc.addUntracked<std::string>("mergingPset", "")
349  ->setComment("Name of merging PSet to look for merging type definitions for streams");
350  descriptions.add("EvFDaqDirector", desc);
351  }

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setComment(), and AlCaHLTBitMon_QueryRunRegistry::string.

◆ findCurrentRunDir()

std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 81 of file EvFDaqDirector.h.

81 { return dirManager_.findRunDir(run_); }

References dirManager_, evf::DirManager::findRunDir(), and run_.

◆ getBoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 478 of file EvFDaqDirector.cc.

478  {
479  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
480  }

References fffnaming::bolsFileName(), eostools::ls(), run_, and run_dir_.

Referenced by createBoLSFile().

◆ getDatFilePath()

std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getEoLSFilePathOnBU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 470 of file EvFDaqDirector.cc.

470  {
471  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
472  }

References bu_run_dir_, fffnaming::eolsFileName(), eostools::ls(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNext().

◆ getEoLSFilePathOnFU()

std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

◆ getEoRFilePath()

std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 482 of file EvFDaqDirector.cc.

482 { return bu_run_dir_ + "/" + fffnaming::eorFileName(run_); }

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

◆ getEoRFilePathOnFU()

std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 484 of file EvFDaqDirector.cc.

484 { return run_dir_ + "/" + fffnaming::eorFileName(run_); }

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNext().

◆ getFFFParamsFilePathOnBU()

std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 486 of file EvFDaqDirector.cc.

486 { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }

References bu_run_dir_.

◆ getInitFilePath()

std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 439 of file EvFDaqDirector.cc.

439  {
441  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

Referenced by dqm::DQMFileSaverPB::initRun().

◆ getInputJsonFilePath()

std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 395 of file EvFDaqDirector.cc.

395  {
397  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

Referenced by bumpFile().

◆ getLumisectionToStart()

unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1787 of file EvFDaqDirector.cc.

1787  {
1788  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1790  struct stat buf;
1791  unsigned int lscount = startFromLS_;
1792  do {
1793  std::stringstream ss;
1794  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1795  fullpath = ss.str();
1796  lscount++;
1797  } while (stat(fullpath.c_str(), &buf) == 0);
1798  return lscount - 1;
1799  }

References visDQMUpload::buf, reco_skim_cfg_mod::fullpath, run_dir_, run_string_, contentValuesCheck::ss, startFromLS_, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

◆ getMergedDatChecksumFilePath()

std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedDatFilePath()

std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getMergedRootHistogramFilePath()

std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getNextFromFileBroker()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  rawFd,
uint16_t &  rawHeaderSize,
int32_t &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1635 of file EvFDaqDirector.cc.

1642  {
1643  EvFDaqDirector::FileStatus fileStatus = noFile;
1644 
1645  //int retval = -1;
1646  //int lock_attempts = 0;
1647  //long total_lock_attempts = 0;
1648 
1649  struct stat buf;
1650  int stopFileLS = -1;
1651  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1652  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1653  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1654  if (stopFileCheck == 0)
1655  stopFileLS = readLastLSEntry(stopFilePath_);
1656  else
1657  stopFileLS = 1; //stop without drain if only pid is stopped
1658  if (!stop_ls_override_) {
1659  //if lumisection is higher than in stop file, should quit at next from current
1660  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1661  stopFileLS = stop_ls_override_ = ls;
1662  } else
1663  stopFileLS = stop_ls_override_;
1664  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1665  << stopFileLS;
1666  //return runEnded;
1667  } else //if file was removed before reaching stop condition, reset this
1668  stop_ls_override_ = 0;
1669 
1670  /* look for EoLS
1671  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1672  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1673  ls++;
1674  return noFile;
1675  }
1676  */
1677 
1678  timeval ts_lockbegin;
1679  gettimeofday(&ts_lockbegin, nullptr);
1680 
1681  std::string nextFileJson;
1682  uint32_t serverLS, closedServerLS;
1683  unsigned int serverHttpStatus;
1684  bool serverError;
1685 
1686  //local lock to force index json and EoLS files to appear in order
1688  lockFULocal2();
1689 
1690  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1691  bool rawHeader = false;
1692  fileStatus = contactFileBroker(
1693  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1694 
1695  if (serverError) {
1696  //do not update anything
1698  unlockFULocal2();
1699  return noFile;
1700  }
1701 
1702  //handle creation of EoLS and BoLS files if lumisection has changed
1703  if (currentLumiSection == 0) {
1704  if (fileStatus == runEnded) {
1705  createLumiSectionFiles(closedServerLS, 0);
1706  createLumiSectionFiles(serverLS, closedServerLS, false); // +1
1707  } else
1708  createLumiSectionFiles(serverLS, 0);
1709  } else {
1710  //loop over and create any EoLS files missing
1711  if (closedServerLS >= currentLumiSection) {
1712  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1713  createLumiSectionFiles(i + 1, i);
1714  }
1715  }
1716 
1717  bool fileFound = true;
1718 
1719  if (fileStatus == newFile) {
1720  if (rawHeader > 0)
1721  serverEventsInNewFile =
1722  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS);
1723  else
1724  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1725  }
1726  //closing file in case of any error
1727  if (serverEventsInNewFile < 0 && rawFd != -1) {
1728  close(rawFd);
1729  rawFd = -1;
1730  }
1731  if (!fileFound) {
1732  //catch condition where directory got deleted
1733  fileStatus = noFile;
1734  struct stat buf;
1735  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1736  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1737  fileStatus = runEnded;
1738  }
1739  }
1740 
1741  //can unlock because all files have been created locally
1743  unlockFULocal2();
1744 
1745  if (fileStatus == runEnded)
1746  ls = std::max(currentLumiSection, serverLS);
1747  else if (fileStatus == newFile) {
1748  assert(serverLS >= ls);
1749  ls = serverLS;
1750  } else if (fileStatus == noFile) {
1751  if (serverLS >= ls)
1752  ls = serverLS;
1753  else {
1754  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1755  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1756  sleep(1);
1757  }
1758  }
1759 
1760  return fileStatus;
1761  }

References cms::cuda::assert(), bu_run_dir_, visDQMUpload::buf, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), grabNextJsonFromRaw(), mps_fire::i, lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, hgcalPlots::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

◆ getNFilesFromEoLS()

int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 705 of file EvFDaqDirector.cc.

705  {
706  boost::filesystem::ifstream ij(BUEoLSFile);
707  Json::Value deserializeRoot;
709 
710  if (!reader.parse(ij, deserializeRoot)) {
711  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
712  return -1;
713  }
714 
716  DataPoint dp;
717  dp.deserialize(deserializeRoot);
718 
719  //read definition
720  if (readEolsDefinition_) {
721  //std::string def = boost::algorithm::trim(dp.getDefinition());
722  std::string def = dp.getDefinition();
723  if (def.empty())
724  readEolsDefinition_ = false;
725  while (!def.empty()) {
727  if (def.find('/') == 0)
728  fullpath = def;
729  else
730  fullpath = bu_run_dir_ + '/' + def;
731  struct stat buf;
732  if (stat(fullpath.c_str(), &buf) == 0) {
733  DataPointDefinition eolsDpd;
734  std::string defLabel = "legend";
735  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
736  if (eolsDpd.getNames().empty()) {
737  //try with "data" label if "legend" format is not used
738  eolsDpd = DataPointDefinition();
739  defLabel = "data";
740  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
741  }
742  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
743  if (eolsDpd.getNames().at(i) == "NFiles")
745  readEolsDefinition_ = false;
746  break;
747  }
748  //check if we can still find definition
749  if (def.size() <= 1 || def.find('/') == std::string::npos) {
750  readEolsDefinition_ = false;
751  break;
752  }
753  def = def.substr(def.find('/') + 1);
754  }
755  }
756 
757  if (dp.getData().size() > eolsNFilesIndex_)
758  data = dp.getData()[eolsNFilesIndex_];
759  else {
760  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
761  return -1;
762  }
763  return boost::lexical_cast<int>(data);
764  }

References bu_run_dir_, visDQMUpload::buf, data, spu::def(), Calorimetry_cff::dp, eolsNFilesIndex_, reco_skim_cfg_mod::fullpath, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, readEolsDefinition_, DQM::reader, hgcalPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

◆ getOpenDatFilePath()

std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenInitFilePath()

std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 435 of file EvFDaqDirector.cc.

435  {
436  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
437  }

References fffnaming::initFileNameWithPid(), run_, run_dir_, and cms::cuda::stream.

◆ getOpenInputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 407 of file EvFDaqDirector.cc.

407  {
408  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
409  }

References bu_run_dir_, fffnaming::inputJsonFileName(), eostools::ls(), and run_.

◆ getOpenOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOpenRawFilePath()

std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 403 of file EvFDaqDirector.cc.

403  {
404  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
405  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

◆ getOpenRootHistogramFilePath()

std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getOutputJsonFilePath()

std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getProtocolBufferHistogramFilePath()

std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRawFilePath()

std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 399 of file EvFDaqDirector.cc.

399  {
401  }

References bu_run_dir_, fffnaming::inputRawFileName(), eostools::ls(), and run_.

Referenced by removeFile().

◆ getRootHistogramFilePath()

std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

◆ getRunNumber()

unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 113 of file EvFDaqDirector.h.

113 { return run_; }

References run_.

◆ getRunOpenDirPath()

std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 106 of file EvFDaqDirector.h.

106 { return run_dir_ + "/open"; }

References run_dir_.

Referenced by createRunOpendirMaybe(), and initRun().

◆ getStreamDestinations()

std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1864 of file EvFDaqDirector.cc.

1864  {
1865  std::string streamRequestName;
1866  if (transferSystemJson_->isMember(stream.c_str()))
1867  streamRequestName = stream;
1868  else {
1869  std::stringstream msg;
1870  msg << "Transfer system mode definitions missing for -: " << stream;
1871  if (requireTSPSet_)
1872  throw cms::Exception("EvFDaqDirector") << msg.str();
1873  else {
1874  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1875  return std::string("Failsafe");
1876  }
1877  }
1878  //return empty if strict check parameter is not on
1879  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1880  edm::LogWarning("EvFDaqDirector")
1881  << "Selected mode string is not provided as DaqDirector parameter."
1882  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1883  return std::string("Failsafe");
1884  }
1885  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1886  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1887  }
1888  //check if stream has properly listed transfer stream
1889  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1890  std::stringstream msg;
1891  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1892  if (requireTSPSet_)
1893  throw cms::Exception("EvFDaqDirector") << msg.str();
1894  else
1895  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1896  return std::string("Failsafe");
1897  }
1898  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
1899 
1900  //flatten string json::Array into CSV std::string
1901  std::string ret;
1902  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
1903  if (!ret.empty())
1904  ret += ",";
1905  ret += (*it).asString();
1906  }
1907  return ret;
1908  }

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, runTheMatrix::ret, selectedTransferMode_, cms::cuda::stream, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

◆ getStreamMergeType()

std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 1928 of file EvFDaqDirector.cc.

1928  {
1929  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
1930  if (mergeTypeMap_.find(search_ac, stream))
1931  return search_ac->second;
1932 
1933  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
1934  std::string defaultName = MergeTypeNames_[defaultType];
1935  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1936  mergeTypeMap_.insert(ac, stream);
1937  ac->second = defaultName;
1938  ac.release();
1939  return defaultName;
1940  }

References mergeTypeMap_, MergeTypeNames_, cms::cuda::stream, and AlCaHLTBitMon_QueryRunRegistry::string.

◆ grabNextJsonFile()

int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 1103 of file EvFDaqDirector.cc.

1106  {
1107  fileFound = true;
1108 
1109  //should be ported to use fffnaming
1110  std::ostringstream fileNameWithPID;
1111  fileNameWithPID << boost::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1112  << std::setw(5) << pid_ << ".jsn";
1113 
1114  // assemble json destination path
1115  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1116 
1117  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1118 
1119  int infile = -1, outfile = -1;
1120 
1121  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1122  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1123  << strerror(errno);
1124  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1125  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1126  << jsonSourcePath << " : " << strerror(errno);
1127  if (errno == ENOENT)
1128  fileFound = false;
1129  return -1;
1130  }
1131  }
1132 
1133  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1134  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1135  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1136  if (errno == EEXIST) {
1137  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1138  << " : ";
1139  ::close(infile);
1140  return -1;
1141  }
1142  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1143  << strerror(errno);
1144  struct stat out_stat;
1145  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1146  edm::LogWarning("EvFDaqDirector")
1147  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1148  if (unlink(jsonDestPath.c_str()) == -1) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1151  }
1152  }
1153  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1154  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1155  << jsonDestPath << " : " << strerror(errno);
1156  ::close(infile);
1157  return -1;
1158  }
1159  }
1160  //copy contents
1161  const std::size_t buf_sz = 512;
1162  std::size_t tot_written = 0;
1163  std::unique_ptr<char> buf(new char[buf_sz]);
1164 
1165  ssize_t sz, sz_read = 1, sz_write;
1166  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1167  sz_write = 0;
1168  do {
1169  assert(sz_read - sz_write > 0);
1170  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1171  sz_read = sz; // cause read loop termination
1172  break;
1173  }
1174  assert(sz > 0);
1175  sz_write += sz;
1176  tot_written += sz;
1177  } while (sz_write < sz_read);
1178  }
1179  close(infile);
1180  close(outfile);
1181 
1182  if (tot_written > 0) {
1183  //leave file if it was empty for diagnosis
1184  if (unlink(jsonSourcePath.c_str()) == -1) {
1185  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1186  << strerror(errno);
1187  return -1;
1188  }
1189  } else {
1190  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1191  << jsonSourcePath;
1192  return -1;
1193  }
1194 
1195  Json::Value deserializeRoot;
1197 
1198  std::string data;
1199  std::stringstream ss;
1200  bool result;
1201  try {
1202  if (tot_written <= buf_sz) {
1203  result = reader.parse(buf.get(), deserializeRoot);
1204  } else {
1205  //json will normally not be bigger than buf_sz bytes
1206  try {
1207  boost::filesystem::ifstream ij(jsonDestPath);
1208  ss << ij.rdbuf();
1209  } catch (boost::filesystem::filesystem_error const& ex) {
1210  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1211  return -1;
1212  }
1213  result = reader.parse(ss.str(), deserializeRoot);
1214  }
1215  if (!result) {
1216  if (tot_written <= buf_sz)
1217  ss << buf.get();
1218  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1219  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1220  << ss.str() << ".";
1221  return -1;
1222  }
1223 
1224  //read BU JSON
1225  DataPoint dp;
1226  dp.deserialize(deserializeRoot);
1227  bool success = false;
1228  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1229  if (dpd_->getNames().at(i) == "NEvents")
1230  if (i < dp.getData().size()) {
1231  data = dp.getData()[i];
1232  success = true;
1233  break;
1234  }
1235  }
1236  if (!success) {
1237  if (!dp.getData().empty())
1238  data = dp.getData()[0];
1239  else {
1240  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1241  << "grabNextJsonFile - "
1242  << " error reading number of events from BU JSON; No input value. data -: " << data;
1243  return -1;
1244  }
1245  }
1246 
1247  //try to read raw file size
1248  fileSizeFromJson = -1;
1249  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1250  if (dpd_->getNames().at(i) == "NBytes") {
1251  if (i < dp.getData().size()) {
1252  std::string dataSize = dp.getData()[i];
1253  try {
1254  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1255  } catch (boost::bad_lexical_cast const&) {
1256  //non-fatal currently, processing can continue without this value
1257  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1258  << "Input value is -: " << dataSize;
1259  }
1260  break;
1261  }
1262  }
1263  }
1264  return boost::lexical_cast<int>(data);
1265  } catch (boost::bad_lexical_cast const& e) {
1266  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1267  << "Input value is -: " << data;
1268  } catch (std::runtime_error const& e) {
1269  //Can be thrown by Json parser
1270  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1271  }
1272 
1273  catch (std::exception const& e) {
1274  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1275  } catch (...) {
1276  //unknown exception
1277  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1278  }
1279 
1280  return -1;
1281  }

References cms::cuda::assert(), baseRunDir(), visDQMUpload::buf, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, timingPdfMaker::infile, LogDebug, timingPdfMaker::outfile, castor_dqm_sourceclient_file_cfg::path, pid_, readEcalDQMStatus::read, DQM::reader, mps_fire::result, contentValuesCheck::ss, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ grabNextJsonFileAndUnlock()

int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( boost::filesystem::path const &  jsonSourcePath)

Definition at line 1283 of file EvFDaqDirector.cc.

1283  {
1284  std::string data;
1285  try {
1286  // assemble json destination path
1287  boost::filesystem::path jsonDestPath(baseRunDir());
1288 
1289  //should be ported to use fffnaming
1290  std::ostringstream fileNameWithPID;
1291  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1292  << ".jsn";
1293  jsonDestPath /= fileNameWithPID.str();
1294 
1295  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1296  try {
1297  boost::filesystem::copy(jsonSourcePath, jsonDestPath);
1298  } catch (boost::filesystem::filesystem_error const& ex) {
1299  // Input dir gone?
1300  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1301  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1302  sleep(1);
1303  boost::filesystem::copy(jsonSourcePath, jsonDestPath);
1304  }
1305  unlockFULocal();
1306 
1307  try {
1308  //sometimes this fails but file gets deleted
1309  boost::filesystem::remove(jsonSourcePath);
1310  } catch (boost::filesystem::filesystem_error const& ex) {
1311  // Input dir gone?
1312  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1313  } catch (std::exception const& ex) {
1314  // Input dir gone?
1315  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1316  }
1317 
1318  boost::filesystem::ifstream ij(jsonDestPath);
1319  Json::Value deserializeRoot;
1321 
1322  std::stringstream ss;
1323  ss << ij.rdbuf();
1324  if (!reader.parse(ss.str(), deserializeRoot)) {
1325  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1326  << "\nERROR:\n"
1327  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1328  << ss.str() << ".";
1329  throw std::runtime_error("Cannot deserialize input JSON file");
1330  }
1331 
1332  //read BU JSON
1333  std::string data;
1334  DataPoint dp;
1335  dp.deserialize(deserializeRoot);
1336  bool success = false;
1337  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1338  if (dpd_->getNames().at(i) == "NEvents")
1339  if (i < dp.getData().size()) {
1340  data = dp.getData()[i];
1341  success = true;
1342  }
1343  }
1344  if (!success) {
1345  if (!dp.getData().empty())
1346  data = dp.getData()[0];
1347  else
1348  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1349  << " error reading number of events from BU JSON -: No input value " << data;
1350  }
1351  return boost::lexical_cast<int>(data);
1352  } catch (boost::filesystem::filesystem_error const& ex) {
1353  // Input dir gone?
1354  unlockFULocal();
1355  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1356  } catch (std::runtime_error const& e) {
1357  // Another process grabbed the file and NFS did not register this
1358  unlockFULocal();
1359  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1360  } catch (boost::bad_lexical_cast const&) {
1361  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1362  << "Input value is -: " << data;
1363  } catch (std::exception const& e) {
1364  // BU run directory disappeared?
1365  unlockFULocal();
1366  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1367  }
1368 
1369  return -1;
1370  }

References baseRunDir(), filterCSVwithJSON::copy, data, Calorimetry_cff::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, castor_dqm_sourceclient_file_cfg::path, DQM::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

◆ grabNextJsonFromRaw()

int evf::EvFDaqDirector::grabNextJsonFromRaw ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
int64_t &  fileSizeFromHeader,
bool &  fileFound,
uint32_t  serverLS 
)

Definition at line 1022 of file EvFDaqDirector.cc.

1027  {
1028  fileFound = true;
1029 
1030  //take only first three tokens delimited by "_" in the renamed raw file name
1031  std::string jsonStem = boost::filesystem::path(rawSourcePath).stem().string();
1032  size_t pos = 0, n_tokens = 0;
1033  while (n_tokens++ < 3 && (pos = jsonStem.find("_", pos + 1)) != std::string::npos) {
1034  }
1035  std::string reducedJsonStem = jsonStem.substr(0, pos);
1036 
1037  std::ostringstream fileNameWithPID;
1038  //should be ported to use fffnaming
1039  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1040 
1041  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1042 
1043  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1044 
1045  //parse RAW file header if it exists
1046  uint32_t lsFromRaw;
1047  int32_t nbEventsWrittenRaw;
1048  int64_t fileSizeFromRaw;
1049  auto ret = parseFRDFileHeader(
1050  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, false);
1051  if (ret != 0) {
1052  if (ret == 1)
1053  fileFound = false;
1054  return -1;
1055  }
1056 
1057  int outfile;
1058  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1059  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1060  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1061  if (errno == EEXIST) {
1062  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1063  << " : ";
1064  return -1;
1065  }
1066  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1067  << strerror(errno);
1068  struct stat out_stat;
1069  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1070  edm::LogWarning("EvFDaqDirector")
1071  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1072  << jsonDestPath;
1073  if (unlink(jsonDestPath.c_str()) == -1) {
1074  edm::LogWarning("EvFDaqDirector")
1075  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1076  }
1077  }
1078  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1079  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1080  << jsonDestPath << " : " << strerror(errno);
1081  return -1;
1082  }
1083  }
1084  //write JSON file (TODO: use jsoncpp)
1085  std::stringstream ss;
1086  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1087  std::string sstr = ss.str();
1088 
1089  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1090  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1091  << " : " << strerror(errno);
1092  return -1;
1093  }
1094  close(outfile);
1095  if (serverLS && serverLS != lsFromRaw)
1096  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1097  << " and raw file header LS " << lsFromRaw;
1098 
1099  fileSizeFromHeader = fileSizeFromRaw;
1100  return nbEventsWrittenRaw;
1101  }

References baseRunDir(), LogDebug, timingPdfMaker::outfile, parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, pid_, runTheMatrix::ret, contentValuesCheck::ss, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and writeEcalDQMStatus::write.

Referenced by getNextFromFileBroker().

◆ initFileName()

std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private

◆ initRun()

void evf::EvFDaqDirector::initRun ( )

Definition at line 150 of file EvFDaqDirector.cc.

150  {
151  std::stringstream ss;
152  ss << "run" << std::setfill('0') << std::setw(6) << run_;
153  run_string_ = ss.str();
154  ss = std::stringstream();
155  ss << run_;
156  run_nstring_ = ss.str();
157  run_dir_ = base_dir_ + "/" + run_string_;
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161 
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  boost::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244 
245  boost::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
246 
247  boost::filesystem::rename(tmphltdir, hltdir);
248  } else
249  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
250  }
251  //else{}//no configuration specified
252  } else {
253  // for FU, check if bu base dir exists
254 
255  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
256  if (retval != 0 && errno != EEXIST) {
257  throw cms::Exception("DaqDirector")
258  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
259  }
260 
262  fulockfile_ = bu_run_dir_ + "/fu.lock";
263  openFULockfileStream(false);
264  }
265 
266  pthread_mutex_init(&init_lock_, nullptr);
267 
268  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
269  std::stringstream sstp;
270  sstp << stopFilePath_ << "_pid" << pid_;
271  stopFilePathPid_ = sstp.str();
272 
273  if (!directorBU_) {
274  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
275  struct stat statbuf;
276  if (!stat(defPath.c_str(), &statbuf))
277  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
278  else {
279  //look in source directory if not present in ramdisk
280  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
281  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
282  if (stat(defPath.c_str(), &statbuf)) {
283  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
284  if (stat(defPath.c_str(), &statbuf)) {
285  defPath = defPathSuffix;
286  }
287  }
288  }
289  dpd_ = new DataPointDefinition();
290  std::string defLabel = "data";
291  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
292  }
293  }

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, visDQMUpload::buf, eostools::chmod(), createRunOpendirMaybe(), directorBU_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), pid_, run_, run_dir_, run_nstring_, run_string_, contentValuesCheck::ss, hgcalPlots::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

◆ inputFileNameStem()

std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private

◆ isSingleStreamThread()

bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline

Definition at line 117 of file EvFDaqDirector.h.

117 { return nStreams_ == 1 && nThreads_ == 1; }

References nStreams_, and nThreads_.

Referenced by FedRawDataInputSource::getNextEvent().

◆ lockFULocal()

void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 878 of file EvFDaqDirector.cc.

878  {
879  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
880  flock(fulocal_rwlock_fd_, LOCK_SH);
881  }

References fulocal_rwlock_fd_.

Referenced by updateFuLock().

◆ lockFULocal2()

void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 888 of file EvFDaqDirector.cc.

888  {
889  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
890  flock(fulocal_rwlock_fd2_, LOCK_EX);
891  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

◆ lockInitLock()

void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 874 of file EvFDaqDirector.cc.

874 { pthread_mutex_lock(&init_lock_); }

References init_lock_.

◆ make_flock()

struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 1948 of file EvFDaqDirector.cc.

1948  {
1949 #ifdef __APPLE__
1950  return {start, len, pid, type, whence};
1951 #else
1952  return {type, whence, start, len, pid};
1953 #endif
1954  }

References command_line::start.

◆ mergedFileNameStem()

std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ openFULockfileStream()

void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 855 of file EvFDaqDirector.cc.

855  {
856  if (create) {
858  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
859  chmod(fulockfile_.c_str(), 0766);
860  } else {
861  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
862  }
863  if (fu_readwritelock_fd_ == -1)
864  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
865  << " create:" << create << " error:" << strerror(errno);
866  else
867  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
868 
869  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
870  if (fu_rw_lock_stream == nullptr)
871  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
872  }

References eostools::chmod(), beamerCreator::create(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

◆ outputAdler32Recheck()

bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 107 of file EvFDaqDirector.h.

107 { return outputAdler32Recheck_; }

References outputAdler32Recheck_.

◆ outputFileNameStem()

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private

◆ overrideRunNumber()

void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 75 of file EvFDaqDirector.h.

75 { run_ = run; }

References writedatasetfile::run, and run_.

◆ parseFRDFileHeader()

int evf::EvFDaqDirector::parseFRDFileHeader ( std::string const &  rawSourcePath,
int &  rawFd,
uint16_t &  rawHeaderSize,
uint32_t &  lsFromHeader,
int32_t &  eventsFromHeader,
int64_t &  fileSizeFromHeader,
bool  requireHeader,
bool  retry,
bool  closeFile 
)
static

Definition at line 926 of file EvFDaqDirector.cc.

934  {
935  int infile;
936 
937  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
938  if (retry) {
939  edm::LogWarning("EvFDaqDirector")
940  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
941  return parseFRDFileHeader(rawSourcePath,
942  rawFd,
943  rawHeaderSize,
944  lsFromHeader,
945  eventsFromHeader,
946  fileSizeFromHeader,
947  requireHeader,
948  false,
949  closeFile);
950  } else {
951  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
952  edm::LogError("EvFDaqDirector")
953  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
954  if (errno == ENOENT)
955  return 1; // error && file not found
956  else
957  return -1;
958  }
959  }
960  }
961 
962  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
963  FRDFileHeader_v1 fileHead;
964 
965  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
966  if (closeFile) {
967  close(infile);
968  infile = -1;
969  }
970 
971  if (sz_read < 0) {
972  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
973  << strerror(errno);
974  if (infile != -1)
975  close(infile);
976  return -1;
977  }
978  if ((size_t)sz_read < buf_sz) {
979  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
980  if (infile != -1)
981  close(infile);
982  return -1;
983  }
984 
985  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
986 
987  if (frd_version == 0) {
988  //no header (specific sequence not detected)
989  if (requireHeader) {
990  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
991  if (infile != -1)
992  close(infile);
993  return -1;
994  } else {
995  //no header, but valid file
996  lseek(infile, 0, SEEK_SET);
997  rawHeaderSize = 0;
998  lsFromHeader = 0;
999  eventsFromHeader = -1;
1000  fileSizeFromHeader = -1;
1001  }
1002  } else {
1003  //version 1 header
1004  uint32_t headerSizeRaw = fileHead.headerSize_;
1005  if (headerSizeRaw < buf_sz) {
1006  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1007  << " v:" << frd_version;
1008  if (infile != -1)
1009  close(infile);
1010  return -1;
1011  }
1012  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1013  lsFromHeader = fileHead.lumiSection_;
1014  eventsFromHeader = (int32_t)fileHead.eventCount_;
1015  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1016  rawHeaderSize = fileHead.headerSize_;
1017  }
1018  rawFd = infile;
1019  return 0; //OK
1020  }

References getFRDFileHeaderVersion(), timingPdfMaker::infile, and readEcalDQMStatus::read.

Referenced by grabNextJsonFromRaw(), and FedRawDataInputSource::readSupervisor().

◆ postEndRun()

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 368 of file EvFDaqDirector.cc.

368  {
369  close(bu_readlock_fd_);
370  close(bu_writelock_fd_);
371  if (directorBU_) {
372  std::string filename = bu_run_dir_ + "/bu.lock";
374  }
375  }

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBU_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

◆ preallocate()

void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 314 of file EvFDaqDirector.cc.

314  {
315  initRun();
316 
317  nThreads_ = bounds.maxNumberOfStreams();
318  nStreams_ = bounds.maxNumberOfThreads();
319  }

References initRun(), edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

◆ preBeginJob()

void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 353 of file EvFDaqDirector.cc.

353  {
355  checkMergeTypePSet(pc);
356  }

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

◆ preBeginRun()

void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 358 of file EvFDaqDirector.cc.

358  {
359  //assert(run_ == id.run());
360 
361  // check if the requested run is the latest one - issue a warning if it isn't
363  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
364  << ". This is not the highest run " << dirManager_.findHighestRunDir();
365  }
366  }

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

◆ preGlobalEndLumi()

void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 377 of file EvFDaqDirector.cc.

377  {
378  //delete all files belonging to just closed lumi
379  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
381  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
382  return;
383  }
384 
385  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
386  auto it = filesToDeletePtr_->begin();
387  while (it != filesToDeletePtr_->end()) {
388  if (it->second->lumi_ == ls) {
389  it = filesToDeletePtr_->erase(it);
390  } else
391  it++;
392  }
393  }

References fileDeleteLockPtr_, filesToDeletePtr_, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), and edm::GlobalContext::luminosityBlockID().

Referenced by EvFDaqDirector().

◆ readLastLSEntry()

int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1773 of file EvFDaqDirector.cc.

1773  {
1774  boost::filesystem::ifstream ij(file);
1775  Json::Value deserializeRoot;
1777 
1778  if (!reader.parse(ij, deserializeRoot)) {
1779  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1780  return -1;
1781  }
1782 
1783  int ret = deserializeRoot.get("lastLS", "").asInt();
1784  return ret;
1785  }

References Json::Value::asInt(), FrontierConditions_GlobalTag_cff::file, Json::Value::get(), DQM::reader, and runTheMatrix::ret.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ removeFile() [1/2]

void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 488 of file EvFDaqDirector.cc.

488  {
489  int retval = remove(filename.c_str());
490  if (retval != 0)
491  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
492  << ". error = " << strerror(errno);
493  }

References corrVsCorr::filename, and MatrixUtil::remove().

◆ removeFile() [2/2]

void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 495 of file EvFDaqDirector.cc.

References getRawFilePath(), and eostools::ls().

Referenced by postEndRun().

◆ setDeleteTracking()

void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, std::unique_ptr< InputFile >>> *  filesToDelete 
)
inline

Definition at line 168 of file EvFDaqDirector.h.

169  {
170  fileDeleteLockPtr_ = fileDeleteLock;
171  filesToDeletePtr_ = filesToDelete;
172  }

References fileDeleteLockPtr_, and filesToDeletePtr_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ setFMS()

void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 116 of file EvFDaqDirector.h.

116 { fms_ = fms; }

References fms_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

◆ tryInitializeFuLockFile()

void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 845 of file EvFDaqDirector.cc.

845  {
846  if (fu_rw_lock_stream == nullptr)
847  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
848  else {
849  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
850  unsigned int readLs = 1, readIndex = 0;
851  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
852  }
853  }

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

◆ unlockFULocal()

void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 883 of file EvFDaqDirector.cc.

883  {
884  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
885  flock(fulocal_rwlock_fd_, LOCK_UN);
886  }

References fulocal_rwlock_fd_.

Referenced by grabNextJsonFileAndUnlock(), and ~EvFDaqDirector().

◆ unlockFULocal2()

void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 893 of file EvFDaqDirector.cc.

893  {
894  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
895  flock(fulocal_rwlock_fd2_, LOCK_UN);
896  }

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNext(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

◆ unlockInitLock()

void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 876 of file EvFDaqDirector.cc.

876 { pthread_mutex_unlock(&init_lock_); }

References init_lock_.

◆ updateFuLock()

EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)

Definition at line 497 of file EvFDaqDirector.cc.

500  {
501  EvFDaqDirector::FileStatus fileStatus = noFile;
502 
503  int retval = -1;
504  int lock_attempts = 0;
505  long total_lock_attempts = 0;
506 
507  struct stat buf;
508  int stopFileLS = -1;
509  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
510  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
511  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
512  if (stopFileCheck == 0)
513  stopFileLS = readLastLSEntry(stopFilePath_);
514  else
515  stopFileLS = 1; //stop without drain if only pid is stopped
516  if (!stop_ls_override_) {
517  //if lumisection is higher than in stop file, should quit at next from current
518  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
519  stopFileLS = stop_ls_override_ = ls;
520  } else
521  stopFileLS = stop_ls_override_;
522  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
523  << stopFileLS;
524  //return runEnded;
525  } else //if file was removed before reaching stop condition, reset this
526  stop_ls_override_ = 0;
527 
528  timeval ts_lockbegin;
529  gettimeofday(&ts_lockbegin, nullptr);
530 
531  while (retval == -1) {
532  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
533  if (retval == -1)
534  usleep(fuLockPollInterval_);
535  else
536  continue;
537 
538  lock_attempts += fuLockPollInterval_;
539  total_lock_attempts += fuLockPollInterval_;
540  if (lock_attempts > 5000000 || errno == 116) {
541  if (errno == 116)
542  edm::LogWarning("EvFDaqDirector")
543  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
544  else
545  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
546  "fu.lock file are present -: errno "
547  << errno << ":" << strerror(errno) << std::endl;
548 
549  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
550  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
551  ls++;
552  return noFile;
553  }
554 
555  if (stat(bu_run_dir_.c_str(), &buf) != 0)
556  return runEnded;
557  if (stat(fulockfile_.c_str(), &buf) != 0)
558  return runEnded;
559 
560  lock_attempts = 0;
561  }
562  if (total_lock_attempts > 5 * 60000000) {
563  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
564  return runAbort;
565  }
566  }
567 
568  timeval ts_lockend;
569  gettimeofday(&ts_lockend, nullptr);
570  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
571  if (deltat > 0.)
572  lockWaitTime = deltat;
573 
574  if (retval != 0)
575  return fileStatus;
576 
577 #ifdef DEBUG
578  timeval ts_lockend;
579  gettimeofday(&ts_lockend, 0);
580 #endif
581 
582  //open another lock file FD after the lock using main fd has been acquired
583  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
584  if (fu_readwritelock_fd2 == -1)
585  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
586  << " create. error:" << strerror(errno);
587 
588  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
589 
590  // if the stream is readable
591  if (fu_rw_lock_stream2 != nullptr) {
592  unsigned int readLs, readIndex;
593  int check = 0;
594  // rewind the stream
595  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
596  // if rewinded ok
597  if (check == 0) {
598  // read its' values
599  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
600  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
601 
602  unsigned int currentLs = readLs;
603  bool bumpedOk = false;
604  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
605  //no lock file write in this case
606  if (ls && ls + 1 < currentLs)
607  ls++;
608  else {
609  // try to bump (look for new index or EoLS file)
610  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
611  //avoid 2 lumisections jump
612  if (ls && readLs > currentLs && currentLs > ls) {
613  ls++;
614  readLs = currentLs = ls;
615  readIndex = 0;
616  bumpedOk = false;
617  //no write to lock file
618  } else {
619  if (ls == 0 && readLs > currentLs) {
620  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
621  //in this case there is no new file in the same LS
622  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
623  readLs = currentLs;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  }
628  //update return LS value
629  ls = readLs;
630  }
631  }
632  if (bumpedOk) {
633  // there is a new index file to grab, lock file needs to be updated
634  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
635  if (check == 0) {
636  ftruncate(fu_readwritelock_fd2, 0);
637  // write next index in the file, which is the file the next process should take
638  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
639  fflush(fu_rw_lock_stream2);
640  fsync(fu_readwritelock_fd2);
641  fileStatus = newFile;
642  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
643  } else {
644  throw cms::Exception("EvFDaqDirector")
645  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
646  }
647  } else if (currentLs < readLs) {
648  //there is no new file in next LS (yet), but lock file can be updated to the next LS
649  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
650  if (check == 0) {
651  ftruncate(fu_readwritelock_fd2, 0);
652  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
653  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
654  fflush(fu_rw_lock_stream2);
655  fsync(fu_readwritelock_fd2);
656  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
657  } else {
658  throw cms::Exception("EvFDaqDirector")
659  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
660  }
661  }
662  } else {
663  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
664  << strerror(errno);
665  }
666  } else {
667  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
668  }
669  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
670 
671 #ifdef DEBUG
672  timeval ts_preunlock;
673  gettimeofday(&ts_preunlock, 0);
674  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
675  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
676 #endif
677 
678  //if new json is present, lock file which FedRawDataInputSource will later unlock
679  if (fileStatus == newFile)
680  lockFULocal();
681 
682  //release lock at this point
683  int retvalu = -1;
684  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
685  if (retvalu == -1)
686  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
687 
688 #ifdef DEBUG
689  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
690 #endif
691 
692  if (fileStatus == noFile) {
693  struct stat buf;
694  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
695  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
696  fileStatus = runEnded;
697  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
698  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
699  fileStatus = runEnded;
700  }
701  }
702  return fileStatus;
703  }

References bu_run_dir_, visDQMUpload::buf, bumpFile(), RPCNoise_example::check, Exception, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, hgcalPlots::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

◆ useFileBroker()

bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 79 of file EvFDaqDirector.h.

79 { return useFileBroker_; }

References useFileBroker_.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

Member Data Documentation

◆ base_dir_

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 190 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_base_dir_

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 191 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_r_flk

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 235 of file EvFDaqDirector.h.

◆ bu_r_fulk

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 237 of file EvFDaqDirector.h.

◆ bu_r_lock_stream

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 225 of file EvFDaqDirector.h.

◆ bu_readlock_fd_

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 218 of file EvFDaqDirector.h.

Referenced by postEndRun().

◆ bu_run_dir_

std::string evf::EvFDaqDirector::bu_run_dir_
private

◆ bu_run_open_dir_

std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 215 of file EvFDaqDirector.h.

Referenced by buBaseRunOpenDir(), and initRun().

◆ bu_t_monitor_stream

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 228 of file EvFDaqDirector.h.

◆ bu_w_flk

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 234 of file EvFDaqDirector.h.

◆ bu_w_fulk

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 236 of file EvFDaqDirector.h.

◆ bu_w_lock_stream

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 224 of file EvFDaqDirector.h.

Referenced by initRun().

◆ bu_w_monitor_stream

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 227 of file EvFDaqDirector.h.

◆ bu_writelock_fd_

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 219 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ directorBU_

bool evf::EvFDaqDirector::directorBU_
private

Definition at line 204 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

◆ dirManager_

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 230 of file EvFDaqDirector.h.

Referenced by findCurrentRunDir(), and preBeginRun().

◆ dpd_

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 264 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

◆ endpoint_iterator_

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 269 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ eolsNFilesIndex_

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 252 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ fileBrokerHost_

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 195 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

◆ fileBrokerHostFromCfg_

bool evf::EvFDaqDirector::fileBrokerHostFromCfg_
private

Definition at line 194 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerKeepAlive_

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 197 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

◆ fileBrokerPort_

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 196 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ fileBrokerUseLocalLock_

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 198 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

◆ fileDeleteLockPtr_

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 243 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ filesToDeletePtr_

std::list<std::pair<int, std::unique_ptr<InputFile> > >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 244 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi(), and setDeleteTracking().

◆ fms_

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 241 of file EvFDaqDirector.h.

Referenced by bumpFile(), and setFMS().

◆ fu_readwritelock_fd_

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fu_rw_flk

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 238 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_fulk

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by updateFuLock().

◆ fu_rw_lock_stream

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

◆ fulocal_rwlock_fd2_

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

◆ fulocal_rwlock_fd_

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

◆ fulockfile_

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

◆ fuLockPollInterval_

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 199 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

◆ hltSourceDirectory_

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 205 of file EvFDaqDirector.h.

Referenced by initRun().

◆ hostname_

std::string evf::EvFDaqDirector::hostname_
private

◆ init_lock_

pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 246 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

◆ io_service_

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 266 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ mergeTypeMap_

tbb::concurrent_hash_map<std::string, std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 258 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

◆ MergeTypeNames_

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"}
staticprivate

Definition at line 261 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

◆ mergeTypePset_

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 203 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

◆ nStreams_

unsigned int evf::EvFDaqDirector::nStreams_ = 0
private

Definition at line 248 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ nThreads_

unsigned int evf::EvFDaqDirector::nThreads_ = 0
private

Definition at line 249 of file EvFDaqDirector.h.

Referenced by isSingleStreamThread(), and preallocate().

◆ outputAdler32Recheck_

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 200 of file EvFDaqDirector.h.

Referenced by outputAdler32Recheck().

◆ pid_

std::string evf::EvFDaqDirector::pid_
private

◆ previousFileSize_

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 232 of file EvFDaqDirector.h.

Referenced by bumpFile().

◆ query_

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 268 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ readEolsDefinition_

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 251 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

◆ requireTSPSet_

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 201 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ resolver_

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 267 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

◆ run_

unsigned int evf::EvFDaqDirector::run_
private

◆ run_dir_

std::string evf::EvFDaqDirector::run_dir_
private

◆ run_nstring_

std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 211 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and initRun().

◆ run_string_

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 210 of file EvFDaqDirector.h.

Referenced by getLumisectionToStart(), and initRun().

◆ selectedTransferMode_

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 202 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

◆ socket_

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 270 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

◆ startFromLS_

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 207 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getLumisectionToStart().

◆ stop_ls_override_

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 255 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

◆ stopFilePath_

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 253 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ stopFilePathPid_

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 254 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

◆ transferSystemJson_

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 257 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

◆ useFileBroker_

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 193 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and useFileBroker().

runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:355
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:314
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:204
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:232
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:201
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:893
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:269
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:230
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
FRDFileHeader_v1::lumiSection_
uint32_t lumiSection_
Definition: FRDFileHeader.h:37
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:898
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:197
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:399
mps_fire.i
i
Definition: mps_fire.py:355
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:221
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:41
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:193
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:246
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:470
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:244
evf::EvFDaqDirector::bu_r_fulk
struct flock bu_r_fulk
Definition: EvFDaqDirector.h:237
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:482
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:251
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:203
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
FRDFileHeader_v1::headerSize_
uint16_t headerSize_
Definition: FRDFileHeader.h:35
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:290
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:215
FRDFileHeader_v1::id_
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:33
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames()
Definition: DataPointDefinition.h:44
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_w_fulk
struct flock bu_w_fulk
Definition: EvFDaqDirector.h:236
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:218
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:249
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1802
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
evf::EvFDaqDirector::make_flock
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
Definition: EvFDaqDirector.cc:1948
pos
Definition: PixelAliasList.h:18
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::LogInfo
Definition: MessageLogger.h:254
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:219
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1372
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:845
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:266
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:255
mps_check.msg
tuple msg
Definition: mps_check.py:285
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:195
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:160
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::bu_r_flk
struct flock bu_r_flk
Definition: EvFDaqDirector.h:235
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:268
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:353
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:652
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:855
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:474
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:253
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1111
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:157
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:149
DQM.reader
reader
Definition: DQM.py:105
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:926
FRDFileHeader_v1::eventCount_
uint16_t eventCount_
Definition: FRDFileHeader.h:36
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:202
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:241
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:115
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:192
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:252
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:212
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:198
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::LogWarning
Definition: MessageLogger.h:141
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:261
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:243
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:222
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:670
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:36
edm::ParameterSetDescription::setComment
void setComment(std::string const &value)
Definition: ParameterSetDescription.cc:33
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:358
edm::LogError
Definition: MessageLogger.h:183
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:196
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
Definition: EvFDaqDirector.cc:766
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
evf::EvFDaqDirector::outputAdler32Recheck_
bool outputAdler32Recheck_
Definition: EvFDaqDirector.h:200
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:258
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:883
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
FRDFileHeader_v1::version_
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:34
recoMuon::in
Definition: RecoMuonEnumerators.h:6
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:213
edm::Service
Definition: Service.h:30
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:22
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:199
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:878
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:191
jsoncollector::DataPoint
Definition: DataPoint.h:36
DBConfiguration_cff.connect
connect
Definition: DBConfiguration_cff.py:18
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1763
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:239
res
Definition: Electron.h:6
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
Definition: EvFDaqDirector.cc:908
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:205
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:207
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1773
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:347
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:142
FRDFileHeader_v1
Definition: FRDFileHeader.h:22
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:238
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:210
evf::DirManager::findRunDir
std::string findRunDir(unsigned int)
Definition: DirManager.cc:42
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:855
type
type
Definition: HCALResponse.h:21
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:270
writedatasetfile.run
run
Definition: writedatasetfile.py:27
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:150
evf::EvFDaqDirector::bu_w_flk
struct flock bu_w_flk
Definition: EvFDaqDirector.h:234
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:246
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:216
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:377
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:257
FRDFileHeader_v1::fileSize_
uint64_t fileSize_
Definition: FRDFileHeader.h:38
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:395
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:209
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:264
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:267
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:495
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:248
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:350
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:220
SiStripCommissioningSource_FromEDM_cfg.FastMonitoringService
FastMonitoringService
Definition: SiStripCommissioningSource_FromEDM_cfg.py:49
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:214
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:478
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:303
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:190
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:351
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:273
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:226
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:254
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:211
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:368
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS)
Definition: EvFDaqDirector.cc:1022
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:888
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1103
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:705
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2121
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:224
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:194
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::bu_r_lock_stream
FILE * bu_r_lock_stream
Definition: EvFDaqDirector.h:225
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1910