CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch12/src/EventFilter/ResourceBroker/src/SMProxy.cc

Go to the documentation of this file.
00001 
00002 //
00003 // SMProxy
00004 // -------
00005 //
00006 //            03/20/2007 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00011 
00012 #include "xdaq/Application.h"
00013 
00014 #include "toolbox/mem/Reference.h"
00015 #include "toolbox/mem/MemoryPoolFactory.h"
00016 #include "toolbox/mem/exception/Exception.h"
00017 
00018 #include "i2o/Method.h"
00019 #include "i2o/utils/AddressMap.h"
00020 
00021 #include "xcept/tools.h"
00022 
00023 #include <iostream>
00024 #include <cmath>
00025 
00026 
00027 using namespace std;
00028 using namespace evf;
00029 
00030 
00032 // construction/destruction
00034 
00035 //______________________________________________________________________________
00036 SMProxy::SMProxy(xdaq::ApplicationDescriptor *fuAppDesc,
00037                  xdaq::ApplicationDescriptor *smAppDesc,
00038                  xdaq::ApplicationContext    *fuAppContext,
00039                  toolbox::mem::Pool          *i2oPool)
00040   : log_(fuAppContext->getLogger())
00041   , fuAppDesc_(fuAppDesc)
00042   , smAppDesc_(smAppDesc)
00043   , fuAppContext_(fuAppContext)
00044   , i2oPool_(i2oPool)
00045   , initHeaderSize_(sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME))
00046   , dataHeaderSize_(sizeof(I2O_SM_DATA_MESSAGE_FRAME))
00047   , dqmHeaderSize_(sizeof(I2O_SM_DQM_MESSAGE_FRAME))
00048 {
00049   fuUrl_=fuAppDesc_->getContextDescriptor()->getURL();
00050   if (fuUrl_.size()>=MAX_I2O_SM_URLCHARS)
00051     fuUrl_=fuUrl_.substr(0,MAX_I2O_SM_URLCHARS-1);
00052   
00053   fuClassName_=fuAppDesc_->getClassName();
00054   if (fuClassName_.size()>=MAX_I2O_SM_URLCHARS)
00055     fuClassName_=fuClassName_.substr(0,MAX_I2O_SM_URLCHARS-1);
00056 }
00057 
00058 
00059 //______________________________________________________________________________
00060 SMProxy::~SMProxy()
00061 {
00062 
00063 }
00064 
00065 
00067 // implementation of member functions
00069 
00070 //______________________________________________________________________________
00071 UInt_t SMProxy::sendInitMessage(UInt_t  fuResourceId,
00072                                 UInt_t  outModId,
00073                                 UInt_t  fuProcessId,
00074                                 UInt_t  fuGuid,
00075                                 UChar_t*data,
00076                                 UInt_t  dataSize)
00077   throw (evf::Exception)
00078 {
00079   UInt_t    totalSize=0;
00080   MemRef_t* bufRef   =createFragmentChain(I2O_SM_PREAMBLE,
00081                                           initHeaderSize_,
00082                                           data,
00083                                           dataSize,
00084                                           totalSize);
00085   
00086   I2O_SM_PREAMBLE_MESSAGE_FRAME *msg;
00087   MemRef_t* next=bufRef;
00088   do {
00089     msg=(I2O_SM_PREAMBLE_MESSAGE_FRAME*)next->getDataLocation();
00090     msg->rbBufferID=fuResourceId;
00091     msg->outModID=outModId;
00092     msg->fuProcID=fuProcessId;
00093     msg->fuGUID=fuGuid;
00094   }
00095   while ((next=next->getNextReference()));
00096 
00097   try {
00098     fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00099   }
00100   catch (xdaq::exception::Exception &e) {
00101     string msg="Failed to post INIT Message.";
00102     XCEPT_RETHROW(evf::Exception,msg,e);
00103   }
00104   
00105   return totalSize;
00106 }
00107 
00108 
00109 //______________________________________________________________________________
00110 UInt_t SMProxy::sendDataEvent(UInt_t   fuResourceId,
00111                               UInt_t   runNumber,
00112                               UInt_t   evtNumber,
00113                               UInt_t   outModId,
00114                               UInt_t   fuProcessId,
00115                               UInt_t   fuGuid,
00116                               UChar_t *data,
00117                               UInt_t   dataSize)
00118   throw (evf::Exception)
00119 {
00120   UInt_t    totalSize=0;
00121   MemRef_t* bufRef   =createFragmentChain(I2O_SM_DATA,
00122                                           dataHeaderSize_,
00123                                           data,
00124                                           dataSize,
00125                                           totalSize);
00126   
00127   I2O_SM_DATA_MESSAGE_FRAME *msg;
00128   MemRef_t* next=bufRef;
00129   do {
00130     msg=(I2O_SM_DATA_MESSAGE_FRAME*)next->getDataLocation();
00131     msg->rbBufferID=fuResourceId;
00132     msg->runID   =runNumber;
00133     msg->eventID =evtNumber;
00134     msg->outModID=outModId;
00135     msg->fuProcID=fuProcessId;
00136     msg->fuGUID=fuGuid;
00137   }
00138   while ((next=next->getNextReference()));
00139   
00140   try {
00141     fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00142   }
00143   catch (xdaq::exception::Exception &e) {
00144     string errmsg="Failed to post DATA Message.";
00145     LOG4CPLUS_FATAL(log_,errmsg);
00146     XCEPT_RETHROW(evf::Exception,errmsg,e);
00147   }
00148   
00149   return totalSize;
00150 }
00151 
00152 
00153 //______________________________________________________________________________
00154 UInt_t SMProxy::sendErrorEvent(UInt_t   fuResourceId,
00155                                UInt_t   runNumber,
00156                                UInt_t   evtNumber,
00157                                UInt_t   fuProcessId,
00158                                UInt_t   fuGuid,
00159                                UChar_t *data,
00160                                UInt_t   dataSize)
00161   throw (evf::Exception)
00162 {
00163   UInt_t    totalSize=0;
00164   MemRef_t* bufRef   =createFragmentChain(I2O_SM_ERROR,
00165                                           dataHeaderSize_,
00166                                           data,
00167                                           dataSize,
00168                                           totalSize);
00169   
00170   I2O_SM_DATA_MESSAGE_FRAME *msg;
00171   MemRef_t* next=bufRef;
00172   do {
00173     msg=(I2O_SM_DATA_MESSAGE_FRAME*)next->getDataLocation();
00174     msg->rbBufferID=fuResourceId;
00175     msg->runID   =runNumber;
00176     msg->eventID =evtNumber;
00177     msg->outModID=0xffffffff;
00178     msg->fuProcID=fuProcessId;
00179     msg->fuGUID=fuGuid;
00180   }
00181   while ((next=next->getNextReference()));
00182   
00183   try {
00184     fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00185   }
00186   catch (xdaq::exception::Exception &e) {
00187     string errmsg="Failed to post ERROR Message.";
00188     LOG4CPLUS_FATAL(log_,errmsg);
00189     XCEPT_RETHROW(evf::Exception,errmsg,e);
00190   }
00191   
00192   return totalSize;
00193 }
00194 
00195 
00196 //______________________________________________________________________________
00197 UInt_t SMProxy::sendDqmEvent(UInt_t  fuDqmId,
00198                              UInt_t  runNumber,
00199                              UInt_t  evtAtUpdate,
00200                              UInt_t  folderId,
00201                              UInt_t  fuProcessId,
00202                              UInt_t  fuGuid,
00203                              UChar_t*data,
00204                              UInt_t  dataSize)
00205   throw (evf::Exception)
00206 {
00207   UInt_t    totalSize=0;
00208   MemRef_t* bufRef   =createFragmentChain(I2O_SM_DQM,
00209                                           dqmHeaderSize_,
00210                                           data,
00211                                           dataSize,
00212                                           totalSize);
00213   
00214   I2O_SM_DQM_MESSAGE_FRAME *msg;
00215   MemRef_t* next=bufRef;
00216   do {
00217     msg=(I2O_SM_DQM_MESSAGE_FRAME*)next->getDataLocation();
00218     msg->rbBufferID     =fuDqmId;
00219     msg->runID          =runNumber;
00220     msg->eventAtUpdateID=evtAtUpdate;
00221     msg->folderID       =folderId;
00222     msg->fuProcID       =fuProcessId;
00223     msg->fuGUID         =fuGuid;
00224   }
00225   while ((next=next->getNextReference()));
00226   
00227   try {
00228     fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00229   }
00230   catch (xdaq::exception::Exception &e) {
00231     string errmsg="Failed to post DQM Message.";
00232     LOG4CPLUS_FATAL(log_,errmsg);
00233     XCEPT_RETHROW(evf::Exception,errmsg,e);
00234   }
00235  
00236   return totalSize;
00237 }
00238 
00239 
00241 // implementation of private member functions
00243 
00244 //______________________________________________________________________________
00245 MemRef_t* SMProxy::createFragmentChain(UShort_t i2oFunctionCode,
00246                                        UInt_t   headerSize,
00247                                        UChar_t *data,
00248                                        UInt_t   dataSize,
00249                                        UInt_t  &totalSize)
00250   throw (evf::Exception)
00251 {
00252   totalSize=0;
00253   
00254   UInt_t fragmentDataSizeMax=I2O_MAX_SIZE-headerSize;
00255   UInt_t fragmentCount=(dataSize/fragmentDataSizeMax);
00256   if (dataSize%fragmentDataSizeMax) ++fragmentCount;
00257   
00258   UInt_t currentPosition  =0;
00259   UInt_t remainingDataSize=dataSize;
00260   
00261   MemRef_t *head(0);
00262   MemRef_t *tail(0);
00263   
00264   try {
00265     
00266     for (UInt_t iFragment=0;iFragment<fragmentCount;iFragment++) {
00267 
00268       UInt_t fragmentDataSize=fragmentDataSizeMax;
00269       UInt_t fragmentSize    =fragmentDataSize+headerSize;
00270       
00271       if (remainingDataSize<fragmentDataSizeMax) {
00272         fragmentDataSize=remainingDataSize;
00273         fragmentSize=fragmentDataSize+headerSize;
00274         if (fragmentSize&0x7) fragmentSize = ((fragmentSize >> 3) + 1) << 3;
00275       }
00276       
00277       // allocate the fragment buffer from the pool
00278       toolbox::mem::Reference *bufRef =
00279         toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,fragmentSize);
00280       
00281       // set up pointers to the allocated message buffer
00282       I2O_MESSAGE_FRAME              *stdMsg;
00283       I2O_PRIVATE_MESSAGE_FRAME      *pvtMsg;
00284       I2O_SM_MULTIPART_MESSAGE_FRAME *msg;
00285       
00286       stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00287       pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00288       msg   =(I2O_SM_MULTIPART_MESSAGE_FRAME*)stdMsg;
00289       
00290       stdMsg->VersionOffset   =0;
00291       stdMsg->MsgFlags        =0;  // normal message (not multicast)
00292       stdMsg->MessageSize     =fragmentSize >> 2;
00293       stdMsg->Function        =I2O_PRIVATE_MESSAGE;
00294       stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00295       stdMsg->TargetAddress   =i2o::utils::getAddressMap()->getTid(smAppDesc_);
00296       
00297       pvtMsg->XFunctionCode   =i2oFunctionCode;
00298       pvtMsg->OrganizationID  =XDAQ_ORGANIZATION_ID;
00299 
00300       msg->dataSize           =fragmentDataSize;
00301       msg->hltLocalId         =fuAppDesc_->getLocalId();
00302       msg->hltInstance        =fuAppDesc_->getInstance();
00303       msg->hltTid             =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00304       msg->numFrames          =fragmentCount;
00305       msg->frameCount         =iFragment;
00306       msg->originalSize       =dataSize;
00307       
00308       for (UInt_t i=0;i<fuUrl_.size();i++)
00309       msg->hltURL[i]=fuUrl_[i];
00310       msg->hltURL[fuUrl_.size()]='\0';
00311       
00312       for (UInt_t i=0;i<fuClassName_.size();i++)
00313       msg->hltClassName[i]=fuClassName_[i];
00314       msg->hltClassName[fuClassName_.size()]='\0';
00315 
00316       if (iFragment==0) {
00317         head=bufRef;
00318         tail=bufRef;
00319       }
00320       else {
00321         tail->setNextReference(bufRef);
00322         tail=bufRef;
00323       }
00324 
00325       if (fragmentDataSize!=0) {
00326         UChar_t* targetAddr=(UChar_t*)msg+headerSize;
00327         std::copy(data+currentPosition,
00328                   data+currentPosition+fragmentDataSize,
00329                   targetAddr);
00330       }
00331       
00332       bufRef->setDataSize(fragmentSize);
00333       remainingDataSize-=fragmentDataSize;
00334       currentPosition  +=fragmentDataSize;
00335       totalSize        +=fragmentSize;
00336 
00337     } // for (iFragment ...)
00338   }
00339   catch(toolbox::mem::exception::Exception& e) {
00340     if (0!=head) head->release();
00341     totalSize=0;
00342     string errmsg="Failed to allocate buffer reference.";
00343     LOG4CPLUS_FATAL(log_,errmsg);
00344     XCEPT_RETHROW(evf::Exception,errmsg,e);
00345   }
00346   catch(xdaq::exception::ApplicationDescriptorNotFound& e) {
00347     if (0!=head) head->release();
00348     totalSize=0;
00349     string errmsg="Failed to get tid.";
00350     LOG4CPLUS_FATAL(log_,errmsg);
00351     XCEPT_RETHROW(evf::Exception,errmsg,e);
00352   }
00353   
00354   return head;
00355 }