CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUShmBuffer.cc
Go to the documentation of this file.
1 //
3 // FUShmBuffer
4 // -----------
5 //
6 // 15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
8 
9 
14 
15 #include <unistd.h>
16 #include <iostream>
17 #include <string>
18 #include <cassert>
19 
20 #include <sstream>
21 #include <fstream>
22 
23 #include <cstdlib>
24 #include <cstring>
25 #include <stdint.h>
26 
27 // the shmem keys are henceforth going to be FIXED for a give userid
28 // prior to creation, the application will attempt to get ownership
29 // of existing segments by the same key and destroy them
30 
31 #define SHM_DESCRIPTOR_KEYID 1 /* Id used on ftok for 1. shmget key */
32 #define SHM_KEYID 2 /* Id used on ftok for 2. shmget key */
33 #define SEM_KEYID 1 /* Id used on ftok for semget key */
34 
35 #define NSKIP_MAX 100
36 
37 using namespace std;
38 using namespace evf;
39 
40 //obsolete!!!
41 const char* FUShmBuffer::shmKeyPath_ =
42  (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null"
43  : getenv("FUSHM_KEYFILE"));
44 const char* FUShmBuffer::semKeyPath_ =
45  (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null"
46  : getenv("FUSEM_KEYFILE"));
47 
49 // construction/destruction
51 
52 //______________________________________________________________________________
53 FUShmBuffer::FUShmBuffer(bool segmentationMode, unsigned int nRawCells,
54  unsigned int nRecoCells, unsigned int nDqmCells,
55  unsigned int rawCellSize, unsigned int recoCellSize,
56  unsigned int dqmCellSize) :
57  segmentationMode_(segmentationMode), nClientsMax_(128),
58  nRawCells_(nRawCells), rawCellPayloadSize_(rawCellSize),
59  nRecoCells_(nRecoCells), recoCellPayloadSize_(recoCellSize),
60  nDqmCells_(nDqmCells), dqmCellPayloadSize_(dqmCellSize) {
64 
65  void* addr;
66 
67  rawWriteOffset_ = sizeof(FUShmBuffer);
68  addr = (void*) ((unsigned long) this + rawWriteOffset_);
69  new (addr) unsigned int[nRawCells_];
70 
71  rawReadOffset_ = rawWriteOffset_ + nRawCells_ * sizeof(unsigned int);
72  addr = (void*) ((unsigned long) this + rawReadOffset_);
73  new (addr) unsigned int[nRawCells_];
74 
75  recoWriteOffset_ = rawReadOffset_ + nRawCells_ * sizeof(unsigned int);
76  addr = (void*) ((unsigned long) this + recoWriteOffset_);
77  new (addr) unsigned int[nRecoCells_];
78 
79  recoReadOffset_ = recoWriteOffset_ + nRecoCells_ * sizeof(unsigned int);
80  addr = (void*) ((unsigned long) this + recoReadOffset_);
81  new (addr) unsigned int[nRecoCells_];
82 
83  dqmWriteOffset_ = recoReadOffset_ + nRecoCells_ * sizeof(unsigned int);
84  addr = (void*) ((unsigned long) this + dqmWriteOffset_);
85  new (addr) unsigned int[nDqmCells_];
86 
87  dqmReadOffset_ = dqmWriteOffset_ + nDqmCells_ * sizeof(unsigned int);
88  addr = (void*) ((unsigned long) this + dqmReadOffset_);
89  new (addr) unsigned int[nDqmCells_];
90 
91  evtStateOffset_ = dqmReadOffset_ + nDqmCells_ * sizeof(unsigned int);
92  addr = (void*) ((unsigned long) this + evtStateOffset_);
93  new (addr) evt::State_t[nRawCells_];
94 
96  addr = (void*) ((unsigned long) this + evtDiscardOffset_);
97  new (addr) unsigned int[nRawCells_];
98 
99  evtNumberOffset_ = evtDiscardOffset_ + nRawCells_ * sizeof(unsigned int);
100  addr = (void*) ((unsigned long) this + evtNumberOffset_);
101  new (addr) unsigned int[nRawCells_];
102 
103  evtPrcIdOffset_ = evtNumberOffset_ + nRawCells_ * sizeof(unsigned int);
104  addr = (void*) ((unsigned long) this + evtPrcIdOffset_);
105  new (addr) pid_t[nRawCells_];
106 
107  evtTimeStampOffset_ = evtPrcIdOffset_ + nRawCells_ * sizeof(pid_t);
108  addr = (void*) ((unsigned long) this + evtTimeStampOffset_);
109  new (addr) time_t[nRawCells_];
110 
111  dqmStateOffset_ = evtTimeStampOffset_ + nRawCells_ * sizeof(time_t);
112  addr = (void*) ((unsigned long) this + dqmStateOffset_);
113  new (addr) dqm::State_t[nDqmCells_];
114 
116  addr = (void*) ((unsigned long) this + clientPrcIdOffset_);
117  new (addr) pid_t[nClientsMax_];
118 
119  rawCellOffset_ = clientPrcIdOffset_ + nClientsMax_ * sizeof(pid_t);
120 
121  if (segmentationMode_) {
122  recoCellOffset_ = rawCellOffset_ + nRawCells_ * sizeof(key_t);
123  dqmCellOffset_ = recoCellOffset_ + nRecoCells_ * sizeof(key_t);
124  addr = (void*) ((unsigned long) this + rawCellOffset_);
125  new (addr) key_t[nRawCells_];
126  addr = (void*) ((unsigned long) this + recoCellOffset_);
127  new (addr) key_t[nRecoCells_];
128  addr = (void*) ((unsigned long) this + dqmCellOffset_);
129  new (addr) key_t[nDqmCells_];
130  } else {
133  for (unsigned int i = 0; i < nRawCells_; i++) {
134  addr = (void*) ((unsigned long) this + rawCellOffset_ + i
136  new (addr) FUShmRawCell(rawCellSize);
137  }
138  for (unsigned int i = 0; i < nRecoCells_; i++) {
139  addr = (void*) ((unsigned long) this + recoCellOffset_ + i
141  new (addr) FUShmRecoCell(recoCellSize);
142  }
143  for (unsigned int i = 0; i < nDqmCells_; i++) {
144  addr = (void*) ((unsigned long) this + dqmCellOffset_ + i
146  new (addr) FUShmDqmCell(dqmCellSize);
147  }
148  }
149 }
150 
151 //______________________________________________________________________________
153 
154 }
155 
157 // implementation of member functions
159 
160 
161 //______________________________________________________________________________
162 void FUShmBuffer::initialize(unsigned int shmid, unsigned int semid) {
163  shmid_ = shmid;
164  semid_ = semid;
165 
166  if (segmentationMode_) {
167  int shmKeyId = 666;
168  key_t* keyAddr = (key_t*) ((unsigned long) this + rawCellOffset_);
169  for (unsigned int i = 0; i < nRawCells_; i++) {
170  *keyAddr = ftok(shmKeyPath_, shmKeyId++);
171  int shmid = shm_create(*keyAddr, rawCellTotalSize_);
172  void* shmAddr = shm_attach(shmid);
173  new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
174  shmdt(shmAddr);
175  ++keyAddr;
176  }
177  keyAddr = (key_t*) ((unsigned long) this + recoCellOffset_);
178  for (unsigned int i = 0; i < nRecoCells_; i++) {
179  *keyAddr = ftok(shmKeyPath_, shmKeyId++);
180  int shmid = shm_create(*keyAddr, recoCellTotalSize_);
181  void* shmAddr = shm_attach(shmid);
182  new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
183  shmdt(shmAddr);
184  ++keyAddr;
185  }
186  keyAddr = (key_t*) ((unsigned long) this + dqmCellOffset_);
187  for (unsigned int i = 0; i < nDqmCells_; i++) {
188  *keyAddr = ftok(shmKeyPath_, shmKeyId++);
189  int shmid = shm_create(*keyAddr, dqmCellTotalSize_);
190  void* shmAddr = shm_attach(shmid);
191  new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
192  shmdt(shmAddr);
193  ++keyAddr;
194  }
195  }
196 
197  reset(true);
198 }
199 
200 //______________________________________________________________________________
201 void FUShmBuffer::reset(bool shm_detach) {
202  nClients_ = 0;
203 
204 
205  for (unsigned int i = 0; i < nRawCells_; i++) {
206  FUShmRawCell* cell = rawCell(i);
207  cell->initialize(i);
208  if (segmentationMode_ && shm_detach)
209  shmdt(cell);
210  }
211 
212  for (unsigned int i = 0; i < nRecoCells_; i++) {
213  FUShmRecoCell* cell = recoCell(i);
214  cell->initialize(i);
215  if (segmentationMode_ && shm_detach)
216  shmdt(cell);
217  }
218 
219  for (unsigned int i = 0; i < nDqmCells_; i++) {
220  FUShmDqmCell* cell = dqmCell(i);
221  cell->initialize(i);
222  if (segmentationMode_ && shm_detach)
223  shmdt(cell);
224  }
225 
226 
227  // setup ipc semaphores
228  sem_init(0, 1); // lock (binary)
229  sem_init(1, nRawCells_); // raw write semaphore
230  sem_init(2, 0); // raw read semaphore
231  sem_init(3, 1); // binary semaphore to schedule raw event for discard
232  sem_init(4, 0); // binary semaphore to discard raw event
233  sem_init(5, nRecoCells_);// reco write semaphore
234  sem_init(6, 0); // reco send (read) semaphore
235  sem_init(7, nDqmCells_); // dqm write semaphore
236  sem_init(8, 0); // dqm send (read) semaphore
237 
238  sem_print();
239 
240  unsigned int *iWrite, *iRead;
241 
242  rawWriteNext_ = 0;
243  rawWriteLast_ = 0;
244  rawReadNext_ = 0;
245  rawReadLast_ = 0;
246  iWrite = (unsigned int*) ((unsigned long) this + rawWriteOffset_);
247  iRead = (unsigned int*) ((unsigned long) this + rawReadOffset_);
248  for (unsigned int i = 0; i < nRawCells_; i++) {
249  *iWrite++ = i;
250  *iRead++ = 0xffffffff;
251  }
252 
253  recoWriteNext_ = 0;
254  recoWriteLast_ = 0;
255  recoReadNext_ = 0;
256  recoReadLast_ = 0;
257  iWrite = (unsigned int*) ((unsigned long) this + recoWriteOffset_);
258  iRead = (unsigned int*) ((unsigned long) this + recoReadOffset_);
259  for (unsigned int i = 0; i < nRecoCells_; i++) {
260  *iWrite++ = i;
261  *iRead++ = 0xffffffff;
262  }
263 
264  dqmWriteNext_ = 0;
265  dqmWriteLast_ = 0;
266  dqmReadNext_ = 0;
267  dqmReadLast_ = 0;
268  iWrite = (unsigned int*) ((unsigned long) this + dqmWriteOffset_);
269  iRead = (unsigned int*) ((unsigned long) this + dqmReadOffset_);
270  for (unsigned int i = 0; i < nDqmCells_; i++) {
271  *iWrite++ = i;
272  *iRead++ = 0xffffffff;
273  }
274 
275  for (unsigned int i = 0; i < nRawCells_; i++) {
277  setEvtDiscard(i, 0);
278  setEvtNumber(i, 0xffffffff);
279  setEvtPrcId(i, 0);
280  setEvtTimeStamp(i, 0);
281  }
282 
283  for (unsigned int i = 0; i < nDqmCells_; i++)
285 }
286 
287 //______________________________________________________________________________
288 unsigned int FUShmBuffer::nbRawCellsToWrite() const {
289  return semctl(semid(), 1, GETVAL);
290 }
291 
292 //______________________________________________________________________________
294  return semctl(semid(), 2, GETVAL);
295 }
296 
297 //______________________________________________________________________________
299  if (waitRawWrite() != 0)
300  return 0;
301  unsigned int iCell = nextRawWriteIndex();
302  FUShmRawCell* cell = rawCell(iCell);
303  evt::State_t state = evtState(iCell);
304  if(!(state == evt::EMPTY)) {
305  stringstream details;
306  details << "state==evt::EMPTY assertion failed! Actual state is " << state << ", iCell = " << iCell;
307  XCEPT_ASSERT(false, evf::Exception, details.str());
308  }
309  lock();
310  setEvtState(iCell, evt::RAWWRITING,false);
311  setEvtDiscard(iCell, 1,false,false);
312  unlock();
313  return cell;
314 }
315 
316 //______________________________________________________________________________
318  waitRawRead();
319  unsigned int iCell = nextRawReadIndex();
320  FUShmRawCell* cell = rawCell(iCell);
321  evt::State_t state = evtState(iCell);
322  if(!(state == evt::RAWWRITTEN || state == evt::EMPTY ||
323  state == evt::STOP || state == evt::LUMISECTION))
324  {
325  stringstream details;
326  details
327  << "state==evt::RAWWRITTEN ||state==evt::EMPTY ||state==evt::STOP"
328  << "||state==evt::LUMISECTION assertion failed! Actual state is "
329  << state << ", iCell = " << iCell;
330  XCEPT_ASSERT(false,evf::Exception, details.str());
331  }
332  if (state == evt::RAWWRITTEN) {
333  setEvtPrcId(iCell, getpid());
335  }
336  return cell;
337 }
338 
339 //______________________________________________________________________________
341  waitRecoRead();
342  unsigned int iCell = nextRecoReadIndex();
343  FUShmRecoCell* cell = recoCell(iCell);
344  unsigned int iRawCell = cell->rawCellIndex();
345  if (iRawCell < nRawCells_) {
346  //evt::State_t state=evtState(iRawCell);
347  //XCEPT_ASSERT(state==evt::RECOWRITTEN, evf::Exception, "state==evt::RECOWRITTEN assertion failed!");
348  setEvtState(iRawCell, evt::SENDING);
349  }
350  return cell;
351 }
352 
353 //______________________________________________________________________________
355  waitDqmRead();
356  unsigned int iCell = nextDqmReadIndex();
357  FUShmDqmCell* cell = dqmCell(iCell);
358  dqm::State_t state = dqmState(iCell);
359 
360  if(!(state == dqm::WRITTEN || state == dqm::EMPTY)) {
361  stringstream details;
362  details << "state==dqm::WRITTEN || state==dqm::EMPTY assertion failed! Actual state is "
363  << state << ", iCell = " << iCell;
364  XCEPT_ASSERT(false, evf::Exception, details.str());
365  }
366 
367  if (state == dqm::WRITTEN)
368  setDqmState(iCell, dqm::SENDING);
369  return cell;
370 }
371 
372 //______________________________________________________________________________
376  evt::State_t state = evtState(cell->index());
377  if(!(state == evt::PROCESSED || state == evt::SENT ||
378  state == evt::EMPTY || state == evt::STOP ||
379  state == evt::USEDLS)) {
380  stringstream details;
381  details << "state==evt::PROCESSED || state==evt::SENT || "
382  << "state==evt::EMPTY || state==evt::STOP || "
383  << "state==evt::USEDLS assertion failed! Actual state is "
384  << state << ", index = " << cell->index();
385  XCEPT_ASSERT(false,evf::Exception,details.str());
386  }
387  if (state != evt::EMPTY && state != evt::USEDLS && state != evt::STOP)
389  return cell;
390 }
391 
392 //______________________________________________________________________________
394  evt::State_t state = evtState(cell->index());
395  if(!(state == evt::RAWWRITING)) {
396  stringstream details;
397  details << "state==evt::RAWWRITING assertion failed! Actual state is "
398  << state << ", index = " << cell->index();
399  XCEPT_ASSERT(false, evf::Exception, details.str());
400  }
402  setEvtNumber(cell->index(), cell->evtNumber());
403  postRawIndexToRead(cell->index());
404  if (segmentationMode_)
405  shmdt(cell);
406  postRawRead();
407 }
408 
409 //______________________________________________________________________________
411  evt::State_t state = evtState(cell->index());
412  if(!(state == evt::RAWREADING)) {
413  stringstream details;
414  details << "state==evt::RAWREADING assertion failed! Actual state is "
415  << state << ", index = " << cell->index();
416  XCEPT_ASSERT(false, evf::Exception, details.str());
417  }
418  setEvtState(cell->index(), evt::RAWREAD);
420  setEvtTimeStamp(cell->index(), time(0));
421  if (segmentationMode_)
422  shmdt(cell);
423 }
424 
425 //______________________________________________________________________________
427  unsigned int iRawCell = cell->rawCellIndex();
428  if (iRawCell < nRawCells_) {
429  //evt::State_t state=evtState(cell->rawCellIndex());
430  //XCEPT_ASSERT(state==evt::SENDING, evf::Exception, "state==evt::SENDING assertion failed!");
432  }
433  if (segmentationMode_)
434  shmdt(cell);
435 }
436 
437 //______________________________________________________________________________
439  dqm::State_t state = dqmState(cell->index());
440  if(!(state == dqm::SENDING || state == dqm::EMPTY)) {
441  stringstream details;
442  details << "state==dqm::SENDING||state==dqm::EMPTY assertion failed! Actual state is "
443  << state << ", index = " << cell->index();
444  XCEPT_ASSERT(false, evf::Exception, details.str());
445 
446  }
447  if (state == dqm::SENDING)
448  setDqmState(cell->index(), dqm::SENT);
449  if (segmentationMode_)
450  shmdt(cell);
451 }
452 
453 //______________________________________________________________________________
454 void FUShmBuffer::scheduleRawCellForDiscard(unsigned int iCell) {
455  waitRawDiscard();
456  if (rawCellReadyForDiscard(iCell)) {
457  rawDiscardIndex_ = iCell;
458  evt::State_t state = evtState(iCell);
459  if(!(state == evt::PROCESSING || state == evt::SENT ||
460  state == evt::EMPTY || state == evt::STOP ||
461  state == evt::LUMISECTION || state == evt::RECOWRITTEN)) {
462  stringstream details;
463  details << "state==evt::PROCESSING||state==evt::SENT||state==evt::EMPTY||"
464  <<"state==evt::STOP||state==evt::LUMISECTION||state==evt::RECOWRITTEN assertion failed! Actual state is "
465  << state << ", iCell = " << iCell;
466  XCEPT_ASSERT( false,evf::Exception,details.str());
467  }
468  if (state == evt::PROCESSING)
469  setEvtState(iCell, evt::PROCESSED);
470  if (state == evt::LUMISECTION)
471  setEvtState(iCell, evt::USEDLS);
473  } else
474  postRawDiscard();
475 }
476 
477 //______________________________________________________________________________
479  waitRawDiscard();
480  if (rawCellReadyForDiscard(iCell)) {
481  rawDiscardIndex_ = iCell;
482  evt::State_t state = evtState(iCell);
483  // UPDATE: aspataru
484  if (state != evt::LUMISECTION && state != evt::EMPTY
485  && state != evt::USEDLS && state != evt::STOP)
486  setEvtState(iCell, evt::PROCESSED);
487  if (state == evt::LUMISECTION)
488  setEvtState(iCell, evt::USEDLS);
490  } else
491  postRawDiscard();
492 }
493 
494 //______________________________________________________________________________
496  releaseRawCell(cell);
497  postRawDiscard();
498 }
499 
500 //______________________________________________________________________________
501 void FUShmBuffer::discardRecoCell(unsigned int iCell) {
502  FUShmRecoCell* cell = recoCell(iCell);
503  unsigned int iRawCell = cell->rawCellIndex();
504  if (iRawCell < nRawCells_) {
505  //evt::State_t state=evtState(iRawCell);
506  //XCEPT_ASSERT(state==evt::SENT, evf::Exception, "state==evt::SENT assertion failed!");
507  scheduleRawCellForDiscard(iRawCell);
508  }
509  cell->clear();
510  if (segmentationMode_)
511  shmdt(cell);
512  postRecoIndexToWrite(iCell);
513  postRecoWrite();
514 }
515 
516 //______________________________________________________________________________
517 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell) {
518  FUShmRecoCell* cell = recoCell(iCell);
519  cell->clear();
520  if (segmentationMode_)
521  shmdt(cell);
522  postRecoIndexToWrite(iCell);
523  postRecoWrite();
524 }
525 
526 //______________________________________________________________________________
527 void FUShmBuffer::discardDqmCell(unsigned int iCell) {
528  dqm::State_t state = dqmState(iCell);
529  if(!(state == dqm::EMPTY || state == dqm::SENT)) {
530  stringstream details;
531  details << "state==dqm::EMPTY||state==dqm::SENT assertion failed! Actual state is "
532  << state << ", iCell = " << iCell;
533  XCEPT_ASSERT(false, evf::Exception,details.str());
534  }
536  FUShmDqmCell* cell = dqmCell(iCell);
537  cell->clear();
538  if (segmentationMode_)
539  shmdt(cell);
540  setDqmState(iCell, dqm::EMPTY);
541  postDqmIndexToWrite(iCell);
542  postDqmWrite();
543 }
544 
545 //______________________________________________________________________________
547  evt::State_t state = evtState(cell->index());
548  if(!( state == evt::DISCARDING || state == evt::RAWWRITING ||
549  state== evt::EMPTY || state == evt::STOP ||
550  /*||state==evt::LUMISECTION*/
551  state == evt::USEDLS)) {
552  std::cout << "=================releaseRawCell state " << state << std::endl;
553  stringstream details;
554  details << "state==evt::DISCARDING||state==evt::RAWWRITING||"
555  << "state==evt::EMPTY||state==evt::STOP||state==evt::USEDLS"
556  << " assertion failed! Actual state is " << state << ", index = " << cell->index();
557  XCEPT_ASSERT( false, evf::Exception, details.str());
558  }
559  setEvtState(cell->index(), evt::EMPTY);
560  setEvtDiscard(cell->index(), 0);
561  setEvtNumber(cell->index(), 0xffffffff);
562  setEvtPrcId(cell->index(), 0);
563  setEvtTimeStamp(cell->index(), 0);
564  cell->clear();
565  postRawIndexToWrite(cell->index());
566  if (segmentationMode_)
567  shmdt(cell);
568  postRawWrite();
569 }
570 
571 //______________________________________________________________________________
573  FUShmRawCell* cell = rawCellToWrite();
574  if (cell == 0)
575  return;
576  evt::State_t state = evtState(cell->index());
577  if(!(state == evt::RAWWRITING)) {
578  stringstream details;
579  details << "state==evt::RAWWRITING assertion failed! Actual state is "
580  << state << ", index = " << cell->index();
581  XCEPT_ASSERT(false, evf::Exception, details.str());
582  }
583  setEvtState(cell->index(), evt::STOP);
584  cell->setEventTypeStopper();
585  postRawIndexToRead(cell->index());
586  if (segmentationMode_)
587  shmdt(cell);
588  postRawRead();
589 }
590 
591 //______________________________________________________________________________
593  FUShmRawCell* cell = rawCellToWrite();
594  if (cell == 0)
595  return;
596  cell->setLumiSection(ls);
597  evt::State_t state = evtState(cell->index());
598  if (!(state == evt::RAWWRITING)) {
599  stringstream details;
600  details << "state==evt::RAWWRITING assertion failed! Actual state is "
601  << state << ", index = " << cell->index();
602  XCEPT_ASSERT(false, evf::Exception, details.str());
603  }
604  setEvtNumber(cell->index(),0xfffffffe);
606  cell->setEventTypeEol();
607  postRawIndexToRead(cell->index());
608  if (segmentationMode_)
609  shmdt(cell);
610  postRawRead();
611 }
612 
613 //______________________________________________________________________________
615  waitRecoWrite();
616  unsigned int iCell = nextRecoWriteIndex();
617  FUShmRecoCell* cell = recoCell(iCell);
618  cell->clear();
619  postRecoIndexToRead(iCell);
620  if (segmentationMode_)
621  shmdt(cell);
622  postRecoRead();
623 }
624 
625 //______________________________________________________________________________
627  waitDqmWrite();
628  unsigned int iCell = nextDqmWriteIndex();
629  FUShmDqmCell* cell = dqmCell(iCell);
630  cell->clear();
631  postDqmIndexToRead(iCell);
632  if (segmentationMode_)
633  shmdt(cell);
634  postDqmRead();
635 }
636 
637 //______________________________________________________________________________
639  FUShmRawCell* cell = rawCellToWrite();
640  if (cell == 0)
641  return;
642  rawDiscardIndex_ = cell->index();
643  setEvtState(cell->index(), evt::STOP);
644  cell->setEventTypeStopper();
645  setEvtNumber(cell->index(), 0xffffffff);
646  setEvtPrcId(cell->index(), 0);
647  setEvtTimeStamp(cell->index(), 0);
649 }
650 
651 //______________________________________________________________________________
653  waitRawDiscard();
654  if (rawCellReadyForDiscard(cell->index())) {
655  rawDiscardIndex_ = cell->index();
656  // as this function is called by the reader, the state and type should
657  // already be correct
658  // setEvtState(cell->index(),evt::STOP);
659  // cell->setEventType(evt::STOPPER);
660  // setEvtNumber(cell->index(),0xffffffff);
661  // setEvtPrcId(cell->index(),0);
662  // setEvtTimeStamp(cell->index(),0);
663  pidstatus = removeClientPrcId(getpid());
664  if (segmentationMode_)
665  shmdt(cell);
667  return true;
668  } else {
669  postRawDiscard();
670  return false;
671  }
672 }
673 
674 //______________________________________________________________________________
676  // waitRawDiscard();
677  if (rawCellReadyForDiscard(cell->index())) {
678  rawDiscardIndex_ = cell->index();
679  // setEvtState(cell->index(),evt::STOP);
680  // cell->setEventType(evt::STOPPER);
681  // setEvtNumber(cell->index(),0xffffffff);
682  // setEvtPrcId(cell->index(),0);
683  // setEvtTimeStamp(cell->index(),0);
684  // removeClientPrcId(getpid());
685  if (segmentationMode_)
686  shmdt(cell);
688  } else
689  postRawDiscard();
690 }
691 
692 //______________________________________________________________________________
693 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
694  unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
695  unsigned int dataSize, unsigned int nExpectedEPs) {
696  if (dataSize > recoCellPayloadSize_) {
697  cout << "FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."
698  << endl;
699  return false;
700  }
701 
702  waitRecoWrite();
703  unsigned int iCell = nextRecoWriteIndex();
704  FUShmRecoCell* cell = recoCell(iCell);
705  cell->writeInitMsg(outModId, fuProcessId, fuGuid, data, dataSize,nExpectedEPs);
706  postRecoIndexToRead(iCell);
707  if (segmentationMode_)
708  shmdt(cell);
709  postRecoRead();
710  return true;
711 }
712 
713 //______________________________________________________________________________
715  unsigned int evtNumber, unsigned int outModId,
716  unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
717  unsigned int dataSize) {
718  if (dataSize > recoCellPayloadSize_) {
719  cout << "FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."
720  << endl;
721  return false;
722  }
723 
724  waitRecoWrite();
725  unsigned int rawCellIndex = indexForEvtNumber(evtNumber);
726  unsigned int iCell = nextRecoWriteIndex();
727  FUShmRecoCell* cell = recoCell(iCell);
728  //evt::State_t state=evtState(rawCellIndex);
729  //XCEPT_ASSERT(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT,
730  //evf::Exception, "state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT assertion failed!");
731  lock();
732  setEvtState(rawCellIndex, evt::RECOWRITING,false);
733  incEvtDiscard(rawCellIndex,false);
734  unlock();
735  cell->writeEventData(rawCellIndex, runNumber, evtNumber, outModId,
736  fuProcessId, fuGuid, data, dataSize);
737  setEvtState(rawCellIndex, evt::RECOWRITTEN);
738  postRecoIndexToRead(iCell);
739  if (segmentationMode_)
740  shmdt(cell);
741  postRecoRead();
742  return true;
743 }
744 
745 //______________________________________________________________________________
747  unsigned int fuProcessId, unsigned int iRawCell,bool checkValue) {
748  FUShmRawCell *raw = rawCell(iRawCell);
749 
750  unsigned int dataSize = sizeof(uint32_t) * (4 + 1024) + raw->eventSize();
751  unsigned char *data = new unsigned char[dataSize];
752  uint32_t *pos = (uint32_t*) data;
753  // 06-Oct-2008, KAB - added a version number for the error event format.
754  //
755  // Version 1 had no version number, so the run number appeared in the
756  // first value. So, a reasonable test for version 1 is whether the
757  // first value is larger than some relatively small cutoff (say 32).
758  // Version 2 added the lumi block number.
759  //
760  *pos++ = (uint32_t) 2; // protocol version number
761  *pos++ = (uint32_t) runNumber;
762  *pos++ = (uint32_t) evf::evtn::getlbn(
764  *pos++ = (uint32_t) raw->evtNumber();
765  for (unsigned int i = 0; i < 1024; i++)
766  *pos++ = (uint32_t) raw->fedSize(i);
767  memcpy(pos, raw->payloadAddr(), raw->eventSize());
768 
769  // DEBUG
770  /*
771  if (1) {
772  stringstream ss;
773  ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
774  ofstream fout;
775  fout.open(ss.str().c_str(),ios::out|ios::binary);
776  if (!fout.write((char*)data,dataSize))
777  cout<<"Failed to write error event to "<<ss.str()<<endl;
778  fout.close();
779 
780  stringstream ss2;
781  ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
782  ofstream fout2;
783  fout2.open(ss2.str().c_str());
784  fout2<<"dataSize = "<<dataSize<<endl;
785  fout2<<"runNumber = "<<runNumber<<endl;
786  fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
787  fout2<<"eventSize = "<<raw->eventSize()<<endl;
788  unsigned int totalSize(0);
789  for (unsigned int i=0;i<1024;i++) {
790  unsigned int fedSize = raw->fedSize(i);
791  totalSize += fedSize;
792  if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
793  }
794  fout2<<"totalSize = "<<totalSize<<endl;
795  fout2.close();
796  }
797  */// END DEBUG
798 
799  waitRecoWrite();
800  unsigned int iRecoCell = nextRecoWriteIndex();
801  FUShmRecoCell* reco = recoCell(iRecoCell);
802  lock();
803  setEvtState(iRawCell, evt::RECOWRITING,false);
804  setEvtDiscard(iRawCell, 1, true,false);
805  unlock();
806  reco->writeErrorEvent(iRawCell, runNumber, raw->evtNumber(), fuProcessId,
807  data, dataSize);
808  delete[] data;
809  setEvtState(iRawCell, evt::RECOWRITTEN);
810  postRecoIndexToRead(iRecoCell);
811  if (segmentationMode_) {
812  shmdt(raw);
813  shmdt(reco);
814  }
815  postRecoRead();
816  return true;
817 }
818 
819 //______________________________________________________________________________
821  unsigned int evtAtUpdate, unsigned int folderId,
822  unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data,
823  unsigned int dataSize) {
824  if (dataSize > dqmCellPayloadSize_) {
825  cout << "FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."
826  << endl;
827  return false;
828  }
829 
830  waitDqmWrite();
831  unsigned int iCell = nextDqmWriteIndex();
832  FUShmDqmCell* cell = dqmCell(iCell);
833  dqm::State_t state = dqmState(iCell);
834  if (!(state == dqm::EMPTY)) {
835  stringstream details;
836  details << "state==dqm::EMPTY assertion failed! Actual state is " << state
837  << ", iCell = " << iCell;
838  XCEPT_ASSERT(false, evf::Exception, details.str());
839  }
840  setDqmState(iCell, dqm::WRITING);
841  cell->writeData(runNumber, evtAtUpdate, folderId, fuProcessId, fuGuid,
842  data, dataSize);
843  setDqmState(iCell, dqm::WRITTEN);
844  postDqmIndexToRead(iCell);
845  if (segmentationMode_)
846  shmdt(cell);
847  postDqmRead();
848  return true;
849 }
850 
851 //______________________________________________________________________________
853  cout << "--> current sem values:" << endl << " lock=" << semctl(semid(), 0,
854  GETVAL) << endl << " wraw=" << semctl(semid(), 1, GETVAL)
855  << " rraw=" << semctl(semid(), 2, GETVAL) << endl << " wdsc="
856  << semctl(semid(), 3, GETVAL) << " rdsc=" << semctl(semid(), 4,
857  GETVAL) << endl << " wrec=" << semctl(semid(), 5, GETVAL)
858  << " rrec=" << semctl(semid(), 6, GETVAL) << endl << " wdqm="
859  << semctl(semid(), 7, GETVAL) << " rdqm=" << semctl(semid(), 8,
860  GETVAL) << endl;
861 }
862 
864  ostringstream ostr;
865  ostr << "--> current sem values:" << endl << " lock=" << semctl(semid(), 0,
866  GETVAL) << endl << " wraw=" << semctl(semid(), 1, GETVAL)
867  << " rraw=" << semctl(semid(), 2, GETVAL) << endl << " wdsc="
868  << semctl(semid(), 3, GETVAL) << " rdsc=" << semctl(semid(), 4,
869  GETVAL) << endl << " wrec=" << semctl(semid(), 5, GETVAL)
870  << " rrec=" << semctl(semid(), 6, GETVAL) << endl << " wdqm="
871  << semctl(semid(), 7, GETVAL) << " rdqm=" << semctl(semid(), 8,
872  GETVAL) << endl;
873  return ostr.str();
874 
875 }
876 
877 //______________________________________________________________________________
878 void FUShmBuffer::printEvtState(unsigned int index) {
879  evt::State_t state = evtState(index);
880  std::string stateName;
881  if (state == evt::EMPTY)
882  stateName = "EMPTY";
883  else if (state == evt::STOP)
884  stateName = "STOP";
885  else if (state == evt::RAWWRITING)
886  stateName = "RAWWRITING";
887  else if (state == evt::RAWWRITTEN)
888  stateName = "RAWRITTEN";
889  else if (state == evt::RAWREADING)
890  stateName = "RAWREADING";
891  else if (state == evt::RAWREAD)
892  stateName = "RAWREAD";
893  else if (state == evt::PROCESSING)
894  stateName = "PROCESSING";
895  else if (state == evt::PROCESSED)
896  stateName = "PROCESSED";
897  else if (state == evt::RECOWRITING)
898  stateName = "RECOWRITING";
899  else if (state == evt::RECOWRITTEN)
900  stateName = "RECOWRITTEN";
901  else if (state == evt::SENDING)
902  stateName = "SENDING";
903  else if (state == evt::SENT)
904  stateName = "SENT";
905  else if (state == evt::DISCARDING)
906  stateName = "DISCARDING";
907  cout << "evt " << index << " in state '" << stateName << "'." << endl;
908 }
909 
910 //______________________________________________________________________________
911 void FUShmBuffer::printDqmState(unsigned int index) {
912  dqm::State_t state = dqmState(index);
913  cout << "dqm evt " << index << " in state '" << state << "'." << endl;
914 }
915 
916 //______________________________________________________________________________
918  unsigned int nRawCells, unsigned int nRecoCells,
919  unsigned int nDqmCells, unsigned int rawCellSize,
920  unsigned int recoCellSize, unsigned int dqmCellSize) {
921  // if necessary, release shared memory first!
923  cout << "FUShmBuffer::createShmBuffer: "
924  << "REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL." << endl;
925 
926  // create bookkeeping shared memory segment
927  int size = sizeof(unsigned int) * 7;
929  if (shmid < 0)
930  return 0;
931  void*shmAddr = shm_attach(shmid);
932  if (0 == shmAddr)
933  return 0;
934 
935  if (1 != shm_nattch(shmid)) {
936  cout << "FUShmBuffer::createShmBuffer() FAILED: nattch=" << shm_nattch(
937  shmid) << endl;
938  shmdt(shmAddr);
939  return 0;
940  }
941 
942  unsigned int* p = (unsigned int*) shmAddr;
943  *p++ = segmentationMode;
944  *p++ = nRawCells;
945  *p++ = nRecoCells;
946  *p++ = nDqmCells;
947  *p++ = rawCellSize;
948  *p++ = recoCellSize;
949  *p++ = dqmCellSize;
950  shmdt(shmAddr);
951 
952  // create the 'real' shared memory buffer
953  size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
954  nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
955  shmid = shm_create(FUShmBuffer::getShmKey(), size);
956  if (shmid < 0)
957  return 0;
959  if (semid < 0)
960  return 0;
961  shmAddr = shm_attach(shmid);
962  if (0 == shmAddr)
963  return 0;
964 
965  if (1 != shm_nattch(shmid)) {
966  cout << "FUShmBuffer::createShmBuffer FAILED: nattch=" << shm_nattch(
967  shmid) << endl;
968  shmdt(shmAddr);
969  return 0;
970  }
971  FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
972  nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
973  dqmCellSize);
974 
975  cout << "FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."
976  << endl;
977  cout << " segmentationMode="
978  << segmentationMode << endl;
979 
980  buffer->initialize(shmid, semid);
981 
982  return buffer;
983 }
984 
985 //______________________________________________________________________________
987  // get bookkeeping shared memory segment
988  int size = sizeof(unsigned int) * 7;
990  if (shmid < 0)
991  return 0;
992  void* shmAddr = shm_attach(shmid);
993  if (0 == shmAddr)
994  return 0;
995 
996  unsigned int *p = (unsigned int*) shmAddr;
997  bool segmentationMode = *p++;
998  unsigned int nRawCells = *p++;
999  unsigned int nRecoCells = *p++;
1000  unsigned int nDqmCells = *p++;
1001  unsigned int rawCellSize = *p++;
1002  unsigned int recoCellSize = *p++;
1003  unsigned int dqmCellSize = *p++;
1004  shmdt(shmAddr);
1005 
1006  cout << "FUShmBuffer::getShmBuffer():" << " segmentationMode="
1007  << segmentationMode << " nRawCells=" << nRawCells << " nRecoCells="
1008  << nRecoCells << " nDqmCells=" << nDqmCells << " rawCellSize="
1009  << rawCellSize << " recoCellSize=" << recoCellSize
1010  << " dqmCellSize=" << dqmCellSize << endl;
1011 
1012  // get the 'real' shared memory buffer
1013  size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
1014  nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
1015  shmid = shm_get(FUShmBuffer::getShmKey(), size);
1016  if (shmid < 0)
1017  return 0;
1018  int semid = sem_get(FUShmBuffer::getSemKey(), 9);
1019  if (semid < 0)
1020  return 0;
1021  shmAddr = shm_attach(shmid);
1022  if (0 == shmAddr)
1023  return 0;
1024 
1025  if (0 == shm_nattch(shmid)) {
1026  cout << "FUShmBuffer::getShmBuffer() FAILED: nattch=" << shm_nattch(
1027  shmid) << endl;
1028  return 0;
1029  }
1030  FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
1031  nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
1032  dqmCellSize);
1033 
1034  cout << "FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED. PID:" << getpid()
1035  << endl;
1036  cout << " segmentationMode="
1037  << segmentationMode << endl;
1038 
1039  buffer->setClientPrcId(getpid());
1040 
1041  return buffer;
1042 }
1043 
1044 //______________________________________________________________________________
1046  // get bookkeeping shared memory segment
1047  int size = sizeof(unsigned int) * 7;
1048  int shmidd = shm_get(FUShmBuffer::getShmDescriptorKey(), size);
1049  if (shmidd < 0)
1050  return false;
1051  void* shmAddr = shm_attach(shmidd);
1052  if (0 == shmAddr)
1053  return false;
1054 
1055  unsigned int*p = (unsigned int*) shmAddr;
1056  bool segmentationMode = *p++;
1057  unsigned int nRawCells = *p++;
1058  unsigned int nRecoCells = *p++;
1059  unsigned int nDqmCells = *p++;
1060  unsigned int rawCellSize = *p++;
1061  unsigned int recoCellSize = *p++;
1062  unsigned int dqmCellSize = *p++;
1063  shmdt(shmAddr);
1064 
1065  // get the 'real' shared memory segment
1066  size = FUShmBuffer::size(segmentationMode, nRawCells, nRecoCells,
1067  nDqmCells, rawCellSize, recoCellSize, dqmCellSize);
1068  int shmid = shm_get(FUShmBuffer::getShmKey(), size);
1069  if (shmid < 0)
1070  return false;
1071  int semid = sem_get(FUShmBuffer::getSemKey(), 9);
1072  if (semid < 0)
1073  return false;
1074  shmAddr = shm_attach(shmid);
1075  if (0 == shmAddr)
1076  return false;
1077 
1078  int att = 0;
1079  for (; att < 10; att++) {
1080  if (shm_nattch(shmid) > 1) {
1081  cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="
1082  << shm_nattch(shmid)
1083  << ", failed attempt to release shared memory." << endl;
1084  ::sleep(1);
1085  } else
1086  break;
1087  }
1088 
1089  if (att >= 10)
1090  return false;
1091 
1092  if (segmentationMode) {
1093  FUShmBuffer* buffer = new (shmAddr) FUShmBuffer(segmentationMode,
1094  nRawCells, nRecoCells, nDqmCells, rawCellSize, recoCellSize,
1095  dqmCellSize);
1096  int cellid;
1097  for (unsigned int i = 0; i < nRawCells; i++) {
1098  cellid = shm_get(buffer->rawCellShmKey(i),
1099  FUShmRawCell::size(rawCellSize));
1100  if ((shm_destroy(cellid) == -1))
1101  return false;
1102  }
1103  for (unsigned int i = 0; i < nRecoCells; i++) {
1104  cellid = shm_get(buffer->recoCellShmKey(i),
1105  FUShmRecoCell::size(recoCellSize));
1106  if ((shm_destroy(cellid) == -1))
1107  return false;
1108  }
1109  for (unsigned int i = 0; i < nDqmCells; i++) {
1110  cellid = shm_get(buffer->dqmCellShmKey(i),
1111  FUShmDqmCell::size(dqmCellSize));
1112  if ((shm_destroy(cellid) == -1))
1113  return false;
1114  }
1115  }
1116  shmdt(shmAddr);
1117  if (sem_destroy(semid) == -1)
1118  return false;
1119  if (shm_destroy(shmid) == -1)
1120  return false;
1121  if (shm_destroy(shmidd) == -1)
1122  return false;
1123 
1124  return true;
1125 }
1126 
1127 //______________________________________________________________________________
1128 unsigned int FUShmBuffer::size(bool segmentationMode, unsigned int nRawCells,
1129  unsigned int nRecoCells, unsigned int nDqmCells,
1130  unsigned int rawCellSize, unsigned int recoCellSize,
1131  unsigned int dqmCellSize) {
1132  unsigned int offset = sizeof(FUShmBuffer) + sizeof(unsigned int) * 4
1133  * nRawCells + sizeof(evt::State_t) * nRawCells
1134  + sizeof(dqm::State_t) * nDqmCells;
1135 
1136  unsigned int rawCellTotalSize = FUShmRawCell::size(rawCellSize);
1137  unsigned int recoCellTotalSize = FUShmRecoCell::size(recoCellSize);
1138  unsigned int dqmCellTotalSize = FUShmDqmCell::size(dqmCellSize);
1139 
1140  unsigned int realSize = (segmentationMode) ? offset + sizeof(key_t)
1141  * (nRawCells + nRecoCells + nDqmCells) : offset + rawCellTotalSize
1142  * nRawCells + recoCellTotalSize * nRecoCells + dqmCellTotalSize
1143  * nDqmCells;
1144 
1145  unsigned int result = realSize / 0x10 * 0x10 + (realSize % 0x10 > 0) * 0x10;
1146 
1147  return result;
1148 }
1149 
1150 //______________________________________________________________________________
1152  key_t result = getuid() * 1000 + SHM_DESCRIPTOR_KEYID;
1153  if (result == (key_t) -1)
1154  cout << "FUShmBuffer::getShmDescriptorKey: failed " << "for file "
1155  << shmKeyPath_ << "!" << endl;
1156  return result;
1157 }
1158 
1159 //______________________________________________________________________________
1161  key_t result = getuid() * 1000 + SHM_KEYID;
1162  if (result == (key_t) -1)
1163  cout << "FUShmBuffer::getShmKey: ftok() failed " << "for file "
1164  << shmKeyPath_ << "!" << endl;
1165  return result;
1166 }
1167 
1168 //______________________________________________________________________________
1170  key_t result = getuid() * 1000 + SEM_KEYID;
1171  if (result == (key_t) -1)
1172  cout << "FUShmBuffer::getSemKey: ftok() failed " << "for file "
1173  << semKeyPath_ << "!" << endl;
1174  return result;
1175 }
1176 
1177 //______________________________________________________________________________
1179  // first check and possibly remove existing segment with same id
1180  int shmid = shmget(key, 1, 0644);//using minimal size any segment with key "key" will be connected
1181  if (shmid != -1) {
1182  // an existing segment was found, remove it
1183  shmid_ds shmstat;
1184  shmctl(shmid, IPC_STAT, &shmstat);
1185  cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
1186  << " created by process " << shmstat.shm_cpid << " owned by "
1187  << shmstat.shm_perm.uid << " permissions " << hex
1188  << shmstat.shm_perm.mode << dec << endl;
1189  shmctl(shmid, IPC_RMID, &shmstat);
1190  }
1191  shmid = shmget(key, size, IPC_CREAT | 0644);
1192  if (shmid == -1) {
1193  int err = errno;
1194  cout << "FUShmBuffer::shm_create(" << key << "," << size
1195  << ") failed: " << strerror(err) << endl;
1196  }
1197  return shmid;
1198 }
1199 
1200 //______________________________________________________________________________
1201 int FUShmBuffer::shm_get(key_t key, int size) {
1202  int shmid = shmget(key, size, 0644);
1203  if (shmid == -1) {
1204  int err = errno;
1205  cout << "FUShmBuffer::shm_get(" << key << "," << size << ") failed: "
1206  << strerror(err) << endl;
1207  }
1208  return shmid;
1209 }
1210 
1211 //______________________________________________________________________________
1212 void* FUShmBuffer::shm_attach(int shmid) {
1213  void* result = shmat(shmid, NULL, 0);
1214  if (0 == result) {
1215  int err = errno;
1216  cout << "FUShmBuffer::shm_attach(" << shmid << ") failed: "
1217  << strerror(err) << endl;
1218  }
1219  return result;
1220 }
1221 
1222 //______________________________________________________________________________
1223 int FUShmBuffer::shm_nattch(int shmid) {
1224  shmid_ds shmstat;
1225  shmctl(shmid, IPC_STAT, &shmstat);
1226  return shmstat.shm_nattch;
1227 }
1228 
1229 //______________________________________________________________________________
1231  shmid_ds shmstat;
1232  int result = shmctl(shmid, IPC_RMID, &shmstat);
1233  if (result == -1)
1234  cout << "FUShmBuffer::shm_destroy(shmid=" << shmid << ") failed."
1235  << endl;
1236  return result;
1237 }
1238 
1239 //______________________________________________________________________________
1240 int FUShmBuffer::sem_create(key_t key, int nsem) {
1241  int semid = semget(key, nsem, IPC_CREAT | 0666);
1242  if (semid < 0) {
1243  int err = errno;
1244  cout << "FUShmBuffer::sem_create(key=" << key << ",nsem=" << nsem
1245  << ") failed: " << strerror(err) << endl;
1246  }
1247  return semid;
1248 }
1249 
1250 //______________________________________________________________________________
1251 int FUShmBuffer::sem_get(key_t key, int nsem) {
1252  int semid = semget(key, nsem, 0666);
1253  if (semid < 0) {
1254  int err = errno;
1255  cout << "FUShmBuffer::sem_get(key=" << key << ",nsem=" << nsem
1256  << ") failed: " << strerror(err) << endl;
1257  }
1258  return semid;
1259 }
1260 
1261 //______________________________________________________________________________
1263  int result = semctl(semid, 0, IPC_RMID);
1264  if (result == -1)
1265  cout << "FUShmBuffer::sem_destroy(semid=" << semid << ") failed."
1266  << endl;
1267  return result;
1268 }
1269 
1271 // implementation of private member functions
1273 
1274 //______________________________________________________________________________
1275 unsigned int FUShmBuffer::nextIndex(unsigned int offset, unsigned int nCells,
1276  unsigned int& iNext) {
1277  lock();
1278  unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
1279  pindex += iNext;
1280  iNext = (iNext + 1) % nCells;
1281  unsigned int result = *pindex;
1282  unlock();
1283  return result;
1284 }
1285 
1286 //______________________________________________________________________________
1287 void FUShmBuffer::postIndex(unsigned int index, unsigned int offset,
1288  unsigned int nCells, unsigned int& iLast) {
1289  lock();
1290  unsigned int* pindex = (unsigned int*) ((unsigned long) this + offset);
1291  pindex += iLast;
1292  *pindex = index;
1293  iLast = (iLast + 1) % nCells;
1294  unlock();
1295 }
1296 
1297 //______________________________________________________________________________
1300 }
1301 
1302 //______________________________________________________________________________
1305 }
1306 
1307 //______________________________________________________________________________
1310 }
1311 
1312 //______________________________________________________________________________
1315 }
1316 
1317 //______________________________________________________________________________
1320 }
1321 
1322 //______________________________________________________________________________
1325 }
1326 
1327 //______________________________________________________________________________
1330 }
1331 
1332 //______________________________________________________________________________
1335 }
1336 
1337 //______________________________________________________________________________
1340 }
1341 
1342 //______________________________________________________________________________
1345 }
1346 
1347 //______________________________________________________________________________
1350 }
1351 
1352 //______________________________________________________________________________
1355 }
1356 
1357 //______________________________________________________________________________
1358 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber) {
1359  unsigned int *pevt = (unsigned int*) ((unsigned long) this
1360  + evtNumberOffset_);
1361  for (unsigned int i = 0; i < nRawCells_; i++) {
1362  if ((*pevt++) == evtNumber)
1363  return i;
1364  }
1365  XCEPT_ASSERT(false, evf::Exception, "This point should not be reached!");
1366  return 0xffffffff;
1367 }
1368 
1369 //______________________________________________________________________________
1370 unsigned int FUShmBuffer::indexForEvtPrcId(pid_t prcid) {
1371  pid_t *pevt = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
1372  for (unsigned int i = 0; i < nRawCells_; i++) {
1373  if ((*pevt++) == prcid)
1374  return i;
1375  }
1376  XCEPT_ASSERT(false, evf::Exception, "This point should not be reached!");
1377  return 0xffffffff;
1378 }
1379 
1380 //______________________________________________________________________________
1382  if (!(index < nRawCells_)) {
1383  stringstream details;
1384  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1385  XCEPT_ASSERT(false, evf::Exception, details.str());
1386  }
1387  evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
1388  + evtStateOffset_);
1389  pstate += index;
1390  return *pstate;
1391 }
1392 
1393 //______________________________________________________________________________
1395  if (!(index < nDqmCells_)) {
1396  stringstream details;
1397  details << "index<nDqmCells_ assertion failed! Actual index is " << index;
1398  XCEPT_ASSERT(false, evf::Exception, details.str());
1399  }
1400  dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
1401  + dqmStateOffset_);
1402  pstate += index;
1403  return *pstate;
1404 }
1405 
1406 //______________________________________________________________________________
1407 unsigned int FUShmBuffer::evtNumber(unsigned int index) {
1408  if (!(index < nRawCells_)) {
1409  stringstream details;
1410  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1411  XCEPT_ASSERT(false, evf::Exception, details.str());
1412  }
1413  unsigned int *pevt = (unsigned int*) ((unsigned long) this
1414  + evtNumberOffset_);
1415  pevt += index;
1416  return *pevt;
1417 }
1418 
1419 //______________________________________________________________________________
1420 pid_t FUShmBuffer::evtPrcId(unsigned int index) {
1421  if (!(index < nRawCells_)) {
1422  stringstream details;
1423  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1424  XCEPT_ASSERT(false, evf::Exception, details.str());
1425  }
1426  pid_t *prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
1427  prcid += index;
1428  return *prcid;
1429 }
1430 
1431 //______________________________________________________________________________
1432 time_t FUShmBuffer::evtTimeStamp(unsigned int index) {
1433  if (!(index < nRawCells_)) {
1434  stringstream details;
1435  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1436  XCEPT_ASSERT(false, evf::Exception, details.str());
1437  }
1438  time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
1439  ptstmp += index;
1440  return *ptstmp;
1441 }
1442 
1443 //______________________________________________________________________________
1444 pid_t FUShmBuffer::clientPrcId(unsigned int index) {
1445  if (!(index < nClientsMax_)) {
1446  stringstream details;
1447  details << "index<nClientsMax_ assertion failed! Actual index is " << index;
1448  XCEPT_ASSERT(false, evf::Exception, details.str());
1449  }
1450  pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
1451  prcid += index;
1452  return *prcid;
1453 }
1454 
1455 //______________________________________________________________________________
1456 bool FUShmBuffer::setEvtState(unsigned int index, evt::State_t state, bool lockShm) {
1457  if(!(index < nRawCells_)) {
1458  stringstream details;
1459  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1460  //unlock here as it will skip external unlock
1461  if (!lockShm) unlock();
1462  XCEPT_ASSERT(false, evf::Exception, details.str());
1463  }
1464  evt::State_t *pstate = (evt::State_t*) ((unsigned long) this
1465  + evtStateOffset_);
1466  pstate += index;
1467  if (lockShm) lock();
1468  *pstate = state;
1469  if (lockShm) unlock();
1470  return true;
1471 }
1472 
1473 //______________________________________________________________________________
1475  if(!(index < nDqmCells_)) {
1476  stringstream details;
1477  details << "index<nDqmCells_ assertion failed! Actual index is " << index;
1478  XCEPT_ASSERT(false, evf::Exception, details.str());
1479  }
1480  dqm::State_t *pstate = (dqm::State_t*) ((unsigned long) this
1481  + dqmStateOffset_);
1482  pstate += index;
1483  lock();
1484  *pstate = state;
1485  unlock();
1486  return true;
1487 }
1488 
1489 //______________________________________________________________________________
1490 bool FUShmBuffer::setEvtDiscard(unsigned int index, unsigned int discard, bool checkValue, bool lockShm) {
1491  if(!(index < nRawCells_)) {
1492  stringstream details;
1493  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1494  //unlock here as it will skip external unlock
1495  if (!lockShm) unlock();
1496  XCEPT_ASSERT(false, evf::Exception, details.str());
1497  }
1498  unsigned int *pcount = (unsigned int*) ((unsigned long) this
1499  + evtDiscardOffset_);
1500  pcount += index;
1501  if (lockShm) lock();
1502  if (checkValue) {
1503  if (*pcount < discard)
1504  *pcount = discard;
1505  } else
1506  *pcount = discard;
1507  if (lockShm) unlock();
1508  return true;
1509 }
1510 
1511 //______________________________________________________________________________
1512 int FUShmBuffer::incEvtDiscard(unsigned int index, bool lockShm) {
1513  int result = 0;
1514  if(!(index < nRawCells_)) {
1515  stringstream details;
1516  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1517  //unlock here as it will skip external unlock
1518  if (!lockShm) unlock();
1519  XCEPT_ASSERT(false,evf::Exception,details.str());
1520  }
1521  unsigned int *pcount = (unsigned int*) ((unsigned long) this
1522  + evtDiscardOffset_);
1523  pcount += index;
1524  if (lockShm) lock();
1525  (*pcount)++;
1526  result = *pcount;
1527  if (lockShm) unlock();
1528  return result;
1529 }
1530 
1531 //______________________________________________________________________________
1532 bool FUShmBuffer::setEvtNumber(unsigned int index, unsigned int evtNumber) {
1533  if(!(index < nRawCells_)) {
1534  stringstream details;
1535  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1536  XCEPT_ASSERT(false, evf::Exception, details.str());
1537  }
1538  unsigned int *pevt = (unsigned int*) ((unsigned long) this
1539  + evtNumberOffset_);
1540  pevt += index;
1541  lock();
1542  *pevt = evtNumber;
1543  unlock();
1544  return true;
1545 }
1546 
1547 //______________________________________________________________________________
1548 bool FUShmBuffer::setEvtPrcId(unsigned int index, pid_t prcId) {
1549  if(!(index < nRawCells_)) {
1550  stringstream details;
1551  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1552  XCEPT_ASSERT(false, evf::Exception, details.str());
1553  }
1554  pid_t* prcid = (pid_t*) ((unsigned long) this + evtPrcIdOffset_);
1555  prcid += index;
1556  lock();
1557  *prcid = prcId;
1558  unlock();
1559  return true;
1560 }
1561 
1562 //______________________________________________________________________________
1563 bool FUShmBuffer::setEvtTimeStamp(unsigned int index, time_t timeStamp) {
1564  if(!(index < nRawCells_)) {
1565  stringstream details;
1566  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1567  XCEPT_ASSERT(false, evf::Exception, details.str());
1568  }
1569  time_t *ptstmp = (time_t*) ((unsigned long) this + evtTimeStampOffset_);
1570  ptstmp += index;
1571  lock();
1572  *ptstmp = timeStamp;
1573  unlock();
1574  return true;
1575 }
1576 
1577 //______________________________________________________________________________
1578 bool FUShmBuffer::setClientPrcId(pid_t prcId) {
1579  lock();
1580  if(!(nClients_ < nClientsMax_)) {
1581  stringstream details;
1582  details << "nClients_<nClientsMax_ assertion failed! Actual nClients is "
1583  << nClients_ << " and nClientsMax is " << nClientsMax_;
1584  unlock();
1585  XCEPT_ASSERT(false, evf::Exception, details.str());
1586  }
1587  pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
1588  for (unsigned int i = 0; i < nClients_; i++) {
1589  if ((*prcid) == prcId) {
1590  unlock();
1591  return false;
1592  }
1593  prcid++;
1594  }
1595  nClients_++;
1596  *prcid = prcId;
1597  unlock();
1598  return true;
1599 }
1600 
1601 //______________________________________________________________________________
1603  lock();
1604  pid_t *prcid = (pid_t*) ((unsigned long) this + clientPrcIdOffset_);
1605  unsigned int iClient(0);
1606  while (iClient < nClients_ && (*prcid) != prcId) {
1607  prcid++;
1608  iClient++;
1609  }
1610  if (iClient==nClients_) {
1611  unlock();
1612  return false;
1613  }
1614  //stringstream details;
1615  //details << "iClient!=nClients_ assertion failed! Actual iClient is "
1616  // << iClient;
1617  //XCEPT_ASSERT(iClient != nClients_, evf::Exception, details.str());
1618  pid_t* next = prcid;
1619  next++;
1620  while (iClient < nClients_ - 1) {
1621  *prcid = *next;
1622  prcid++;
1623  next++;
1624  iClient++;
1625  }
1626  nClients_--;
1627  unlock();
1628  return true;
1629 }
1630 
1631 //______________________________________________________________________________
1632 FUShmRawCell* FUShmBuffer::rawCell(unsigned int iCell) {
1633  FUShmRawCell* result(0);
1634 
1635  if (iCell >= nRawCells_) {
1636  cout << "FUShmBuffer::rawCell(" << iCell << ") ERROR: " << "iCell="
1637  << iCell << " >= nRawCells()=" << nRawCells_ << endl;
1638  return result;
1639  }
1640 
1641  if (segmentationMode_) {
1642  key_t shmkey = rawCellShmKey(iCell);
1643  int shmid = shm_get(shmkey, rawCellTotalSize_);
1644  void* cellAddr = shm_attach(shmid);
1645  result = new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
1646  } else {
1647  result = (FUShmRawCell*) ((unsigned long) this + rawCellOffset_ + iCell
1648  * rawCellTotalSize_);
1649  }
1650 
1651  return result;
1652 }
1653 
1654 //______________________________________________________________________________
1655 FUShmRecoCell* FUShmBuffer::recoCell(unsigned int iCell) {
1656  FUShmRecoCell* result(0);
1657 
1658  if (iCell >= nRecoCells_) {
1659  cout << "FUShmBuffer::recoCell(" << iCell << ") ERROR: " << "iCell="
1660  << iCell << " >= nRecoCells=" << nRecoCells_ << endl;
1661  return result;
1662  }
1663 
1664  if (segmentationMode_) {
1665  key_t shmkey = recoCellShmKey(iCell);
1666  int shmid = shm_get(shmkey, recoCellTotalSize_);
1667  void* cellAddr = shm_attach(shmid);
1668  result = new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
1669  } else {
1670  result = (FUShmRecoCell*) ((unsigned long) this + recoCellOffset_
1671  + iCell * recoCellTotalSize_);
1672  }
1673 
1674  return result;
1675 }
1676 
1677 //______________________________________________________________________________
1678 FUShmDqmCell* FUShmBuffer::dqmCell(unsigned int iCell) {
1679  FUShmDqmCell* result(0);
1680 
1681  if (iCell >= nDqmCells_) {
1682  cout << "FUShmBuffer::dqmCell(" << iCell << ") ERROR: " << "iCell="
1683  << iCell << " >= nDqmCells=" << nDqmCells_ << endl;
1684  return result;
1685  }
1686 
1687  if (segmentationMode_) {
1688  key_t shmkey = dqmCellShmKey(iCell);
1689  int shmid = shm_get(shmkey, dqmCellTotalSize_);
1690  void* cellAddr = shm_attach(shmid);
1691  result = new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
1692  } else {
1693  result = (FUShmDqmCell*) ((unsigned long) this + dqmCellOffset_ + iCell
1694  * dqmCellTotalSize_);
1695  }
1696 
1697  return result;
1698 }
1699 
1700 //______________________________________________________________________________
1702  if(!(index < nRawCells_)) {
1703  stringstream details;
1704  details << "index<nRawCells_ assertion failed! Actual index is " << index;
1705  XCEPT_ASSERT(false, evf::Exception, details.str());
1706  }
1707  unsigned int *pcount = (unsigned int*) ((unsigned long) this
1708  + evtDiscardOffset_);
1709  pcount += index;
1710  lock();
1711  if(!(*pcount > 0)) {
1712  stringstream details;
1713  details << "*pcount>0 assertion failed! Value at pcount is " << *pcount << " for cell index " << index;
1714  unlock();
1715  XCEPT_ASSERT(false, evf::Exception, details.str());
1716  }
1717  --(*pcount);
1718  bool result = (*pcount == 0);
1719  unlock();
1720  return result;
1721 }
1722 
1723 //______________________________________________________________________________
1724 key_t FUShmBuffer::shmKey(unsigned int iCell, unsigned int offset) {
1725  if (!segmentationMode_) {
1726  cout << "FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"
1727  << endl;
1728  return -1;
1729  }
1730  key_t* addr = (key_t*) ((unsigned long) this + offset);
1731  for (unsigned int i = 0; i < iCell; i++)
1732  ++addr;
1733  return *addr;
1734 }
1735 
1736 //______________________________________________________________________________
1737 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell) {
1738  if (iCell >= nRawCells_) {
1739  cout << "FUShmBuffer::rawCellShmKey() ERROR: " << "iCell>=nRawCells: "
1740  << iCell << ">=" << nRawCells_ << endl;
1741  return -1;
1742  }
1743  return shmKey(iCell, rawCellOffset_);
1744 }
1745 
1746 //______________________________________________________________________________
1747 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell) {
1748  if (iCell >= nRecoCells_) {
1749  cout << "FUShmBuffer::recoCellShmKey() ERROR: "
1750  << "iCell>=nRecoCells: " << iCell << ">=" << nRecoCells_
1751  << endl;
1752  return -1;
1753  }
1754  return shmKey(iCell, recoCellOffset_);
1755 }
1756 
1757 //______________________________________________________________________________
1758 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell) {
1759  if (iCell >= nDqmCells_) {
1760  cout << "FUShmBuffer::dqmCellShmKey() ERROR: " << "iCell>=nDqmCells: "
1761  << iCell << ">=" << nDqmCells_ << endl;
1762  return -1;
1763  }
1764  return shmKey(iCell, dqmCellOffset_);
1765 }
1766 
1767 //______________________________________________________________________________
1768 void FUShmBuffer::sem_init(int isem, int value) {
1769  if (semctl(semid(), isem, SETVAL, value) < 0) {
1770  cout << "FUShmBuffer: FATAL ERROR in semaphore initialization." << endl;
1771  }
1772 }
1773 
1774 //______________________________________________________________________________
1775 int FUShmBuffer::sem_wait(int isem) {
1776  struct sembuf sops[1];
1777  sops[0].sem_num = isem;
1778  sops[0].sem_op = -1;
1779  sops[0].sem_flg = 0;
1780  if (semop(semid(), sops, 1) == -1) {
1781  cout << "FUShmBuffer: ERROR in semaphore operation sem_wait." << endl;
1782  return -1;
1783  }
1784  return 0;
1785 }
1786 
1787 //______________________________________________________________________________
1788 void FUShmBuffer::sem_post(int isem) {
1789  struct sembuf sops[1];
1790  sops[0].sem_num = isem;
1791  sops[0].sem_op = 1;
1792  sops[0].sem_flg = 0;
1793  if (semop(semid(), sops, 1) == -1) {
1794  cout << "FUShmBuffer: ERROR in semaphore operation sem_post." << endl;
1795  }
1796 }
void discardRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:501
unsigned int clientPrcIdOffset_
Definition: FUShmBuffer.h:309
key_t recoCellShmKey(unsigned int iCell)
FUShmRawCell * rawCellToRead()
Definition: FUShmBuffer.cc:317
FUShmRawCell * rawCellToDiscard()
Definition: FUShmBuffer.cc:373
int i
Definition: DBlmapReader.cc:9
void postRawIndexToRead(unsigned int index)
unsigned int nRecoCells() const
Definition: FUShmBuffer.h:66
void waitRecoWrite()
Definition: FUShmBuffer.h:260
void writeRawLumiSectionEvent(unsigned int)
Definition: FUShmBuffer.cc:592
static int shm_nattch(int shmid)
unsigned int index() const
Definition: FUShmDqmCell.h:22
void initialize(unsigned int index)
bool setEvtPrcId(unsigned int index, pid_t prcId)
unsigned int dqmWriteNext_
Definition: FUShmBuffer.h:293
static void * shm_attach(int shmid)
void postDqmIndexToWrite(unsigned int index)
unsigned int rawReadOffset_
Definition: FUShmBuffer.h:283
void initialize(unsigned int index)
Definition: FUShmDqmCell.cc:46
static unsigned int size(unsigned int payloadSize)
unsigned int nextRecoReadIndex()
unsigned int dqmReadLast_
Definition: FUShmBuffer.h:297
unsigned int recoCellPayloadSize_
Definition: FUShmBuffer.h:319
unsigned int dqmReadOffset_
Definition: FUShmBuffer.h:298
bool writeRecoEventData(unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:714
bool writeErrorEventData(unsigned int runNumber, unsigned int fuProcessId, unsigned int iRawCell, bool checkValue)
Definition: FUShmBuffer.cc:746
unsigned int nextIndex(unsigned int offset, unsigned int nCells, unsigned int &iNext)
bool writeDqmEventData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:820
int sem_wait(int isem)
void scheduleRawCellForDiscardServerSide(unsigned int iCell)
Definition: FUShmBuffer.cc:478
unsigned int dqmWriteLast_
Definition: FUShmBuffer.h:294
static FUShmBuffer * getShmBuffer()
Definition: FUShmBuffer.cc:986
int nbRawCellsToRead() const
Definition: FUShmBuffer.cc:293
void printDqmState(unsigned int index)
Definition: FUShmBuffer.cc:911
void releaseRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:546
int shmid() const
Definition: FUShmBuffer.h:69
unsigned int evtDiscardOffset_
Definition: FUShmBuffer.h:301
unsigned int nRecoCells_
Definition: FUShmBuffer.h:318
static const char * semKeyPath_
Definition: FUShmBuffer.h:331
unsigned int dqmStateOffset_
Definition: FUShmBuffer.h:305
pid_t clientPrcId(unsigned int index)
static key_t getShmDescriptorKey()
static int shm_get(key_t key, int size)
unsigned int dqmWriteOffset_
Definition: FUShmBuffer.h:295
#define NULL
Definition: scimark2.h:8
unsigned int nRawCells_
Definition: FUShmBuffer.h:311
unsigned int recoWriteLast_
Definition: FUShmBuffer.h:287
static bool releaseSharedMemory()
static int sem_create(key_t key, int nsem)
bool removeClientPrcId(pid_t prcId)
void waitRawDiscard()
Definition: FUShmBuffer.h:256
static unsigned int size(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
FUShmDqmCell * dqmCellToRead()
Definition: FUShmBuffer.cc:354
unsigned int evtNumberOffset_
Definition: FUShmBuffer.h:302
unsigned int dqmCellOffset_
Definition: FUShmBuffer.h:328
evt::State_t evtState(unsigned int index)
void sleep(Duration_t)
Definition: Utils.h:163
bool writeRecoInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize, unsigned int nExpectedEPs)
Definition: FUShmBuffer.cc:693
void discardDqmCell(unsigned int iCell)
Definition: FUShmBuffer.cc:527
unsigned int evtNumber() const
Definition: FUShmRawCell.h:28
void writeInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize, unsigned int nExpectedEPs)
unsigned int nextDqmReadIndex()
time_t evtTimeStamp(unsigned int index)
unsigned int evtNumber(unsigned int index)
unsigned int recoWriteNext_
Definition: FUShmBuffer.h:286
void initialize(unsigned int shmid, unsigned int semid)
Definition: FUShmBuffer.cc:162
void writeRecoEmptyEvent()
Definition: FUShmBuffer.cc:614
bool setEvtNumber(unsigned int index, unsigned int evtNumber)
void postDqmIndexToRead(unsigned int index)
unsigned int evtStateOffset_
Definition: FUShmBuffer.h:300
void setEventTypeEol()
Definition: FUShmRawCell.h:61
static unsigned int size(unsigned int payloadSize)
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
void initialize(unsigned int index)
Definition: FUShmRawCell.cc:73
void discardOrphanedRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:517
bool setClientPrcId(pid_t prcId)
unsigned int getlbn(const unsigned char *)
std::string sem_print_s()
Definition: FUShmBuffer.cc:863
void discardRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:495
void postRawIndexToWrite(unsigned int index)
unsigned int nextRawReadIndex()
unsigned char * payloadAddr() const
FUShmRawCell * rawCell(unsigned int iCell)
#define SHM_KEYID
Definition: FUShmBuffer.cc:32
unsigned int recoReadOffset_
Definition: FUShmBuffer.h:291
unsigned int nDqmCells_
Definition: FUShmBuffer.h:325
static const char * shmKeyPath_
Definition: FUShmBuffer.h:330
tuple result
Definition: query.py:137
unsigned int rawWriteNext_
Definition: FUShmBuffer.h:278
pid_t evtPrcId(unsigned int index)
void finishWritingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:393
unsigned int nbRawCellsToWrite() const
Definition: FUShmBuffer.cc:288
bool setEvtDiscard(unsigned int index, unsigned int discard, bool checkValue=false, bool lockShm=true)
key_t rawCellShmKey(unsigned int iCell)
static int shm_destroy(int shmid)
unsigned int nRawCells() const
Definition: FUShmBuffer.h:65
bool setEvtState(unsigned int index, evt::State_t state, bool lockShm=true)
FUShmRecoCell * recoCell(unsigned int iCell)
void writeRawEmptyEvent()
Definition: FUShmBuffer.cc:572
bool rawCellReadyForDiscard(unsigned int index)
unsigned int offset(bool)
unsigned char * fedAddr(unsigned int i) const
Definition: FUShmRawCell.cc:94
void postRawDiscard()
Definition: FUShmBuffer.h:257
unsigned int dqmCellPayloadSize_
Definition: FUShmBuffer.h:326
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
Definition: FUShmBuffer.cc:917
void finishReadingDqmCell(FUShmDqmCell *cell)
Definition: FUShmBuffer.cc:438
void scheduleRawEmptyCellForDiscard()
Definition: FUShmBuffer.cc:638
unsigned int recoReadLast_
Definition: FUShmBuffer.h:290
void finishReadingRecoCell(FUShmRecoCell *cell)
Definition: FUShmBuffer.cc:426
unsigned int nextDqmWriteIndex()
void writeErrorEvent(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int fuProcessId, unsigned char *data, unsigned int dataSize)
key_t dqmCellShmKey(unsigned int iCell)
dqm::State_t dqmState(unsigned int index)
static key_t getShmKey()
unsigned int rawReadNext_
Definition: FUShmBuffer.h:281
unsigned int rawCellPayloadSize_
Definition: FUShmBuffer.h:312
unsigned int nextRecoWriteIndex()
unsigned int eventSize() const
unsigned int rawDiscardIndex_
Definition: FUShmBuffer.h:284
#define SEM_KEYID
Definition: FUShmBuffer.cc:33
unsigned int nextRawWriteIndex()
FUShmRawCell * rawCellToWrite()
Definition: FUShmBuffer.cc:298
unsigned int rawReadLast_
Definition: FUShmBuffer.h:282
unsigned int rawWriteOffset_
Definition: FUShmBuffer.h:280
unsigned int dqmCellTotalSize_
Definition: FUShmBuffer.h:327
void reset(bool)
Definition: FUShmBuffer.cc:201
bool setEvtTimeStamp(unsigned int index, time_t timeStamp)
unsigned int rawWriteLast_
Definition: FUShmBuffer.h:279
void writeDqmEmptyEvent()
Definition: FUShmBuffer.cc:626
unsigned int dqmReadNext_
Definition: FUShmBuffer.h:296
unsigned int recoCellTotalSize_
Definition: FUShmBuffer.h:320
void waitRawDiscarded()
Definition: FUShmBuffer.h:258
void postRecoIndexToRead(unsigned int index)
char state
Definition: procUtils.cc:75
unsigned int index() const
Definition: FUShmRawCell.h:25
void setLumiSection(unsigned int)
unsigned int indexForEvtNumber(unsigned int evtNumber)
static key_t getSemKey()
void printEvtState(unsigned int index)
Definition: FUShmBuffer.cc:878
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void postRawDiscarded()
Definition: FUShmBuffer.h:259
unsigned int nClientsMax_
Definition: FUShmBuffer.h:308
list key
Definition: combine.py:13
static int sem_get(key_t key, int nsem)
void sem_post(int isem)
FUShmRecoCell * recoCellToRead()
Definition: FUShmBuffer.cc:340
unsigned int rawCellTotalSize_
Definition: FUShmBuffer.h:313
FUShmBuffer(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
Definition: FUShmBuffer.cc:53
void writeData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmDqmCell.cc:69
static int sem_destroy(int semid)
#define SHM_DESCRIPTOR_KEYID
Definition: FUShmBuffer.cc:31
tuple cout
Definition: gather_cfg.py:121
static int shm_create(key_t key, int size)
unsigned int nDqmCells() const
Definition: FUShmBuffer.h:67
unsigned int fedSize(unsigned int i) const
Definition: FUShmRawCell.cc:82
void sem_init(int isem, int value)
unsigned int evtPrcIdOffset_
Definition: FUShmBuffer.h:303
unsigned int recoWriteOffset_
Definition: FUShmBuffer.h:288
int incEvtDiscard(unsigned int index, bool lockShm=true)
void writeEventData(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
static unsigned int size(unsigned int payloadSize)
Definition: FUShmDqmCell.cc:97
void postRecoWrite()
Definition: FUShmBuffer.h:261
unsigned int nClients_
Definition: FUShmBuffer.h:307
unsigned int rawCellOffset_
Definition: FUShmBuffer.h:314
unsigned int recoReadNext_
Definition: FUShmBuffer.h:289
unsigned int indexForEvtPrcId(pid_t evtNumber)
key_t shmKey(unsigned int iCell, unsigned int offset)
void finishReadingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:410
tuple size
Write out results.
unsigned int recoCellOffset_
Definition: FUShmBuffer.h:321
std::string timeStamp(TimePoint_t)
Definition: Utils.cc:23
void setEventTypeStopper()
Definition: FUShmRawCell.h:65
void postRecoIndexToWrite(unsigned int index)
bool setDqmState(unsigned int index, dqm::State_t state)
void postIndex(unsigned int index, unsigned int offset, unsigned int nCells, unsigned int &iLast)
FUShmDqmCell * dqmCell(unsigned int iCell)
void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:675
unsigned int evtTimeStampOffset_
Definition: FUShmBuffer.h:304
void scheduleRawCellForDiscard(unsigned int iCell)
Definition: FUShmBuffer.cc:454
int semid() const
Definition: FUShmBuffer.h:70