CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <sstream>
5 #include <sys/types.h>
6 #include <sys/file.h>
7 #include <unistd.h>
8 #include <vector>
9 #include <fstream>
10 #include <zlib.h>
11 
12 #include <boost/algorithm/string.hpp>
13 #include <boost/filesystem/fstream.hpp>
14 
18 
21 
27 
31 
33 
34 #include "FedRawDataInputSource.h"
35 #include "FastMonitoringService.h"
36 #include "EvFDaqDirector.h"
37 
39  edm::InputSourceDescription const& desc) :
40  edm::RawInputSource(pset, desc),
41  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)),
42  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
43  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
44  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
45  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
46  buInputDir_(edm::Service<evf::EvFDaqDirector>()->buBaseDir()),
47  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->fuBaseDir()),
48  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
49  fileStream_(0),
50  eventID_(),
51  currentLumiSection_(0),
52  currentInputJson_(""),
53  currentInputEventCount_(0),
54  eorFileSeen_(false),
55  dataBuffer_(new unsigned char[1024 * 1024 * eventChunkSize_]),
56  bufferCursor_(dataBuffer_),
57  bufferLeft_(0)
58 {
59  char thishost[256];
60  gethostname(thishost, 255);
61  edm::LogInfo("FedRawDataInputSource") << "test mode: "
62  << testModeNoBuilderUnit_ << ", read-ahead chunk size: " << eventChunkSize_
63  << " on host " << thishost;
64 
66  setNewRun();
69 }
70 
72 {
73  if (fileStream_)
74  fclose(fileStream_);
75  fileStream_ = 0;
76 }
77 
79 {
80  int eventAvailable = cacheNextEvent();
81  if (eventAvailable < 0) {
82  // run has ended
85  return false;
86  }
87  else if(eventAvailable == 0) {
88  edm::LogInfo("FedRawDataInputSource") << "No Event files at this time, but a new lumisection was detected : " << currentLumiSection_;
89 
90  return true;
91  }
92  else {
93  if (!getLSFromFilename_) {
94  //get new lumi from file header
96  }
97 
99  event_->run(),
101  event_->event());
102 
103  setEventCached();
104 
105  return true;
106  }
107 }
108 
109 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
110 {
112  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
113 
114  if ( currentLumiSection_ > 0 ) {
115  const string fuEoLS =
117  struct stat buf;
118  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
119  if ( !found ) {
120  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
121  close(eol_fd);
122  }
123  }
124 
125  currentLumiSection_ = lumiSection;
126 
128 
129  timeval tv;
130  gettimeofday(&tv, 0);
131  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
132 
135  runAuxiliary()->run(),
136  lumiSection, lsopentime,
138 
139  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
140 
141  edm::LogInfo("FedRawDataInputSource") << "New lumi section " << lumiSection << " opened";
142  }
143 }
144 
146 {
147  //return values or cachenext -1 :== run ended, 0:== LS ended, 1 :== cache good
148  if ( bufferLeft_ < (4 + 1024) * sizeof(uint32) ) //minimal size to fit any version of FRDEventHeader
149  {
151  if ( check ==-1) return 0;
152  if ( check ==-100) return -1;
153  }
154 
155  event_.reset( new FRDEventMsgView(bufferCursor_) );
156 
157  const uint32_t msgSize = event_->size();
158 
159  if ( bufferLeft_ < msgSize )
160  {
161  if ( readNextChunkIntoBuffer()<0 || bufferLeft_ < msgSize )
162  {
163  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
164  "Premature end of input file while reading event data";
165  }
166  event_.reset( new FRDEventMsgView(bufferCursor_) );
167  }
168 
169  if ( verifyAdler32_ && event_->version() >= 3 )
170  {
171  uint32_t adler = adler32(0L,Z_NULL,0);
172  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
173 
174  if ( adler != event_->adler32() ) {
175  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
176  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
177  " but calculated 0x" << adler;
178  }
179  }
180 
181  bufferLeft_ -= msgSize;
182  bufferCursor_ += msgSize;
183 
184  return 1;
185 }
186 
188 {
189  //this function is called when we reach the end of the buffer (i.e. bytes to read are more than bytes left in buffer)
190  // NOTA BENE: A positive or 0 value is returned if data are buffered
191  // a value of -1 indicates no data can be buffered but there is a new
192  // lumi section to account for
193  int fileStatus = 100; //file is healthy for now
194  if (eofReached()){
196  fileStatus = openNextFile(); // this can now return even if there is
197  //no file only temporarily
198  if(fileStatus==0) return -100; //should only happen when requesting the next event header and the run is over
199  }
200  if(fileStatus==100){ //either file was not over or a new one was opened
201  if (bufferLeft_ == 0) { //in the rare case the last byte barely fit
202  uint32_t chunksize = 1024 * 1024 * eventChunkSize_;
203  bufferLeft_ = fread((void*) dataBuffer_, sizeof(unsigned char),
204  chunksize, fileStream_); //reads a maximum of chunksize bytes from file into buffer
205  } else { //refill the buffer after having moved the bufferLeft_ bytes at the head
206  uint32_t chunksize = 1024 * 1024 * eventChunkSize_ - bufferLeft_;
207  memcpy((void*) dataBuffer_, bufferCursor_, bufferLeft_); //this copy could be avoided
208  bufferLeft_ += fread((void*) (dataBuffer_ + bufferLeft_),
209  sizeof(unsigned char), chunksize, fileStream_);
210  }
211  bufferCursor_ = dataBuffer_; // reset the cursor at the beginning of the buffer
212  return bufferLeft_;
213  }
214  else{
215  // no file to read but a new lumi section has been cached
216  return -1;
217  }
218 }
219 
221 {
222  if (fileStream_ == 0)
223  return true;
224 
225  int c;
226  c = fgetc(fileStream_);
227  ungetc(c, fileStream_);
228 
229  return (c == EOF);
230 }
231 
233 {
234  if (fileStream_) {
235 
236  edm::LogInfo("FedRawDataInputSource") << "Closing input file " << openFile_.string();
237 
238  fclose(fileStream_);
239  fileStream_ = 0;
240 
241  if (!testModeNoBuilderUnit_) {
242  boost::filesystem::remove(openFile_); // won't work in case of forked children
243  } else {
245  }
246  }
247 }
248 
250 {
251  int nextfile = -1;
252  while((nextfile = searchForNextFile())<0){
253  if(eorFileSeen_)
254  return 0;
255  else{
256  edm::LogInfo("FedRawDataInputSource") << "No file for me... sleep and try again..." << std::endl;
257  usleep(100000);
258  }
259  }
260  return nextfile;
261 }
262 
264 {
266  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
267  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
268 
269  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
271  makeEvent(eventPrincipal, aux);
272 
275 
278 
279  return;
280 }
281 
282 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(std::auto_ptr<FEDRawDataCollection>& rawData) const
283 {
284  edm::Timestamp tstamp;
285  uint32_t eventSize = event_->eventSize();
286  char* event = (char*)event_->payload();
287 
288  while (eventSize > 0) {
289  eventSize -= sizeof(fedt_t);
290  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
291  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
292  eventSize -= (fedSize - sizeof(fedh_t));
293  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
294  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
295  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
297  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
298  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
299  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
300  }
301  FEDRawData& fedData = rawData->FEDData(fedId);
302  fedData.resize(fedSize);
303  memcpy(fedData.data(), event + eventSize, fedSize);
304  }
305  assert(eventSize == 0);
306 
307  return tstamp;
308 }
309 
311 {
312  int retval = -1;
314  throw cms::Exception("RuntimeError") << "Went to search for next file but according to BU more events in "
315  << currentInputJson_.string();
316  }
317 
318  std::string nextFile;
319  uint32_t ls;
320 
321  edm::LogInfo("FedRawDataInputSource") << "Asking for next file... to the DaqDirector";
323  fms->startedLookingForFile();
324  bool fileIsOKToGrab = edm::Service<evf::EvFDaqDirector>()->updateFuLock(ls,nextFile,eorFileSeen_);
325 
326  if (fileIsOKToGrab) {
327 
328  edm::LogInfo("FedRawDataInputSource") << "The director says to grab: " << nextFile;
329 
330  fms->stoppedLookingForFile();
331 
333  jsonFile.replace_extension(".jsn");
334  assert( grabNextJsonFile(jsonFile) );
335  openDataFile(nextFile);
336  retval = 100;
337  } else {
338  edm::LogInfo("FedRawDataInputSource") << "The DAQ Director has nothing for me! ";
339  }
340 
341  while( getLSFromFilename_ && ls > currentLumiSection_ ) {
343  retval = 1;
344  }
345 
346  return retval;
347 }
348 
350 {
351  try {
352  // assemble json destination path
354 
355  std::ostringstream fileNameWithPID;
356  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
357  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
358  const boost::filesystem::path filePathWithPID(fileNameWithPID.str());
359  jsonDestPath /= fileNameWithPID.str();
360 
361  edm::LogInfo("FedRawDataInputSource") << " JSON rename " << jsonSourcePath << " to "
362  << jsonDestPath;
363 
365  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
366  else {
367  //boost::filesystem::rename(jsonSourcePath,jsonDestPath);
368  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
369  boost::filesystem::remove(jsonSourcePath);
370  }
371 
372  currentInputJson_ = jsonDestPath; // store location for later deletion.
373  boost::filesystem::ifstream ij(jsonDestPath);
374  Json::Value deserializeRoot;
375  DataPoint dp;
376  if(!reader_.parse(ij,deserializeRoot)){
377  throw std::runtime_error("Cannot deserialize input JSON file");
378  }
379  else {
380  dp.deserialize(deserializeRoot);
381  std::string data = dp.getData()[0];
382  currentInputEventCount_=atoi(data.c_str()); //all this is horrible...
383  }
384 
385  return true;
386  }
387 
388  catch (const boost::filesystem::filesystem_error& ex)
389  {
390  // Input dir gone?
391  edm::LogError("FedRawDataInputSource") << " - grabNextFile BOOST FILESYSTEM ERROR CAUGHT: " << ex.what()
392  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
393  _exit(0);
394  }
395  catch (std::runtime_error e)
396  {
397  // Another process grabbed the file and NFS did not register this
398  edm::LogError("FedRawDataInputSource") << " - grabNextFile runtime Exception: " << e.what() << std::endl;
399  }
400  catch (std::exception e)
401  {
402  // BU run directory disappeared?
403  edm::LogError("FedRawDataInputSource") << " - grabNextFileSOME OTHER EXCEPTION OCCURED!!!! ->" << e.what()
404  << std::endl;
405  }
406  return false;
407 }
408 
410 {
411  const int fileDescriptor = open(nextFile.c_str(), O_RDONLY);
412  if (fileDescriptor != -1) {
413  fileStream_ = fdopen(fileDescriptor, "rb");
414  openFile_ = nextFile;
415  edm::LogInfo("FedRawDataInputSource") << " opened file " << nextFile;
416  }
417  else
418  {
419  throw cms::Exception("FedRawDataInputSource::openDataFile") <<
420  " failed to open file " << nextFile << " fd:" << fileDescriptor;
421  }
422 }
423 
425 {
427  boost::filesystem::path destination( edm::Service<evf::EvFDaqDirector>()->getJumpFilePath() );
428 
429  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME: " << openFile_
430  << " to: " << destination.string();
431  boost::filesystem::rename(source,destination);
432  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
433 }
434 
436 {}
437 
438 void FedRawDataInputSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource>)
439 {
440  InputSource::rewind();
444 }
445 
447 {}
448 
449 // define this class as an input source
451 
static const char runNumber_[]
unsigned int getgpshigh(const unsigned char *)
virtual void read(edm::EventPrincipal &eventPrincipal) override
ProductProvenance dummyProvenance_
struct fedh_struct fedh_t
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
void maybeOpenNewLumiSection(const uint32_t lumiSection)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:258
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:210
tuple jsonFile
Definition: dataset.py:390
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void evm_board_setformat(size_t size)
#define DEFINE_FWK_INPUT_SOURCE(type)
boost::filesystem::path currentInputJson_
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:350
bool check(const DataFrame &df, bool capcheck, bool dvercheck)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
bool grabNextJsonFile(boost::filesystem::path const &)
void openDataFile(std::string const &)
virtual void preForkReleaseResources() override
const edm::DaqProvenanceHelper daqProvenanceHelper_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:36
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:377
const std::string fuOutputDir_
std::vector< std::string > getData() const
Definition: DataPoint.h:40
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
const unsigned int eventChunkSize_
unsigned long long uint64_t
Definition: Time.h:15
auto dp
Definition: deltaR.h:24
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:343
std::unique_ptr< FRDEventMsgView > event_
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &) const
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:358
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:255
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:173
unsigned int eventsize
Definition: fed_trailer.h:33
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:346
boost::filesystem::path openFile_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int currentInputEventCount_
BranchDescription const constBranchDescription_
volatile std::atomic< bool > shutdown_flag false
static std::string const source
Definition: EdmProvDump.cc:43
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)