CMS 3D CMS Logo

List of all members | Public Member Functions | Private Attributes
WatcherStreamFileReader Class Reference

#include <WatcherStreamFileReader.h>

Public Member Functions

void closeFile ()
 
const InitMsgViewgetHeader ()
 
edm::StreamerInputFilegetInputFile ()
 
const EventMsgViewgetNextEvent ()
 
const bool newHeader ()
 
 WatcherStreamFileReader (edm::ParameterSet const &pset)
 
 ~WatcherStreamFileReader ()
 

Private Attributes

std::string corruptedDir_
 
std::string curDir_
 
bool end_
 
std::string fileListCmd_
 
std::string fileName_
 
std::vector< std::string > filePatterns_
 
std::deque< std::string > filesInQueue_
 
std::string inprocessDir_
 
std::string inputDir_
 
std::string processedDir_
 
std::unique_ptr< edm::StreamerInputFilestreamerInputFile_
 
int timeOut_
 
std::string tokenFile_
 
int verbosity_
 

Detailed Description

Definition at line 26 of file WatcherStreamFileReader.h.

Constructor & Destructor Documentation

◆ WatcherStreamFileReader()

WatcherStreamFileReader::WatcherStreamFileReader ( edm::ParameterSet const &  pset)

Definition at line 114 of file WatcherStreamFileReader.cc.

115  : inputDir_(pset.getParameter<std::string>("inputDir")),
116  filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
117  inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
118  processedDir_(pset.getParameter<std::string>("processedDir")),
119  corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
120  tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
121  timeOut_(pset.getParameter<int>("timeOutInSec")),
122  end_(false),
123  verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
124  struct stat buf;
125  if (stat(tokenFile_.c_str(), &buf)) {
126  FILE* f = fopen(tokenFile_.c_str(), "w");
127  if (f) {
128  fclose(f);
129  } else {
130  throw cms::Exception("WatcherSource") << "Failed to create token file.";
131  }
132  }
133  vector<string> dirs;
134  dirs.push_back(inprocessDir_);
135  dirs.push_back(processedDir_);
136  dirs.push_back(corruptedDir_);
137 
138  for (unsigned i = 0; i < dirs.size(); ++i) {
139  const string& dir = dirs[i];
140  struct stat fileStat;
141  if (0 == stat(dir.c_str(), &fileStat)) {
142  if (!S_ISDIR(fileStat.st_mode)) {
143  throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
144  << " as expected.";
145  }
146  } else { //directory does not exists, let's try to create it
147  if (0 != mkdir(dir.c_str(), 0755)) {
148  throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
149  }
150  }
151  }
152 
153  std::stringstream fileListCmdBuf;
154  fileListCmdBuf.str("");
155  // fileListCmdBuf << "/bin/ls -rt " << inputDir_ << " | egrep '(";
156  //by default ls will sort the file alphabetically which will results
157  //in ordering the files in increasing LB number, which is the desired
158  //order.
159  // fileListCmdBuf << "/bin/ls " << inputDir_ << " | egrep '(";
160  fileListCmdBuf << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
161  //TODO: validate patternDir (see ;, &&, ||) and escape special character
162  if (filePatterns_.empty())
163  throw cms::Exception("WacherSource", "filePatterns parameter is empty");
164  char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
165  if (getcwd(curDir, sizeof(curDir)) == nullptr) {
166  throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
167  }
168  curDir_ = curDir;
169 
170  for (unsigned i = 0; i < filePatterns_.size(); ++i) {
171  if (i > 0)
172  fileListCmdBuf << "|";
173  // if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
174  // fileListCmdBuf << curDir << "/";
175  // }
176  fileListCmdBuf << filePatterns_[i];
177  }
178  fileListCmdBuf << ")' | sort";
179 
180  fileListCmd_ = fileListCmdBuf.str();
181 
182  cout << "[WatcherSource " << now() << "]"
183  << " Command to retrieve input files: " << fileListCmd_ << "\n";
184 }

References visDQMUpload::buf, corruptedDir_, gather_cfg::cout, curDir_, DeadROC_duringRun::dir, heppy_check::dirs, Exception, f, fileListCmd_, filePatterns_, mps_fire::i, inprocessDir_, inputDir_, eostools::mkdir(), now(), processedDir_, edm_modernize_messagelogger::stat, and tokenFile_.

◆ ~WatcherStreamFileReader()

WatcherStreamFileReader::~WatcherStreamFileReader ( )

Definition at line 186 of file WatcherStreamFileReader.cc.

186 {}

Member Function Documentation

◆ closeFile()

void WatcherStreamFileReader::closeFile ( )

Definition at line 393 of file WatcherStreamFileReader.cc.

393  {
394  if (streamerInputFile_.get() == nullptr)
395  return;
396  //delete the streamer input file:
397  streamerInputFile_.reset();
398  stringstream cmd;
399  //TODO: validation of processDir
400  cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
401  if (verbosity_)
402  cout << "[WatcherSource " << now() << "]"
403  << " Excuting " << cmd.str() << "\n";
404  int i = system(cmd.str().c_str());
405  if (i != 0) {
406  throw cms::Exception("WatcherSource") << "Failed to move processed file '" << fileName_ << "'"
407  << " to processed directory '" << processedDir_ << "'\n";
408  //Stop further processing to prevent endless loop:
409  end_ = true;
410  }
411  cout << flush;
412 }

References mps_setup::cmd, gather_cfg::cout, end_, Exception, fileName_, mps_fire::i, now(), processedDir_, streamerInputFile_, and verbosity_.

Referenced by getNextEvent().

◆ getHeader()

const InitMsgView * WatcherStreamFileReader::getHeader ( )

Definition at line 193 of file WatcherStreamFileReader.cc.

193  {
195 
196  //TODO: shall better send an exception...
197  if (inputFile == nullptr) {
198  throw cms::Exception("WatcherSource") << "No input file found.";
199  }
200 
201  const InitMsgView* header = inputFile->startMessage();
202 
203  if (header->code() != Header::INIT) //INIT Msg
204  throw cms::Exception("readHeader", "WatcherStreamFileReader")
205  << "received wrong message type: expected INIT, got " << header->code() << "\n";
206 
207  return header;
208 }

References Exception, getInputFile(), RecoTauValidation_cfi::header, Header::INIT, and dtResolutionTest_cfi::inputFile.

◆ getInputFile()

edm::StreamerInputFile * WatcherStreamFileReader::getInputFile ( )

Definition at line 226 of file WatcherStreamFileReader.cc.

226  {
227  char* lineptr = nullptr;
228  size_t n = 0;
229 
230  struct stat buf;
231 
232  if (stat(tokenFile_.c_str(), &buf) != 0) {
233  end_ = true;
234  }
235 
236  bool waiting = false;
237  static bool firstWait = true;
238  timeval waitStart;
239  //if no cached input file, look for new files until one is found:
240  if (!end_ && streamerInputFile_.get() == nullptr) {
241  fileName_.assign("");
242 
243  //check if we have file in the queue, if not look for new files:
244  while (filesInQueue_.empty()) {
245  if (stat(tokenFile_.c_str(), &buf) != 0) {
246  end_ = true;
247  break;
248  }
249  FILE* s = popen(fileListCmd_.c_str(), "r");
250  if (s == nullptr) {
251  throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
252  }
253 
254  ssize_t len;
255  while (!feof(s)) {
256  if ((len = getline(&lineptr, &n, s)) > 0) {
257  //remove end-of-line character:
258  lineptr[len - 1] = 0;
259  string fileName;
260  if (lineptr[0] != '/') {
261  if (!inputDir_.empty() && inputDir_[0] != '/') { //relative path
262  fileName.assign(curDir_);
263  fileName.append("/");
264  fileName.append(inputDir_);
265  } else {
266  fileName.assign(inputDir_);
267  }
268  fileName.append("/");
269  }
270  fileName.append(lineptr);
271  filesInQueue_.push_back(fileName);
272  if (verbosity_)
273  cout << "[WatcherSource " << now() << "]"
274  << " File to process: '" << fileName << "'\n";
275  }
276  }
277  while (!feof(s))
278  fgetc(s);
279  pclose(s);
280  if (filesInQueue_.empty()) {
281  if (!waiting) {
282  cout << "[WatcherSource " << now() << "]"
283  << " No file found. Waiting for new file...\n";
284  cout << flush;
285  waiting = true;
286  gettimeofday(&waitStart, nullptr);
287  } else if (!firstWait) {
288  timeval t;
289  gettimeofday(&t, nullptr);
290  float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
291  if ((timeOut_ >= 0) && (dt > timeOut_)) {
292  cout << "[WatcherSource " << now() << "]"
293  << " Having waited for new file for " << (int)dt << " sec. "
294  << "Timeout exceeded. Exits.\n";
295  //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
296  end_ = true;
297  break;
298  }
299  }
300  }
301  sleep(1);
302  } //end of file queue update
303  firstWait = false;
304  free(lineptr);
305  lineptr = nullptr;
306 
307  while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
308  fileName_ = filesInQueue_.front();
309  filesInQueue_.pop_front();
310  int fd = open(fileName_.c_str(), 0);
311  if (fd != 0) {
312  struct stat buf;
313  off_t size = -1;
314  //check that file transfer is finished, by monitoring its size:
315  time_t t = time(nullptr);
316  for (;;) {
317  fstat(fd, &buf);
318  if (verbosity_)
319  cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
320  if (buf.st_size == size)
321  break;
322  else
323  size = buf.st_size;
324  if (difftime(t, buf.st_mtime) > 60)
325  break; //file older then 1 min=> tansfer must be finished
326  sleep(1);
327  }
328 
329  if (fd != 0 && buf.st_size == 0) { //file is empty. streamer reader
330  // does not like empty file=> skip it
331  stringstream c;
332  c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
333  if (verbosity_)
334  cout << "[WatcherSource " << now() << "]"
335  << " Excuting " << c.str() << "\n";
336  int i = system(c.str().c_str());
337  if (i != 0) {
338  //throw cms::Exception("WatcherSource")
339  cout << "[WatcherSource " << now() << "] "
340  << "Failed to move empty file '" << fileName_ << "'"
341  << " to corrupted directory '" << corruptedDir_ << "'\n";
342  }
343  continue;
344  }
345 
346  close(fd);
347 
348  vector<char> buf1(fileName_.size() + 1);
349  copy(fileName_.begin(), fileName_.end(), buf1.begin());
350  buf1[buf1.size() - 1] = 0;
351 
352  vector<char> buf2(fileName_.size() + 1);
353  copy(fileName_.begin(), fileName_.end(), buf2.begin());
354  buf2[buf1.size() - 1] = 0;
355 
356  string dirnam(dirname(&buf1[0]));
357  string filenam(basename(&buf2[0]));
358 
359  string dest = inprocessDir_ + "/" + filenam;
360 
361  if (verbosity_)
362  cout << "[WatcherSource " << now() << "]"
363  << " Moving file " << fileName_ << " to " << dest << "\n";
364 
365  stringstream c;
366  c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest << "/.\"";
367 
368  if (0 != rename(fileName_.c_str(), dest.c_str())) {
369  //if(0!=system(c.str().c_str())){
370  throw cms::Exception("WatcherSource")
371  << "Failed to move file '" << fileName_ << "' "
372  << "to processing directory " << inprocessDir_ << ": " << strerror(errno);
373  }
374 
375  fileName_ = dest;
376 
377  cout << "[WatcherSource " << now() << "]"
378  << " Opening file " << fileName_ << "\n"
379  << flush;
380  streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
381 
382  ofstream f(".watcherfile");
383  f << fileName_;
384  } else {
385  cout << "[WatcherSource " << now() << "]"
386  << " Failed to open file " << fileName_ << endl;
387  }
388  } //loop on file queue to find one file which opening succeeded
389  }
390  return streamerInputFile_.get();
391 }

References visDQMUpload::buf, c, filterCSVwithJSON::copy, corruptedDir_, gather_cfg::cout, curDir_, mps_fire::dest, BTVHLTOfflineSource_cfi::dirname, dt, end_, Exception, f, ztee::fd, fileListCmd_, MillePedeFileConverter_cfg::fileName, fileName_, filesInQueue_, mps_fire::i, inprocessDir_, inputDir_, createfilelist::int, dqmiodumpmetadata::n, now(), alignCSCRings::s, findQualityFiles::size, edm_modernize_messagelogger::stat, streamerInputFile_, submitPVValidationJobs::t, protons_cff::time, timeOut_, tokenFile_, and verbosity_.

Referenced by getHeader(), getNextEvent(), and newHeader().

◆ getNextEvent()

const EventMsgView * WatcherStreamFileReader::getNextEvent ( )

Definition at line 210 of file WatcherStreamFileReader.cc.

210  {
211  if (end_) {
212  closeFile();
213  return nullptr;
214  }
215 
217 
218  //go to next input file, till no new event is found
219  while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
220  closeFile();
221  }
222 
223  return inputFile == nullptr ? nullptr : inputFile->currentRecord();
224 }

References closeFile(), end_, getInputFile(), dtResolutionTest_cfi::inputFile, and edm::StreamerInputFile::kEvent.

◆ newHeader()

const bool WatcherStreamFileReader::newHeader ( )

Definition at line 188 of file WatcherStreamFileReader.cc.

188  {
190  return inputFile ? inputFile->newHeader() : false;
191 }

References getInputFile(), and dtResolutionTest_cfi::inputFile.

Member Data Documentation

◆ corruptedDir_

std::string WatcherStreamFileReader::corruptedDir_
private

Directory where file must be moved if file is unreadble (e.g empty size)

Definition at line 58 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ curDir_

std::string WatcherStreamFileReader::curDir_
private

Definition at line 78 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ end_

bool WatcherStreamFileReader::end_
private

Definition at line 72 of file WatcherStreamFileReader.h.

Referenced by closeFile(), getInputFile(), and getNextEvent().

◆ fileListCmd_

std::string WatcherStreamFileReader::fileListCmd_
private

Definition at line 76 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ fileName_

std::string WatcherStreamFileReader::fileName_
private

Definition at line 64 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

◆ filePatterns_

std::vector<std::string> WatcherStreamFileReader::filePatterns_
private

Streamer file name pattern list

Definition at line 46 of file WatcherStreamFileReader.h.

Referenced by WatcherStreamFileReader().

◆ filesInQueue_

std::deque<std::string> WatcherStreamFileReader::filesInQueue_
private

Definition at line 70 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ inprocessDir_

std::string WatcherStreamFileReader::inprocessDir_
private

Directory where file are moved during processing

Definition at line 50 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ inputDir_

std::string WatcherStreamFileReader::inputDir_
private

Directory to look for streamer files

Definition at line 42 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ processedDir_

std::string WatcherStreamFileReader::processedDir_
private

Directory where file must be moved once processed

Definition at line 54 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and WatcherStreamFileReader().

◆ streamerInputFile_

std::unique_ptr<edm::StreamerInputFile> WatcherStreamFileReader::streamerInputFile_
private

Cached input file stream

Definition at line 62 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

◆ timeOut_

int WatcherStreamFileReader::timeOut_
private

Definition at line 68 of file WatcherStreamFileReader.h.

Referenced by getInputFile().

◆ tokenFile_

std::string WatcherStreamFileReader::tokenFile_
private

Definition at line 66 of file WatcherStreamFileReader.h.

Referenced by getInputFile(), and WatcherStreamFileReader().

◆ verbosity_

int WatcherStreamFileReader::verbosity_
private

Definition at line 74 of file WatcherStreamFileReader.h.

Referenced by closeFile(), and getInputFile().

mps_setup.cmd
list cmd
Definition: mps_setup.py:244
WatcherStreamFileReader::fileName_
std::string fileName_
Definition: WatcherStreamFileReader.h:64
mps_fire.i
i
Definition: mps_fire.py:428
WatcherStreamFileReader::filePatterns_
std::vector< std::string > filePatterns_
Definition: WatcherStreamFileReader.h:46
WatcherStreamFileReader::closeFile
void closeFile()
Definition: WatcherStreamFileReader.cc:393
dqmiodumpmetadata.n
n
Definition: dqmiodumpmetadata.py:28
WatcherStreamFileReader::filesInQueue_
std::deque< std::string > filesInQueue_
Definition: WatcherStreamFileReader.h:70
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
f
double f[11][100]
Definition: MuScleFitUtils.cc:78
heppy_check.dirs
dirs
Definition: heppy_check.py:26
gather_cfg.cout
cout
Definition: gather_cfg.py:144
WatcherStreamFileReader::corruptedDir_
std::string corruptedDir_
Definition: WatcherStreamFileReader.h:58
WatcherStreamFileReader::inprocessDir_
std::string inprocessDir_
Definition: WatcherStreamFileReader.h:50
protons_cff.time
time
Definition: protons_cff.py:39
WatcherStreamFileReader::curDir_
std::string curDir_
Definition: WatcherStreamFileReader.h:78
edm::StreamerInputFile::Next::kEvent
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
rename
Definition: rename.py:1
Header::INIT
Definition: MsgHeader.h:15
WatcherStreamFileReader::timeOut_
int timeOut_
Definition: WatcherStreamFileReader.h:68
WatcherStreamFileReader::verbosity_
int verbosity_
Definition: WatcherStreamFileReader.h:74
edm::StreamerInputFile
Definition: StreamerInputFile.h:19
BTVHLTOfflineSource_cfi.dirname
dirname
Definition: BTVHLTOfflineSource_cfi.py:7
dt
float dt
Definition: AMPTWrapper.h:136
ztee.fd
fd
Definition: ztee.py:136
alignCSCRings.s
s
Definition: alignCSCRings.py:92
WatcherStreamFileReader::fileListCmd_
std::string fileListCmd_
Definition: WatcherStreamFileReader.h:76
WatcherStreamFileReader::processedDir_
std::string processedDir_
Definition: WatcherStreamFileReader.h:54
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
WatcherStreamFileReader::getInputFile
edm::StreamerInputFile * getInputFile()
Definition: WatcherStreamFileReader.cc:226
now
static std::string now()
Definition: WatcherStreamFileReader.cc:100
createfilelist.int
int
Definition: createfilelist.py:10
dtResolutionTest_cfi.inputFile
inputFile
Definition: dtResolutionTest_cfi.py:14
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
WatcherStreamFileReader::streamerInputFile_
std::unique_ptr< edm::StreamerInputFile > streamerInputFile_
Definition: WatcherStreamFileReader.h:62
Exception
Definition: hltDiff.cc:245
WatcherStreamFileReader::end_
bool end_
Definition: WatcherStreamFileReader.h:72
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cms::Exception
Definition: Exception.h:70
c
auto & c
Definition: CAHitNtupletGeneratorKernelsImpl.h:46
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
WatcherStreamFileReader::tokenFile_
std::string tokenFile_
Definition: WatcherStreamFileReader.h:66
submitPVValidationJobs.t
string t
Definition: submitPVValidationJobs.py:644
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
mps_fire.dest
dest
Definition: mps_fire.py:179
DeadROC_duringRun.dir
dir
Definition: DeadROC_duringRun.py:23
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
InitMsgView
Definition: InitMessage.h:61
WatcherStreamFileReader::inputDir_
std::string inputDir_
Definition: WatcherStreamFileReader.h:42