CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUShmBuffer.cc
Go to the documentation of this file.
1 //
3 // FUShmBuffer
4 // -----------
5 //
6 // 15/11/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
8 
9 
13 
14 #include <unistd.h>
15 #include <iostream>
16 #include <string>
17 #include <cassert>
18 
19 #include <sstream>
20 #include <fstream>
21 
22 #include <cstdlib>
23 #include <cstring>
24 #include <stdint.h>
25 
26 // the shmem keys are henceforth going to be FIXED for a give userid
27 // prior to creation, the application will attempt to get ownership
28 // of existing segments by the same key and destroy them
29 
30 #define SHM_DESCRIPTOR_KEYID 1 /* Id used on ftok for 1. shmget key */
31 #define SHM_KEYID 2 /* Id used on ftok for 2. shmget key */
32 #define SEM_KEYID 1 /* Id used on ftok for semget key */
33 
34 #define NSKIP_MAX 100
35 
36 
37 using namespace std;
38 using namespace evf;
39 
40 
41 //obsolete!!!
42 const char* FUShmBuffer::shmKeyPath_ =
43  (getenv("FUSHM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSHM_KEYFILE"));
44 const char* FUShmBuffer::semKeyPath_ =
45  (getenv("FUSEM_KEYFILE") == NULL ? "/dev/null" : getenv("FUSEM_KEYFILE"));
46 
47 
49 // construction/destruction
51 
52 //______________________________________________________________________________
53 FUShmBuffer::FUShmBuffer(bool segmentationMode,
54  unsigned int nRawCells,
55  unsigned int nRecoCells,
56  unsigned int nDqmCells,
57  unsigned int rawCellSize,
58  unsigned int recoCellSize,
59  unsigned int dqmCellSize)
60  : segmentationMode_(segmentationMode)
61  , nClientsMax_(128)
62  , nRawCells_(nRawCells)
63  , rawCellPayloadSize_(rawCellSize)
64  , nRecoCells_(nRecoCells)
65  , recoCellPayloadSize_(recoCellSize)
66  , nDqmCells_(nDqmCells)
67  , dqmCellPayloadSize_(dqmCellSize)
68 {
72 
73  void* addr;
74 
76  addr=(void*)((unsigned long)this+rawWriteOffset_);
77  new (addr) unsigned int[nRawCells_];
78 
79  rawReadOffset_=rawWriteOffset_+nRawCells_*sizeof(unsigned int);
80  addr=(void*)((unsigned long)this+rawReadOffset_);
81  new (addr) unsigned int[nRawCells_];
82 
83  recoWriteOffset_=rawReadOffset_+nRawCells_*sizeof(unsigned int);
84  addr=(void*)((unsigned long)this+recoWriteOffset_);
85  new (addr) unsigned int[nRecoCells_];
86 
87  recoReadOffset_=recoWriteOffset_+nRecoCells_*sizeof(unsigned int);
88  addr=(void*)((unsigned long)this+recoReadOffset_);
89  new (addr) unsigned int[nRecoCells_];
90 
91  dqmWriteOffset_=recoReadOffset_+nRecoCells_*sizeof(unsigned int);
92  addr=(void*)((unsigned long)this+dqmWriteOffset_);
93  new (addr) unsigned int[nDqmCells_];
94 
95  dqmReadOffset_=dqmWriteOffset_+nDqmCells_*sizeof(unsigned int);
96  addr=(void*)((unsigned long)this+dqmReadOffset_);
97  new (addr) unsigned int[nDqmCells_];
98 
99  evtStateOffset_=dqmReadOffset_+nDqmCells_*sizeof(unsigned int);
100  addr=(void*)((unsigned long)this+evtStateOffset_);
101  new (addr) evt::State_t[nRawCells_];
102 
104  addr=(void*)((unsigned long)this+evtDiscardOffset_);
105  new (addr) unsigned int[nRawCells_];
106 
107  evtNumberOffset_=evtDiscardOffset_+nRawCells_*sizeof(unsigned int);
108  addr=(void*)((unsigned long)this+evtNumberOffset_);
109  new (addr) unsigned int[nRawCells_];
110 
111  evtPrcIdOffset_=evtNumberOffset_+nRawCells_*sizeof(unsigned int);
112  addr=(void*)((unsigned long)this+evtPrcIdOffset_);
113  new (addr) pid_t[nRawCells_];
114 
116  addr=(void*)((unsigned long)this+evtTimeStampOffset_);
117  new (addr) time_t[nRawCells_];
118 
120  addr=(void*)((unsigned long)this+dqmStateOffset_);
121  new (addr) dqm::State_t[nDqmCells_];
122 
124  addr=(void*)((unsigned long)this+clientPrcIdOffset_);
125  new (addr) pid_t[nClientsMax_];
126 
128 
129  if (segmentationMode_) {
132  addr=(void*)((unsigned long)this+rawCellOffset_);
133  new (addr) key_t[nRawCells_];
134  addr=(void*)((unsigned long)this+recoCellOffset_);
135  new (addr) key_t[nRecoCells_];
136  addr=(void*)((unsigned long)this+dqmCellOffset_);
137  new (addr) key_t[nDqmCells_];
138  }
139  else {
142  for (unsigned int i=0;i<nRawCells_;i++) {
143  addr=(void*)((unsigned long)this+rawCellOffset_+i*rawCellTotalSize_);
144  new (addr) FUShmRawCell(rawCellSize);
145  }
146  for (unsigned int i=0;i<nRecoCells_;i++) {
147  addr=(void*)((unsigned long)this+recoCellOffset_+i*recoCellTotalSize_);
148  new (addr) FUShmRecoCell(recoCellSize);
149  }
150  for (unsigned int i=0;i<nDqmCells_;i++) {
151  addr=(void*)((unsigned long)this+dqmCellOffset_+i*dqmCellTotalSize_);
152  new (addr) FUShmDqmCell(dqmCellSize);
153  }
154  }
155 }
156 
157 
158 //______________________________________________________________________________
160 {
161 
162 }
163 
164 
166 // implementation of member functions
168 
169 //______________________________________________________________________________
170 void FUShmBuffer::initialize(unsigned int shmid,unsigned int semid)
171 {
172  shmid_=shmid;
173  semid_=semid;
174 
175  if (segmentationMode_) {
176  int shmKeyId=666;
177  key_t* keyAddr =(key_t*)((unsigned long)this+rawCellOffset_);
178  for (unsigned int i=0;i<nRawCells_;i++) {
179  *keyAddr =ftok(shmKeyPath_,shmKeyId++);
180  int shmid =shm_create(*keyAddr,rawCellTotalSize_);
181  void* shmAddr=shm_attach(shmid);
182  new (shmAddr) FUShmRawCell(rawCellPayloadSize_);
183  shmdt(shmAddr);
184  ++keyAddr;
185  }
186  keyAddr =(key_t*)((unsigned long)this+recoCellOffset_);
187  for (unsigned int i=0;i<nRecoCells_;i++) {
188  *keyAddr =ftok(shmKeyPath_,shmKeyId++);
189  int shmid =shm_create(*keyAddr,recoCellTotalSize_);
190  void* shmAddr=shm_attach(shmid);
191  new (shmAddr) FUShmRecoCell(recoCellPayloadSize_);
192  shmdt(shmAddr);
193  ++keyAddr;
194  }
195  keyAddr =(key_t*)((unsigned long)this+dqmCellOffset_);
196  for (unsigned int i=0;i<nDqmCells_;i++) {
197  *keyAddr =ftok(shmKeyPath_,shmKeyId++);
198  int shmid =shm_create(*keyAddr,dqmCellTotalSize_);
199  void* shmAddr=shm_attach(shmid);
200  new (shmAddr) FUShmDqmCell(dqmCellPayloadSize_);
201  shmdt(shmAddr);
202  ++keyAddr;
203  }
204  }
205 
206  for (unsigned int i=0;i<nRawCells_;i++) {
207  FUShmRawCell* cell=rawCell(i);
208  cell->initialize(i);
209  if (segmentationMode_) shmdt(cell);
210  }
211 
212  for (unsigned int i=0;i<nRecoCells_;i++) {
213  FUShmRecoCell* cell=recoCell(i);
214  cell->initialize(i);
215  if (segmentationMode_) shmdt(cell);
216  }
217 
218  for (unsigned int i=0;i<nDqmCells_;i++) {
219  FUShmDqmCell* cell=dqmCell(i);
220  cell->initialize(i);
221  if (segmentationMode_) shmdt(cell);
222  }
223 
224  reset();
225 }
226 
227 
228 //______________________________________________________________________________
230 {
231  nClients_=0;
232 
233  // setup ipc semaphores
234  sem_init(0,1); // lock (binary)
235  sem_init(1,nRawCells_); // raw write semaphore
236  sem_init(2,0); // raw read semaphore
237  sem_init(3,1); // binary semaphore to schedule raw event for discard
238  sem_init(4,0); // binary semaphore to discard raw event
239  sem_init(5,nRecoCells_);// reco write semaphore
240  sem_init(6,0); // reco send (read) semaphore
241  sem_init(7,nDqmCells_); // dqm write semaphore
242  sem_init(8,0); // dqm send (read) semaphore
243 
244  sem_print();
245 
246  unsigned int *iWrite,*iRead;
247 
249  iWrite=(unsigned int*)((unsigned long)this+rawWriteOffset_);
250  iRead =(unsigned int*)((unsigned long)this+rawReadOffset_);
251  for (unsigned int i=0;i<nRawCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
252 
254  iWrite=(unsigned int*)((unsigned long)this+recoWriteOffset_);
255  iRead =(unsigned int*)((unsigned long)this+recoReadOffset_);
256  for (unsigned int i=0;i<nRecoCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
257 
259  iWrite=(unsigned int*)((unsigned long)this+dqmWriteOffset_);
260  iRead =(unsigned int*)((unsigned long)this+dqmReadOffset_);
261  for (unsigned int i=0;i<nDqmCells_;i++) { *iWrite++=i; *iRead++ =0xffffffff; }
262 
263  for (unsigned int i=0;i<nRawCells_;i++) {
265  setEvtDiscard(i,0);
266  setEvtNumber(i,0xffffffff);
267  setEvtPrcId(i,0);
268  setEvtTimeStamp(i,0);
269  }
270 
271  for (unsigned int i=0;i<nDqmCells_;i++) setDqmState(i,dqm::EMPTY);
272 }
273 
274 
275 //______________________________________________________________________________
277 {
278  return semctl(semid(),1,GETVAL);
279 }
280 
281 
282 //______________________________________________________________________________
284 {
285  return semctl(semid(),2,GETVAL);
286 }
287 
288 
289 //______________________________________________________________________________
291 {
292  waitRawWrite();
293  unsigned int iCell=nextRawWriteIndex();
294  FUShmRawCell* cell =rawCell(iCell);
295  evt::State_t state=evtState(iCell);
296  assert(state==evt::EMPTY);
298  setEvtDiscard(iCell,1);
299  return cell;
300 }
301 
302 
303 //______________________________________________________________________________
305 {
306  waitRawRead();
307  unsigned int iCell=nextRawReadIndex();
308  FUShmRawCell* cell=rawCell(iCell);
309  evt::State_t state=evtState(iCell);
310  assert(state==evt::RAWWRITTEN
311  ||state==evt::EMPTY
312  ||state==evt::STOP
313  ||state==evt::LUMISECTION);
314  if (state==evt::RAWWRITTEN) {
315  setEvtPrcId(iCell,getpid());
317  }
318  return cell;
319 }
320 
321 
322 //______________________________________________________________________________
324 {
325  waitRecoRead();
326  unsigned int iCell =nextRecoReadIndex();
327  FUShmRecoCell* cell =recoCell(iCell);
328  unsigned int iRawCell=cell->rawCellIndex();
329  if (iRawCell<nRawCells_) {
330  //evt::State_t state=evtState(iRawCell);
331  //assert(state==evt::RECOWRITTEN);
332  setEvtState(iRawCell,evt::SENDING);
333  }
334  return cell;
335 }
336 
337 
338 //______________________________________________________________________________
340 {
341  waitDqmRead();
342  unsigned int iCell=nextDqmReadIndex();
343  FUShmDqmCell* cell=dqmCell(iCell);
344  dqm::State_t state=dqmState(iCell);
345  assert(state==dqm::WRITTEN||state==dqm::EMPTY);
346  if (state==dqm::WRITTEN) setDqmState(iCell,dqm::SENDING);
347  return cell;
348 }
349 
350 
351 //______________________________________________________________________________
353 {
356  evt::State_t state=evtState(cell->index());
357  assert(state==evt::PROCESSED
358  ||state==evt::SENT
359  ||state==evt::EMPTY
360  ||state==evt::STOP
361  ||state==evt::LUMISECTION);
362  if (state!=evt::EMPTY
363  && state!=evt::LUMISECTION
364  && state!=evt::STOP
365  )
367  return cell;
368 }
369 
370 
371 //______________________________________________________________________________
373 {
374  evt::State_t state=evtState(cell->index());
375  assert(state==evt::RAWWRITING);
377  setEvtNumber(cell->index(),cell->evtNumber());
378  postRawIndexToRead(cell->index());
379  if (segmentationMode_) shmdt(cell);
380  postRawRead();
381 }
382 
383 
384 //______________________________________________________________________________
386 {
387  evt::State_t state=evtState(cell->index());
388  assert(state==evt::RAWREADING);
389  setEvtState(cell->index(),evt::RAWREAD);
391  setEvtTimeStamp(cell->index(),time(0));
392  if (segmentationMode_) shmdt(cell);
393 }
394 
395 
396 //______________________________________________________________________________
398 {
399  unsigned int iRawCell=cell->rawCellIndex();
400  if (iRawCell<nRawCells_) {
401  //evt::State_t state=evtState(cell->rawCellIndex());
402  //assert(state==evt::SENDING);
404  }
405  if (segmentationMode_) shmdt(cell);
406 }
407 
408 
409 //______________________________________________________________________________
411 {
412  dqm::State_t state=dqmState(cell->index());
413  assert(state==dqm::SENDING||state==dqm::EMPTY);
414  if (state==dqm::SENDING) setDqmState(cell->index(),dqm::SENT);
415  if (segmentationMode_) shmdt(cell);
416 }
417 
418 
419 //______________________________________________________________________________
421 {
422  waitRawDiscard();
423  if (rawCellReadyForDiscard(iCell)) {
424  rawDiscardIndex_=iCell;
425  evt::State_t state=evtState(iCell);
426  assert(state==evt::PROCESSING
427  ||state==evt::SENT
428  ||state==evt::EMPTY
429  ||state==evt::STOP
430  ||state==evt::LUMISECTION
431  );
432  if (state==evt::PROCESSING) setEvtState(iCell,evt::PROCESSED);
434  }
435  else postRawDiscard();
436 }
437 
438 
439 //______________________________________________________________________________
441 {
442  waitRawDiscard();
443  if (rawCellReadyForDiscard(iCell)) {
444  rawDiscardIndex_=iCell;
445  evt::State_t state=evtState(iCell);
446  assert(state==evt::RAWWRITTEN
447  );
450  }
451  else postRawDiscard();
452 }
453 
454 
455 //______________________________________________________________________________
457 {
458  releaseRawCell(cell);
459  postRawDiscard();
460 }
461 
462 
463 //______________________________________________________________________________
464 void FUShmBuffer::discardRecoCell(unsigned int iCell)
465 {
466  FUShmRecoCell* cell=recoCell(iCell);
467  unsigned int iRawCell=cell->rawCellIndex();
468  if (iRawCell<nRawCells_) {
469  //evt::State_t state=evtState(iRawCell);
470  //assert(state==evt::SENT);
471  scheduleRawCellForDiscard(iRawCell);
472  }
473  cell->clear();
474  if (segmentationMode_) shmdt(cell);
475  postRecoIndexToWrite(iCell);
476  postRecoWrite();
477 }
478 
479 //______________________________________________________________________________
480 void FUShmBuffer::discardOrphanedRecoCell(unsigned int iCell)
481 {
482  FUShmRecoCell* cell=recoCell(iCell);
483  cell->clear();
484  if (segmentationMode_) shmdt(cell);
485  postRecoIndexToWrite(iCell);
486  postRecoWrite();
487 }
488 
489 
490 //______________________________________________________________________________
491 void FUShmBuffer::discardDqmCell(unsigned int iCell)
492 {
493  dqm::State_t state=dqmState(iCell);
494  assert(state==dqm::EMPTY||state==dqm::SENT);
496  FUShmDqmCell* cell=dqmCell(iCell);
497  cell->clear();
498  if (segmentationMode_) shmdt(cell);
499  setDqmState(iCell,dqm::EMPTY);
500  postDqmIndexToWrite(iCell);
501  postDqmWrite();
502 }
503 
504 
505 //______________________________________________________________________________
507 {
508  evt::State_t state=evtState(cell->index());
509  if(!(
510  state==evt::DISCARDING
511  ||state==evt::RAWWRITING
512  ||state==evt::EMPTY
513  ||state==evt::STOP
514  ||state==evt::LUMISECTION
515  )) std::cout << "=================releaseRawCell state " << state
516  << std::endl;
517 
518  assert(
519  state==evt::DISCARDING
520  ||state==evt::RAWWRITING
521  ||state==evt::EMPTY
522  ||state==evt::STOP
523  ||state==evt::LUMISECTION
524  );
525  setEvtState(cell->index(),evt::EMPTY);
526  setEvtDiscard(cell->index(),0);
527  setEvtNumber(cell->index(),0xffffffff);
528  setEvtPrcId(cell->index(),0);
529  setEvtTimeStamp(cell->index(),0);
530  cell->clear();
531  postRawIndexToWrite(cell->index());
532  if (segmentationMode_) shmdt(cell);
533  postRawWrite();
534 }
535 
536 
537 //______________________________________________________________________________
539 {
541  evt::State_t state=evtState(cell->index());
542  assert(state==evt::RAWWRITING);
543  setEvtState(cell->index(),evt::STOP);
544  postRawIndexToRead(cell->index());
545  if (segmentationMode_) shmdt(cell);
546  postRawRead();
547 }
548 
549 //______________________________________________________________________________
551 {
553  cell->setLumiSection(ls);
554  evt::State_t state=evtState(cell->index());
555  assert(state==evt::RAWWRITING);
557  postRawIndexToRead(cell->index());
558  if (segmentationMode_) shmdt(cell);
559  postRawRead();
560 }
561 
562 
563 //______________________________________________________________________________
565 {
566  waitRecoWrite();
567  unsigned int iCell=nextRecoWriteIndex();
568  FUShmRecoCell* cell =recoCell(iCell);
569  cell->clear();
570  postRecoIndexToRead(iCell);
571  if (segmentationMode_) shmdt(cell);
572  postRecoRead();
573 }
574 
575 
576 //______________________________________________________________________________
578 {
579  waitDqmWrite();
580  unsigned int iCell=nextDqmWriteIndex();
581  FUShmDqmCell* cell=dqmCell(iCell);
582  cell->clear();
583  postDqmIndexToRead(iCell);
584  if (segmentationMode_) shmdt(cell);
585  postDqmRead();
586 }
587 
588 
589 //______________________________________________________________________________
591 {
593  rawDiscardIndex_=cell->index();
594  setEvtState(cell->index(),evt::STOP);
595  setEvtNumber(cell->index(),0xffffffff);
596  setEvtPrcId(cell->index(),0);
597  setEvtTimeStamp(cell->index(),0);
599 }
600 
601 
602 //______________________________________________________________________________
604 {
605  waitRawDiscard();
606  if (rawCellReadyForDiscard(cell->index())) {
607  rawDiscardIndex_=cell->index();
608  setEvtState(cell->index(),evt::STOP);
609  setEvtNumber(cell->index(),0xffffffff);
610  setEvtPrcId(cell->index(),0);
611  setEvtTimeStamp(cell->index(),0);
612  removeClientPrcId(getpid());
613  if (segmentationMode_) shmdt(cell);
615  }
616  else postRawDiscard();
617 }
618 
619 //______________________________________________________________________________
621 {
622  // waitRawDiscard();
623  if (rawCellReadyForDiscard(cell->index())) {
624  rawDiscardIndex_=cell->index();
625  setEvtState(cell->index(),evt::STOP);
626  setEvtNumber(cell->index(),0xffffffff);
627  setEvtPrcId(cell->index(),0);
628  setEvtTimeStamp(cell->index(),0);
629  // removeClientPrcId(getpid());
630  if (segmentationMode_) shmdt(cell);
632  }
633  else postRawDiscard();
634 }
635 
636 
637 //______________________________________________________________________________
638 bool FUShmBuffer::writeRecoInitMsg(unsigned int outModId,
639  unsigned int fuProcessId,
640  unsigned int fuGuid,
641  unsigned char *data,
642  unsigned int dataSize)
643 {
644  if (dataSize>recoCellPayloadSize_) {
645  cout<<"FUShmBuffer::writeRecoInitMsg() ERROR: buffer overflow."<<endl;
646  return false;
647  }
648 
649  waitRecoWrite();
650  unsigned int iCell=nextRecoWriteIndex();
651  FUShmRecoCell* cell =recoCell(iCell);
652  cell->writeInitMsg(outModId,fuProcessId,fuGuid,data,dataSize);
653  postRecoIndexToRead(iCell);
654  if (segmentationMode_) shmdt(cell);
655  postRecoRead();
656  return true;
657 }
658 
659 
660 //______________________________________________________________________________
662  unsigned int evtNumber,
663  unsigned int outModId,
664  unsigned int fuProcessId,
665  unsigned int fuGuid,
666  unsigned char *data,
667  unsigned int dataSize)
668 {
669  if (dataSize>recoCellPayloadSize_) {
670  cout<<"FUShmBuffer::writeRecoEventData() ERROR: buffer overflow."<<endl;
671  return false;
672  }
673 
674  waitRecoWrite();
675  unsigned int iCell=nextRecoWriteIndex();
676  FUShmRecoCell* cell =recoCell(iCell);
677  unsigned int rawCellIndex=indexForEvtNumber(evtNumber);
678  //evt::State_t state=evtState(rawCellIndex);
679  //assert(state==evt::PROCESSING||state==evt::RECOWRITING||state==evt::SENT);
680  setEvtState(rawCellIndex,evt::RECOWRITING);
681  incEvtDiscard(rawCellIndex);
682  cell->writeEventData(rawCellIndex,runNumber,evtNumber,outModId,
683  fuProcessId,fuGuid,data,dataSize);
684  setEvtState(rawCellIndex,evt::RECOWRITTEN);
685  postRecoIndexToRead(iCell);
686  if (segmentationMode_) shmdt(cell);
687  postRecoRead();
688  return true;
689 }
690 
691 
692 //______________________________________________________________________________
694  unsigned int fuProcessId,
695  unsigned int iRawCell)
696 {
697  FUShmRawCell *raw=rawCell(iRawCell);
698 
699  unsigned int dataSize=sizeof(uint32_t)*(4+1024)+raw->eventSize();
700  unsigned char *data =new unsigned char[dataSize];
701  uint32_t *pos =(uint32_t*)data;
702  // 06-Oct-2008, KAB - added a version number for the error event format.
703  //
704  // Version 1 had no version number, so the run number appeared in the
705  // first value. So, a reasonable test for version 1 is whether the
706  // first value is larger than some relatively small cutoff (say 32).
707  // Version 2 added the lumi block number.
708  //
709  *pos++=(uint32_t)2; // protocol version number
710  *pos++=(uint32_t)runNumber;
711  *pos++=(uint32_t)evf::evtn::getlbn(raw->fedAddr(FEDNumbering::MINTriggerGTPFEDID)) + 1;
712  *pos++=(uint32_t)raw->evtNumber();
713  for (unsigned int i=0;i<1024;i++) *pos++ = (uint32_t)raw->fedSize(i);
714  memcpy(pos,raw->payloadAddr(),raw->eventSize());
715 
716  // DEBUG
717  /*
718  if (1) {
719  stringstream ss;
720  ss<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".err";
721  ofstream fout;
722  fout.open(ss.str().c_str(),ios::out|ios::binary);
723  if (!fout.write((char*)data,dataSize))
724  cout<<"Failed to write error event to "<<ss.str()<<endl;
725  fout.close();
726 
727  stringstream ss2;
728  ss2<<"/tmp/run"<<runNumber<<"_evt"<<raw->evtNumber()<<".info";
729  ofstream fout2;
730  fout2.open(ss2.str().c_str());
731  fout2<<"dataSize = "<<dataSize<<endl;
732  fout2<<"runNumber = "<<runNumber<<endl;
733  fout2<<"evtNumber = "<<raw->evtNumber()<<endl;
734  fout2<<"eventSize = "<<raw->eventSize()<<endl;
735  unsigned int totalSize(0);
736  for (unsigned int i=0;i<1024;i++) {
737  unsigned int fedSize = raw->fedSize(i);
738  totalSize += fedSize;
739  if (fedSize>0) fout2<<i<<": "<<fedSize<<endl;
740  }
741  fout2<<"totalSize = "<<totalSize<<endl;
742  fout2.close();
743  }
744  */// END DEBUG
745 
746  waitRecoWrite();
747  unsigned int iRecoCell=nextRecoWriteIndex();
748  FUShmRecoCell* reco =recoCell(iRecoCell);
749  setEvtState(iRawCell,evt::RECOWRITING);
750  setEvtDiscard(iRawCell,1);
751  reco->writeErrorEvent(iRawCell,runNumber,raw->evtNumber(),fuProcessId,
752  data,dataSize);
753  delete [] data;
754  setEvtState(iRawCell,evt::RECOWRITTEN);
755  postRecoIndexToRead(iRecoCell);
756  if (segmentationMode_) { shmdt(raw); shmdt(reco); }
757  postRecoRead();
758  return true;
759 }
760 
761 
762 //______________________________________________________________________________
764  unsigned int evtAtUpdate,
765  unsigned int folderId,
766  unsigned int fuProcessId,
767  unsigned int fuGuid,
768  unsigned char *data,
769  unsigned int dataSize)
770 {
771  if (dataSize>dqmCellPayloadSize_) {
772  cout<<"FUShmBuffer::writeDqmEventData() ERROR: buffer overflow."<<endl;
773  return false;
774  }
775 
776  waitDqmWrite();
777  unsigned int iCell=nextDqmWriteIndex();
778  FUShmDqmCell* cell=dqmCell(iCell);
779  dqm::State_t state=dqmState(iCell);
780  assert(state==dqm::EMPTY);
781  setDqmState(iCell,dqm::WRITING);
782  cell->writeData(runNumber,evtAtUpdate,folderId,fuProcessId,fuGuid,data,dataSize);
783  setDqmState(iCell,dqm::WRITTEN);
784  postDqmIndexToRead(iCell);
785  if (segmentationMode_) shmdt(cell);
786  postDqmRead();
787  return true;
788 }
789 
790 
791 //______________________________________________________________________________
793 {
794  cout<<"--> current sem values:"
795  <<endl
796  <<" lock="<<semctl(semid(),0,GETVAL)
797  <<endl
798  <<" wraw="<<semctl(semid(),1,GETVAL)
799  <<" rraw="<<semctl(semid(),2,GETVAL)
800  <<endl
801  <<" wdsc="<<semctl(semid(),3,GETVAL)
802  <<" rdsc="<<semctl(semid(),4,GETVAL)
803  <<endl
804  <<" wrec="<<semctl(semid(),5,GETVAL)
805  <<" rrec="<<semctl(semid(),6,GETVAL)
806  <<endl
807  <<" wdqm="<<semctl(semid(),7,GETVAL)
808  <<" rdqm="<<semctl(semid(),8,GETVAL)
809  <<endl;
810 }
811 
812 
813 //______________________________________________________________________________
815 {
816  evt::State_t state=evtState(index);
817  std::string stateName;
818  if (state==evt::EMPTY) stateName="EMPTY";
819  else if (state==evt::STOP) stateName="STOP";
820  else if (state==evt::RAWWRITING) stateName="RAWWRITING";
821  else if (state==evt::RAWWRITTEN) stateName="RAWRITTEN";
822  else if (state==evt::RAWREADING) stateName="RAWREADING";
823  else if (state==evt::RAWREAD) stateName="RAWREAD";
824  else if (state==evt::PROCESSING) stateName="PROCESSING";
825  else if (state==evt::PROCESSED) stateName="PROCESSED";
826  else if (state==evt::RECOWRITING)stateName="RECOWRITING";
827  else if (state==evt::RECOWRITTEN)stateName="RECOWRITTEN";
828  else if (state==evt::SENDING) stateName="SENDING";
829  else if (state==evt::SENT) stateName="SENT";
830  else if (state==evt::DISCARDING) stateName="DISCARDING";
831  cout<<"evt "<<index<<" in state '"<<stateName<<"'."<<endl;
832 }
833 
834 
835 //______________________________________________________________________________
837 {
838  dqm::State_t state=dqmState(index);
839  cout<<"dqm evt "<<index<<" in state '"<<state<<"'."<<endl;
840 }
841 
842 
843 //______________________________________________________________________________
845  unsigned int nRawCells,
846  unsigned int nRecoCells,
847  unsigned int nDqmCells,
848  unsigned int rawCellSize,
849  unsigned int recoCellSize,
850  unsigned int dqmCellSize)
851 {
852  // if necessary, release shared memory first!
854  cout<<"FUShmBuffer::createShmBuffer: "
855  <<"REMOVAL OF OLD SHARED MEM SEGMENTS SUCCESSFULL."
856  <<endl;
857 
858  // create bookkeeping shared memory segment
859  int size =sizeof(unsigned int)*7;
860  int shmid=shm_create(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
861  void*shmAddr=shm_attach(shmid); if(0==shmAddr)return 0;
862 
863  if(1!=shm_nattch(shmid)) {
864  cout<<"FUShmBuffer::createShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
865  shmdt(shmAddr);
866  return 0;
867  }
868 
869  unsigned int* p=(unsigned int*)shmAddr;
870  *p++=segmentationMode;
871  *p++=nRawCells;
872  *p++=nRecoCells;
873  *p++=nDqmCells;
874  *p++=rawCellSize;
875  *p++=recoCellSize;
876  *p++=dqmCellSize;
877  shmdt(shmAddr);
878 
879  // create the 'real' shared memory buffer
880  size =FUShmBuffer::size(segmentationMode,
881  nRawCells,nRecoCells,nDqmCells,
882  rawCellSize,recoCellSize,dqmCellSize);
883  shmid =shm_create(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
884  int semid=sem_create(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
885  shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
886 
887  if (1!=shm_nattch(shmid)) {
888  cout<<"FUShmBuffer::createShmBuffer FAILED: nattch="<<shm_nattch(shmid)<<endl;
889  shmdt(shmAddr);
890  return 0;
891  }
892  FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
893  nRawCells,nRecoCells,nDqmCells,
894  rawCellSize,recoCellSize,dqmCellSize);
895 
896  cout<<"FUShmBuffer::createShmBuffer(): CREATED shared memory buffer."<<endl;
897  cout<<" segmentationMode="<<segmentationMode<<endl;
898 
899  buffer->initialize(shmid,semid);
900 
901  return buffer;
902 }
903 
904 
905 //______________________________________________________________________________
907 {
908  // get bookkeeping shared memory segment
909  int size =sizeof(unsigned int)*7;
910  int shmid =shm_get(FUShmBuffer::getShmDescriptorKey(),size);if(shmid<0)return 0;
911  void* shmAddr=shm_attach(shmid); if (0==shmAddr) return 0;
912 
913  unsigned int *p=(unsigned int*)shmAddr;
914  bool segmentationMode=*p++;
915  unsigned int nRawCells =*p++;
916  unsigned int nRecoCells =*p++;
917  unsigned int nDqmCells =*p++;
918  unsigned int rawCellSize =*p++;
919  unsigned int recoCellSize =*p++;
920  unsigned int dqmCellSize =*p++;
921  shmdt(shmAddr);
922 
923  cout<<"FUShmBuffer::getShmBuffer():"
924  <<" segmentationMode="<<segmentationMode
925  <<" nRawCells="<<nRawCells
926  <<" nRecoCells="<<nRecoCells
927  <<" nDqmCells="<<nDqmCells
928  <<" rawCellSize="<<rawCellSize
929  <<" recoCellSize="<<recoCellSize
930  <<" dqmCellSize="<<dqmCellSize
931  <<endl;
932 
933  // get the 'real' shared memory buffer
934  size =FUShmBuffer::size(segmentationMode,
935  nRawCells,nRecoCells,nDqmCells,
936  rawCellSize,recoCellSize,dqmCellSize);
937  shmid =shm_get(FUShmBuffer::getShmKey(),size); if (shmid<0) return 0;
938  int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return 0;
939  shmAddr =shm_attach(shmid); if (0==shmAddr) return 0;
940 
941  if (0==shm_nattch(shmid)) {
942  cout<<"FUShmBuffer::getShmBuffer() FAILED: nattch="<<shm_nattch(shmid)<<endl;
943  return 0;
944  }
945 
946  FUShmBuffer* buffer=new(shmAddr) FUShmBuffer(segmentationMode,
947  nRawCells,nRecoCells,nDqmCells,
948  rawCellSize,recoCellSize,dqmCellSize);
949 
950  cout<<"FUShmBuffer::getShmBuffer(): shared memory buffer RETRIEVED."<<endl;
951  cout<<" segmentationMode="<<segmentationMode<<endl;
952 
953  buffer->setClientPrcId(getpid());
954 
955  return buffer;
956 }
957 
958 
959 //______________________________________________________________________________
961 {
962  // get bookkeeping shared memory segment
963  int size =sizeof(unsigned int)*7;
964  int shmidd =shm_get(FUShmBuffer::getShmDescriptorKey(),size); if(shmidd<0) return false;
965  void* shmAddr=shm_attach(shmidd); if (0==shmAddr) return false;
966 
967  unsigned int*p=(unsigned int*)shmAddr;
968  bool segmentationMode=*p++;
969  unsigned int nRawCells =*p++;
970  unsigned int nRecoCells =*p++;
971  unsigned int nDqmCells =*p++;
972  unsigned int rawCellSize =*p++;
973  unsigned int recoCellSize =*p++;
974  unsigned int dqmCellSize =*p++;
975  shmdt(shmAddr);
976 
977 
978  // get the 'real' shared memory segment
979  size =FUShmBuffer::size(segmentationMode,
980  nRawCells,nRecoCells,nDqmCells,
981  rawCellSize,recoCellSize,dqmCellSize);
982  int shmid=shm_get(FUShmBuffer::getShmKey(),size);if (shmid<0) return false;
983  int semid=sem_get(FUShmBuffer::getSemKey(),9); if (semid<0) return false;
984  shmAddr =shm_attach(shmid); if (0==shmAddr) return false;
985 
986  int att = 0;
987  for(; att <10; att++)
988  {
989  if(shm_nattch(shmid)>1) {
990  cout << att << " FUShmBuffer::releaseSharedMemory(): nattch="<<shm_nattch(shmid)
991  <<", failed attempt to release shared memory."<<endl;
992  ::sleep(1);
993  }
994  else
995  break;
996  }
997 
998  if(att>=10) return false;
999 
1000  if (segmentationMode) {
1001  FUShmBuffer* buffer=
1002  new (shmAddr) FUShmBuffer(segmentationMode,
1003  nRawCells,nRecoCells,nDqmCells,
1004  rawCellSize,recoCellSize,dqmCellSize);
1005  int cellid;
1006  for (unsigned int i=0;i<nRawCells;i++) {
1007  cellid=shm_get(buffer->rawCellShmKey(i),FUShmRawCell::size(rawCellSize));
1008  if ((shm_destroy(cellid)==-1)) return false;
1009  }
1010  for (unsigned int i=0;i<nRecoCells;i++) {
1011  cellid=shm_get(buffer->recoCellShmKey(i),FUShmRecoCell::size(recoCellSize));
1012  if ((shm_destroy(cellid)==-1)) return false;
1013  }
1014  for (unsigned int i=0;i<nDqmCells;i++) {
1015  cellid=shm_get(buffer->dqmCellShmKey(i),FUShmDqmCell::size(dqmCellSize));
1016  if ((shm_destroy(cellid)==-1)) return false;
1017  }
1018  }
1019  shmdt(shmAddr);
1020  if (sem_destroy(semid)==-1) return false;
1021  if (shm_destroy(shmid)==-1) return false;
1022  if (shm_destroy(shmidd)==-1) return false;
1023 
1024  return true;
1025 }
1026 
1027 
1028 //______________________________________________________________________________
1029 unsigned int FUShmBuffer::size(bool segmentationMode,
1030  unsigned int nRawCells,
1031  unsigned int nRecoCells,
1032  unsigned int nDqmCells,
1033  unsigned int rawCellSize,
1034  unsigned int recoCellSize,
1035  unsigned int dqmCellSize)
1036 {
1037  unsigned int offset=
1038  sizeof(FUShmBuffer)+
1039  sizeof(unsigned int)*4*nRawCells+
1040  sizeof(evt::State_t)*nRawCells+
1041  sizeof(dqm::State_t)*nDqmCells;
1042 
1043  unsigned int rawCellTotalSize=FUShmRawCell::size(rawCellSize);
1044  unsigned int recoCellTotalSize=FUShmRecoCell::size(recoCellSize);
1045  unsigned int dqmCellTotalSize=FUShmDqmCell::size(dqmCellSize);
1046 
1047  unsigned int realSize =
1048  (segmentationMode) ?
1049  offset
1050  +sizeof(key_t)*(nRawCells+nRecoCells+nDqmCells)
1051  :
1052  offset
1053  +rawCellTotalSize*nRawCells
1054  +recoCellTotalSize*nRecoCells
1055  +dqmCellTotalSize*nDqmCells;
1056 
1057  unsigned int result=realSize/0x10*0x10 + (realSize%0x10>0)*0x10;
1058 
1059  return result;
1060 }
1061 
1062 
1063 //______________________________________________________________________________
1065 {
1066  key_t result=getuid()*1000+SHM_DESCRIPTOR_KEYID;
1067  if (result==(key_t)-1) cout<<"FUShmBuffer::getShmDescriptorKey: failed "
1068  <<"for file "<<shmKeyPath_<<"!"<<endl;
1069  return result;
1070 }
1071 
1072 
1073 //______________________________________________________________________________
1075 {
1076  key_t result=getuid()*1000+SHM_KEYID;
1077  if (result==(key_t)-1) cout<<"FUShmBuffer::getShmKey: ftok() failed "
1078  <<"for file "<<shmKeyPath_<<"!"<<endl;
1079  return result;
1080 }
1081 
1082 
1083 //______________________________________________________________________________
1085 {
1086  key_t result=getuid()*1000+SEM_KEYID;
1087  if (result==(key_t)-1) cout<<"FUShmBuffer::getSemKey: ftok() failed "
1088  <<"for file "<<semKeyPath_<<"!"<<endl;
1089  return result;
1090 }
1091 
1092 
1093 //______________________________________________________________________________
1095 {
1096  // first check and possibly remove existing segment with same id
1097  int shmid=shmget(key,1,0644);//using minimal size any segment with key "key" will be connected
1098  if(shmid!=-1){
1099  // an existing segment was found, remove it
1100  shmid_ds shmstat;
1101  int result=shmctl(shmid,IPC_STAT,&shmstat);
1102  cout << "FUShmBuffer found segment for key 0x " << hex << key << dec
1103  << " created by process " << shmstat.shm_cpid << " owned by "
1104  << shmstat.shm_perm.uid << " permissions "
1105  << hex << shmstat.shm_perm.mode << dec << endl;
1106  result=shmctl(shmid,IPC_RMID,&shmstat);
1107  }
1108  shmid=shmget(key,size,IPC_CREAT|0644);
1109  if (shmid==-1) {
1110  int err=errno;
1111  cout<<"FUShmBuffer::shm_create("<<key<<","<<size<<") failed: "
1112  <<strerror(err)<<endl;
1113  }
1114  return shmid;
1115 }
1116 
1117 
1118 //______________________________________________________________________________
1120 {
1121  int shmid=shmget(key,size,0644);
1122  if (shmid==-1) {
1123  int err=errno;
1124  cout<<"FUShmBuffer::shm_get("<<key<<","<<size<<") failed: "
1125  <<strerror(err)<<endl;
1126  }
1127  return shmid;
1128 }
1129 
1130 
1131 //______________________________________________________________________________
1132 void* FUShmBuffer::shm_attach(int shmid)
1133 {
1134  void* result=shmat(shmid,NULL,0);
1135  if (0==result) {
1136  int err=errno;
1137  cout<<"FUShmBuffer::shm_attach("<<shmid<<") failed: "
1138  <<strerror(err)<<endl;
1139  }
1140  return result;
1141 }
1142 
1143 
1144 //______________________________________________________________________________
1146 {
1147  shmid_ds shmstat;
1148  shmctl(shmid,IPC_STAT,&shmstat);
1149  return shmstat.shm_nattch;
1150 }
1151 
1152 
1153 //______________________________________________________________________________
1155 {
1156  shmid_ds shmstat;
1157  int result=shmctl(shmid,IPC_RMID,&shmstat);
1158  if (result==-1) cout<<"FUShmBuffer::shm_destroy(shmid="<<shmid<<") failed."<<endl;
1159  return result;
1160 }
1161 
1162 
1163 //______________________________________________________________________________
1164 int FUShmBuffer::sem_create(key_t key,int nsem)
1165 {
1166  int semid=semget(key,nsem,IPC_CREAT|0666);
1167  if (semid<0) {
1168  int err=errno;
1169  cout<<"FUShmBuffer::sem_create(key="<<key<<",nsem="<<nsem<<") failed: "
1170  <<strerror(err)<<endl;
1171  }
1172  return semid;
1173 }
1174 
1175 
1176 //______________________________________________________________________________
1177 int FUShmBuffer::sem_get(key_t key,int nsem)
1178 {
1179  int semid=semget(key,nsem,0666);
1180  if (semid<0) {
1181  int err=errno;
1182  cout<<"FUShmBuffer::sem_get(key="<<key<<",nsem="<<nsem<<") failed: "
1183  <<strerror(err)<<endl;
1184  }
1185  return semid;
1186 }
1187 
1188 
1189 //______________________________________________________________________________
1191 {
1192  int result=semctl(semid,0,IPC_RMID);
1193  if (result==-1) cout<<"FUShmBuffer::sem_destroy(semid="<<semid<<") failed."<<endl;
1194  return result;
1195 }
1196 
1197 
1198 
1200 // implementation of private member functions
1202 
1203 //______________________________________________________________________________
1204 unsigned int FUShmBuffer::nextIndex(unsigned int offset,
1205  unsigned int nCells,
1206  unsigned int& iNext)
1207 {
1208  lock();
1209  unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
1210  pindex+=iNext;
1211  iNext=(iNext+1)%nCells;
1212  unsigned int result=*pindex;
1213  unlock();
1214  return result;
1215 }
1216 
1217 
1218 //______________________________________________________________________________
1219 void FUShmBuffer::postIndex(unsigned int index,
1220  unsigned int offset,
1221  unsigned int nCells,
1222  unsigned int& iLast)
1223 {
1224  lock();
1225  unsigned int* pindex=(unsigned int*)((unsigned long)this+offset);
1226  pindex+=iLast;
1227  *pindex=index;
1228  iLast=(iLast+1)%nCells;
1229  unlock();
1230 }
1231 
1232 
1233 //______________________________________________________________________________
1235 {
1237 }
1238 
1239 
1240 //______________________________________________________________________________
1242 {
1244 }
1245 
1246 
1247 //______________________________________________________________________________
1249 {
1251 }
1252 
1253 
1254 //______________________________________________________________________________
1256 {
1258 }
1259 
1260 
1261 //______________________________________________________________________________
1263 {
1265 }
1266 
1267 
1268 //______________________________________________________________________________
1270 {
1272 }
1273 
1274 
1275 //______________________________________________________________________________
1277 {
1279 }
1280 
1281 
1282 //______________________________________________________________________________
1284 {
1286 }
1287 
1288 
1289 //______________________________________________________________________________
1291 {
1293 }
1294 
1295 
1296 //______________________________________________________________________________
1298 {
1300 }
1301 
1302 
1303 //______________________________________________________________________________
1305 {
1307 }
1308 
1309 
1310 //______________________________________________________________________________
1312 {
1314 }
1315 
1316 
1317 //______________________________________________________________________________
1318 unsigned int FUShmBuffer::indexForEvtNumber(unsigned int evtNumber)
1319 {
1320  unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
1321  for (unsigned int i=0;i<nRawCells_;i++) {
1322  if ((*pevt++)==evtNumber) return i;
1323  }
1324  assert(false);
1325  return 0xffffffff;
1326 }
1327 
1328 
1329 //______________________________________________________________________________
1331 {
1332  assert(index<nRawCells_);
1333  evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
1334  pstate+=index;
1335  return *pstate;
1336 }
1337 
1338 
1339 //______________________________________________________________________________
1341 {
1342  assert(index<nDqmCells_);
1343  dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
1344  pstate+=index;
1345  return *pstate;
1346 }
1347 
1348 
1349 //______________________________________________________________________________
1350 unsigned int FUShmBuffer::evtNumber(unsigned int index)
1351 {
1352  assert(index<nRawCells_);
1353  unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
1354  pevt+=index;
1355  return *pevt;
1356 }
1357 
1358 
1359 //______________________________________________________________________________
1360 pid_t FUShmBuffer::evtPrcId(unsigned int index)
1361 {
1362  assert(index<nRawCells_);
1363  pid_t *prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
1364  prcid+=index;
1365  return *prcid;
1366 }
1367 
1368 
1369 //______________________________________________________________________________
1370 time_t FUShmBuffer::evtTimeStamp(unsigned int index)
1371 {
1372  assert(index<nRawCells_);
1373  time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
1374  ptstmp+=index;
1375  return *ptstmp;
1376 }
1377 
1378 
1379 //______________________________________________________________________________
1380 pid_t FUShmBuffer::clientPrcId(unsigned int index)
1381 {
1382  assert(index<nClientsMax_);
1383  pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
1384  prcid+=index;
1385  return *prcid;
1386 }
1387 
1388 
1389 //______________________________________________________________________________
1391 {
1392  assert(index<nRawCells_);
1393  evt::State_t *pstate=(evt::State_t*)((unsigned long)this+evtStateOffset_);
1394  pstate+=index;
1395  lock();
1396  *pstate=state;
1397  unlock();
1398  return true;
1399 }
1400 
1401 
1402 //______________________________________________________________________________
1404 {
1405  assert(index<nDqmCells_);
1406  dqm::State_t *pstate=(dqm::State_t*)((unsigned long)this+dqmStateOffset_);
1407  pstate+=index;
1408  lock();
1409  *pstate=state;
1410  unlock();
1411  return true;
1412 }
1413 
1414 
1415 //______________________________________________________________________________
1416 bool FUShmBuffer::setEvtDiscard(unsigned int index,unsigned int discard)
1417 {
1418  assert(index<nRawCells_);
1419  unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
1420  pcount+=index;
1421  lock();
1422  *pcount=discard;
1423  unlock();
1424  return true;
1425 }
1426 
1427 
1428 //______________________________________________________________________________
1430 {
1431  int result = 0;
1432  assert(index<nRawCells_);
1433  unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
1434  pcount+=index;
1435  lock();
1436  (*pcount)++;
1437  result = *pcount;
1438  unlock();
1439  return result;
1440 }
1441 
1442 
1443 //______________________________________________________________________________
1444 bool FUShmBuffer::setEvtNumber(unsigned int index,unsigned int evtNumber)
1445 {
1446  assert(index<nRawCells_);
1447  unsigned int *pevt=(unsigned int*)((unsigned long)this+evtNumberOffset_);
1448  pevt+=index;
1449  lock();
1450  *pevt=evtNumber;
1451  unlock();
1452  return true;
1453 }
1454 
1455 
1456 //______________________________________________________________________________
1457 bool FUShmBuffer::setEvtPrcId(unsigned int index,pid_t prcId)
1458 {
1459  assert(index<nRawCells_);
1460  pid_t* prcid=(pid_t*)((unsigned long)this+evtPrcIdOffset_);
1461  prcid+=index;
1462  lock();
1463  *prcid=prcId;
1464  unlock();
1465  return true;
1466 }
1467 
1468 
1469 //______________________________________________________________________________
1471 {
1472  assert(index<nRawCells_);
1473  time_t *ptstmp=(time_t*)((unsigned long)this+evtTimeStampOffset_);
1474  ptstmp+=index;
1475  lock();
1476  *ptstmp=timeStamp;
1477  unlock();
1478  return true;
1479 }
1480 
1481 
1482 //______________________________________________________________________________
1484 {
1485  lock();
1486  assert(nClients_<nClientsMax_);
1487  pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
1488  for (unsigned int i=0;i<nClients_;i++) {
1489  if ((*prcid)==prcId) { unlock(); return false; }
1490  prcid++;
1491  }
1492  nClients_++;
1493  *prcid=prcId;
1494  unlock();
1495  return true;
1496 }
1497 
1498 
1499 //______________________________________________________________________________
1501 {
1502  lock();
1503  pid_t *prcid=(pid_t*)((unsigned long)this+clientPrcIdOffset_);
1504  unsigned int iClient(0);
1505  while (iClient<=nClients_&&(*prcid)!=prcId) { prcid++; iClient++; }
1506  assert(iClient!=nClients_);
1507  pid_t* next=prcid; next++;
1508  while (iClient<nClients_-1) { *prcid=*next; prcid++; next++; iClient++; }
1509  nClients_--;
1510  unlock();
1511  return true;
1512 }
1513 
1514 
1515 //______________________________________________________________________________
1517 {
1518  FUShmRawCell* result(0);
1519 
1520  if (iCell>=nRawCells_) {
1521  cout<<"FUShmBuffer::rawCell("<<iCell<<") ERROR: "
1522  <<"iCell="<<iCell<<" >= nRawCells()="<<nRawCells_<<endl;
1523  return result;
1524  }
1525 
1526  if (segmentationMode_) {
1527  key_t shmkey =rawCellShmKey(iCell);
1528  int shmid =shm_get(shmkey,rawCellTotalSize_);
1529  void* cellAddr=shm_attach(shmid);
1530  result=new (cellAddr) FUShmRawCell(rawCellPayloadSize_);
1531  }
1532  else {
1533  result=
1534  (FUShmRawCell*)((unsigned long)this+rawCellOffset_+iCell*rawCellTotalSize_);
1535  }
1536 
1537  return result;
1538 }
1539 
1540 
1541 //______________________________________________________________________________
1543 {
1544  FUShmRecoCell* result(0);
1545 
1546  if (iCell>=nRecoCells_) {
1547  cout<<"FUShmBuffer::recoCell("<<iCell<<") ERROR: "
1548  <<"iCell="<<iCell<<" >= nRecoCells="<<nRecoCells_<<endl;
1549  return result;
1550  }
1551 
1552  if (segmentationMode_) {
1553  key_t shmkey =recoCellShmKey(iCell);
1554  int shmid =shm_get(shmkey,recoCellTotalSize_);
1555  void* cellAddr=shm_attach(shmid);
1556  result=new (cellAddr) FUShmRecoCell(recoCellPayloadSize_);
1557  }
1558  else {
1559  result=
1560  (FUShmRecoCell*)((unsigned long)this+recoCellOffset_+iCell*recoCellTotalSize_);
1561  }
1562 
1563  return result;
1564 }
1565 
1566 
1567 //______________________________________________________________________________
1569 {
1570  FUShmDqmCell* result(0);
1571 
1572  if (iCell>=nDqmCells_) {
1573  cout<<"FUShmBuffer::dqmCell("<<iCell<<") ERROR: "
1574  <<"iCell="<<iCell<<" >= nDqmCells="<<nDqmCells_<<endl;
1575  return result;
1576  }
1577 
1578  if (segmentationMode_) {
1579  key_t shmkey =dqmCellShmKey(iCell);
1580  int shmid =shm_get(shmkey,dqmCellTotalSize_);
1581  void* cellAddr=shm_attach(shmid);
1582  result=new (cellAddr) FUShmDqmCell(dqmCellPayloadSize_);
1583  }
1584  else {
1585  result=
1586  (FUShmDqmCell*)((unsigned long)this+dqmCellOffset_+iCell*dqmCellTotalSize_);
1587  }
1588 
1589  return result;
1590 }
1591 
1592 
1593 //______________________________________________________________________________
1595 {
1596  assert(index<nRawCells_);
1597  unsigned int *pcount=(unsigned int*)((unsigned long)this+evtDiscardOffset_);
1598  pcount+=index;
1599  lock();
1600  assert(*pcount>0);
1601  --(*pcount);
1602  bool result=(*pcount==0);
1603  unlock();
1604  return result;
1605 }
1606 
1607 
1608 //______________________________________________________________________________
1609 key_t FUShmBuffer::shmKey(unsigned int iCell,unsigned int offset)
1610 {
1611  if (!segmentationMode_) {
1612  cout<<"FUShmBuffer::shmKey() ERROR: only valid in segmentationMode!"<<endl;
1613  return -1;
1614  }
1615  key_t* addr=(key_t*)((unsigned long)this+offset);
1616  for (unsigned int i=0;i<iCell;i++) ++addr;
1617  return *addr;
1618 }
1619 
1620 
1621 //______________________________________________________________________________
1622 key_t FUShmBuffer::rawCellShmKey(unsigned int iCell)
1623 {
1624  if (iCell>=nRawCells_) {
1625  cout<<"FUShmBuffer::rawCellShmKey() ERROR: "
1626  <<"iCell>=nRawCells: "<<iCell<<">="<<nRawCells_<<endl;
1627  return -1;
1628  }
1629  return shmKey(iCell,rawCellOffset_);
1630 }
1631 
1632 
1633 //______________________________________________________________________________
1634 key_t FUShmBuffer::recoCellShmKey(unsigned int iCell)
1635 {
1636  if (iCell>=nRecoCells_) {
1637  cout<<"FUShmBuffer::recoCellShmKey() ERROR: "
1638  <<"iCell>=nRecoCells: "<<iCell<<">="<<nRecoCells_<<endl;
1639  return -1;
1640  }
1641  return shmKey(iCell,recoCellOffset_);
1642 }
1643 
1644 
1645 //______________________________________________________________________________
1646 key_t FUShmBuffer::dqmCellShmKey(unsigned int iCell)
1647 {
1648  if (iCell>=nDqmCells_) {
1649  cout<<"FUShmBuffer::dqmCellShmKey() ERROR: "
1650  <<"iCell>=nDqmCells: "<<iCell<<">="<<nDqmCells_<<endl;
1651  return -1;
1652  }
1653  return shmKey(iCell,dqmCellOffset_);
1654 }
1655 
1656 
1657 //______________________________________________________________________________
1658 void FUShmBuffer::sem_init(int isem,int value)
1659 {
1660  if (semctl(semid(),isem,SETVAL,value)<0) {
1661  cout<<"FUShmBuffer: FATAL ERROR in semaphore initialization."<<endl;
1662  }
1663 }
1664 
1665 
1666 //______________________________________________________________________________
1668 {
1669  struct sembuf sops[1];
1670  sops[0].sem_num=isem;
1671  sops[0].sem_op = -1;
1672  sops[0].sem_flg= 0;
1673  if (semop(semid(),sops,1)==-1) {
1674  cout<<"FUShmBuffer: ERROR in semaphore operation sem_wait."<<endl;
1675  return -1;
1676  }
1677  return 0;
1678 }
1679 
1680 
1681 //______________________________________________________________________________
1683 {
1684  struct sembuf sops[1];
1685  sops[0].sem_num=isem;
1686  sops[0].sem_op = 1;
1687  sops[0].sem_flg= 0;
1688  if (semop(semid(),sops,1)==-1) {
1689  cout<<"FUShmBuffer: ERROR in semaphore operation sem_post."<<endl;
1690  }
1691 }
void discardRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:464
unsigned int clientPrcIdOffset_
Definition: FUShmBuffer.h:303
key_t recoCellShmKey(unsigned int iCell)
FUShmRawCell * rawCellToRead()
Definition: FUShmBuffer.cc:304
FUShmRawCell * rawCellToDiscard()
Definition: FUShmBuffer.cc:352
int incEvtDiscard(unsigned int index)
int i
Definition: DBlmapReader.cc:9
void postRawIndexToRead(unsigned int index)
unsigned int nRecoCells() const
Definition: FUShmBuffer.h:64
void waitRecoWrite()
Definition: FUShmBuffer.h:254
void writeRawLumiSectionEvent(unsigned int)
Definition: FUShmBuffer.cc:550
static int shm_nattch(int shmid)
unsigned int index() const
Definition: FUShmDqmCell.h:22
void initialize(unsigned int index)
bool setEvtPrcId(unsigned int index, pid_t prcId)
unsigned int dqmWriteNext_
Definition: FUShmBuffer.h:287
static void * shm_attach(int shmid)
void postDqmIndexToWrite(unsigned int index)
unsigned int rawReadOffset_
Definition: FUShmBuffer.h:277
void initialize(unsigned int index)
Definition: FUShmDqmCell.cc:46
static unsigned int size(unsigned int payloadSize)
unsigned int nextRecoReadIndex()
unsigned int dqmReadLast_
Definition: FUShmBuffer.h:291
unsigned int recoCellPayloadSize_
Definition: FUShmBuffer.h:313
unsigned int dqmReadOffset_
Definition: FUShmBuffer.h:292
bool writeRecoEventData(unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:661
unsigned int nextIndex(unsigned int offset, unsigned int nCells, unsigned int &iNext)
bool writeDqmEventData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:763
int sem_wait(int isem)
void scheduleRawCellForDiscardServerSide(unsigned int iCell)
Definition: FUShmBuffer.cc:440
unsigned int dqmWriteLast_
Definition: FUShmBuffer.h:288
static FUShmBuffer * getShmBuffer()
Definition: FUShmBuffer.cc:906
int nbRawCellsToRead() const
Definition: FUShmBuffer.cc:283
void printDqmState(unsigned int index)
Definition: FUShmBuffer.cc:836
void releaseRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:506
int shmid() const
Definition: FUShmBuffer.h:67
unsigned int evtDiscardOffset_
Definition: FUShmBuffer.h:295
bool writeRecoInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmBuffer.cc:638
unsigned int nRecoCells_
Definition: FUShmBuffer.h:312
static const char * semKeyPath_
Definition: FUShmBuffer.h:325
unsigned int dqmStateOffset_
Definition: FUShmBuffer.h:299
pid_t clientPrcId(unsigned int index)
static key_t getShmDescriptorKey()
static int shm_get(key_t key, int size)
unsigned int dqmWriteOffset_
Definition: FUShmBuffer.h:289
#define NULL
Definition: scimark2.h:8
unsigned int nRawCells_
Definition: FUShmBuffer.h:305
unsigned int recoWriteLast_
Definition: FUShmBuffer.h:281
static bool releaseSharedMemory()
Definition: FUShmBuffer.cc:960
static int sem_create(key_t key, int nsem)
bool removeClientPrcId(pid_t prcId)
void waitRawDiscard()
Definition: FUShmBuffer.h:250
static unsigned int size(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
FUShmDqmCell * dqmCellToRead()
Definition: FUShmBuffer.cc:339
unsigned int evtNumberOffset_
Definition: FUShmBuffer.h:296
unsigned int dqmCellOffset_
Definition: FUShmBuffer.h:322
evt::State_t evtState(unsigned int index)
void sleep(Duration_t)
Definition: Utils.h:163
void discardDqmCell(unsigned int iCell)
Definition: FUShmBuffer.cc:491
unsigned int evtNumber() const
Definition: FUShmRawCell.h:25
unsigned int nextDqmReadIndex()
time_t evtTimeStamp(unsigned int index)
unsigned int evtNumber(unsigned int index)
unsigned int recoWriteNext_
Definition: FUShmBuffer.h:280
void initialize(unsigned int shmid, unsigned int semid)
Definition: FUShmBuffer.cc:170
void writeRecoEmptyEvent()
Definition: FUShmBuffer.cc:564
bool setEvtNumber(unsigned int index, unsigned int evtNumber)
void postDqmIndexToRead(unsigned int index)
unsigned int evtStateOffset_
Definition: FUShmBuffer.h:294
static unsigned int size(unsigned int payloadSize)
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
void initialize(unsigned int index)
Definition: FUShmRawCell.cc:74
void discardOrphanedRecoCell(unsigned int iCell)
Definition: FUShmBuffer.cc:480
bool setClientPrcId(pid_t prcId)
unsigned int getlbn(const unsigned char *)
void discardRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:456
void postRawIndexToWrite(unsigned int index)
unsigned int nextRawReadIndex()
unsigned char * payloadAddr() const
FUShmRawCell * rawCell(unsigned int iCell)
#define SHM_KEYID
Definition: FUShmBuffer.cc:31
unsigned int recoReadOffset_
Definition: FUShmBuffer.h:285
unsigned int nDqmCells_
Definition: FUShmBuffer.h:319
bool setEvtState(unsigned int index, evt::State_t state)
static const char * shmKeyPath_
Definition: FUShmBuffer.h:324
tuple result
Definition: query.py:137
unsigned int rawWriteNext_
Definition: FUShmBuffer.h:272
pid_t evtPrcId(unsigned int index)
void finishWritingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:372
key_t rawCellShmKey(unsigned int iCell)
static int shm_destroy(int shmid)
unsigned int nRawCells() const
Definition: FUShmBuffer.h:63
FUShmRecoCell * recoCell(unsigned int iCell)
void writeRawEmptyEvent()
Definition: FUShmBuffer.cc:538
bool rawCellReadyForDiscard(unsigned int index)
unsigned int offset(bool)
unsigned char * fedAddr(unsigned int i) const
Definition: FUShmRawCell.cc:94
void postRawDiscard()
Definition: FUShmBuffer.h:251
unsigned int dqmCellPayloadSize_
Definition: FUShmBuffer.h:320
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
Definition: FUShmBuffer.cc:844
void finishReadingDqmCell(FUShmDqmCell *cell)
Definition: FUShmBuffer.cc:410
void scheduleRawEmptyCellForDiscard()
Definition: FUShmBuffer.cc:590
unsigned int recoReadLast_
Definition: FUShmBuffer.h:284
void finishReadingRecoCell(FUShmRecoCell *cell)
Definition: FUShmBuffer.cc:397
unsigned int nextDqmWriteIndex()
void writeErrorEvent(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int fuProcessId, unsigned char *data, unsigned int dataSize)
key_t dqmCellShmKey(unsigned int iCell)
dqm::State_t dqmState(unsigned int index)
static key_t getShmKey()
unsigned int rawReadNext_
Definition: FUShmBuffer.h:275
unsigned int rawCellPayloadSize_
Definition: FUShmBuffer.h:306
unsigned int nextRecoWriteIndex()
unsigned int eventSize() const
unsigned int rawDiscardIndex_
Definition: FUShmBuffer.h:278
#define SEM_KEYID
Definition: FUShmBuffer.cc:32
unsigned int nextRawWriteIndex()
FUShmRawCell * rawCellToWrite()
Definition: FUShmBuffer.cc:290
unsigned int rawReadLast_
Definition: FUShmBuffer.h:276
unsigned int rawWriteOffset_
Definition: FUShmBuffer.h:274
unsigned int dqmCellTotalSize_
Definition: FUShmBuffer.h:321
bool setEvtTimeStamp(unsigned int index, time_t timeStamp)
unsigned int rawWriteLast_
Definition: FUShmBuffer.h:273
void writeDqmEmptyEvent()
Definition: FUShmBuffer.cc:577
unsigned int dqmReadNext_
Definition: FUShmBuffer.h:290
unsigned int recoCellTotalSize_
Definition: FUShmBuffer.h:314
void waitRawDiscarded()
Definition: FUShmBuffer.h:252
void postRecoIndexToRead(unsigned int index)
char state
Definition: procUtils.cc:75
void writeInitMsg(unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
unsigned int index() const
Definition: FUShmRawCell.h:22
void setLumiSection(unsigned int)
unsigned int indexForEvtNumber(unsigned int evtNumber)
static key_t getSemKey()
void printEvtState(unsigned int index)
Definition: FUShmBuffer.cc:814
void postRawDiscarded()
Definition: FUShmBuffer.h:253
unsigned int nClientsMax_
Definition: FUShmBuffer.h:302
list key
Definition: combine.py:13
static int sem_get(key_t key, int nsem)
void sem_post(int isem)
FUShmRecoCell * recoCellToRead()
Definition: FUShmBuffer.cc:323
int nbRawCellsToWrite() const
Definition: FUShmBuffer.cc:276
unsigned int rawCellTotalSize_
Definition: FUShmBuffer.h:307
FUShmBuffer(bool segmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize, unsigned int recoCellSize, unsigned int dqmCellSize)
Definition: FUShmBuffer.cc:53
void writeData(unsigned int runNumber, unsigned int evtAtUpdate, unsigned int folderId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
Definition: FUShmDqmCell.cc:69
static int sem_destroy(int semid)
#define SHM_DESCRIPTOR_KEYID
Definition: FUShmBuffer.cc:30
tuple cout
Definition: gather_cfg.py:41
static int shm_create(key_t key, int size)
unsigned int nDqmCells() const
Definition: FUShmBuffer.h:65
unsigned int fedSize(unsigned int i) const
Definition: FUShmRawCell.cc:82
void sem_init(int isem, int value)
unsigned int evtPrcIdOffset_
Definition: FUShmBuffer.h:297
unsigned int recoWriteOffset_
Definition: FUShmBuffer.h:282
void writeEventData(unsigned int rawCellIndex, unsigned int runNumber, unsigned int evtNumber, unsigned int outModId, unsigned int fuProcessId, unsigned int fuGuid, unsigned char *data, unsigned int dataSize)
static unsigned int size(unsigned int payloadSize)
Definition: FUShmDqmCell.cc:97
void postRecoWrite()
Definition: FUShmBuffer.h:255
unsigned int nClients_
Definition: FUShmBuffer.h:301
unsigned int rawCellOffset_
Definition: FUShmBuffer.h:308
unsigned int recoReadNext_
Definition: FUShmBuffer.h:283
bool writeErrorEventData(unsigned int runNumber, unsigned int fuProcessId, unsigned int iRawCell)
Definition: FUShmBuffer.cc:693
bool setEvtDiscard(unsigned int index, unsigned int discard)
key_t shmKey(unsigned int iCell, unsigned int offset)
void finishReadingRawCell(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:385
tuple size
Write out results.
unsigned int recoCellOffset_
Definition: FUShmBuffer.h:315
std::string timeStamp(TimePoint_t)
Definition: Utils.cc:23
void postRecoIndexToWrite(unsigned int index)
bool setDqmState(unsigned int index, dqm::State_t state)
void postIndex(unsigned int index, unsigned int offset, unsigned int nCells, unsigned int &iLast)
FUShmDqmCell * dqmCell(unsigned int iCell)
void scheduleRawEmptyCellForDiscardServerSide(FUShmRawCell *cell)
Definition: FUShmBuffer.cc:620
unsigned int evtTimeStampOffset_
Definition: FUShmBuffer.h:298
void scheduleRawCellForDiscard(unsigned int iCell)
Definition: FUShmBuffer.cc:420
int semid() const
Definition: FUShmBuffer.h:68