CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <sstream>
5 #include <sys/types.h>
6 #include <sys/file.h>
7 #include <unistd.h>
8 #include <vector>
9 #include <fstream>
10 #include <zlib.h>
11 
12 #include <boost/algorithm/string.hpp>
13 #include <boost/filesystem/fstream.hpp>
14 
18 
21 
27 
31 
33 
34 #include "FedRawDataInputSource.h"
35 #include "FastMonitoringService.h"
36 #include "EvFDaqDirector.h"
37 
39  edm::InputSourceDescription const& desc) :
40  edm::RawInputSource(pset, desc),
41  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)),
42  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
43  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
44  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
45  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
46  buInputDir_(edm::Service<evf::EvFDaqDirector>()->buBaseDir()),
47  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->fuBaseDir()),
48  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
49  fileStream_(0),
50  eventID_(),
51  processHistoryID_(),
52  currentLumiSection_(0),
53  currentInputJson_(""),
54  currentInputEventCount_(0),
55  eorFileSeen_(false),
56  dataBuffer_(new unsigned char[1024 * 1024 * eventChunkSize_]),
57  bufferCursor_(dataBuffer_),
58  bufferLeft_(0)
59 {
60  char thishost[256];
61  gethostname(thishost, 255);
62  edm::LogInfo("FedRawDataInputSource") << "test mode: "
63  << testModeNoBuilderUnit_ << ", read-ahead chunk size: " << eventChunkSize_
64  << " on host " << thishost;
65 
67  setNewRun();
70  runAuxiliary()->setProcessHistoryID(processHistoryID_);
71 }
72 
74 {
75  if (fileStream_)
76  fclose(fileStream_);
77  fileStream_ = 0;
78 }
79 
81 {
82  int eventAvailable = cacheNextEvent();
83  if (eventAvailable < 0) {
84  // run has ended
87  return false;
88  }
89  else if(eventAvailable == 0) {
90  edm::LogInfo("FedRawDataInputSource") << "No Event files at this time, but a new lumisection was detected : " << currentLumiSection_;
91 
92  return true;
93  }
94  else {
95  if (!getLSFromFilename_) {
96  //get new lumi from file header
98  }
99 
101  event_->run(),
103  event_->event());
104 
105  setEventCached();
106 
107  return true;
108  }
109 }
110 
111 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
112 {
114  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
115 
116  if ( currentLumiSection_ > 0 ) {
117  const string fuEoLS =
119  struct stat buf;
120  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
121  if ( !found ) {
122  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
123  close(eol_fd);
124  }
125  }
126 
127  currentLumiSection_ = lumiSection;
128 
130 
131  timeval tv;
132  gettimeofday(&tv, 0);
133  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
134 
135  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
137  runAuxiliary()->run(),
138  lumiSection, lsopentime,
140 
141  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
142  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
143 
144  edm::LogInfo("FedRawDataInputSource") << "New lumi section " << lumiSection << " opened";
145  }
146 }
147 
149 {
150  //return values or cachenext -1 :== run ended, 0:== LS ended, 1 :== cache good
151  if ( bufferLeft_ < (4 + 1024) * sizeof(uint32) ) //minimal size to fit any version of FRDEventHeader
152  {
154  if ( check ==-1) return 0;
155  if ( check ==-100) return -1;
156  }
157 
158  event_.reset( new FRDEventMsgView(bufferCursor_) );
159 
160  const uint32_t msgSize = event_->size();
161 
162  if ( bufferLeft_ < msgSize )
163  {
164  if ( readNextChunkIntoBuffer()<0 || bufferLeft_ < msgSize )
165  {
166  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
167  "Premature end of input file while reading event data";
168  }
169  event_.reset( new FRDEventMsgView(bufferCursor_) );
170  }
171 
172  if ( verifyAdler32_ && event_->version() >= 3 )
173  {
174  uint32_t adler = adler32(0L,Z_NULL,0);
175  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
176 
177  if ( adler != event_->adler32() ) {
178  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
179  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
180  " but calculated 0x" << adler;
181  }
182  }
183 
184  bufferLeft_ -= msgSize;
185  bufferCursor_ += msgSize;
186 
187  return 1;
188 }
189 
191 {
192  //this function is called when we reach the end of the buffer (i.e. bytes to read are more than bytes left in buffer)
193  // NOTA BENE: A positive or 0 value is returned if data are buffered
194  // a value of -1 indicates no data can be buffered but there is a new
195  // lumi section to account for
196  int fileStatus = 100; //file is healthy for now
197  if (eofReached()){
199  fileStatus = openNextFile(); // this can now return even if there is
200  //no file only temporarily
201  if(fileStatus==0) return -100; //should only happen when requesting the next event header and the run is over
202  }
203  if(fileStatus==100){ //either file was not over or a new one was opened
204  if (bufferLeft_ == 0) { //in the rare case the last byte barely fit
205  uint32_t chunksize = 1024 * 1024 * eventChunkSize_;
206  bufferLeft_ = fread((void*) dataBuffer_, sizeof(unsigned char),
207  chunksize, fileStream_); //reads a maximum of chunksize bytes from file into buffer
208  } else { //refill the buffer after having moved the bufferLeft_ bytes at the head
209  uint32_t chunksize = 1024 * 1024 * eventChunkSize_ - bufferLeft_;
210  memcpy((void*) dataBuffer_, bufferCursor_, bufferLeft_); //this copy could be avoided
211  bufferLeft_ += fread((void*) (dataBuffer_ + bufferLeft_),
212  sizeof(unsigned char), chunksize, fileStream_);
213  }
214  bufferCursor_ = dataBuffer_; // reset the cursor at the beginning of the buffer
215  return bufferLeft_;
216  }
217  else{
218  // no file to read but a new lumi section has been cached
219  return -1;
220  }
221 }
222 
224 {
225  if (fileStream_ == 0)
226  return true;
227 
228  int c;
229  c = fgetc(fileStream_);
230  ungetc(c, fileStream_);
231 
232  return (c == EOF);
233 }
234 
236 {
237  if (fileStream_) {
238 
239  edm::LogInfo("FedRawDataInputSource") << "Closing input file " << openFile_.string();
240 
241  fclose(fileStream_);
242  fileStream_ = 0;
243 
244  if (!testModeNoBuilderUnit_) {
245  boost::filesystem::remove(openFile_); // won't work in case of forked children
246  } else {
248  }
249  }
250 }
251 
253 {
254  int nextfile = -1;
255  while((nextfile = searchForNextFile())<0){
256  if(eorFileSeen_)
257  return 0;
258  else{
259  edm::LogInfo("FedRawDataInputSource") << "No file for me... sleep and try again..." << std::endl;
260  usleep(100000);
261  }
262  }
263  return nextfile;
264 }
265 
267 {
269  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
270  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
271 
272  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
275  makeEvent(eventPrincipal, aux);
276 
279 
282 
283  return;
284 }
285 
286 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(std::auto_ptr<FEDRawDataCollection>& rawData) const
287 {
288  edm::Timestamp tstamp;
289  uint32_t eventSize = event_->eventSize();
290  char* event = (char*)event_->payload();
291 
292  while (eventSize > 0) {
293  eventSize -= sizeof(fedt_t);
294  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
295  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
296  eventSize -= (fedSize - sizeof(fedh_t));
297  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
298  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
299  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
301  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
302  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
303  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
304  }
305  FEDRawData& fedData = rawData->FEDData(fedId);
306  fedData.resize(fedSize);
307  memcpy(fedData.data(), event + eventSize, fedSize);
308  }
309  assert(eventSize == 0);
310 
311  return tstamp;
312 }
313 
315 {
316  int retval = -1;
318  throw cms::Exception("RuntimeError") << "Went to search for next file but according to BU more events in "
319  << currentInputJson_.string();
320  }
321 
322  std::string nextFile;
323  uint32_t ls;
324 
325  edm::LogInfo("FedRawDataInputSource") << "Asking for next file... to the DaqDirector";
327  fms->startedLookingForFile();
328  bool fileIsOKToGrab = edm::Service<evf::EvFDaqDirector>()->updateFuLock(ls,nextFile,eorFileSeen_);
329 
330  if (fileIsOKToGrab) {
331 
332  edm::LogInfo("FedRawDataInputSource") << "The director says to grab: " << nextFile;
333 
334  fms->stoppedLookingForFile();
335 
337  jsonFile.replace_extension(".jsn");
338  assert( grabNextJsonFile(jsonFile) );
339  openDataFile(nextFile);
340  retval = 100;
341  } else {
342  edm::LogInfo("FedRawDataInputSource") << "The DAQ Director has nothing for me! ";
343  }
344 
345  while( getLSFromFilename_ && ls > currentLumiSection_ ) {
347  retval = 1;
348  }
349 
350  return retval;
351 }
352 
354 {
355  try {
356  // assemble json destination path
358 
359  std::ostringstream fileNameWithPID;
360  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
361  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
362  const boost::filesystem::path filePathWithPID(fileNameWithPID.str());
363  jsonDestPath /= fileNameWithPID.str();
364 
365  edm::LogInfo("FedRawDataInputSource") << " JSON rename " << jsonSourcePath << " to "
366  << jsonDestPath;
367 
369  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
370  else {
371  //boost::filesystem::rename(jsonSourcePath,jsonDestPath);
372  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
373  boost::filesystem::remove(jsonSourcePath);
374  }
375 
376  currentInputJson_ = jsonDestPath; // store location for later deletion.
377  boost::filesystem::ifstream ij(jsonDestPath);
378  Json::Value deserializeRoot;
379  DataPoint dp;
380  if(!reader_.parse(ij,deserializeRoot)){
381  throw std::runtime_error("Cannot deserialize input JSON file");
382  }
383  else {
384  dp.deserialize(deserializeRoot);
385  std::string data = dp.getData()[0];
386  currentInputEventCount_=atoi(data.c_str()); //all this is horrible...
387  }
388 
389  return true;
390  }
391 
392  catch (const boost::filesystem::filesystem_error& ex)
393  {
394  // Input dir gone?
395  edm::LogError("FedRawDataInputSource") << " - grabNextFile BOOST FILESYSTEM ERROR CAUGHT: " << ex.what()
396  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
397  _exit(0);
398  }
399  catch (std::runtime_error e)
400  {
401  // Another process grabbed the file and NFS did not register this
402  edm::LogError("FedRawDataInputSource") << " - grabNextFile runtime Exception: " << e.what() << std::endl;
403  }
404  catch (std::exception e)
405  {
406  // BU run directory disappeared?
407  edm::LogError("FedRawDataInputSource") << " - grabNextFileSOME OTHER EXCEPTION OCCURED!!!! ->" << e.what()
408  << std::endl;
409  }
410  return false;
411 }
412 
414 {
415  const int fileDescriptor = open(nextFile.c_str(), O_RDONLY);
416  if (fileDescriptor != -1) {
417  fileStream_ = fdopen(fileDescriptor, "rb");
418  openFile_ = nextFile;
419  edm::LogInfo("FedRawDataInputSource") << " opened file " << nextFile;
420  }
421  else
422  {
423  throw cms::Exception("FedRawDataInputSource::openDataFile") <<
424  " failed to open file " << nextFile << " fd:" << fileDescriptor;
425  }
426 }
427 
429 {
431  boost::filesystem::path destination( edm::Service<evf::EvFDaqDirector>()->getJumpFilePath() );
432 
433  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME: " << openFile_
434  << " to: " << destination.string();
435  boost::filesystem::rename(source,destination);
436  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
437 }
438 
440 {}
441 
442 void FedRawDataInputSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource>)
443 {
444  InputSource::rewind();
448 }
449 
451 {}
452 
453 // define this class as an input source
455 
static const char runNumber_[]
unsigned int getgpshigh(const unsigned char *)
virtual void read(edm::EventPrincipal &eventPrincipal) override
ProductProvenance dummyProvenance_
struct fedh_struct fedh_t
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
void maybeOpenNewLumiSection(const uint32_t lumiSection)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:258
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:210
tuple jsonFile
Definition: dataset.py:390
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void evm_board_setformat(size_t size)
#define DEFINE_FWK_INPUT_SOURCE(type)
boost::filesystem::path currentInputJson_
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:350
bool check(const DataFrame &df, bool capcheck, bool dvercheck)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
bool grabNextJsonFile(boost::filesystem::path const &)
void openDataFile(std::string const &)
edm::ProcessHistoryID processHistoryID_
virtual void preForkReleaseResources() override
const edm::DaqProvenanceHelper daqProvenanceHelper_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:36
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:377
const std::string fuOutputDir_
std::vector< std::string > getData() const
Definition: DataPoint.h:40
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
const unsigned int eventChunkSize_
unsigned long long uint64_t
Definition: Time.h:15
auto dp
Definition: deltaR.h:24
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:343
std::unique_ptr< FRDEventMsgView > event_
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &) const
void setProcessHistoryID(ProcessHistoryID const &phid)
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:358
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:255
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:173
unsigned int eventsize
Definition: fed_trailer.h:33
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:346
boost::filesystem::path openFile_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int currentInputEventCount_
BranchDescription const constBranchDescription_
volatile std::atomic< bool > shutdown_flag false
static std::string const source
Definition: EdmProvDump.cc:43
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)