CMS 3D CMS Logo

List of all members | Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes
evf::EvFDaqDirector Class Reference

#include <EvFDaqDirector.h>

Public Types

enum  FileStatus {
  noFile, sameFile, newFile, newLumi,
  runEnded, runAbort
}
 

Public Member Functions

std::string & baseRunDir ()
 
std::string & buBaseRunDir ()
 
std::string & buBaseRunOpenDir ()
 
void checkMergeTypePSet (edm::ProcessContext const &pc)
 
void checkTransferSystemPSet (edm::ProcessContext const &pc)
 
void createProcessingNotificationMaybe () const
 
void createRunOpendirMaybe ()
 
bool emptyLumisectionMode () const
 
 EvFDaqDirector (const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
 
std::string findCurrentRunDir ()
 
std::string getBoLSFilePathOnFU (const unsigned int ls) const
 
std::string getDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getEoLSFilePathOnBU (const unsigned int ls) const
 
std::string getEoLSFilePathOnFU (const unsigned int ls) const
 
std::string getEoRFilePath () const
 
std::string getEoRFilePathOnFU () const
 
std::string getInitFilePath (std::string const &stream) const
 
std::string getInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getMergedDatChecksumFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getMergedRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenDatFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenInitFilePath (std::string const &stream) const
 
std::string getOpenInputJsonFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOpenRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getOpenRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getOutputJsonFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getProtocolBufferHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
std::string getRawFilePath (const unsigned int ls, const unsigned int index) const
 
std::string getRootHistogramFilePath (const unsigned int ls, std::string const &stream) const
 
unsigned int getRunNumber () const
 
std::string getRunOpenDirPath () const
 
std::string getStreamDestinations (std::string const &stream) const
 
std::string getStreamMergeType (std::string const &stream, MergeType defaultType)
 
void initRun ()
 
bool isSingleStreamThread ()
 
void lockFULocal ()
 
void lockFULocal2 ()
 
void lockInitLock ()
 
bool microMergeDisabled () const
 
bool outputAdler32Recheck () const
 
void overrideRunNumber (unsigned int run)
 
void postEndRun (edm::GlobalContext const &globalContext)
 
void preallocate (edm::service::SystemBounds const &bounds)
 
void preBeginJob (edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
 
void preBeginRun (edm::GlobalContext const &globalContext)
 
void preGlobalEndLumi (edm::GlobalContext const &globalContext)
 
int readLastLSEntry (std::string const &file)
 
void removeFile (unsigned int ls, unsigned int index)
 
void removeFile (std::string)
 
void setDeleteTracking (std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
 
void setFMS (evf::FastMonitoringService *fms)
 
void tryInitializeFuLockFile ()
 
void unlockFULocal ()
 
void unlockFULocal2 ()
 
void unlockInitLock ()
 
FileStatus updateFuLock (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
 ~EvFDaqDirector ()
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
static struct flock make_flock (short type, short whence, off_t start, off_t len, pid_t pid)
 

Private Member Functions

bool bumpFile (unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
 
std::string eolsFileName (const unsigned int ls) const
 
std::string eorFileName () const
 
int getNFilesFromEoLS (std::string BUEoLSFile)
 
std::string initFileName (std::string const &stream) const
 
std::string inputFileNameStem (const unsigned int ls, const unsigned int index) const
 
std::string mergedFileNameStem (const unsigned int ls, std::string const &stream) const
 
void openFULockfileStream (std::string &fuLockFilePath, bool create)
 
std::string outputFileNameStem (const unsigned int ls, std::string const &stream) const
 

Private Attributes

std::string base_dir_
 
std::string bu_base_dir_
 
struct flock bu_r_flk
 
struct flock bu_r_fulk
 
FILE * bu_r_lock_stream
 
int bu_readlock_fd_
 
std::string bu_run_dir_
 
std::string bu_run_open_dir_
 
FILE * bu_t_monitor_stream
 
struct flock bu_w_flk
 
struct flock bu_w_fulk
 
FILE * bu_w_lock_stream
 
FILE * bu_w_monitor_stream
 
int bu_writelock_fd_
 
bool directorBu_
 
DirManager dirManager_
 
bool emptyLumisectionMode_
 
unsigned int eolsNFilesIndex_ = 1
 
std::mutexfileDeleteLockPtr_ = nullptr
 
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_ = nullptr
 
evf::FastMonitoringServicefms_ = nullptr
 
int fu_readwritelock_fd_
 
struct flock fu_rw_flk
 
struct flock fu_rw_fulk
 
FILE * fu_rw_lock_stream
 
int fulocal_rwlock_fd2_
 
int fulocal_rwlock_fd_
 
unsigned int fuLockPollInterval_
 
std::string hltSourceDirectory_
 
std::string hostname_
 
pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER
 
std::map< std::string, std::string > mergeTypeMap_
 
std::string mergeTypePset_
 
bool microMergeDisabled_
 
unsigned int nStreams_ =0
 
unsigned int nThreads_ =0
 
bool outputAdler32Recheck_
 
unsigned long previousFileSize_
 
bool readEolsDefinition_ = true
 
bool requireTSPSet_
 
unsigned int run_
 
std::string run_dir_
 
std::string run_string_
 
std::string selectedTransferMode_
 
unsigned int stop_ls_override_ = 0
 
std::string stopFilePath_
 
std::string stopFilePathPid_
 
std::shared_ptr< Json::ValuetransferSystemJson_
 

Static Private Attributes

static const std::vector< std::string > MergeTypeNames_ = {"","DAT","PB","JSNDATA"}
 

Detailed Description

Definition at line 54 of file EvFDaqDirector.h.

Member Enumeration Documentation

Constructor & Destructor Documentation

evf::EvFDaqDirector::EvFDaqDirector ( const edm::ParameterSet pset,
edm::ActivityRegistry reg 
)
explicit

Definition at line 35 of file EvFDaqDirector.cc.

References emptyLumisectionMode_, fuLockPollInterval_, hostname_, microMergeDisabled_, postEndRun(), preallocate(), preBeginJob(), preBeginRun(), preGlobalEndLumi(), AlCaHLTBitMon_QueryRunRegistry::string, edm::ActivityRegistry::watchPostGlobalEndRun(), edm::ActivityRegistry::watchPreallocate(), edm::ActivityRegistry::watchPreBeginJob(), edm::ActivityRegistry::watchPreGlobalBeginRun(), and edm::ActivityRegistry::watchPreGlobalEndLumi().

36  :
37  base_dir_(pset.getUntrackedParameter<std::string> ("baseDir", ".")),
38  bu_base_dir_(pset.getUntrackedParameter<std::string> ("buBaseDir", ".")),
39  directorBu_(pset.getUntrackedParameter<bool> ("directorIsBu", false)),
40  run_(pset.getUntrackedParameter<unsigned int> ("runNumber",0)),
41  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck",false)),
42  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet",false)),
43  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode","")),
44  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory","")),
45  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval",2000)),
46  emptyLumisectionMode_(pset.getUntrackedParameter<bool>("emptyLumisectionMode",true)),
47  microMergeDisabled_(pset.getUntrackedParameter<bool>("microMergeDisabled",true)),
48  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergeTypePset","")),
49  hostname_(""),
50  bu_readlock_fd_(-1),
51  bu_writelock_fd_(-1),
55 
56  bu_w_lock_stream(nullptr),
57  bu_r_lock_stream(nullptr),
58  fu_rw_lock_stream(nullptr),
59  //bu_w_monitor_stream(0),
60  //bu_t_monitor_stream(0),
61 
63 
65 
66  bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
67  bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
68  bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
69  bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
70  fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
71  fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
72  {
73 
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname,32);
83  hostname_ = hostname;
84 
85  char * fuLockPollIntervalPtr = getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_=boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_ << " us";
90  }
91  catch( boost::bad_lexical_cast const& ) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  char * emptyLumiModePtr = getenv("FFF_EMPTYLSMODE");
97  if (emptyLumiModePtr) {
98  emptyLumisectionMode_ = true;
99  edm::LogInfo("EvFDaqDirector") << "Setting empty lumisection mode";
100  }
101 
102  char * microMergeDisabledPtr = getenv("FFF_MICROMERGEDISABLED");
103  if (microMergeDisabledPtr) {
104  microMergeDisabled_ = true;
105  edm::LogInfo("EvFDaqDirector") << "Disabling dat file micro-merge by the HLT process (delegated to the hlt daemon)";
106  }
107  }
struct flock bu_w_fulk
T getUntrackedParameter(std::string const &, T const &) const
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
void watchPreallocate(Preallocate::slot_type const &iSlot)
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
struct flock bu_r_fulk
unsigned long previousFileSize_
struct flock fu_rw_fulk
std::string hltSourceDirectory_
std::string mergeTypePset_
std::string bu_base_dir_
std::string selectedTransferMode_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
void postEndRun(edm::GlobalContext const &globalContext)
void preallocate(edm::service::SystemBounds const &bounds)
unsigned int fuLockPollInterval_
struct flock bu_r_flk
evf::EvFDaqDirector::~EvFDaqDirector ( )

Definition at line 239 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_, fulocal_rwlock_fd_, unlockFULocal(), and unlockFULocal2().

240  {
241  if (fulocal_rwlock_fd_!=-1) {
242  unlockFULocal();
243  close(fulocal_rwlock_fd_);
244  }
245 
246  if (fulocal_rwlock_fd2_!=-1) {
247  unlockFULocal2();
248  close(fulocal_rwlock_fd2_);
249  }
250 
251  }

Member Function Documentation

std::string& evf::EvFDaqDirector::baseRunDir ( )
inline

Definition at line 71 of file EvFDaqDirector.h.

71 {return run_dir_;}
std::string& evf::EvFDaqDirector::buBaseRunDir ( )
inline

Definition at line 72 of file EvFDaqDirector.h.

72 {return bu_run_dir_;}
std::string bu_run_dir_
std::string& evf::EvFDaqDirector::buBaseRunOpenDir ( )
inline

Definition at line 73 of file EvFDaqDirector.h.

73 {return bu_run_open_dir_;}
std::string bu_run_open_dir_
bool evf::EvFDaqDirector::bumpFile ( unsigned int &  ls,
unsigned int &  index,
std::string &  nextFile,
uint32_t &  fsize,
int  maxLS 
)
private

Definition at line 696 of file EvFDaqDirector.cc.

References evf::FastMonitoringService::accumulateFileSize(), fms_, getEoLSFilePathOnBU(), getInputJsonFilePath(), getNFilesFromEoLS(), diffTreeTool::index, eostools::ls(), previousFileSize_, trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by updateFuLock().

696  {
697 
698  if (previousFileSize_ != 0) {
699  if (!fms_) {
700  fms_ = (FastMonitoringService *) (edm::Service<evf::MicroStateService>().operator->());
701  }
703  previousFileSize_ = 0;
704  }
705 
706  //reached limit
707  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
708 
709  struct stat buf;
710  std::stringstream ss;
711  unsigned int nextIndex = index;
712  nextIndex++;
713 
714  // 1. Check suggested file
715  nextFile = getInputJsonFilePath(ls,index);
716  if (stat(nextFile.c_str(), &buf) == 0) {
717 
718  previousFileSize_ = buf.st_size;
719  fsize = buf.st_size;
720  return true;
721  }
722  // 2. No file -> lumi ended? (and how many?)
723  else {
724  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
725  bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
726  while (eolFound) {
727 
728  // recheck that no raw file appeared in the meantime
729  if (stat(nextFile.c_str(), &buf) == 0) {
730  previousFileSize_ = buf.st_size;
731  fsize = buf.st_size;
732  return true;
733  }
734 
735  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
736  if (indexFilesInLS < 0)
737  //parsing failed
738  return false;
739  else {
740  //check index
741  if ((int)index<indexFilesInLS) {
742  //we have 2 files, and check for 1 failed... retry (2 will never be here)
743  edm::LogError("EvFDaqDirector") << "Potential miss of index file in LS -: " << ls << ". Missing "
744  << nextFile << " because " << indexFilesInLS-1 << " is the highest index expected. Will not update fu.lock file";
745  return false;
746  }
747  }
748  // this lumi ended, check for files
749  ++ls;
750  index = 0;
751 
752  //reached limit
753  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
754 
755  nextFile = getInputJsonFilePath(ls,0);
756  if (stat(nextFile.c_str(), &buf) == 0) {
757  // a new file was found at new lumisection, index 0
758  previousFileSize_ = buf.st_size;
759  fsize = buf.st_size;
760  return true;
761  }
762  else {
763  //change of policy: we need to cycle through each LS
764  return false;
765  }
766  BUEoLSFile = getEoLSFilePathOnBU(ls);
767  eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
768  }
769  }
770  // no new file found
771  return false;
772  }
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
unsigned long previousFileSize_
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:348
evf::FastMonitoringService * fms_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
void evf::EvFDaqDirector::checkMergeTypePSet ( edm::ProcessContext const &  pc)

Definition at line 964 of file EvFDaqDirector.cc.

References edm::ParameterSet::existsAs(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), mergeTypeMap_, mergeTypePset_, edm::ProcessContext::parameterSetID(), BPhysicsValidation_cfi::pname, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by preBeginJob().

965  {
966  if (mergeTypePset_.empty()) return;
967  if(!mergeTypeMap_.empty()) return;
968  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
969  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_,true))
970  {
971  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
972  for (std::string pname : tsPset.getParameterNames()) {
973  std::string streamType = tsPset.getParameter<std::string>(pname);
974  mergeTypeMap_[pname]=streamType;
975  }
976  }
977  }
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:186
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::map< std::string, std::string > mergeTypeMap_
std::string mergeTypePset_
ParameterSet const & getParameterSet(std::string const &) const
void evf::EvFDaqDirector::checkTransferSystemPSet ( edm::ProcessContext const &  pc)

Definition at line 861 of file EvFDaqDirector.cc.

References Json::Value::append(), Json::arrayValue, mps_fire::dest, myMessageLogger_cff::destinations, Exception, edm::ParameterSet::existsAs(), edm::ParameterSet::getParameter(), edm::ParameterSet::getParameterSet(), edm::getParameterSet(), ALCARECOPromptCalibProdSiPixelAli0T_cff::mode, edm::ProcessContext::parameterSetID(), requireTSPSet_, and transferSystemJson_.

Referenced by preBeginJob().

862  {
863  if(transferSystemJson_) return;
864 
865  transferSystemJson_.reset(new Json::Value);
866  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
867  if (topPset.existsAs<edm::ParameterSet>("transferSystem",true))
868  {
869  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
870 
871  Json::Value destinationsVal(Json::arrayValue);
872  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
873  for (auto & dest: destinations) destinationsVal.append(dest);
874  (*transferSystemJson_)["destinations"]=destinationsVal;
875 
876  Json::Value modesVal(Json::arrayValue);
877  std::vector<std::string> modes = tsPset.getParameter< std::vector<std::string> >("transferModes");
878  for (auto & mode: modes) modesVal.append(mode);
879  (*transferSystemJson_)["transferModes"]=modesVal;
880 
881  for (auto psKeyItr =tsPset.psetTable().begin();psKeyItr!=tsPset.psetTable().end(); ++ psKeyItr) {
882  if (psKeyItr->first!="destinations" && psKeyItr->first!="transferModes") {
883  const edm::ParameterSet & streamDef = tsPset.getParameterSet(psKeyItr->first);
884  Json::Value streamVal;
885  for (auto & mode : modes) {
886  //validation
887  if (!streamDef.existsAs<std::vector<std::string>>(mode,true))
888  throw cms::Exception("EvFDaqDirector") << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode << ")";
889  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
890 
891  Json::Value sDestsValue(Json::arrayValue);
892 
893  if (streamDestinations.empty())
894  throw cms::Exception("EvFDaqDirector") << " Missing transter system destination(s) for -: "<< psKeyItr->first << ", mode:" << mode;
895 
896  for (auto & sdest:streamDestinations) {
897  bool sDestValid=false;
898  sDestsValue.append(sdest);
899  for (auto & dest: destinations) {
900  if (dest==sdest) sDestValid=true;
901  }
902  if (!sDestValid)
903  throw cms::Exception("EvFDaqDirector") << " Invalid transter system destination specified for -: "<< psKeyItr->first << ", mode:" << mode << ", dest:"<<sdest;
904  }
905  streamVal[mode]=sDestsValue;
906  }
907  (*transferSystemJson_)[psKeyItr->first] = streamVal;
908  }
909  }
910  }
911  else {
912  if (requireTSPSet_)
913  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
914  }
915  }
T getParameter(std::string const &) const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:186
std::shared_ptr< Json::Value > transferSystemJson_
ParameterSet const & getParameterSet(ParameterSetID const &id)
Represents a JSON value.
Definition: value.h:111
ParameterSet const & getParameterSet(std::string const &) const
array value (ordered list)
Definition: value.h:31
void evf::EvFDaqDirector::createProcessingNotificationMaybe ( ) const

Definition at line 992 of file EvFDaqDirector.cc.

References run_dir_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::checkNextEvent().

992  {
993  std::string proc_flag = run_dir_ + "/processing";
994  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
995  close(proc_flag_fd);
996  }
void evf::EvFDaqDirector::createRunOpendirMaybe ( )

Definition at line 833 of file EvFDaqDirector.cc.

References getRunOpenDirPath(), LogDebug, and callgraph::path.

Referenced by initRun().

833  {
834  // create open dir if not already there
835 
837  if (!boost::filesystem::is_directory(openPath)) {
838  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
839  boost::filesystem::create_directories(openPath);
840  }
841  }
#define LogDebug(id)
std::string getRunOpenDirPath() const
bool evf::EvFDaqDirector::emptyLumisectionMode ( ) const
inline

Definition at line 126 of file EvFDaqDirector.h.

Referenced by FedRawDataInputSource::checkNextEvent().

std::string evf::EvFDaqDirector::eolsFileName ( const unsigned int  ls) const
private
std::string evf::EvFDaqDirector::eorFileName ( ) const
private
void evf::EvFDaqDirector::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 262 of file EvFDaqDirector.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and AlCaHLTBitMon_QueryRunRegistry::string.

263  {
265  desc.setComment("Service used for file locking arbitration and for propagating information between other EvF components");
266  desc.addUntracked<std::string> ("baseDir", ".")->setComment("Local base directory for run output");
267  desc.addUntracked<std::string> ("buBaseDir", ".")->setComment("BU base ramdisk directory ");
268  desc.addUntracked<unsigned int> ("runNumber",0)->setComment("Run Number in ramdisk to open");
269  desc.addUntracked<bool>("outputAdler32Recheck",false)->setComment("Check Adler32 of per-process output files while micro-merging");
270  desc.addUntracked<bool>("requireTransfersPSet",false)->setComment("Require complete transferSystem PSet in the process configuration");
271  desc.addUntracked<std::string>("selectedTransferMode","")->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
272  desc.addUntracked<unsigned int>("fuLockPollInterval",2000)->setComment("Lock polling interval in microseconds for the input directory file lock");
273  desc.addUntracked<bool>("emptyLumisectionMode",true)->setComment("Enables writing stream output metadata even when no events are processed in a lumisection");
274  desc.addUntracked<bool>("microMergeDisabled",true)->setComment("Disabled micro-merging by the Output Module, so it is later done by hltd service");
275  desc.addUntracked<std::string>("mergingPset","")->setComment("Name of merging PSet to look for merging type definitions for streams");
276  desc.setAllowAnything();
277  descriptions.add("EvFDaqDirector", desc);
278  }
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string evf::EvFDaqDirector::findCurrentRunDir ( )
inline

Definition at line 75 of file EvFDaqDirector.h.

References diffTreeTool::index, eostools::ls(), and AlCaHLTBitMon_QueryRunRegistry::string.

75 { return dirManager_.findRunDir(run_);}
std::string findRunDir(unsigned int)
Definition: DirManager.cc:37
std::string evf::EvFDaqDirector::getBoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 431 of file EvFDaqDirector.cc.

References fffnaming::bolsFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::createBoLSFile().

431  {
432  return run_dir_ + "/" + fffnaming::bolsFileName(run_,ls);
433  }
std::string bolsFileName(const unsigned int run, const unsigned int ls)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 367 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerDataFileNameWithPid().

367  {
369  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getEoLSFilePathOnBU ( const unsigned int  ls) const

Definition at line 423 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eolsFileName(), and run_.

Referenced by bumpFile(), and FedRawDataInputSource::checkNextEvent().

423  {
424  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
425  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string evf::EvFDaqDirector::getEoLSFilePathOnFU ( const unsigned int  ls) const

Definition at line 427 of file EvFDaqDirector.cc.

References fffnaming::eolsFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNextEvent(), FedRawDataInputSource::maybeOpenNewLumiSection(), and updateFuLock().

427  {
428  return run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
429  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string eolsFileName(const unsigned int run, const unsigned int ls)
std::string evf::EvFDaqDirector::getEoRFilePath ( ) const

Definition at line 435 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::eorFileName(), and run_.

Referenced by updateFuLock().

435  {
436  return bu_run_dir_ + "/" + fffnaming::eorFileName(run_);
437  }
std::string eorFileName(const unsigned int run)
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getEoRFilePathOnFU ( ) const

Definition at line 440 of file EvFDaqDirector.cc.

References fffnaming::eorFileName(), run_, and run_dir_.

Referenced by FedRawDataInputSource::checkNextEvent().

440  {
441  return run_dir_ + "/" + fffnaming::eorFileName(run_);
442  }
std::string eorFileName(const unsigned int run)
std::string evf::EvFDaqDirector::getInitFilePath ( std::string const &  stream) const

Definition at line 395 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, and run_dir_.

Referenced by DQMFileSaver::globalBeginRun().

395  {
396  return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_,0,stream);
397  }
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 351 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), and run_.

Referenced by bumpFile().

351  {
353  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getMergedDatChecksumFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 387 of file EvFDaqDirector.cc.

References hostname_, run_, run_dir_, and fffnaming::streamerDataChecksumFileNameWithInstance().

387  {
389  }
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getMergedDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 383 of file EvFDaqDirector.cc.

References hostname_, run_, run_dir_, and fffnaming::streamerDataFileNameWithInstance().

383  {
385  }
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 407 of file EvFDaqDirector.cc.

References hostname_, fffnaming::protocolBufferHistogramFileNameWithInstance(), run_, and run_dir_.

407  {
409  }
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getMergedRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 419 of file EvFDaqDirector.cc.

References hostname_, fffnaming::rootHistogramFileNameWithInstance(), run_, and run_dir_.

419  {
421  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
int evf::EvFDaqDirector::getNFilesFromEoLS ( std::string  BUEoLSFile)
private

Definition at line 635 of file EvFDaqDirector.cc.

References bu_run_dir_, data, def, jsoncollector::DataPoint::deserialize(), reco::dp, eolsNFilesIndex_, jsoncollector::DataPoint::getData(), jsoncollector::DataPoint::getDefinition(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, Json::Reader::parse(), readEolsDefinition_, matplotRender::reader, trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by bumpFile().

635  {
636 
637  boost::filesystem::ifstream ij(BUEoLSFile);
638  Json::Value deserializeRoot;
640 
641  if (!reader.parse(ij, deserializeRoot)) {
642  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
643  return -1;
644  }
645 
647  DataPoint dp;
648  dp.deserialize(deserializeRoot);
649 
650  //read definition
651  if (readEolsDefinition_) {
652  //std::string def = boost::algorithm::trim(dp.getDefinition());
654  if (def.empty()) readEolsDefinition_=false;
655  while (!def.empty()) {
656  std::string fullpath;
657  if (def.find('/')==0)
658  fullpath = def;
659  else
660  fullpath = bu_run_dir_+'/'+def;
661  struct stat buf;
662  if (stat(fullpath.c_str(), &buf) == 0) {
663  DataPointDefinition eolsDpd;
664  std::string defLabel = "legend";
665  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
666  if (eolsDpd.getNames().empty()) {
667  //try with "data" label if "legend" format is not used
668  eolsDpd = DataPointDefinition();
669  defLabel="data";
670  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
671  }
672  for (unsigned int i=0;i<eolsDpd.getNames().size();i++)
673  if (eolsDpd.getNames().at(i)=="NFiles")
675  readEolsDefinition_=false;
676  break;
677  }
678  //check if we can still find definition
679  if (def.size()<=1 || def.find('/')==std::string::npos) {
680  readEolsDefinition_=false;
681  break;
682  }
683  def = def.substr(def.find('/')+1);
684  }
685  }
686 
687  if (dp.getData().size()>eolsNFilesIndex_)
688  data = dp.getData()[eolsNFilesIndex_];
689  else {
690  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
691  return -1;
692  }
693  return boost::lexical_cast<int>(data);
694  }
std::vector< std::string > & getData()
Definition: DataPoint.h:58
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
unsigned int eolsNFilesIndex_
auto dp
Definition: deltaR.h:22
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
std::string & getDefinition()
Definition: DataPoint.h:59
std::string bu_run_dir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
JetCorrectorParameters::Definitions def
Definition: classes.h:6
std::string evf::EvFDaqDirector::getOpenDatFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 371 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerDataFileNameWithPid().

371  {
372  return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_,ls,stream);
373  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenInitFilePath ( std::string const &  stream) const

Definition at line 391 of file EvFDaqDirector.cc.

References fffnaming::initFileNameWithPid(), run_, and run_dir_.

391  {
392  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_,0,stream);
393  }
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenInputJsonFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 363 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputJsonFileName(), and run_.

363  {
365  }
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getOpenOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 375 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerJsonFileNameWithPid().

375  {
376  return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_,ls,stream);
377  }
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 399 of file EvFDaqDirector.cc.

References fffnaming::protocolBufferHistogramFileNameWithPid(), run_, and run_dir_.

399  {
401  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOpenRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 359 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), and run_.

359  {
360  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_,ls,index);
361  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getOpenRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 411 of file EvFDaqDirector.cc.

References fffnaming::rootHistogramFileNameWithPid(), run_, and run_dir_.

411  {
412  return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_,ls,stream);
413  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getOutputJsonFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 379 of file EvFDaqDirector.cc.

References run_, run_dir_, and fffnaming::streamerJsonFileNameWithPid().

379  {
381  }
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
def ls(path, rec=False)
Definition: eostools.py:348
std::string evf::EvFDaqDirector::getProtocolBufferHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 403 of file EvFDaqDirector.cc.

References fffnaming::protocolBufferHistogramFileNameWithPid(), run_, and run_dir_.

403  {
405  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string evf::EvFDaqDirector::getRawFilePath ( const unsigned int  ls,
const unsigned int  index 
) const

Definition at line 355 of file EvFDaqDirector.cc.

References bu_run_dir_, fffnaming::inputRawFileName(), and run_.

Referenced by removeFile().

355  {
357  }
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string evf::EvFDaqDirector::getRootHistogramFilePath ( const unsigned int  ls,
std::string const &  stream 
) const

Definition at line 415 of file EvFDaqDirector.cc.

References fffnaming::rootHistogramFileNameWithPid(), run_, and run_dir_.

415  {
417  }
def ls(path, rec=False)
Definition: eostools.py:348
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int evf::EvFDaqDirector::getRunNumber ( ) const
inline

Definition at line 106 of file EvFDaqDirector.h.

106 { return run_; }
std::string evf::EvFDaqDirector::getRunOpenDirPath ( ) const
inline

Definition at line 99 of file EvFDaqDirector.h.

Referenced by createRunOpendirMaybe(), and initRun().

99 {return run_dir_ +"/open";}
std::string evf::EvFDaqDirector::getStreamDestinations ( std::string const &  stream) const

Definition at line 917 of file EvFDaqDirector.cc.

References Json::Value::begin(), Json::Value::end(), Exception, mps_check::msg, requireTSPSet_, selectedTransferMode_, AlCaHLTBitMon_QueryRunRegistry::string, and transferSystemJson_.

918  {
919  std::string streamRequestName;
920  if (transferSystemJson_->isMember(stream.c_str()))
921  streamRequestName = stream;
922  else {
923  std::stringstream msg;
924  msg << "Transfer system mode definitions missing for -: " << stream;
925  if (requireTSPSet_)
926  throw cms::Exception("EvFDaqDirector") << msg.str();
927  else {
928  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
929  return std::string("Failsafe");
930  }
931  }
932  //return empty if strict check parameter is not on
934  edm::LogWarning("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter."
935  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
936  return std::string("Failsafe");
937  }
939  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
940  }
941  //check if stream has properly listed transfer stream
942  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str()))
943  {
944  std::stringstream msg;
945  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
946  if (requireTSPSet_)
947  throw cms::Exception("EvFDaqDirector") << msg.str();
948  else
949  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
950  return std::string("Failsafe");
951  }
952  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_,"");
953 
954  //flatten string json::Array into CSV std::string
955  std::string ret;
956  for (Json::Value::iterator it = destsVec.begin(); it!=destsVec.end(); it++)
957  {
958  if (ret!="") ret +=",";
959  ret+=(*it).asString();
960  }
961  return ret;
962  }
const_iterator begin() const
std::shared_ptr< Json::Value > transferSystemJson_
Represents a JSON value.
Definition: value.h:111
std::string selectedTransferMode_
tuple msg
Definition: mps_check.py:277
const_iterator end() const
Iterator for object and array value.
Definition: value.h:1007
std::string evf::EvFDaqDirector::getStreamMergeType ( std::string const &  stream,
MergeType  defaultType 
)

Definition at line 979 of file EvFDaqDirector.cc.

References mergeTypeMap_, MergeTypeNames_, and AlCaHLTBitMon_QueryRunRegistry::string.

980  {
981  auto mergeTypeItr = mergeTypeMap_.find(stream);
982  if (mergeTypeItr == mergeTypeMap_.end()) {
983  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
984  assert(defaultType<MergeTypeNames_.size());
985  std::string defaultName = MergeTypeNames_[defaultType];
986  mergeTypeMap_[stream] = defaultName;
987  return defaultName;
988  }
989  return mergeTypeItr->second;
990  }
std::map< std::string, std::string > mergeTypeMap_
static const std::vector< std::string > MergeTypeNames_
std::string evf::EvFDaqDirector::initFileName ( std::string const &  stream) const
private
void evf::EvFDaqDirector::initRun ( void  )

Definition at line 109 of file EvFDaqDirector.cc.

References base_dir_, bu_base_dir_, bu_run_dir_, bu_run_open_dir_, bu_w_lock_stream, bu_writelock_fd_, eostools::chmod(), createRunOpendirMaybe(), directorBu_, Exception, fu_readwritelock_fd_, fu_rw_lock_stream, fulocal_rwlock_fd2_, fulocal_rwlock_fd_, getRunOpenDirPath(), hltSourceDirectory_, init_lock_, eostools::mkdir(), openFULockfileStream(), run_, run_dir_, run_string_, trackingPlots::stat, stopFilePath_, stopFilePathPid_, AlCaHLTBitMon_QueryRunRegistry::string, and tryInitializeFuLockFile().

Referenced by preallocate().

110  {
111  std::stringstream ss;
112  ss << "run" << std::setfill('0') << std::setw(6) << run_;
113  run_string_ = ss.str();
115 
116  // check if base dir exists or create it accordingly
117  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
118  if (retval != 0 && errno != EEXIST) {
119  throw cms::Exception("DaqDirector") << " Error checking for base dir -: "
120  << base_dir_ << " mkdir error:" << strerror(errno);
121  }
122 
123  //create run dir in base dir
124  umask(0);
125  retval = mkdir(run_dir_.c_str(),
126  S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
127  if (retval != 0 && errno != EEXIST) {
128  throw cms::Exception("DaqDirector") << " Error creating run dir -: "
129  << run_dir_ << " mkdir error:" << strerror(errno);
130  }
131 
132  //create fu-local.lock in run open dir
133  if (!directorBu_) {
134 
136  std::string fulocal_lock_ = getRunOpenDirPath() +"/fu-local.lock";
137  fulocal_rwlock_fd_ = open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
138  if (fulocal_rwlock_fd_==-1)
139  throw cms::Exception("DaqDirector") << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
140  chmod(fulocal_lock_.c_str(),0777);
141  fsync(fulocal_rwlock_fd_);
142  //open second fd for another input source thread
143  fulocal_rwlock_fd2_ = open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
144  if (fulocal_rwlock_fd2_==-1)
145  throw cms::Exception("DaqDirector") << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
146  }
147 
148  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
149  //for BU, it is created at this point
150  if (directorBu_)
151  {
153  std::string bulockfile = bu_run_dir_ + "/bu.lock";
154  std::string fulockfile = bu_run_dir_ + "/fu.lock";
155 
156  //make or find bu run dir
157  retval = mkdir(bu_run_dir_.c_str(),
158  S_IRWXU | S_IRWXG | S_IRWXO);
159  if (retval != 0 && errno != EEXIST) {
160  throw cms::Exception("DaqDirector")
161  << " Error creating bu run dir -: " << bu_run_dir_
162  << " mkdir error:" << strerror(errno) << "\n";
163  }
164  bu_run_open_dir_ = bu_run_dir_ + "/open";
165  retval = mkdir(bu_run_open_dir_.c_str(),
166  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
167  if (retval != 0 && errno != EEXIST) {
168  throw cms::Exception("DaqDirector") << " Error creating bu run open dir -: "
169  << bu_run_open_dir_ << " mkdir error:" << strerror(errno)
170  << "\n";
171  }
172 
173  // the BU director does not need to know about the fu lock
174  bu_writelock_fd_ = open(bulockfile.c_str(),
175  O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
176  if (bu_writelock_fd_ == -1)
177  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: "
178  << strerror(errno);
179  else
180  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: "
181  << bu_writelock_fd_;
182  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
183  if (bu_w_lock_stream == nullptr)
184  edm::LogWarning("EvFDaqDirector")<< "Error creating write lock stream -: " << strerror(errno);
185 
186  // BU INITIALIZES LOCK FILE
187  // FU LOCK FILE OPEN
188  openFULockfileStream(fulockfile, true);
190  fflush(fu_rw_lock_stream);
191  close(fu_readwritelock_fd_);
192 
193  if (!hltSourceDirectory_.empty())
194  {
195  struct stat buf;
196  if (stat(hltSourceDirectory_.c_str(),&buf)==0) {
197  std::string hltdir=bu_run_dir_+"/hlt";
198  std::string tmphltdir=bu_run_open_dir_+"/hlt";
199  retval = mkdir(tmphltdir.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
200  if (retval != 0 && errno != EEXIST)
201  throw cms::Exception("DaqDirector")
202  << " Error creating bu run dir -: " << hltdir
203  << " mkdir error:" << strerror(errno) << "\n";
204 
205  boost::filesystem::copy_file(hltSourceDirectory_+"/HltConfig.py",tmphltdir+"/HltConfig.py");
206 
207  boost::filesystem::copy_file(hltSourceDirectory_+"/fffParameters.jsn",tmphltdir+"/fffParameters.jsn");
208 
209  boost::filesystem::rename(tmphltdir,hltdir);
210  }
211  else
212  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
213  }
214  //else{}//no configuration specified
215  }
216  else
217  {
218  // for FU, check if bu base dir exists
219 
220  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
221  if (retval != 0 && errno != EEXIST) {
222  throw cms::Exception("DaqDirector") << " Error checking for bu base dir -: "
223  << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
224  }
225 
227  std::string fulockfile = bu_run_dir_ + "/fu.lock";
228  openFULockfileStream(fulockfile, false);
229  }
230 
231  pthread_mutex_init(&init_lock_,nullptr);
232 
233  stopFilePath_ = run_dir_+"/CMSSW_STOP";
234  std::stringstream sstp;
235  sstp << stopFilePath_ << "_pid" << getpid();
236  stopFilePathPid_ = sstp.str();
237  }
std::string run_string_
void openFULockfileStream(std::string &fuLockFilePath, bool create)
pthread_mutex_t init_lock_
std::string hltSourceDirectory_
def chmod(path, mode)
Definition: eostools.py:293
std::string stopFilePath_
std::string bu_base_dir_
std::string stopFilePathPid_
std::string bu_run_dir_
def mkdir(path)
Definition: eostools.py:250
std::string getRunOpenDirPath() const
std::string bu_run_open_dir_
std::string evf::EvFDaqDirector::inputFileNameStem ( const unsigned int  ls,
const unsigned int  index 
) const
private
bool evf::EvFDaqDirector::isSingleStreamThread ( )
inline
void evf::EvFDaqDirector::lockFULocal ( )

Definition at line 811 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by updateFuLock().

811  {
812  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
813  flock(fulocal_rwlock_fd_,LOCK_SH);
814  }
void evf::EvFDaqDirector::lockFULocal2 ( )

Definition at line 822 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNextEvent(), and FedRawDataInputSource::maybeOpenNewLumiSection().

822  {
823  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
824  flock(fulocal_rwlock_fd2_,LOCK_EX);
825  }
void evf::EvFDaqDirector::lockInitLock ( )

Definition at line 803 of file EvFDaqDirector.cc.

References init_lock_.

803  {
804  pthread_mutex_lock(&init_lock_);
805  }
pthread_mutex_t init_lock_
struct flock evf::EvFDaqDirector::make_flock ( short  type,
short  whence,
off_t  start,
off_t  len,
pid_t  pid 
)
static

Definition at line 998 of file EvFDaqDirector.cc.

References sysUtil::pid, and command_line::start.

999  {
1000 #ifdef __APPLE__
1001  return {start, len, pid, type, whence};
1002 #else
1003  return {type, whence, start, len, pid};
1004 #endif
1005  }
type
Definition: HCALResponse.h:21
std::string evf::EvFDaqDirector::mergedFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private
bool evf::EvFDaqDirector::microMergeDisabled ( ) const
inline
void evf::EvFDaqDirector::openFULockfileStream ( std::string &  fuLockFilePath,
bool  create 
)
private

Definition at line 785 of file EvFDaqDirector.cc.

References eostools::chmod(), fu_readwritelock_fd_, fu_rw_lock_stream, and LogDebug.

Referenced by initRun().

785  {
786  if (create) {
787  fu_readwritelock_fd_ = open(fulockfile.c_str(), O_RDWR | O_CREAT,
788  S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
789  chmod(fulockfile.c_str(),0766);
790  } else {
791  fu_readwritelock_fd_ = open(fulockfile.c_str(), O_RDWR, S_IRWXU);
792  }
793  if (fu_readwritelock_fd_ == -1)
794  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile.c_str()
795  << " create:" << create << " error:" << strerror(errno);
796  else
797  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: "
799 
800  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
801  }
#define LogDebug(id)
def create(alignables, pedeDump, additionalData, outputFile, config)
def chmod(path, mode)
Definition: eostools.py:293
bool evf::EvFDaqDirector::outputAdler32Recheck ( ) const
inline

Definition at line 100 of file EvFDaqDirector.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

std::string evf::EvFDaqDirector::outputFileNameStem ( const unsigned int  ls,
std::string const &  stream 
) const
private
void evf::EvFDaqDirector::overrideRunNumber ( unsigned int  run)
inline

Definition at line 70 of file EvFDaqDirector.h.

References findQualityFiles::run.

void evf::EvFDaqDirector::postEndRun ( edm::GlobalContext const &  globalContext)

Definition at line 298 of file EvFDaqDirector.cc.

References bu_readlock_fd_, bu_run_dir_, bu_writelock_fd_, directorBu_, corrVsCorr::filename, removeFile(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by EvFDaqDirector().

298  {
299  close(bu_readlock_fd_);
300  close(bu_writelock_fd_);
301  if (directorBu_) {
302  std::string filename = bu_run_dir_ + "/bu.lock";
303  removeFile(filename);
304  }
305  }
void removeFile(unsigned int ls, unsigned int index)
std::string bu_run_dir_
void evf::EvFDaqDirector::preallocate ( edm::service::SystemBounds const &  bounds)

Definition at line 254 of file EvFDaqDirector.cc.

References initRun(), edm::service::SystemBounds::maxNumberOfStreams(), edm::service::SystemBounds::maxNumberOfThreads(), nStreams_, and nThreads_.

Referenced by EvFDaqDirector().

254  {
255 
256  initRun();
257 
258  nThreads_=bounds.maxNumberOfStreams();
259  nStreams_=bounds.maxNumberOfThreads();
260  }
unsigned int nThreads_
unsigned int nStreams_
void evf::EvFDaqDirector::preBeginJob ( edm::PathsAndConsumesOfModulesBase const &  ,
edm::ProcessContext const &  pc 
)

Definition at line 280 of file EvFDaqDirector.cc.

References checkMergeTypePSet(), and checkTransferSystemPSet().

Referenced by EvFDaqDirector().

281  {
283  checkMergeTypePSet(pc);
284  }
void checkMergeTypePSet(edm::ProcessContext const &pc)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
void evf::EvFDaqDirector::preBeginRun ( edm::GlobalContext const &  globalContext)

Definition at line 286 of file EvFDaqDirector.cc.

References dirManager_, evf::DirManager::findHighestRunDir(), and run_dir_.

Referenced by EvFDaqDirector().

286  {
287 
288  //assert(run_ == id.run());
289 
290  // check if the requested run is the latest one - issue a warning if it isn't
292  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: "
293  << run_dir_ << ". This is not the highest run "
295  }
296  }
std::string findHighestRunDir()
Definition: DirManager.cc:20
void evf::EvFDaqDirector::preGlobalEndLumi ( edm::GlobalContext const &  globalContext)

Definition at line 307 of file EvFDaqDirector.cc.

References cppFunctionSkipper::exception, fileDeleteLockPtr_, filesToDeletePtr_, LogDebug, eostools::ls(), edm::LuminosityBlockID::luminosityBlock(), edm::GlobalContext::luminosityBlockID(), callgraph::path, and MatrixUtil::remove().

Referenced by EvFDaqDirector().

308  {
309  //delete all files belonging to just closed lumi
310  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
312  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
313  return;
314  }
315 
316  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
317  auto it = filesToDeletePtr_->begin();
318  while (it!=filesToDeletePtr_->end()) {
319  if (it->second->lumi_ == ls) {
320  const boost::filesystem::path filePath(it->second->fileName_);
321  LogDebug("EvFDaqDirector") << "Deleting input file -:" << it->second->fileName_;
322  try {
323  //rarely this fails but file gets deleted
324  boost::filesystem::remove(filePath);
325  }
326  catch (const boost::filesystem::filesystem_error& ex)
327  {
328  edm::LogError("EvFDaqDirector") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
329  usleep(10000);
330  try {
331  boost::filesystem::remove(filePath);
332  }
333  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
334  }
335  catch (std::exception& ex)
336  {
337  edm::LogError("EvFDaqDirector") << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
338  usleep(10000);
339  try {
340  boost::filesystem::remove(filePath);
341  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
342  }
343 
344  delete it->second;
345  it = filesToDeletePtr_->erase(it);
346  }
347  else it++;
348  }
349  }
#define LogDebug(id)
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
std::mutex * fileDeleteLockPtr_
def ls(path, rec=False)
Definition: eostools.py:348
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
int evf::EvFDaqDirector::readLastLSEntry ( std::string const &  file)

Definition at line 844 of file EvFDaqDirector.cc.

References Json::Value::asInt(), FrontierConditions_GlobalTag_cff::file, Json::Value::get(), Json::Reader::parse(), and matplotRender::reader.

Referenced by updateFuLock().

844  {
845 
846  boost::filesystem::ifstream ij(file);
847  Json::Value deserializeRoot;
849 
850  if (!reader.parse(ij, deserializeRoot)) {
851  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
852  return -1;
853  }
854 
855  int ret = deserializeRoot.get("lastLS","").asInt();
856  return ret;
857 
858  }
Value get(UInt index, const Value &defaultValue) const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
Int asInt() const
Unserialize a JSON document into a Value.
Definition: reader.h:16
void evf::EvFDaqDirector::removeFile ( unsigned int  ls,
unsigned int  index 
)

Definition at line 451 of file EvFDaqDirector.cc.

References getRawFilePath().

Referenced by postEndRun().

451  {
453  }
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
void removeFile(unsigned int ls, unsigned int index)
def ls(path, rec=False)
Definition: eostools.py:348
void evf::EvFDaqDirector::removeFile ( std::string  filename)

Definition at line 444 of file EvFDaqDirector.cc.

444  {
445  int retval = remove(filename.c_str());
446  if (retval != 0)
447  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename << ". error = "
448  << strerror(errno);
449  }
void evf::EvFDaqDirector::setDeleteTracking ( std::mutex fileDeleteLock,
std::list< std::pair< int, InputFile * >> *  filesToDelete 
)
inline

Definition at line 118 of file EvFDaqDirector.h.

References AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

118  {
119  fileDeleteLockPtr_=fileDeleteLock;
120  filesToDeletePtr_ = filesToDelete;
121  }
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
std::mutex * fileDeleteLockPtr_
void evf::EvFDaqDirector::setFMS ( evf::FastMonitoringService fms)
inline

Definition at line 109 of file EvFDaqDirector.h.

Referenced by FedRawDataInputSource::FedRawDataInputSource().

109 {fms_=fms;}
evf::FastMonitoringService * fms_
void evf::EvFDaqDirector::tryInitializeFuLockFile ( )

Definition at line 774 of file EvFDaqDirector.cc.

References fu_rw_lock_stream, and getHLTprescales::readIndex().

Referenced by initRun().

774  {
775  if (fu_rw_lock_stream == nullptr)
776  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream "
777  << strerror(errno);
778  else {
779  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
780  unsigned int readLs = 1, readIndex = 0;
781  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
782  }
783  }
void evf::EvFDaqDirector::unlockFULocal ( )

Definition at line 816 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd_.

Referenced by FedRawDataInputSource::grabNextJsonFile(), and ~EvFDaqDirector().

816  {
817  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
818  flock(fulocal_rwlock_fd_,LOCK_UN);
819  }
void evf::EvFDaqDirector::unlockFULocal2 ( )

Definition at line 827 of file EvFDaqDirector.cc.

References fulocal_rwlock_fd2_.

Referenced by FedRawDataInputSource::checkNextEvent(), FedRawDataInputSource::maybeOpenNewLumiSection(), and ~EvFDaqDirector().

827  {
828  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
829  flock(fulocal_rwlock_fd2_,LOCK_UN);
830  }
void evf::EvFDaqDirector::unlockInitLock ( )

Definition at line 807 of file EvFDaqDirector.cc.

References init_lock_.

807  {
808  pthread_mutex_unlock(&init_lock_);
809  }
pthread_mutex_t init_lock_
EvFDaqDirector::FileStatus evf::EvFDaqDirector::updateFuLock ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)

Definition at line 455 of file EvFDaqDirector.cc.

References bu_run_dir_, bumpFile(), trackerTree::check(), Exception, fu_readwritelock_fd_, fu_rw_flk, fu_rw_fulk, fu_rw_lock_stream, fuLockPollInterval_, getEoLSFilePathOnFU(), getEoRFilePath(), lockFULocal(), LogDebug, eostools::ls(), newFile, noFile, getHLTprescales::readIndex(), readLastLSEntry(), runEnded, trackingPlots::stat, stop_ls_override_, stopFilePath_, and stopFilePathPid_.

Referenced by FedRawDataInputSource::readSupervisor().

455  {
456  EvFDaqDirector::FileStatus fileStatus = noFile;
457 
458  int retval = -1;
459  int lock_attempts = 0;
460 
461  struct stat buf;
462  int stopFileLS = -1;
463  int stopFileCheck = stat(stopFilePath_.c_str(),&buf);
464  int stopFilePidCheck = stat(stopFilePathPid_.c_str(),&buf);
465  if (stopFileCheck==0 || stopFilePidCheck==0) {
466  if (stopFileCheck==0)
467  stopFileLS = readLastLSEntry(stopFilePath_);
468  else
469  stopFileLS = 1;//stop without drain if only pid is stopped
470  if (!stop_ls_override_) {
471  //if lumisection is higher than in stop file, should quit at next from current
472  if (stopFileLS>=0 && (int)ls>=stopFileLS) stopFileLS = stop_ls_override_ = ls;
473  }
474  else stopFileLS = stop_ls_override_;
475  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
476  //return runEnded;
477  }
478  else //if file was removed before reaching stop condition, reset this
479  stop_ls_override_ = 0;
480 
481  timeval ts_lockbegin;
482  gettimeofday(&ts_lockbegin,nullptr);
483 
484  while (retval==-1) {
485  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
486  if (retval==-1) usleep(fuLockPollInterval_);
487  else continue;
488 
489  lock_attempts+=fuLockPollInterval_;
490  if (lock_attempts>5000000 || errno==116) {
491  if (errno==116)
492  edm::LogWarning("EvFDaqDirector") << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
493  else
494  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and fu.lock file are present -: errno "
495  << errno <<":"<< strerror(errno) << std::endl;
496 
497 
498  if (stat(getEoLSFilePathOnFU(ls).c_str(),&buf)==0) {
499  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< ls ;
500  ls++;
501  return noFile;
502  }
503 
504  if (stat(bu_run_dir_.c_str(), &buf)!=0) return runEnded;
505  if (stat((bu_run_dir_+"/fu.lock").c_str(), &buf)!=0) return runEnded;
506  lock_attempts=0;
507  }
508  }
509 
510  timeval ts_lockend;
511  gettimeofday(&ts_lockend,nullptr);
512  long deltat = (ts_lockend.tv_usec-ts_lockbegin.tv_usec) + (ts_lockend.tv_sec-ts_lockbegin.tv_sec)*1000000;
513  if (deltat>0.) lockWaitTime=deltat;
514 
515 
516 
517  if(retval!=0) return fileStatus;
518 
519 #ifdef DEBUG
520  timeval ts_lockend;
521  gettimeofday(&ts_lockend,0);
522 #endif
523 
524  // if the stream is readable
525  if (fu_rw_lock_stream != nullptr) {
526  unsigned int readLs, readIndex;
527  int check = 0;
528  // rewind the stream
529  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
530  // if rewinded ok
531  if (check == 0) {
532  // read its' values
533  fscanf(fu_rw_lock_stream, "%u %u", &readLs, &readIndex);
534  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
535 
536  unsigned int currentLs = readLs;
537  bool bumpedOk = false;
538  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
539  //no lock file write in this case
540  if (ls && ls+1 < currentLs) ls++;
541  else {
542  // try to bump (look for new index or EoLS file)
543  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
544  //avoid 2 lumisections jump
545  if (ls && readLs>currentLs && currentLs > ls) {
546  ls++;
547  readLs=currentLs=ls;
548  readIndex=0;
549  bumpedOk=false;
550  //no write to lock file
551  }
552  else {
553  if (ls==0 && readLs>currentLs) {
554  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
555  //in this case there is no new file in the same LS
556  readLs=currentLs;
557  readIndex=0;
558  bumpedOk=false;
559  //no write to lock file
560  }
561  //update return LS value
562  ls = readLs;
563  }
564  }
565  if (bumpedOk) {
566  // there is a new index file to grab, lock file needs to be updated
567  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
568  if (check == 0) {
569  ftruncate(fu_readwritelock_fd_, 0);
570  // write next index in the file, which is the file the next process should take
571  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex + 1);
572  fflush(fu_rw_lock_stream);
573  fsync(fu_readwritelock_fd_);
574  fileStatus = newFile;
575  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
576  }
577  else {
578  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
579  }
580  }
581  else if (currentLs < readLs) {
582  //there is no new file in next LS (yet), but lock file can be updated to the next LS
583  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
584  if (check == 0) {
585  ftruncate(fu_readwritelock_fd_, 0);
586  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
587  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
588  fflush(fu_rw_lock_stream);
589  fsync(fu_readwritelock_fd_);
590  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
591  }
592  else {
593  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
594  }
595  }
596  } else {
597  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error " << strerror(errno);
598  }
599  } else {
600  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
601  }
602 
603 #ifdef DEBUG
604  timeval ts_preunlock;
605  gettimeofday(&ts_preunlock,0);
606  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
607  double locked_period=locked_period_int+double(ts_preunlock.tv_usec - ts_lockend.tv_usec)/1000000;
608 #endif
609 
610  //if new json is present, lock file which FedRawDataInputSource will later unlock
611  if (fileStatus==newFile) lockFULocal();
612 
613  //release lock at this point
614  int retvalu=-1;
615  retvalu=fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
616  if (retvalu==-1) edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
617 
618 #ifdef DEBUG
619  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
620 #endif
621 
622  if ( fileStatus == noFile ) {
623  struct stat buf;
624  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
625  if ( stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf)!=0)
626  fileStatus = runEnded;
627  if (stopFileLS>=0 && (int)ls > stopFileLS) {
628  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
629  fileStatus = runEnded;
630  }
631  }
632  return fileStatus;
633  }
#define LogDebug(id)
struct flock fu_rw_flk
std::string getEoRFilePath() const
struct flock fu_rw_fulk
std::string stopFilePath_
unsigned int stop_ls_override_
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
def ls(path, rec=False)
Definition: eostools.py:348
std::string bu_run_dir_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
def check(config)
Definition: trackerTree.py:14
unsigned int fuLockPollInterval_

Member Data Documentation

std::string evf::EvFDaqDirector::base_dir_
private

Definition at line 143 of file EvFDaqDirector.h.

Referenced by initRun().

std::string evf::EvFDaqDirector::bu_base_dir_
private

Definition at line 144 of file EvFDaqDirector.h.

Referenced by initRun().

struct flock evf::EvFDaqDirector::bu_r_flk
private

Definition at line 179 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_r_fulk
private

Definition at line 181 of file EvFDaqDirector.h.

FILE* evf::EvFDaqDirector::bu_r_lock_stream
private

Definition at line 169 of file EvFDaqDirector.h.

int evf::EvFDaqDirector::bu_readlock_fd_
private

Definition at line 162 of file EvFDaqDirector.h.

Referenced by postEndRun().

std::string evf::EvFDaqDirector::bu_run_dir_
private
std::string evf::EvFDaqDirector::bu_run_open_dir_
private

Definition at line 160 of file EvFDaqDirector.h.

Referenced by initRun().

FILE* evf::EvFDaqDirector::bu_t_monitor_stream
private

Definition at line 172 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_w_flk
private

Definition at line 178 of file EvFDaqDirector.h.

struct flock evf::EvFDaqDirector::bu_w_fulk
private

Definition at line 180 of file EvFDaqDirector.h.

FILE* evf::EvFDaqDirector::bu_w_lock_stream
private

Definition at line 168 of file EvFDaqDirector.h.

Referenced by initRun().

FILE* evf::EvFDaqDirector::bu_w_monitor_stream
private

Definition at line 171 of file EvFDaqDirector.h.

int evf::EvFDaqDirector::bu_writelock_fd_
private

Definition at line 163 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

bool evf::EvFDaqDirector::directorBu_
private

Definition at line 145 of file EvFDaqDirector.h.

Referenced by initRun(), and postEndRun().

DirManager evf::EvFDaqDirector::dirManager_
private

Definition at line 174 of file EvFDaqDirector.h.

Referenced by preBeginRun().

bool evf::EvFDaqDirector::emptyLumisectionMode_
private

Definition at line 152 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

unsigned int evf::EvFDaqDirector::eolsNFilesIndex_ = 1
private

Definition at line 196 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

std::mutex* evf::EvFDaqDirector::fileDeleteLockPtr_ = nullptr
private

Definition at line 187 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi().

std::list<std::pair<int,InputFile*> >* evf::EvFDaqDirector::filesToDeletePtr_ = nullptr
private

Definition at line 188 of file EvFDaqDirector.h.

Referenced by preGlobalEndLumi().

evf::FastMonitoringService* evf::EvFDaqDirector::fms_ = nullptr
private

Definition at line 185 of file EvFDaqDirector.h.

Referenced by bumpFile().

int evf::EvFDaqDirector::fu_readwritelock_fd_
private

Definition at line 164 of file EvFDaqDirector.h.

Referenced by initRun(), openFULockfileStream(), and updateFuLock().

struct flock evf::EvFDaqDirector::fu_rw_flk
private

Definition at line 182 of file EvFDaqDirector.h.

Referenced by updateFuLock().

struct flock evf::EvFDaqDirector::fu_rw_fulk
private

Definition at line 183 of file EvFDaqDirector.h.

Referenced by updateFuLock().

FILE* evf::EvFDaqDirector::fu_rw_lock_stream
private
int evf::EvFDaqDirector::fulocal_rwlock_fd2_
private

Definition at line 166 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal2(), unlockFULocal2(), and ~EvFDaqDirector().

int evf::EvFDaqDirector::fulocal_rwlock_fd_
private

Definition at line 165 of file EvFDaqDirector.h.

Referenced by initRun(), lockFULocal(), unlockFULocal(), and ~EvFDaqDirector().

unsigned int evf::EvFDaqDirector::fuLockPollInterval_
private

Definition at line 151 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector(), and updateFuLock().

std::string evf::EvFDaqDirector::hltSourceDirectory_
private

Definition at line 150 of file EvFDaqDirector.h.

Referenced by initRun().

std::string evf::EvFDaqDirector::hostname_
private
pthread_mutex_t evf::EvFDaqDirector::init_lock_ = PTHREAD_MUTEX_INITIALIZER
private

Definition at line 190 of file EvFDaqDirector.h.

Referenced by initRun(), lockInitLock(), and unlockInitLock().

std::map<std::string,std::string> evf::EvFDaqDirector::mergeTypeMap_
private

Definition at line 202 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet(), and getStreamMergeType().

const std::vector< std::string > evf::EvFDaqDirector::MergeTypeNames_ = {"","DAT","PB","JSNDATA"}
staticprivate

Definition at line 205 of file EvFDaqDirector.h.

Referenced by getStreamMergeType().

std::string evf::EvFDaqDirector::mergeTypePset_
private

Definition at line 154 of file EvFDaqDirector.h.

Referenced by checkMergeTypePSet().

bool evf::EvFDaqDirector::microMergeDisabled_
private

Definition at line 153 of file EvFDaqDirector.h.

Referenced by EvFDaqDirector().

unsigned int evf::EvFDaqDirector::nStreams_ =0
private

Definition at line 192 of file EvFDaqDirector.h.

Referenced by preallocate().

unsigned int evf::EvFDaqDirector::nThreads_ =0
private

Definition at line 193 of file EvFDaqDirector.h.

Referenced by preallocate().

bool evf::EvFDaqDirector::outputAdler32Recheck_
private

Definition at line 147 of file EvFDaqDirector.h.

unsigned long evf::EvFDaqDirector::previousFileSize_
private

Definition at line 176 of file EvFDaqDirector.h.

Referenced by bumpFile().

bool evf::EvFDaqDirector::readEolsDefinition_ = true
private

Definition at line 195 of file EvFDaqDirector.h.

Referenced by getNFilesFromEoLS().

bool evf::EvFDaqDirector::requireTSPSet_
private

Definition at line 148 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().

unsigned int evf::EvFDaqDirector::run_
private
std::string evf::EvFDaqDirector::run_dir_
private
std::string evf::EvFDaqDirector::run_string_
private

Definition at line 157 of file EvFDaqDirector.h.

Referenced by initRun().

std::string evf::EvFDaqDirector::selectedTransferMode_
private

Definition at line 149 of file EvFDaqDirector.h.

Referenced by getStreamDestinations().

unsigned int evf::EvFDaqDirector::stop_ls_override_ = 0
private

Definition at line 199 of file EvFDaqDirector.h.

Referenced by updateFuLock().

std::string evf::EvFDaqDirector::stopFilePath_
private

Definition at line 197 of file EvFDaqDirector.h.

Referenced by initRun(), and updateFuLock().

std::string evf::EvFDaqDirector::stopFilePathPid_
private

Definition at line 198 of file EvFDaqDirector.h.

Referenced by initRun(), and updateFuLock().

std::shared_ptr<Json::Value> evf::EvFDaqDirector::transferSystemJson_
private

Definition at line 201 of file EvFDaqDirector.h.

Referenced by checkTransferSystemPSet(), and getStreamDestinations().