CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
EvFDaqDirector::FileStatus contactFileBroker (unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, int maxLS)
 
void createBoLSFile (const uint32_t lumiSection, bool checkIfExists) const
 
void createLumiSectionFiles (const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getFFFParamsFilePathOnBU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
unsigned int getLumisectionToStart () const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
FileStatus getNextFromFileBroker (const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
int grabNextJsonFile (std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
 
int grabNextJsonFileAndUnlock (boost::filesystem::path const &jsonSourcePath)
 
void initRun ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (unsigned int ls, unsigned int index)
 
void removeFile (std::string)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
bool useFileBroker () const
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBu_
 
DirManager dirManager_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
 
unsigned int eolsNFilesIndex_ = 1
 
std::string fileBrokerHost_
 
bool fileBrokerKeepAlive_
 
std::string fileBrokerPort_
 
bool fileBrokerUseLocalLock_
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
std::string fulockfile_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
boost::asio::io_service io_service_
 
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
unsigned int nStreams_ =0
 
unsigned int nThreads_ =0
 
bool outputAdler32Recheck_
 
std::string pid_
 
unsigned long previousFileSize_
 
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_nstring_
 
std::string run_string_
 
std::string selectedTransferMode_
 
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
 
unsigned int startFromLS_ = 1
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 
bool useFileBroker_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"","DAT","PB","JSNDATA"}
 

Detailed Description

Definition at line 62 of file EvFDaqDirector.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 39 of file EvFDaqDirector.cc.

References endpoint_iterator_, Exception, fileBrokerHost_, fileBrokerPort_, fileBrokerUseLocalLock_, fuLockPollInterval_, hostname_, recoMuon::in, io_service_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), query_, resolver_, socket_, startFromLS_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, useFileBroker_, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

40  :
41  base_dir_(pset.getUntrackedParameter<std::string> ("baseDir", ".")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string> ("buBaseDir", ".")),
43  directorBu_(pset.getUntrackedParameter<bool> ("directorIsBu", false)),
44  run_(pset.getUntrackedParameter<unsigned int> ("runNumber",0)),
45  useFileBroker_(pset.getUntrackedParameter<bool> ("useFileBroker", false)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost","")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort","8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock",true)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck",false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet",false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode","")),
53  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory","")),
54  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval",2000)),
55  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergeTypePset","")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
67  bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
68  bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
69  bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
70  bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
71  fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
72  fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
73  {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname,32);
83  hostname_ = hostname;
84 
85  char * fuLockPollIntervalPtr = getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_=boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_ << " us";
90  }
91  catch( boost::bad_lexical_cast const& ) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char * fileBrokerParamPtr = getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_=(boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr)))>0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  }
103  catch( boost::bad_lexical_cast const& ) {
104  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
105  }
106  }
107  if (useFileBroker_) {
108  if (fileBrokerHost_.empty()) {
109  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config",&buf)==0) {
112  std::ifstream busconfig("/etc/appliance/bus.config",std::ifstream::in);
113  std::getline(busconfig,fileBrokerHost_);
114  }
115  }
116  if (!fileBrokerHost_.empty()) {
117  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
118  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_); //default port
119  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
120  socket_=std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
121  }
122  else {
123  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
124  }
125  }
126 
127  char * startFromLSPtr = getenv("FFF_STARTFROMLS");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_=boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  }
133  catch( boost::bad_lexical_cast const& ) {
134  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
135  }
136  }
137 
138  //override file service parameter if specified by environment
139  char * fileBrokerUseLockParamPtr = getenv("FFF_FILEBROKERUSELOCALLOCK");
140  if (fileBrokerUseLockParamPtr) {
141  try {
142  fileBrokerUseLocalLock_=(boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr)))>0;
143  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: " << fileBrokerUseLocalLock_;
144  }
145  catch( boost::bad_lexical_cast const& ) {
146  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
147  }
148  }
149 
150  }
struct flock bu_w_fulk
T getUntrackedParameter(std::string const &, T const &) const
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string bu_base_dir_
std::string selectedTransferMode_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
std::string fileBrokerPort_
void preallocate(edm::service::SystemBounds const &bounds)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
struct flock bu_r_flk
evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 311 of file EvFDaqDirector.cc.

References RecoEcal_EventContent_cff::ec, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, socket_, unlockFULocal(), and unlockFULocal2().

312  {
313 
314  //close server connection
315  if (socket_.get() && socket_->is_open()) {
316  boost::system::error_code ec;
317  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
318  socket_->close(ec);
319  }
320 
321  if (fulocal_rwlock_fd_!=-1) {
322  unlockFULocal();
323  close(fulocal_rwlock_fd_);
324  }
325 
326  if (fulocal_rwlock_fd2_!=-1) {
327  unlockFULocal2();
328  close(fulocal_rwlock_fd2_);
329  }
330 
331  }
std::unique_ptr< boost::asio::ip::tcp::socket > socket_

Member Function Documentation

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 78 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), and grabNextJsonFileAndUnlock().

78 {return run_dir_;}
std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 79 of file EvFDaqDirector.h.

79 {return bu_run_dir_;}
std::string bu_run_dir_
std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 80 of file EvFDaqDirector.h.

80 {return bu_run_open_dir_;}
std::string bu_run_open_dir_
bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
int  maxLS 
)
private

Definition at line 797 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), eostools::ls(), previousFileSize_, trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

797  {
798 
799  if (previousFileSize_ != 0) {
800  if (!fms_) {
801  fms_ = (FastMonitoringService *) (edm::Service<evf::MicroStateService>().operator->());
802  }
804  previousFileSize_ = 0;
805  }
806 
807  //reached limit
808  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
809 
810  struct stat buf;
811  std::stringstream ss;
812  unsigned int nextIndex = index;
813  nextIndex++;
814 
815  // 1. Check suggested file
816  nextFile = getInputJsonFilePath(ls,index);
817  if (stat(nextFile.c_str(), &buf) == 0) {
818 
819  previousFileSize_ = buf.st_size;
820  fsize = buf.st_size;
821  return true;
822  }
823  // 2. No file -> lumi ended? (and how many?)
824  else {
825  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
826  bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
827  while (eolFound) {
828 
829  // recheck that no raw file appeared in the meantime
830  if (stat(nextFile.c_str(), &buf) == 0) {
831  previousFileSize_ = buf.st_size;
832  fsize = buf.st_size;
833  return true;
834  }
835 
836  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
837  if (indexFilesInLS < 0)
838  //parsing failed
839  return false;
840  else {
841  //check index
842  if ((int)index<indexFilesInLS) {
843  //we have 2 files, and check for 1 failed... retry (2 will never be here)
844  edm::LogError("EvFDaqDirector") << "Potential miss of index file in LS -: " << ls << ". Missing "
845  << nextFile << " because " << indexFilesInLS-1 << " is the highest index expected. Will not update fu.lock file";
846  return false;
847  }
848  }
849  // this lumi ended, check for files
850  ++ls;
851  index = 0;
852 
853  //reached limit
854  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
855 
856  nextFile = getInputJsonFilePath(ls,0);
857  if (stat(nextFile.c_str(), &buf) == 0) {
858  // a new file was found at new lumisection, index 0
859  previousFileSize_ = buf.st_size;
860  fsize = buf.st_size;
861  return true;
862  }
863  else {
864  //change of policy: we need to cycle through each LS
865  return false;
866  }
867  BUEoLSFile = getEoLSFilePathOnBU(ls);
868  eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
869  }
870  }
871  // no new file found
872  return false;
873  }
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
unsigned long previousFileSize_
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
evf::FastMonitoringService * fms_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 1750 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), BPhysicsValidation_cfi::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

1751  {
1752  if (mergeTypePset_.empty()) return;
1753  if(!mergeTypeMap_.empty()) return;
1754  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1755  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_,true))
1756  {
1757  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1758  for (std::string pname : tsPset.getParameterNames()) {
1759  std::string streamType = tsPset.getParameter<std::string>(pname);
1760  tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1761  mergeTypeMap_.insert(ac,pname);
1762  ac->second = streamType;
1763  ac.release();
1764  }
1765  }
1766  }
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:185
ParameterSet const & getParameterSet(ParameterSetID const &id)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string mergeTypePset_
ParameterSet const & getParameterSet(std::string const &) const
void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 1647 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

1648  {
1649  if(transferSystemJson_) return;
1650 
1651  transferSystemJson_.reset(new Json::Value);
1652  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1653  if (topPset.existsAs<edm::ParameterSet>("transferSystem",true))
1654  {
1655  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1656 
1657  Json::Value destinationsVal(Json::arrayValue);
1658  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1659  for (auto & dest: destinations) destinationsVal.append(dest);
1660  (*transferSystemJson_)["destinations"]=destinationsVal;
1661 
1662  Json::Value modesVal(Json::arrayValue);
1663  std::vector<std::string> modes = tsPset.getParameter< std::vector<std::string> >("transferModes");
1664  for (auto & mode: modes) modesVal.append(mode);
1665  (*transferSystemJson_)["transferModes"]=modesVal;
1666 
1667  for (auto psKeyItr =tsPset.psetTable().begin();psKeyItr!=tsPset.psetTable().end(); ++ psKeyItr) {
1668  if (psKeyItr->first!="destinations" && psKeyItr->first!="transferModes") {
1669  const edm::ParameterSet & streamDef = tsPset.getParameterSet(psKeyItr->first);
1670  Json::Value streamVal;
1671  for (auto & mode : modes) {
1672  //validation
1673  if (!streamDef.existsAs<std::vector<std::string>>(mode,true))
1674  throw cms::Exception("EvFDaqDirector") << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode << ")";
1675  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1676 
1677  Json::Value sDestsValue(Json::arrayValue);
1678 
1679  if (streamDestinations.empty())
1680  throw cms::Exception("EvFDaqDirector") << " Missing transter system destination(s) for -: "<< psKeyItr->first << ", mode:" << mode;
1681 
1682  for (auto & sdest:streamDestinations) {
1683  bool sDestValid=false;
1684  sDestsValue.append(sdest);
1685  for (auto & dest: destinations) {
1686  if (dest==sdest) sDestValid=true;
1687  }
1688  if (!sDestValid)
1689  throw cms::Exception("EvFDaqDirector") << " Invalid transter system destination specified for -: "<< psKeyItr->first << ", mode:" << mode << ", dest:"<<sdest;
1690  }
1691  streamVal[mode]=sDestsValue;
1692  }
1693  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1694  }
1695  }
1696  }
1697  else {
1698  if (requireTSPSet_)
1699  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1700  }
1701  }
T getParameter(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:185
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(ParameterSetID const &id)
Represents a JSON value.
Definition: value.h:111
ParameterSet const & getParameterSet(std::string const &) const
array value (ordered list)
Definition: value.h:31
EvFDaqDirector::FileStatus evf::EvFDaqDirector::contactFileBroker ( unsigned int &  serverHttpStatus,
bool &  serverState,
uint32_t &  serverLS,
uint32_t &  closedServerLS,
std::string &  nextFileJson,
std::string &  nextFileRaw,
int  maxLS 
)

Definition at line 1248 of file EvFDaqDirector.cc.

References bu_run_dir_, DBConfiguration_cff::connect, MillePedeFileConverter_cfg::e, RecoEcal_EventContent_cff::ec, endpoint_iterator_, cppFunctionSkipper::exception, fileBrokerHost_, fileBrokerKeepAlive_, RecoTauValidation_cfi::header, SiStripPI::max, newFile, noFile, callgraph::path, pid_, run_nstring_, runEnded, socket_, AlCaHLTBitMon_QueryRunRegistry::string, and TriggerAnalyzer::write().

Referenced by getNextFromFileBroker().

1250  {
1251 
1252  EvFDaqDirector::FileStatus fileStatus = noFile;
1253  serverError = false;
1254 
1255  boost::system::error_code ec;
1256  try {
1257  while (true) {
1258 
1259  //socket connect
1260  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1262 
1263  if (ec) {
1264  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1265  serverError = true;
1266  break;
1267  }
1268  }
1269 
1270  boost::asio::streambuf request;
1271  std::ostream request_stream(&request);
1272  std::string path = "/popfile?runnumber="+run_nstring_+"&pid="+pid_;
1273  if (maxLS>=0) {
1274  std::stringstream spath;
1275  spath << path << "&stopls=" << maxLS;
1276  path = spath.str();
1277  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1278  }
1279  request_stream << "GET " << path << " HTTP/1.1\r\n";
1280  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1281  request_stream << "Accept: */*\r\n";
1282  request_stream << "Connection: keep-alive\r\n\r\n";
1283 
1284  boost::asio::write(*socket_, request,ec);
1285  if (ec) {
1286  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1287  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1288  //we got disconnected, try to reconnect to the server before writing the request
1290  if (ec) {
1291  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1292  serverError = true;
1293  break;
1294  }
1295  continue;
1296  }
1297  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1298  serverError = true;
1299  break;
1300  }
1301 
1302  boost::asio::streambuf response;
1303  boost::asio::read_until(*socket_, response, "\r\n",ec);
1304  if (ec) {
1305  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1306  serverError = true;
1307  break;
1308  }
1309 
1310  std::istream response_stream(&response);
1311 
1312  std::string http_version;
1313  response_stream >> http_version;
1314 
1315  response_stream >> serverHttpStatus;
1316 
1317  std::string status_message;
1318  std::getline(response_stream, status_message);
1319  if (!response_stream || http_version.substr(0, 5) != "HTTP/")
1320  {
1321  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1322  serverError = true;
1323  break;
1324  }
1325  if (serverHttpStatus != 200)
1326  {
1327  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1328  serverError = true;
1329  break;
1330  }
1331 
1332  // Process the response headers.
1334  while (std::getline(response_stream, header) && header != "\r") {
1335  }
1336 
1337  std::string fileInfo;
1338  std::map<std::string,std::string> serverMap;
1339  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1340  auto pos=fileInfo.find("=");
1341  if (pos==std::string::npos) continue;
1342  auto stitle = fileInfo.substr(0,pos);
1343  auto svalue = fileInfo.substr(pos+1);
1344  serverMap[stitle]=svalue;
1345  }
1346 
1347  //check that response run number if correct
1348  auto server_version = serverMap.find("version");
1349  assert(server_version!=serverMap.end());
1350 
1351  auto server_run = serverMap.find("runnumber");
1352  assert(server_run!=serverMap.end());
1353  assert(run_nstring_==server_run->second);
1354 
1355  auto server_state = serverMap.find("state");
1356  assert(server_state!=serverMap.end());
1357 
1358  auto server_eols = serverMap.find("lasteols");
1359  assert(server_eols!=serverMap.end());
1360 
1361  auto server_ls = serverMap.find("lumisection");
1362 
1363 
1364  closedServerLS = (uint64_t)std::max(0,atoi(server_eols->second.c_str()));
1365  if (server_ls!=serverMap.end())
1366  serverLS = (uint64_t)std::max(1,atoi(server_ls->second.c_str()));
1367  else
1368  serverLS = closedServerLS+1;
1369 
1370  std::string s_state = server_state->second;
1371  if (s_state == "STARTING") //initial, always empty starting with LS 1
1372  {
1373  auto server_file = serverMap.find("file");
1374  assert(server_file==serverMap.end());//no file with starting state
1375  fileStatus=noFile;
1376  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1377  }
1378  else if (s_state=="READY")
1379  {
1380  auto server_file = serverMap.find("file");
1381  if (server_file==serverMap.end()) {
1382  //can be returned by server if files from new LS already appeared but LS is not yet closed
1383  if (serverLS<=closedServerLS) serverLS=closedServerLS+1;
1384  fileStatus=noFile;
1385  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1386  }
1387  else {
1388  std::string filestem;
1389  std::string fileprefix;
1390  auto server_fileprefix = serverMap.find("fileprefix");
1391 
1392  if (server_fileprefix!=serverMap.end()) {
1393  auto pssize = server_fileprefix->second.size();
1394  if (pssize>1 && server_fileprefix->second[0]=='"' && server_fileprefix->second[pssize-1]=='"')
1395  fileprefix = server_fileprefix->second.substr(1,pssize-2);
1396  else
1397  fileprefix = server_fileprefix->second;
1398  }
1399 
1400  //remove string literals
1401  auto ssize = server_file->second.size();
1402  if (ssize>1 && server_file->second[0]=='"' && server_file->second[ssize-1]=='"')
1403  filestem = server_file->second.substr(1,ssize-2);
1404  else
1405  filestem = server_file->second;
1406  assert(!filestem.empty());
1407  nextFileRaw = bu_run_dir_ +"/" + filestem+".raw";//raw files are not moved
1408  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1409  nextFileJson = filestem + ".jsn";
1410  fileStatus=newFile;
1411  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS " << serverLS << " file:" << filestem;
1412  }
1413  }
1414  else if (s_state== "EOLS") {
1415  serverLS=closedServerLS+1;
1416  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1417  fileStatus=noFile;
1418  }
1419  else if (s_state=="EOR") {
1420  //server_eor = serverMap.find("iseor");
1421  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1422  fileStatus=runEnded;
1423  }
1424  else if (s_state=="NORUN") {
1425  auto err_msg = serverMap.find("errormessage");
1426  if (err_msg!=serverMap.end())
1427  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1428  else
1429  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1430  edm::LogWarning("EvFDaqDirector") << "executing run end";
1431  fileStatus=runEnded;
1432  }
1433  else if (s_state=="ERROR") {
1434  auto err_msg = serverMap.find("errormessage");
1435  if (err_msg!=serverMap.end())
1436  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1437  else
1438  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1439  fileStatus=noFile;
1440  serverError=true;
1441  }
1442  else {
1443  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1444  fileStatus=noFile;
1445  serverError=true;
1446  }
1447 
1448  // Read until EOF, writing data to output as we go.
1449  if (!fileBrokerKeepAlive_) {
1450  while (boost::asio::read(*socket_, response,
1451  boost::asio::transfer_at_least(1), ec)) {
1452  }
1453  if (ec != boost::asio::error::eof) {
1454  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1455  serverError = true;
1456  }
1457  }
1458  break;
1459  }
1460  }
1461  catch (std::exception const& e)
1462  {
1463  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1464  serverError= true;
1465  }
1466 
1467  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1468  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1469  if (ec) {
1470  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1471  }
1472  socket_->close(ec);
1473  if (ec) {
1474  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1475  }
1476  }
1477 
1478  if (serverError) {
1479  if (socket_->is_open()) socket_->close(ec);
1480  if (ec) {
1481  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1482  }
1483  fileStatus = noFile;
1484  sleep(1);//back-off if error detected
1485  }
1486  return fileStatus;
1487  }
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string run_nstring_
unsigned long long uint64_t
Definition: Time.h:15
std::string bu_run_dir_
def write(self, setup)
std::string fileBrokerHost_
void evf::EvFDaqDirector::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
) const

Definition at line 936 of file EvFDaqDirector.cc.

References getBoLSFilePathOnFU(), trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by createLumiSectionFiles(), and FedRawDataInputSource::maybeOpenNewLumiSection().

937  {
938  //used for backpressure mechanisms and monitoring
939  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
940  struct stat buf;
941  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
942  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
943  close(bol_fd);
944  }
945  }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
void evf::EvFDaqDirector::createLumiSectionFiles ( const uint32_t  lumiSection,
const uint32_t  currentLumiSection,
bool  doCreateBoLS = true 
)

Definition at line 947 of file EvFDaqDirector.cc.

References createBoLSFile(), runEdmFileComparison::found, getEoLSFilePathOnFU(), trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by getNextFromFileBroker().

947  {
948  if ( currentLumiSection > 0) {
949  const std::string fuEoLS =
950  getEoLSFilePathOnFU(currentLumiSection);
951  struct stat buf;
952  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
953  if ( !found ) {
954  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
955  close(eol_fd);
956  if (doCreateBoLS) createBoLSFile(lumiSection,false);
957  }
958  }
959  else if (doCreateBoLS) {
960  createBoLSFile(lumiSection,true);//needed for initial lumisection
961  }
962  }
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 1783 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNextEvent().

1783  {
1784  std::string proc_flag = run_dir_ + "/processing";
1785  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1786  close(proc_flag_fd);
1787  }
void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 1604 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and callgraph::path.

Referenced by initRun().

1604  {
1605  // create open dir if not already there
1606 
1608  if (!boost::filesystem::is_directory(openPath)) {
1609  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1610  boost::filesystem::create_directories(openPath);
1611  }
1612  }
#define LogDebug(id)
std::string getRunOpenDirPath() const
std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private
std::string evf::EvFDaqDirector::eorFileName ( ) const
private
void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 342 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and AlCaHLTBitMon_QueryRunRegistry::string.

343  {
345  desc.setComment("Service used for file locking arbitration and for propagating information between other EvF components");
346  desc.addUntracked<std::string> ("baseDir", ".")->setComment("Local base directory for run output");
347  desc.addUntracked<std::string> ("buBaseDir", ".")->setComment("BU base ramdisk directory ");
348  desc.addUntracked<unsigned int> ("runNumber",0)->setComment("Run Number in ramdisk to open");
349  desc.addUntracked<bool> ("useFileBroker", false)->setComment("Use BU file service to grab input data instead of NFS file locking");
350  desc.addUntracked<std::string> ("fileBrokerHost", "")->setComment("BU file service host");
351  desc.addUntracked<std::string> ("fileBrokerPort", "8080")->setComment("BU file service port");
352  desc.addUntracked<bool> ("fileBrokerKeepAlive", true)->setComment("Use keep alive to avoid using large number of sockets");
353  desc.addUntracked<bool> ("fileBrokerUseLocalLock", true)->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
354  desc.addUntracked<bool>("outputAdler32Recheck",false)->setComment("Check Adler32 of per-process output files while micro-merging");
355  desc.addUntracked<bool>("requireTransfersPSet",false)->setComment("Require complete transferSystem PSet in the process configuration");
356  desc.addUntracked<std::string>("selectedTransferMode","")->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
357  desc.addUntracked<unsigned int>("fuLockPollInterval",2000)->setComment("Lock polling interval in microseconds for the input directory file lock");
358  desc.addUntracked<std::string>("mergingPset","")->setComment("Name of merging PSet to look for merging type definitions for streams");
359  desc.setAllowAnything();
360  descriptions.add("EvFDaqDirector", desc);
361  }
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 83 of file EvFDaqDirector.h.

References eostools::ls(), and AlCaHLTBitMon_QueryRunRegistry::string.

83 { return dirManager_.findRunDir(run_);}
std::string findRunDir(unsigned int)
Definition: DirManager.cc:37
std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 514 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), run_, and run_dir_.

Referenced by createBoLSFile().

514  {
515  return run_dir_ + "/" + fffnaming::bolsFileName(run_,ls);
516  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 450 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerDataFileNameWithPid().

450  {
452  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 506 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNextEvent().

506  {
507  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
508  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 510 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNextEvent(), createLumiSectionFiles(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

510  {
511  return run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
512  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 518 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

518  {
519  return bu_run_dir_ + "/" + fffnaming::eorFileName(run_);
520  }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 522 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNextEvent().

522  {
523  return run_dir_ + "/" + fffnaming::eorFileName(run_);
524  }
std::string eorFileName(const unsigned int run)
std::string evf::EvFDaqDirector::getFFFParamsFilePathOnBU ( ) const

Definition at line 526 of file EvFDaqDirector.cc.

References bu_run_dir_.

526  {
527  return bu_run_dir_ + "/hlt/fffParameters.jsn";
528  }
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 478 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, and run_dir_.

Referenced by DQMFileSaver::globalBeginRun().

478  {
479  return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_,0,stream);
480  }
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 434 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), and run_.

Referenced by bumpFile().

434  {
436  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
unsigned int evf::EvFDaqDirector::getLumisectionToStart ( ) const

Definition at line 1631 of file EvFDaqDirector.cc.

References run_dir_, run_string_, startFromLS_, trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::readSupervisor().

1631  {
1632  std::string fileprefix = run_dir_ +"/"+run_string_+"_ls";
1633  std::string fullpath;
1634  struct stat buf;
1635  unsigned int lscount=startFromLS_;
1636  do {
1637  std::stringstream ss;
1638  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1639  fullpath = ss.str();
1640  lscount++;
1641  }
1642  while(stat(fullpath.c_str(),&buf)==0);
1643  return lscount-1;
1644  }
std::string run_string_
unsigned int startFromLS_
std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 470 of file EvFDaqDirector.cc.

References hostname_, run_, run_dir_, and fffnaming::streamerDataChecksumFileNameWithInstance().

470  {
472  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 466 of file EvFDaqDirector.cc.

References hostname_, run_, run_dir_, and fffnaming::streamerDataFileNameWithInstance().

466  {
468  }
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 490 of file EvFDaqDirector.cc.

References hostname_, fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, and run_dir_.

490  {
492  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 502 of file EvFDaqDirector.cc.

References hostname_, fffnaming::rootHistogramFileNameWithInstance(), run_, and run_dir_.

502  {
504  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
EvFDaqDirector::FileStatus evf::EvFDaqDirector::getNextFromFileBroker ( const unsigned int  currentLumiSection,
unsigned int &  ls,
std::string &  nextFile,
int &  serverEventsInNewFile_,
int64_t &  fileSize,
uint64_t &  thisLockWaitTimeUs 
)

Definition at line 1490 of file EvFDaqDirector.cc.

References bu_run_dir_, contactFileBroker(), createLumiSectionFiles(), fileBrokerUseLocalLock_, grabNextJsonFile(), mps_fire::i, lockFULocal2(), eostools::ls(), SiStripPI::max, newFile, noFile, readLastLSEntry(), runEnded, trackingPlots::stat, stop_ls_override_, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, mitigatedMETSequence_cff::U, and unlockFULocal2().

Referenced by FedRawDataInputSource::readSupervisor().

1491  {
1492 
1493  EvFDaqDirector::FileStatus fileStatus = noFile;
1494 
1495  //int retval = -1;
1496  //int lock_attempts = 0;
1497  //long total_lock_attempts = 0;
1498 
1499  struct stat buf;
1500  int stopFileLS = -1;
1501  int stopFileCheck = stat(stopFilePath_.c_str(),&buf);
1502  int stopFilePidCheck = stat(stopFilePathPid_.c_str(),&buf);
1503  if (stopFileCheck==0 || stopFilePidCheck==0) {
1504  if (stopFileCheck==0)
1505  stopFileLS = readLastLSEntry(stopFilePath_);
1506  else
1507  stopFileLS = 1;//stop without drain if only pid is stopped
1508  if (!stop_ls_override_) {
1509  //if lumisection is higher than in stop file, should quit at next from current
1510  if (stopFileLS>=0 && (int)ls>=stopFileLS) stopFileLS = stop_ls_override_ = ls;
1511  }
1512  else stopFileLS = stop_ls_override_;
1513  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
1514  //return runEnded;
1515  }
1516  else //if file was removed before reaching stop condition, reset this
1517  stop_ls_override_ = 0;
1518 
1519  /* look for EoLS
1520  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1521  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1522  ls++;
1523  return noFile;
1524  }
1525  */
1526 
1527 
1528  timeval ts_lockbegin;
1529  gettimeofday(&ts_lockbegin,nullptr);
1530 
1531  std::string nextFileJson;
1532  uint32_t serverLS, closedServerLS;
1533  unsigned int serverHttpStatus;
1534  bool serverError;
1535 
1536  //local lock to force index json and EoLS files to appear in order
1538  lockFULocal2();
1539 
1540  int maxLS = stopFileLS < 0 ? -1: std::max(stopFileLS,(int)currentLumiSection);
1541  fileStatus = contactFileBroker(serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, maxLS);
1542 
1543  if (serverError) {
1544  //do not update anything
1546  unlockFULocal2();
1547  return noFile;
1548  }
1549 
1550  //handle creation of EoLS and BoLS files if lumisection has changed
1551  if (currentLumiSection==0) {
1552  if (fileStatus == runEnded) {
1553  createLumiSectionFiles(closedServerLS,0);
1554  createLumiSectionFiles(serverLS,closedServerLS,false); // +1
1555  }
1556  else
1557  createLumiSectionFiles(serverLS,0);
1558  } else {
1559  //loop over and create any EoLS files missing
1560  if (closedServerLS>=currentLumiSection) {
1561  for (uint32_t i=std::max(currentLumiSection,1U);i<=closedServerLS;i++)
1563  }
1564  }
1565 
1566  bool fileFound=true;
1567 
1568  if (fileStatus == newFile)
1569  serverEventsInNewFile = grabNextJsonFile(nextFileJson,nextFileRaw,fileSizeFromJson,fileFound);
1570 
1571  if (!fileFound) {
1572  //catch condition where directory got deleted
1573  fileStatus = noFile;
1574  struct stat buf;
1575  if (stat(bu_run_dir_.c_str(),&buf)!=0) {
1576  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1577  fileStatus=runEnded;
1578  }
1579  }
1580 
1581  //can unlock because all files have been created locally
1583  unlockFULocal2();
1584 
1585 
1586  if (fileStatus==runEnded)
1587  ls = std::max(currentLumiSection,serverLS);
1588  else if (fileStatus==newFile) {
1589  assert(serverLS>=ls);
1590  ls = serverLS;
1591  }
1592  else if (fileStatus==noFile) {
1593  if (serverLS>=ls)
1594  ls = serverLS;
1595  else {
1596  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS << " which is smaller than currently open LS " << ls << ". Ignoring response";
1597  sleep(1);
1598  }
1599  }
1600 
1601  return fileStatus;
1602  }
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, int maxLS)
std::string stopFilePath_
unsigned int stop_ls_override_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 736 of file EvFDaqDirector.cc.

References bu_run_dir_, data, def, jsoncollector::DataPoint::deserialize(), eolsNFilesIndex_, jsoncollector::DataPoint::getData(), jsoncollector::DataPoint::getDefinition(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, Json::Reader::parse(), readEolsDefinition_, matplotRender::reader, trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

736  {
737 
738  boost::filesystem::ifstream ij(BUEoLSFile);
739  Json::Value deserializeRoot;
741 
742  if (!reader.parse(ij, deserializeRoot)) {
743  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
744  return -1;
745  }
746 
748  DataPoint dp;
749  dp.deserialize(deserializeRoot);
750 
751  //read definition
752  if (readEolsDefinition_) {
753  //std::string def = boost::algorithm::trim(dp.getDefinition());
755  if (def.empty()) readEolsDefinition_=false;
756  while (!def.empty()) {
757  std::string fullpath;
758  if (def.find('/')==0)
759  fullpath = def;
760  else
761  fullpath = bu_run_dir_+'/'+def;
762  struct stat buf;
763  if (stat(fullpath.c_str(), &buf) == 0) {
764  DataPointDefinition eolsDpd;
765  std::string defLabel = "legend";
766  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
767  if (eolsDpd.getNames().empty()) {
768  //try with "data" label if "legend" format is not used
769  eolsDpd = DataPointDefinition();
770  defLabel="data";
771  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
772  }
773  for (unsigned int i=0;i<eolsDpd.getNames().size();i++)
774  if (eolsDpd.getNames().at(i)=="NFiles")
776  readEolsDefinition_=false;
777  break;
778  }
779  //check if we can still find definition
780  if (def.size()<=1 || def.find('/')==std::string::npos) {
781  readEolsDefinition_=false;
782  break;
783  }
784  def = def.substr(def.find('/')+1);
785  }
786  }
787 
788  if (dp.getData().size()>eolsNFilesIndex_)
789  data = dp.getData()[eolsNFilesIndex_];
790  else {
791  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
792  return -1;
793  }
794  return boost::lexical_cast<int>(data);
795  }
std::vector< std::string > & getData()
Definition: DataPoint.h:58
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
unsigned int eolsNFilesIndex_
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
std::string & getDefinition()
Definition: DataPoint.h:59
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
JetCorrectorParameters::Definitions def
Definition: classes.h:6
std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 454 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerDataFileNameWithPid().

454  {
455  return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_,ls,stream);
456  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 474 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, and run_dir_.

474  {
475  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_,0,stream);
476  }
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 446 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), and run_.

446  {
448  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 458 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerJsonFileNameWithPid().

458  {
459  return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_,ls,stream);
460  }
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 482 of file EvFDaqDirector.cc.

References fffnaming::protocolBufferHistogramFileNameWithPid(), run_, and run_dir_.

482  {
484  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 442 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), and run_.

442  {
443  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_,ls,index);
444  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 494 of file EvFDaqDirector.cc.

References fffnaming::rootHistogramFileNameWithPid(), run_, and run_dir_.

494  {
495  return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_,ls,stream);
496  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 462 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerJsonFileNameWithPid().

462  {
464  }
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:349
std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 486 of file EvFDaqDirector.cc.

References fffnaming::protocolBufferHistogramFileNameWithPid(), run_, and run_dir_.

486  {
488  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 438 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), and run_.

Referenced by removeFile().

438  {
440  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 498 of file EvFDaqDirector.cc.

References fffnaming::rootHistogramFileNameWithPid(), run_, and run_dir_.

498  {
500  }
def ls(path, rec=False)
Definition: eostools.py:349
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 115 of file EvFDaqDirector.h.

115 { return run_; }
std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 108 of file EvFDaqDirector.h.

Referenced by createRunOpendirMaybe(), and initRun().

108 {return run_dir_ +"/open";}
std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 1703 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, selectedTransferMode_, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

1704  {
1705  std::string streamRequestName;
1706  if (transferSystemJson_->isMember(stream.c_str()))
1707  streamRequestName = stream;
1708  else {
1709  std::stringstream msg;
1710  msg << "Transfer system mode definitions missing for -: " << stream;
1711  if (requireTSPSet_)
1712  throw cms::Exception("EvFDaqDirector") << msg.str();
1713  else {
1714  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1715  return std::string("Failsafe");
1716  }
1717  }
1718  //return empty if strict check parameter is not on
1719  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_=="null")) {
1720  edm::LogWarning("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter."
1721  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1722  return std::string("Failsafe");
1723  }
1724  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_=="null")) {
1725  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1726  }
1727  //check if stream has properly listed transfer stream
1728  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str()))
1729  {
1730  std::stringstream msg;
1731  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1732  if (requireTSPSet_)
1733  throw cms::Exception("EvFDaqDirector") << msg.str();
1734  else
1735  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1736  return std::string("Failsafe");
1737  }
1738  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_,"");
1739 
1740  //flatten string json::Array into CSV std::string
1741  std::string ret;
1742  for (Json::Value::iterator it = destsVec.begin(); it!=destsVec.end(); it++)
1743  {
1744  if (!ret.empty()) ret +=",";
1745  ret+=(*it).asString();
1746  }
1747  return ret;
1748  }
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
Represents a JSON value.
Definition: value.h:111
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:278
const_iterator end() const
Iterator for object and array value.
Definition: value.h:1007
std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 1768 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, and AlCaHLTBitMon_QueryRunRegistry::string.

1769  {
1770  tbb::concurrent_hash_map<std::string,std::string>::const_accessor search_ac;
1771  if (mergeTypeMap_.find(search_ac,stream))
1772  return search_ac->second;
1773 
1774  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
1775  std::string defaultName = MergeTypeNames_[defaultType];
1776  tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1777  mergeTypeMap_.insert(ac,stream);
1778  ac->second = defaultName;
1779  ac.release();
1780  return defaultName;
1781  }
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_
int evf::EvFDaqDirector::grabNextJsonFile ( std::string const &  jsonSourcePath,
std::string const &  rawSourcePath,
int64_t &  fileSizeFromJson,
bool &  fileFound 
)

Definition at line 964 of file EvFDaqDirector.cc.

References baseRunDir(), data, jsoncollector::DataPoint::deserialize(), dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, lumiCalc2::outfile, Json::Reader::parse(), callgraph::path, pid_, matplotRender::reader, mps_fire::result, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and TriggerAnalyzer::write().

Referenced by getNextFromFileBroker().

965  {
966  fileFound=true;
967 
968  //should be ported to use fffnaming
969  std::ostringstream fileNameWithPID;
970  fileNameWithPID << boost::filesystem::path(rawSourcePath).stem().string() << "_pid"
971  << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
972 
973  // assemble json destination path
974  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
975 
976  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
977 
978  int infile=-1, outfile=-1;
979 
980  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
981  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : " << strerror(errno);
982  usleep(100000);
983  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
984  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: " << jsonSourcePath << " : " << strerror(errno);
985  if (errno==ENOENT) fileFound=false;
986  return -1;
987  }
988  }
989 
990  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
991  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
992  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0)
993  {
994  if (errno==EEXIST) {
995  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath << " : ";
996  ::close(infile);
997  return -1;
998  }
999  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : " << strerror(errno);
1000  usleep(100000);
1001  struct stat out_stat;
1002  if (stat(jsonDestPath.c_str(),&out_stat)==0) {
1003  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1004  if (unlink(jsonDestPath.c_str())==-1) {
1005  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : "<< strerror(errno);
1006  }
1007  }
1008  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0) {
1009  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: " << jsonDestPath << " : " << strerror(errno);
1010  ::close(infile);
1011  return -1;
1012  }
1013  }
1014  //copy contents
1015  const std::size_t buf_sz = 512;
1016  std::size_t tot_written = 0;
1017  std::unique_ptr<char> buf(new char [buf_sz]);
1018 
1019  ssize_t sz, sz_read=1, sz_write;
1020  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0)
1021  {
1022  sz_write = 0;
1023  do {
1024  assert(sz_read - sz_write > 0);
1025  if ((sz = ::write(outfile, buf.get() + sz_write,sz_read - sz_write)) < 0)
1026  {
1027  sz_read = sz; // cause read loop termination
1028  break;
1029  }
1030  assert(sz > 0);
1031  sz_write += sz;
1032  tot_written+=sz;
1033  } while (sz_write < sz_read);
1034  }
1035  close(infile);
1036  close(outfile);
1037 
1038  if (tot_written>0) {
1039  //leave file if it was empty for diagnosis
1040  if (unlink(jsonSourcePath.c_str()) == -1) {
1041  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "<< strerror(errno);
1042  return -1;
1043  }
1044  } else {
1045  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: " << jsonSourcePath;
1046  return -1;
1047  }
1048 
1049  Json::Value deserializeRoot;
1051 
1052  std::string data;
1053  std::stringstream ss;
1054  bool result;
1055  try {
1056  if (tot_written<=buf_sz) {
1057  result = reader.parse(buf.get(), deserializeRoot);
1058  }
1059  else {
1060  //json will normally not be bigger than buf_sz bytes
1061  try {
1062  boost::filesystem::ifstream ij(jsonDestPath);
1063  ss << ij.rdbuf();
1064  }
1065  catch (boost::filesystem::filesystem_error const& ex) {
1066  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1067  return -1;
1068  }
1069  result = reader.parse(ss.str(), deserializeRoot);
1070  }
1071  if (!result) {
1072  if (tot_written<=buf_sz) ss << buf.get();
1073  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath
1074  << "\nERROR:\n" << reader.getFormatedErrorMessages()
1075  << "CONTENT:\n" << ss.str()<<".";
1076  return -1;
1077  }
1078 
1079  //read BU JSON
1080  DataPoint dp;
1081  dp.deserialize(deserializeRoot);
1082  bool success = false;
1083  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1084  if (dpd_->getNames().at(i)=="NEvents")
1085  if (i<dp.getData().size()) {
1086  data = dp.getData()[i];
1087  success=true;
1088  break;
1089  }
1090  }
1091  if (!success) {
1092  if (!dp.getData().empty())
1093  data = dp.getData()[0];
1094  else {
1095  edm::LogError("EvFDaqDirector::grabNextJsonFile") << "grabNextJsonFile - " <<
1096  " error reading number of events from BU JSON; No input value. data -: " << data;
1097  return -1;
1098  }
1099  }
1100 
1101  //try to read raw file size
1102  fileSizeFromJson=-1;
1103  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1104  if (dpd_->getNames().at(i)=="NBytes") {
1105  if (i<dp.getData().size()) {
1106  data = dp.getData()[i];
1107  try {
1108  fileSizeFromJson = boost::lexical_cast<long>(data);
1109  }
1110  catch( boost::bad_lexical_cast const& ) {
1111  //non-fatal currently, processing can continue without this value
1112  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1113  << "Input value is -: " << data;
1114  }
1115  break;
1116  }
1117  }
1118  }
1119  return boost::lexical_cast<int>(data);
1120  }
1121  catch( boost::bad_lexical_cast const& e) {
1122  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1123  << "Input value is -: " << data;
1124  }
1125  catch (std::runtime_error const& e)
1126  {
1127  //Can be thrown by Json parser
1128  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1129  }
1130 
1131  catch (std::exception const& e)
1132  {
1133  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1134  }
1135  catch (...)
1136  {
1137  //unknown exception
1138  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1139  }
1140 
1141  return -1;
1142  }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
std::string & baseRunDir()
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
def write(self, setup)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
int evf::EvFDaqDirector::grabNextJsonFileAndUnlock ( boost::filesystem::path const &  jsonSourcePath)

Definition at line 1144 of file EvFDaqDirector.cc.

References baseRunDir(), popcon2dropbox::copy(), data, jsoncollector::DataPoint::deserialize(), dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, Json::Reader::parse(), callgraph::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and unlockFULocal().

Referenced by FedRawDataInputSource::readSupervisor().

1145  {
1146  std::string data;
1147  try {
1148  // assemble json destination path
1149  boost::filesystem::path jsonDestPath(baseRunDir());
1150 
1151  //should be ported to use fffnaming
1152  std::ostringstream fileNameWithPID;
1153  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
1154  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
1155  jsonDestPath /= fileNameWithPID.str();
1156 
1157  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to "
1158  << jsonDestPath;
1159  try {
1160  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
1161  }
1162  catch (boost::filesystem::filesystem_error const& ex)
1163  {
1164  // Input dir gone?
1165  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1166  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1167  sleep(1);
1168  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
1169  }
1170  unlockFULocal();
1171 
1172  try {
1173  //sometimes this fails but file gets deleted
1174  boost::filesystem::remove(jsonSourcePath);
1175  }
1176  catch (boost::filesystem::filesystem_error const& ex)
1177  {
1178  // Input dir gone?
1179  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1180  }
1181  catch (std::exception const& ex)
1182  {
1183  // Input dir gone?
1184  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1185  }
1186 
1187  boost::filesystem::ifstream ij(jsonDestPath);
1188  Json::Value deserializeRoot;
1190 
1191  std::stringstream ss;
1192  ss << ij.rdbuf();
1193  if (!reader.parse(ss.str(), deserializeRoot)) {
1194 
1195  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1196  << "\nERROR:\n" << reader.getFormatedErrorMessages()
1197  << "CONTENT:\n" << ss.str()<<".";
1198  throw std::runtime_error("Cannot deserialize input JSON file");
1199  }
1200 
1201  //read BU JSON
1202  std::string data;
1203  DataPoint dp;
1204  dp.deserialize(deserializeRoot);
1205  bool success = false;
1206  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1207  if (dpd_->getNames().at(i)=="NEvents")
1208  if (i<dp.getData().size()) {
1209  data = dp.getData()[i];
1210  success=true;
1211  }
1212  }
1213  if (!success) {
1214  if (!dp.getData().empty())
1215  data = dp.getData()[0];
1216  else
1217  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock") <<
1218  " error reading number of events from BU JSON -: No input value " << data;
1219  }
1220  return boost::lexical_cast<int>(data);
1221  }
1222  catch (boost::filesystem::filesystem_error const& ex)
1223  {
1224  // Input dir gone?
1225  unlockFULocal();
1226  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1227  }
1228  catch (std::runtime_error const& e)
1229  {
1230  // Another process grabbed the file and NFS did not register this
1231  unlockFULocal();
1232  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1233  }
1234  catch( boost::bad_lexical_cast const& ) {
1235  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1236  << "Input value is -: " << data;
1237  }
1238  catch (std::exception const& e)
1239  {
1240  // BU run directory disappeared?
1241  unlockFULocal();
1242  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1243  }
1244 
1245  return -1;
1246  }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
def copy(args, dbName)
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
std::string & baseRunDir()
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private
void evf::EvFDaqDirector::initRun ( )

Definition at line 152 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, eostools::chmod(), createRunOpendirMaybe(), directorBu_, dpd_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, fulockfile_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), pid_, run_, run_dir_, run_nstring_, run_string_, trackingPlots::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

153  {
154  std::stringstream ss;
155  ss << "run" << std::setfill('0') << std::setw(6) << run_;
156  run_string_ = ss.str();
157  ss = std::stringstream();
158  ss << run_;
159  run_nstring_ = ss.str();
161  ss = std::stringstream();
162  ss << getpid();
163  pid_ = ss.str();
164 
165  // check if base dir exists or create it accordingly
166  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
167  if (retval != 0 && errno != EEXIST) {
168  throw cms::Exception("DaqDirector") << " Error checking for base dir -: "
169  << base_dir_ << " mkdir error:" << strerror(errno);
170  }
171 
172  //create run dir in base dir
173  umask(0);
174  retval = mkdir(run_dir_.c_str(),
175  S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
176  if (retval != 0 && errno != EEXIST) {
177  throw cms::Exception("DaqDirector") << " Error creating run dir -: "
178  << run_dir_ << " mkdir error:" << strerror(errno);
179  }
180 
181  //create fu-local.lock in run open dir
182  if (!directorBu_) {
183 
185  std::string fulocal_lock_ = getRunOpenDirPath() +"/fu-local.lock";
186  fulocal_rwlock_fd_ = open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
187  if (fulocal_rwlock_fd_==-1)
188  throw cms::Exception("DaqDirector") << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
189  chmod(fulocal_lock_.c_str(),0777);
190  fsync(fulocal_rwlock_fd_);
191  //open second fd for another input source thread
192  fulocal_rwlock_fd2_ = open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
193  if (fulocal_rwlock_fd2_==-1)
194  throw cms::Exception("DaqDirector") << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
195  }
196 
197  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
198  //for BU, it is created at this point
199  if (directorBu_)
200  {
202  std::string bulockfile = bu_run_dir_ + "/bu.lock";
203  fulockfile_ = bu_run_dir_ + "/fu.lock";
204 
205  //make or find bu run dir
206  retval = mkdir(bu_run_dir_.c_str(),
207  S_IRWXU | S_IRWXG | S_IRWXO);
208  if (retval != 0 && errno != EEXIST) {
209  throw cms::Exception("DaqDirector")
210  << " Error creating bu run dir -: " << bu_run_dir_
211  << " mkdir error:" << strerror(errno) << "\n";
212  }
213  bu_run_open_dir_ = bu_run_dir_ + "/open";
214  retval = mkdir(bu_run_open_dir_.c_str(),
215  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
216  if (retval != 0 && errno != EEXIST) {
217  throw cms::Exception("DaqDirector") << " Error creating bu run open dir -: "
218  << bu_run_open_dir_ << " mkdir error:" << strerror(errno)
219  << "\n";
220  }
221 
222  // the BU director does not need to know about the fu lock
223  bu_writelock_fd_ = open(bulockfile.c_str(),
224  O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
225  if (bu_writelock_fd_ == -1)
226  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: "
227  << strerror(errno);
228  else
229  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: "
230  << bu_writelock_fd_;
231  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
232  if (bu_w_lock_stream == nullptr)
233  edm::LogWarning("EvFDaqDirector")<< "Error creating write lock stream -: " << strerror(errno);
234 
235  // BU INITIALIZES LOCK FILE
236  // FU LOCK FILE OPEN
237  openFULockfileStream(true);
239  fflush(fu_rw_lock_stream);
240  close(fu_readwritelock_fd_);
241 
242  if (!hltSourceDirectory_.empty())
243  {
244  struct stat buf;
245  if (stat(hltSourceDirectory_.c_str(),&buf)==0) {
246  std::string hltdir=bu_run_dir_+"/hlt";
247  std::string tmphltdir=bu_run_open_dir_+"/hlt";
248  retval = mkdir(tmphltdir.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
249  if (retval != 0 && errno != EEXIST)
250  throw cms::Exception("DaqDirector")
251  << " Error creating bu run dir -: " << hltdir
252  << " mkdir error:" << strerror(errno) << "\n";
253 
254  boost::filesystem::copy_file(hltSourceDirectory_+"/HltConfig.py",tmphltdir+"/HltConfig.py");
255 
256  boost::filesystem::copy_file(hltSourceDirectory_+"/fffParameters.jsn",tmphltdir+"/fffParameters.jsn");
257 
258  boost::filesystem::rename(tmphltdir,hltdir);
259  }
260  else
261  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
262  }
263  //else{}//no configuration specified
264  }
265  else
266  {
267  // for FU, check if bu base dir exists
268 
269  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
270  if (retval != 0 && errno != EEXIST) {
271  throw cms::Exception("DaqDirector") << " Error checking for bu base dir -: "
272  << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
273  }
274 
276  fulockfile_ = bu_run_dir_ + "/fu.lock";
277  openFULockfileStream(false);
278  }
279 
280  pthread_mutex_init(&init_lock_,nullptr);
281 
282  stopFilePath_ = run_dir_+"/CMSSW_STOP";
283  std::stringstream sstp;
284  sstp << stopFilePath_ << "_pid" << pid_;
285  stopFilePathPid_ = sstp.str();
286 
287 
288  if (!directorBu_) {
289  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
290  struct stat statbuf;
291  if (!stat(defPath.c_str(), &statbuf))
292  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
293  else {
294  //look in source directory if not present in ramdisk
295  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
296  defPath = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
297  if (stat(defPath.c_str(), &statbuf)) {
298  defPath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
299  if (stat(defPath.c_str(), &statbuf)) {
300  defPath = defPathSuffix;
301  }
302  }
303  }
304  dpd_ = new DataPointDefinition();
305  std::string defLabel = "data";
306  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_,&defLabel);
307  }
308 
309  }
std::string run_string_
std::string fulockfile_
jsoncollector::DataPointDefinition * dpd_
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:294
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
std::string run_nstring_
void openFULockfileStream(bool create)
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:251
std::string getRunOpenDirPath() const
std::string bu_run_open_dir_
std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private
bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline
void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 915 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by updateFuLock().

915  {
916  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
917  flock(fulocal_rwlock_fd_,LOCK_SH);
918  }
void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 926 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNextEvent(), getNextFromFileBroker(), and FedRawDataInputSource::maybeOpenNewLumiSection().

926  {
927  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
928  flock(fulocal_rwlock_fd2_,LOCK_EX);
929  }
void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 907 of file EvFDaqDirector.cc.

References init_lock_.

907  {
908  pthread_mutex_lock(&init_lock_);
909  }
pthread_mutex_t init_lock_
struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 1789 of file EvFDaqDirector.cc.

References sysUtil::pid, and command_line::start.

1790  {
1791 #ifdef __APPLE__
1792  return {start, len, pid, type, whence};
1793 #else
1794  return {type, whence, start, len, pid};
1795 #endif
1796  }
type
Definition: HCALResponse.h:21
std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private
void evf::EvFDaqDirector::openFULockfileStream ( bool  create)
private

Definition at line 886 of file EvFDaqDirector.cc.

References eostools::chmod(), fu_readwritelock_fd_, fu_rw_lock_stream, fulockfile_, and LogDebug.

Referenced by initRun().

886  {
887  if (create) {
888  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR | O_CREAT,
889  S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
890  chmod(fulockfile_.c_str(),0766);
891  } else {
892  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
893  }
894  if (fu_readwritelock_fd_ == -1)
895  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
896  << " create:" << create << " error:" << strerror(errno);
897  else
898  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: "
900 
901  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
902  if (fu_rw_lock_stream == nullptr)
903  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
904 
905  }
#define LogDebug(id)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
def chmod(path, mode)
Definition: eostools.py:294
bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 109 of file EvFDaqDirector.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private
void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 77 of file EvFDaqDirector.h.

References findQualityFiles::run.

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 381 of file EvFDaqDirector.cc.

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBu_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

381  {
382  close(bu_readlock_fd_);
383  close(bu_writelock_fd_);
384  if (directorBu_) {
385  std::string filename = bu_run_dir_ + "/bu.lock";
386  removeFile(filename);
387  }
388  }
void removeFile(unsigned int ls, unsigned int index)
std::string bu_run_dir_
void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 334 of file EvFDaqDirector.cc.

References initRun(), edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

334  {
335 
336  initRun();
337 
338  nThreads_=bounds.maxNumberOfStreams();
339  nStreams_=bounds.maxNumberOfThreads();
340  }
unsigned int nThreads_
unsigned int nStreams_
void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 363 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

364  {
366  checkMergeTypePSet(pc);
367  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 369 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

369  {
370 
371  //assert(run_ == id.run());
372 
373  // check if the requested run is the latest one - issue a warning if it isn't
375  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: "
376  << run_dir_ << ". This is not the highest run "
378  }
379  }
std::string findHighestRunDir()
Definition: DirManager.cc:20
void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 390 of file EvFDaqDirector.cc.

References cppFunctionSkipper::exception, fileDeleteLockPtr_, filesToDeletePtr_, LogDebug, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), callgraph::path, and MatrixUtil::remove().

Referenced by EvFDaqDirector().

391  {
392  //delete all files belonging to just closed lumi
393  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
395  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
396  return;
397  }
398 
399  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
400  auto it = filesToDeletePtr_->begin();
401  while (it!=filesToDeletePtr_->end()) {
402  if (it->second->lumi_ == ls) {
403  const boost::filesystem::path filePath(it->second->fileName_);
404  LogDebug("EvFDaqDirector") << "Deleting input file -:" << it->second->fileName_;
405  try {
406  //rarely this fails but file gets deleted
407  boost::filesystem::remove(filePath);
408  }
409  catch (boost::filesystem::filesystem_error const& ex)
410  {
411  edm::LogError("EvFDaqDirector") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
412  usleep(10000);
413  try {
414  boost::filesystem::remove(filePath);
415  }
416  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
417  }
418  catch (std::exception const& ex)
419  {
420  edm::LogError("EvFDaqDirector") << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
421  usleep(10000);
422  try {
423  boost::filesystem::remove(filePath);
424  } catch (std::exception const&) {/*file gets deleted first time but exception is still thrown*/}
425  }
426 
427  delete it->second;
428  it = filesToDeletePtr_->erase(it);
429  }
430  else it++;
431  }
432  }
#define LogDebug(id)
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:349
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 1615 of file EvFDaqDirector.cc.

References Json::Value::asInt(), FrontierConditions_GlobalTag_cff::file, Json::Value::get(), Json::Reader::parse(), and matplotRender::reader.

Referenced by getNextFromFileBroker(), and updateFuLock().

1615  {
1616 
1617  boost::filesystem::ifstream ij(file);
1618  Json::Value deserializeRoot;
1620 
1621  if (!reader.parse(ij, deserializeRoot)) {
1622  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1623  return -1;
1624  }
1625 
1626  int ret = deserializeRoot.get("lastLS","").asInt();
1627  return ret;
1628 
1629  }
Value get(UInt index, const Value &defaultValue) const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
Int asInt() const
Unserialize a JSON document into a Value.
Definition: reader.h:16
void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 537 of file EvFDaqDirector.cc.

References getRawFilePath().

Referenced by postEndRun().

537  {
539  }
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
void removeFile(unsigned int ls, unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:349
void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 530 of file EvFDaqDirector.cc.

530  {
531  int retval = remove(filename.c_str());
532  if (retval != 0)
533  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename << ". error = "
534  << strerror(errno);
535  }
void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, InputFile * >> *  filesToDelete 
)
inline

Definition at line 139 of file EvFDaqDirector.h.

References beamerCreator::create(), fffnaming::eolsFileName(), fffnaming::eorFileName(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

139  {
140  fileDeleteLockPtr_=fileDeleteLock;
141  filesToDeletePtr_ = filesToDelete;
142  }
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
std::mutex * fileDeleteLockPtr_
void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 118 of file EvFDaqDirector.h.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

118 {fms_=fms;}
evf::FastMonitoringService * fms_
void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 875 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

875  {
876  if (fu_rw_lock_stream == nullptr)
877  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream "
878  << strerror(errno);
879  else {
880  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
881  unsigned int readLs = 1, readIndex = 0;
882  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
883  }
884  }
void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 920 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by grabNextJsonFileAndUnlock(), and ~EvFDaqDirector().

920  {
921  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
922  flock(fulocal_rwlock_fd_,LOCK_UN);
923  }
void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 931 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNextEvent(), getNextFromFileBroker(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

931  {
932  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
933  flock(fulocal_rwlock_fd2_,LOCK_UN);
934  }
void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 911 of file EvFDaqDirector.cc.

References init_lock_.

911  {
912  pthread_mutex_unlock(&init_lock_);
913  }
pthread_mutex_t init_lock_
EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)

Definition at line 541 of file EvFDaqDirector.cc.

References bu_run_dir_, bumpFile(), trackerTree::check(), Exception, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fulockfile_, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runAbort, runEnded, trackingPlots::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

541  {
542  EvFDaqDirector::FileStatus fileStatus = noFile;
543 
544  int retval = -1;
545  int lock_attempts = 0;
546  long total_lock_attempts = 0;
547 
548  struct stat buf;
549  int stopFileLS = -1;
550  int stopFileCheck = stat(stopFilePath_.c_str(),&buf);
551  int stopFilePidCheck = stat(stopFilePathPid_.c_str(),&buf);
552  if (stopFileCheck==0 || stopFilePidCheck==0) {
553  if (stopFileCheck==0)
554  stopFileLS = readLastLSEntry(stopFilePath_);
555  else
556  stopFileLS = 1;//stop without drain if only pid is stopped
557  if (!stop_ls_override_) {
558  //if lumisection is higher than in stop file, should quit at next from current
559  if (stopFileLS>=0 && (int)ls>=stopFileLS) stopFileLS = stop_ls_override_ = ls;
560  }
561  else stopFileLS = stop_ls_override_;
562  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
563  //return runEnded;
564  }
565  else //if file was removed before reaching stop condition, reset this
566  stop_ls_override_ = 0;
567 
568  timeval ts_lockbegin;
569  gettimeofday(&ts_lockbegin,nullptr);
570 
571  while (retval==-1) {
572  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
573  if (retval==-1) usleep(fuLockPollInterval_);
574  else continue;
575 
576  lock_attempts+=fuLockPollInterval_;
577  total_lock_attempts+=fuLockPollInterval_;
578  if (lock_attempts>5000000 || errno==116) {
579  if (errno==116)
580  edm::LogWarning("EvFDaqDirector") << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
581  else
582  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and fu.lock file are present -: errno "
583  << errno <<":"<< strerror(errno) << std::endl;
584 
585 
586  if (stat(getEoLSFilePathOnFU(ls).c_str(),&buf)==0) {
587  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< ls ;
588  ls++;
589  return noFile;
590  }
591 
592  if (stat(bu_run_dir_.c_str(), &buf)!=0) return runEnded;
593  if (stat(fulockfile_.c_str(), &buf)!=0) return runEnded;
594 
595  lock_attempts=0;
596  }
597  if (total_lock_attempts>5*60000000) {
598  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
599  return runAbort;
600  }
601  }
602 
603  timeval ts_lockend;
604  gettimeofday(&ts_lockend,nullptr);
605  long deltat = (ts_lockend.tv_usec-ts_lockbegin.tv_usec) + (ts_lockend.tv_sec-ts_lockbegin.tv_sec)*1000000;
606  if (deltat>0.) lockWaitTime=deltat;
607 
608  if(retval!=0) return fileStatus;
609 
610 #ifdef DEBUG
611  timeval ts_lockend;
612  gettimeofday(&ts_lockend,0);
613 #endif
614 
615  //open another lock file FD after the lock using main fd has been acquired
616  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
617  if (fu_readwritelock_fd2 == -1)
618  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
619  << " create. error:" << strerror(errno);
620 
621  FILE * fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
622 
623  // if the stream is readable
624  if (fu_rw_lock_stream2 != nullptr) {
625  unsigned int readLs, readIndex;
626  int check = 0;
627  // rewind the stream
628  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
629  // if rewinded ok
630  if (check == 0) {
631  // read its' values
632  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
633  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
634 
635  unsigned int currentLs = readLs;
636  bool bumpedOk = false;
637  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
638  //no lock file write in this case
639  if (ls && ls+1 < currentLs) ls++;
640  else {
641  // try to bump (look for new index or EoLS file)
642  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
643  //avoid 2 lumisections jump
644  if (ls && readLs>currentLs && currentLs > ls) {
645  ls++;
646  readLs=currentLs=ls;
647  readIndex=0;
648  bumpedOk=false;
649  //no write to lock file
650  }
651  else {
652  if (ls==0 && readLs>currentLs) {
653  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
654  //in this case there is no new file in the same LS
655  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
656  readLs=currentLs;
657  readIndex=0;
658  bumpedOk=false;
659  //no write to lock file
660  }
661  //update return LS value
662  ls = readLs;
663  }
664  }
665  if (bumpedOk) {
666  // there is a new index file to grab, lock file needs to be updated
667  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
668  if (check == 0) {
669  ftruncate(fu_readwritelock_fd2, 0);
670  // write next index in the file, which is the file the next process should take
671  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
672  fflush(fu_rw_lock_stream2);
673  fsync(fu_readwritelock_fd2);
674  fileStatus = newFile;
675  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
676  }
677  else {
678  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
679  }
680  }
681  else if (currentLs < readLs) {
682  //there is no new file in next LS (yet), but lock file can be updated to the next LS
683  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
684  if (check == 0) {
685  ftruncate(fu_readwritelock_fd2, 0);
686  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
687  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
688  fflush(fu_rw_lock_stream2);
689  fsync(fu_readwritelock_fd2);
690  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
691  }
692  else {
693  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
694  }
695  }
696  } else {
697  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error " << strerror(errno);
698  }
699  } else {
700  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
701  }
702  fclose(fu_rw_lock_stream2);// = fdopen(fu_readwritelock_fd2, "r+");
703 
704 #ifdef DEBUG
705  timeval ts_preunlock;
706  gettimeofday(&ts_preunlock,0);
707  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
708  double locked_period=locked_period_int+double(ts_preunlock.tv_usec - ts_lockend.tv_usec)/1000000;
709 #endif
710 
711  //if new json is present, lock file which FedRawDataInputSource will later unlock
712  if (fileStatus==newFile) lockFULocal();
713 
714  //release lock at this point
715  int retvalu=-1;
716  retvalu=fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
717  if (retvalu==-1) edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
718 
719 #ifdef DEBUG
720  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
721 #endif
722 
723  if ( fileStatus == noFile ) {
724  struct stat buf;
725  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
726  if ( stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf)!=0)
727  fileStatus = runEnded;
728  if (stopFileLS>=0 && (int)ls > stopFileLS) {
729  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
730  fileStatus = runEnded;
731  }
732  }
733  return fileStatus;
734  }
#define LogDebug(id)
struct flock fu_rw_flk
std::string fulockfile_
std::string getEoRFilePath() const
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:349
std::string bu_run_dir_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
def check(config)
Definition: trackerTree.py:14
unsigned int fuLockPollInterval_
bool evf::EvFDaqDirector::useFileBroker ( ) const
inline

Definition at line 81 of file EvFDaqDirector.h.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

81 {return useFileBroker_;}

Member Data Documentation

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 160 of file EvFDaqDirector.h.

Referenced by initRun().

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 161 of file EvFDaqDirector.h.

Referenced by initRun().

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 203 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 205 of file EvFDaqDirector.h.

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 193 of file EvFDaqDirector.h.

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 186 of file EvFDaqDirector.h.

Referenced by postEndRun().

std::string evf::EvFDaqDirector::bu_run_dir_
private
std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 183 of file EvFDaqDirector.h.

Referenced by initRun().

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 196 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 202 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 204 of file EvFDaqDirector.h.

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 192 of file EvFDaqDirector.h.

Referenced by initRun().

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 195 of file EvFDaqDirector.h.

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 187 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

bool evf::EvFDaqDirector::directorBu_
private

Definition at line 162 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 198 of file EvFDaqDirector.h.

Referenced by preBeginRun().

jsoncollector::DataPointDefinition* evf::EvFDaqDirector::dpd_
private

Definition at line 233 of file EvFDaqDirector.h.

Referenced by grabNextJsonFile(), grabNextJsonFileAndUnlock(), and initRun().

std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> evf::EvFDaqDirector::endpoint_iterator_
private

Definition at line 238 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 220 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

std::string evf::EvFDaqDirector::fileBrokerHost_
private

Definition at line 165 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and EvFDaqDirector().

bool evf::EvFDaqDirector::fileBrokerKeepAlive_
private

Definition at line 167 of file EvFDaqDirector.h.

Referenced by contactFileBroker().

std::string evf::EvFDaqDirector::fileBrokerPort_
private

Definition at line 166 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

bool evf::EvFDaqDirector::fileBrokerUseLocalLock_
private

Definition at line 168 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getNextFromFileBroker().

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 211 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi().

std::list<std::pair<int,InputFile*> >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 212 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi().

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 209 of file EvFDaqDirector.h.

Referenced by bumpFile().

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 188 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 206 of file EvFDaqDirector.h.

Referenced by updateFuLock().

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 207 of file EvFDaqDirector.h.

Referenced by updateFuLock().

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private

Definition at line 194 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and tryInitializeFuLockFile().

int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 190 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 189 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

std::string evf::EvFDaqDirector::fulockfile_
private

Definition at line 184 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 174 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 173 of file EvFDaqDirector.h.

Referenced by initRun().

std::string evf::EvFDaqDirector::hostname_
private
pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 214 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

boost::asio::io_service evf::EvFDaqDirector::io_service_
private

Definition at line 235 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

tbb::concurrent_hash_map<std::string,std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 226 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"","DAT","PB","JSNDATA"}
staticprivate

Definition at line 229 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 175 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

unsigned int evf::EvFDaqDirector::nStreams_ =0
private

Definition at line 216 of file EvFDaqDirector.h.

Referenced by preallocate().

unsigned int evf::EvFDaqDirector::nThreads_ =0
private

Definition at line 217 of file EvFDaqDirector.h.

Referenced by preallocate().

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 170 of file EvFDaqDirector.h.

std::string evf::EvFDaqDirector::pid_
private

Definition at line 180 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), grabNextJsonFile(), and initRun().

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 200 of file EvFDaqDirector.h.

Referenced by bumpFile().

std::unique_ptr<boost::asio::ip::tcp::resolver::query> evf::EvFDaqDirector::query_
private

Definition at line 237 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 219 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 171 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

std::unique_ptr<boost::asio::ip::tcp::resolver> evf::EvFDaqDirector::resolver_
private

Definition at line 236 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

unsigned int evf::EvFDaqDirector::run_
private
std::string evf::EvFDaqDirector::run_dir_
private
std::string evf::EvFDaqDirector::run_nstring_
private

Definition at line 179 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), and initRun().

std::string evf::EvFDaqDirector::run_string_
private

Definition at line 178 of file EvFDaqDirector.h.

Referenced by getLumisectionToStart(), and initRun().

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 172 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

std::unique_ptr<boost::asio::ip::tcp::socket> evf::EvFDaqDirector::socket_
private

Definition at line 239 of file EvFDaqDirector.h.

Referenced by contactFileBroker(), EvFDaqDirector(), and ~EvFDaqDirector().

unsigned int evf::EvFDaqDirector::startFromLS_ = 1
private

Definition at line 169 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and getLumisectionToStart().

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 223 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), and updateFuLock().

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 221 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 222 of file EvFDaqDirector.h.

Referenced by getNextFromFileBroker(), initRun(), and updateFuLock().

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 225 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

bool evf::EvFDaqDirector::useFileBroker_
private

Definition at line 164 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().