CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_6_2_7/src/EventFilter/ResourceBroker/src/SMProxy.cc

Go to the documentation of this file.
00001 
00002 //
00003 // SMProxy
00004 // -------
00005 //
00006 //            03/20/2007 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
00008 
00009 
00010 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00011 
00012 #include "xdaq/Application.h"
00013 
00014 #include "toolbox/mem/Reference.h"
00015 #include "toolbox/mem/MemoryPoolFactory.h"
00016 #include "toolbox/mem/exception/Exception.h"
00017 
00018 #include "i2o/Method.h"
00019 #include "i2o/utils/AddressMap.h"
00020 
00021 #include "xcept/tools.h"
00022 
00023 #include <iostream>
00024 #include <cmath>
00025 
00026 using namespace std;
00027 using namespace evf;
00028 
00030 // construction/destruction
00032 
00033 //______________________________________________________________________________
00034 SMProxy::SMProxy(xdaq::ApplicationDescriptor *fuAppDesc,
00035                 xdaq::ApplicationDescriptor *smAppDesc,
00036                 xdaq::ApplicationContext *fuAppContext, toolbox::mem::Pool *i2oPool) :
00037         log_(fuAppContext->getLogger()), fuAppDesc_(fuAppDesc),
00038                         smAppDesc_(smAppDesc), fuAppContext_(fuAppContext),
00039                         i2oPool_(i2oPool),
00040                         initHeaderSize_(sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME)),
00041                         dataHeaderSize_(sizeof(I2O_SM_DATA_MESSAGE_FRAME)),
00042                         dqmHeaderSize_(sizeof(I2O_SM_DQM_MESSAGE_FRAME)) {
00043         fuUrl_ = fuAppDesc_->getContextDescriptor()->getURL();
00044         if (fuUrl_.size() >= MAX_I2O_SM_URLCHARS)
00045                 fuUrl_ = fuUrl_.substr(0, MAX_I2O_SM_URLCHARS - 1);
00046 
00047         fuClassName_ = fuAppDesc_->getClassName();
00048         if (fuClassName_.size() >= MAX_I2O_SM_URLCHARS)
00049                 fuClassName_ = fuClassName_.substr(0, MAX_I2O_SM_URLCHARS - 1);
00050 }
00051 
00052 //______________________________________________________________________________
00053 SMProxy::~SMProxy() {
00054 
00055 }
00056 
00058 // implementation of member functions
00060 
00061 //______________________________________________________________________________
00062 UInt_t SMProxy::sendInitMessage(UInt_t fuResourceId, UInt_t outModId,
00063                 UInt_t fuProcessId, UInt_t fuGuid, UChar_t*data, UInt_t dataSize,
00064                 UInt_t nExpectedEPs) throw (evf::Exception) {
00065         UInt_t totalSize = 0;
00066         MemRef_t* bufRef = createFragmentChain(I2O_SM_PREAMBLE, initHeaderSize_,
00067                         data, dataSize, totalSize);
00068 
00069         I2O_SM_PREAMBLE_MESSAGE_FRAME *msg;
00070         MemRef_t* next = bufRef;
00071         do {
00072                 msg = (I2O_SM_PREAMBLE_MESSAGE_FRAME*) next->getDataLocation();
00073                 msg->rbBufferID = fuResourceId;
00074                 msg->outModID = outModId;
00075                 msg->fuProcID = fuProcessId;
00076                 msg->fuGUID = fuGuid;
00077                 msg->nExpectedEPs = nExpectedEPs;
00078         } while ((next = next->getNextReference()));
00079 
00080         try {
00081                 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00082         } catch (xdaq::exception::Exception &e) {
00083                 string msg = "Failed to post INIT Message.";
00084                 XCEPT_RETHROW(evf::Exception, msg, e);
00085         }
00086 
00087         return totalSize;
00088 }
00089 
00090 //______________________________________________________________________________
00091 UInt_t SMProxy::sendDataEvent(UInt_t fuResourceId, UInt_t runNumber,
00092                 UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid,
00093                 UChar_t *data, UInt_t dataSize) throw (evf::Exception) {
00094         UInt_t totalSize = 0;
00095         MemRef_t* bufRef = createFragmentChain(I2O_SM_DATA, dataHeaderSize_, data,
00096                         dataSize, totalSize);
00097 
00098         I2O_SM_DATA_MESSAGE_FRAME *msg;
00099         MemRef_t* next = bufRef;
00100         do {
00101                 msg = (I2O_SM_DATA_MESSAGE_FRAME*) next->getDataLocation();
00102                 msg->rbBufferID = fuResourceId;
00103                 msg->runID = runNumber;
00104                 msg->eventID = evtNumber;
00105                 msg->outModID = outModId;
00106                 msg->fuProcID = fuProcessId;
00107                 msg->fuGUID = fuGuid;
00108         } while ((next = next->getNextReference()));
00109 
00110         try {
00111                 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00112         } catch (xdaq::exception::Exception &e) {
00113                 string errmsg = "Failed to post DATA Message.";
00114                 LOG4CPLUS_FATAL(log_, errmsg);
00115                 XCEPT_RETHROW(evf::Exception, errmsg, e);
00116         }
00117 
00118         return totalSize;
00119 }
00120 
00121 //______________________________________________________________________________
00122 UInt_t SMProxy::sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber,
00123                 UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data,
00124                 UInt_t dataSize) throw (evf::Exception) {
00125         UInt_t totalSize = 0;
00126         MemRef_t* bufRef = createFragmentChain(I2O_SM_ERROR, dataHeaderSize_, data,
00127                         dataSize, totalSize);
00128 
00129         I2O_SM_DATA_MESSAGE_FRAME *msg;
00130         MemRef_t* next = bufRef;
00131         do {
00132                 msg = (I2O_SM_DATA_MESSAGE_FRAME*) next->getDataLocation();
00133                 msg->rbBufferID = fuResourceId;
00134                 msg->runID = runNumber;
00135                 msg->eventID = evtNumber;
00136                 msg->outModID = 0xffffffff;
00137                 msg->fuProcID = fuProcessId;
00138                 msg->fuGUID = fuGuid;
00139         } while ((next = next->getNextReference()));
00140 
00141         try {
00142                 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00143         } catch (xdaq::exception::Exception &e) {
00144                 string errmsg = "Failed to post ERROR Message.";
00145                 LOG4CPLUS_FATAL(log_, errmsg);
00146                 XCEPT_RETHROW(evf::Exception, errmsg, e);
00147         }
00148 
00149         return totalSize;
00150 }
00151 
00152 //______________________________________________________________________________
00153 UInt_t SMProxy::sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber,
00154                 UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid,
00155                 UChar_t*data, UInt_t dataSize) throw (evf::Exception) {
00156         UInt_t totalSize = 0;
00157         MemRef_t* bufRef = createFragmentChain(I2O_SM_DQM, dqmHeaderSize_, data,
00158                         dataSize, totalSize);
00159 
00160         I2O_SM_DQM_MESSAGE_FRAME *msg;
00161         MemRef_t* next = bufRef;
00162         do {
00163                 msg = (I2O_SM_DQM_MESSAGE_FRAME*) next->getDataLocation();
00164                 msg->rbBufferID = fuDqmId;
00165                 msg->runID = runNumber;
00166                 msg->eventAtUpdateID = evtAtUpdate;
00167                 msg->folderID = folderId;
00168                 msg->fuProcID = fuProcessId;
00169                 msg->fuGUID = fuGuid;
00170         } while ((next = next->getNextReference()));
00171 
00172         try {
00173                 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00174         } catch (xdaq::exception::Exception &e) {
00175                 string errmsg = "Failed to post DQM Message.";
00176                 LOG4CPLUS_FATAL(log_, errmsg);
00177                 XCEPT_RETHROW(evf::Exception, errmsg, e);
00178         }
00179 
00180         return totalSize;
00181 }
00182 
00184 // implementation of private member functions
00186 
00187 //______________________________________________________________________________
00188 MemRef_t* SMProxy::createFragmentChain(UShort_t i2oFunctionCode,
00189                 UInt_t headerSize, UChar_t *data, UInt_t dataSize, UInt_t &totalSize)
00190                 throw (evf::Exception) {
00191         totalSize = 0;
00192 
00193         UInt_t fragmentDataSizeMax = I2O_MAX_SIZE - headerSize;
00194         UInt_t fragmentCount = (dataSize / fragmentDataSizeMax);
00195         if (dataSize % fragmentDataSizeMax)
00196                 ++fragmentCount;
00197 
00198         UInt_t currentPosition = 0;
00199         UInt_t remainingDataSize = dataSize;
00200 
00201         MemRef_t *head(0);
00202         MemRef_t *tail(0);
00203 
00204         try {
00205 
00206                 for (UInt_t iFragment = 0; iFragment < fragmentCount; iFragment++) {
00207 
00208                         UInt_t fragmentDataSize = fragmentDataSizeMax;
00209                         UInt_t fragmentSize = fragmentDataSize + headerSize;
00210 
00211                         if (remainingDataSize < fragmentDataSizeMax) {
00212                                 fragmentDataSize = remainingDataSize;
00213                                 fragmentSize = fragmentDataSize + headerSize;
00214                                 if (fragmentSize & 0x7)
00215                                         fragmentSize = ((fragmentSize >> 3) + 1) << 3;
00216                         }
00217 
00218                         // allocate the fragment buffer from the pool
00219                         toolbox::mem::Reference *bufRef =
00220                                         toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00221                                                         fragmentSize);
00222 
00223                         // set up pointers to the allocated message buffer
00224                         I2O_MESSAGE_FRAME *stdMsg;
00225                         I2O_PRIVATE_MESSAGE_FRAME *pvtMsg;
00226                         I2O_SM_MULTIPART_MESSAGE_FRAME *msg;
00227 
00228                         stdMsg = (I2O_MESSAGE_FRAME*) bufRef->getDataLocation();
00229                         pvtMsg = (I2O_PRIVATE_MESSAGE_FRAME*) stdMsg;
00230                         msg = (I2O_SM_MULTIPART_MESSAGE_FRAME*) stdMsg;
00231 
00232                         stdMsg->VersionOffset = 0;
00233                         stdMsg->MsgFlags = 0; // normal message (not multicast)
00234                         stdMsg->MessageSize = fragmentSize >> 2;
00235                         stdMsg->Function = I2O_PRIVATE_MESSAGE;
00236                         stdMsg->InitiatorAddress = i2o::utils::getAddressMap()->getTid(
00237                                         fuAppDesc_);
00238                         stdMsg->TargetAddress = i2o::utils::getAddressMap()->getTid(
00239                                         smAppDesc_);
00240 
00241                         pvtMsg->XFunctionCode = i2oFunctionCode;
00242                         pvtMsg->OrganizationID = XDAQ_ORGANIZATION_ID;
00243 
00244                         msg->dataSize = fragmentDataSize;
00245                         msg->hltLocalId = fuAppDesc_->getLocalId();
00246                         msg->hltInstance = fuAppDesc_->getInstance();
00247                         msg->hltTid = i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00248                         msg->numFrames = fragmentCount;
00249                         msg->frameCount = iFragment;
00250                         msg->originalSize = dataSize;
00251 
00252                         for (UInt_t i = 0; i < fuUrl_.size(); i++)
00253                                 msg->hltURL[i] = fuUrl_[i];
00254                         msg->hltURL[fuUrl_.size()] = '\0';
00255 
00256                         for (UInt_t i = 0; i < fuClassName_.size(); i++)
00257                                 msg->hltClassName[i] = fuClassName_[i];
00258                         msg->hltClassName[fuClassName_.size()] = '\0';
00259 
00260                         if (iFragment == 0) {
00261                                 head = bufRef;
00262                                 tail = bufRef;
00263                         } else {
00264                                 tail->setNextReference(bufRef);
00265                                 tail = bufRef;
00266                         }
00267 
00268                         if (fragmentDataSize != 0) {
00269                                 UChar_t* targetAddr = (UChar_t*) msg + headerSize;
00270                                 std::copy(data + currentPosition,
00271                                                 data + currentPosition + fragmentDataSize, targetAddr);
00272                         }
00273 
00274                         bufRef->setDataSize(fragmentSize);
00275                         remainingDataSize -= fragmentDataSize;
00276                         currentPosition += fragmentDataSize;
00277                         totalSize += fragmentSize;
00278 
00279                 } // for (iFragment ...)
00280         } catch (toolbox::mem::exception::Exception& e) {
00281                 if (0 != head)
00282                         head->release();
00283                 totalSize = 0;
00284                 string errmsg = "Failed to allocate buffer reference.";
00285                 LOG4CPLUS_FATAL(log_, errmsg);
00286                 XCEPT_RETHROW(evf::Exception, errmsg, e);
00287         } catch (xdaq::exception::ApplicationDescriptorNotFound& e) {
00288                 if (0 != head)
00289                         head->release();
00290                 totalSize = 0;
00291                 string errmsg = "Failed to get tid.";
00292                 LOG4CPLUS_FATAL(log_, errmsg);
00293                 XCEPT_RETHROW(evf::Exception, errmsg, e);
00294         }
00295 
00296         return head;
00297 }