00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00011
00012 #include "xdaq/Application.h"
00013
00014 #include "toolbox/mem/Reference.h"
00015 #include "toolbox/mem/MemoryPoolFactory.h"
00016 #include "toolbox/mem/exception/Exception.h"
00017
00018 #include "i2o/Method.h"
00019 #include "i2o/utils/AddressMap.h"
00020
00021 #include "xcept/tools.h"
00022
00023 #include <iostream>
00024 #include <cmath>
00025
00026
00027 using namespace std;
00028 using namespace evf;
00029
00030
00032
00034
00035
00036 SMProxy::SMProxy(xdaq::ApplicationDescriptor *fuAppDesc,
00037 xdaq::ApplicationDescriptor *smAppDesc,
00038 xdaq::ApplicationContext *fuAppContext,
00039 toolbox::mem::Pool *i2oPool)
00040 : log_(fuAppContext->getLogger())
00041 , fuAppDesc_(fuAppDesc)
00042 , smAppDesc_(smAppDesc)
00043 , fuAppContext_(fuAppContext)
00044 , i2oPool_(i2oPool)
00045 , initHeaderSize_(sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME))
00046 , dataHeaderSize_(sizeof(I2O_SM_DATA_MESSAGE_FRAME))
00047 , dqmHeaderSize_(sizeof(I2O_SM_DQM_MESSAGE_FRAME))
00048 {
00049 fuUrl_=fuAppDesc_->getContextDescriptor()->getURL();
00050 if (fuUrl_.size()>=MAX_I2O_SM_URLCHARS)
00051 fuUrl_=fuUrl_.substr(0,MAX_I2O_SM_URLCHARS-1);
00052
00053 fuClassName_=fuAppDesc_->getClassName();
00054 if (fuClassName_.size()>=MAX_I2O_SM_URLCHARS)
00055 fuClassName_=fuClassName_.substr(0,MAX_I2O_SM_URLCHARS-1);
00056 }
00057
00058
00059
00060 SMProxy::~SMProxy()
00061 {
00062
00063 }
00064
00065
00067
00069
00070
00071 UInt_t SMProxy::sendInitMessage(UInt_t fuResourceId,
00072 UInt_t outModId,
00073 UInt_t fuProcessId,
00074 UInt_t fuGuid,
00075 UChar_t*data,
00076 UInt_t dataSize)
00077 throw (evf::Exception)
00078 {
00079 UInt_t totalSize=0;
00080 MemRef_t* bufRef =createFragmentChain(I2O_SM_PREAMBLE,
00081 initHeaderSize_,
00082 data,
00083 dataSize,
00084 totalSize);
00085
00086 I2O_SM_PREAMBLE_MESSAGE_FRAME *msg;
00087 MemRef_t* next=bufRef;
00088 do {
00089 msg=(I2O_SM_PREAMBLE_MESSAGE_FRAME*)next->getDataLocation();
00090 msg->rbBufferID=fuResourceId;
00091 msg->outModID=outModId;
00092 msg->fuProcID=fuProcessId;
00093 msg->fuGUID=fuGuid;
00094 }
00095 while ((next=next->getNextReference()));
00096
00097 try {
00098 fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00099 }
00100 catch (xdaq::exception::Exception &e) {
00101 string msg="Failed to post INIT Message.";
00102 XCEPT_RETHROW(evf::Exception,msg,e);
00103 }
00104
00105 return totalSize;
00106 }
00107
00108
00109
00110 UInt_t SMProxy::sendDataEvent(UInt_t fuResourceId,
00111 UInt_t runNumber,
00112 UInt_t evtNumber,
00113 UInt_t outModId,
00114 UInt_t fuProcessId,
00115 UInt_t fuGuid,
00116 UChar_t *data,
00117 UInt_t dataSize)
00118 throw (evf::Exception)
00119 {
00120 UInt_t totalSize=0;
00121 MemRef_t* bufRef =createFragmentChain(I2O_SM_DATA,
00122 dataHeaderSize_,
00123 data,
00124 dataSize,
00125 totalSize);
00126
00127 I2O_SM_DATA_MESSAGE_FRAME *msg;
00128 MemRef_t* next=bufRef;
00129 do {
00130 msg=(I2O_SM_DATA_MESSAGE_FRAME*)next->getDataLocation();
00131 msg->rbBufferID=fuResourceId;
00132 msg->runID =runNumber;
00133 msg->eventID =evtNumber;
00134 msg->outModID=outModId;
00135 msg->fuProcID=fuProcessId;
00136 msg->fuGUID=fuGuid;
00137 }
00138 while ((next=next->getNextReference()));
00139
00140 try {
00141 fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00142 }
00143 catch (xdaq::exception::Exception &e) {
00144 string errmsg="Failed to post DATA Message.";
00145 LOG4CPLUS_FATAL(log_,errmsg);
00146 XCEPT_RETHROW(evf::Exception,errmsg,e);
00147 }
00148
00149 return totalSize;
00150 }
00151
00152
00153
00154 UInt_t SMProxy::sendErrorEvent(UInt_t fuResourceId,
00155 UInt_t runNumber,
00156 UInt_t evtNumber,
00157 UInt_t fuProcessId,
00158 UInt_t fuGuid,
00159 UChar_t *data,
00160 UInt_t dataSize)
00161 throw (evf::Exception)
00162 {
00163 UInt_t totalSize=0;
00164 MemRef_t* bufRef =createFragmentChain(I2O_SM_ERROR,
00165 dataHeaderSize_,
00166 data,
00167 dataSize,
00168 totalSize);
00169
00170 I2O_SM_DATA_MESSAGE_FRAME *msg;
00171 MemRef_t* next=bufRef;
00172 do {
00173 msg=(I2O_SM_DATA_MESSAGE_FRAME*)next->getDataLocation();
00174 msg->rbBufferID=fuResourceId;
00175 msg->runID =runNumber;
00176 msg->eventID =evtNumber;
00177 msg->outModID=0xffffffff;
00178 msg->fuProcID=fuProcessId;
00179 msg->fuGUID=fuGuid;
00180 }
00181 while ((next=next->getNextReference()));
00182
00183 try {
00184 fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00185 }
00186 catch (xdaq::exception::Exception &e) {
00187 string errmsg="Failed to post ERROR Message.";
00188 LOG4CPLUS_FATAL(log_,errmsg);
00189 XCEPT_RETHROW(evf::Exception,errmsg,e);
00190 }
00191
00192 return totalSize;
00193 }
00194
00195
00196
00197 UInt_t SMProxy::sendDqmEvent(UInt_t fuDqmId,
00198 UInt_t runNumber,
00199 UInt_t evtAtUpdate,
00200 UInt_t folderId,
00201 UInt_t fuProcessId,
00202 UInt_t fuGuid,
00203 UChar_t*data,
00204 UInt_t dataSize)
00205 throw (evf::Exception)
00206 {
00207 UInt_t totalSize=0;
00208 MemRef_t* bufRef =createFragmentChain(I2O_SM_DQM,
00209 dqmHeaderSize_,
00210 data,
00211 dataSize,
00212 totalSize);
00213
00214 I2O_SM_DQM_MESSAGE_FRAME *msg;
00215 MemRef_t* next=bufRef;
00216 do {
00217 msg=(I2O_SM_DQM_MESSAGE_FRAME*)next->getDataLocation();
00218 msg->rbBufferID =fuDqmId;
00219 msg->runID =runNumber;
00220 msg->eventAtUpdateID=evtAtUpdate;
00221 msg->folderID =folderId;
00222 msg->fuProcID =fuProcessId;
00223 msg->fuGUID =fuGuid;
00224 }
00225 while ((next=next->getNextReference()));
00226
00227 try {
00228 fuAppContext_->postFrame(bufRef,fuAppDesc_,smAppDesc_);
00229 }
00230 catch (xdaq::exception::Exception &e) {
00231 string errmsg="Failed to post DQM Message.";
00232 LOG4CPLUS_FATAL(log_,errmsg);
00233 XCEPT_RETHROW(evf::Exception,errmsg,e);
00234 }
00235
00236 return totalSize;
00237 }
00238
00239
00241
00243
00244
00245 MemRef_t* SMProxy::createFragmentChain(UShort_t i2oFunctionCode,
00246 UInt_t headerSize,
00247 UChar_t *data,
00248 UInt_t dataSize,
00249 UInt_t &totalSize)
00250 throw (evf::Exception)
00251 {
00252 totalSize=0;
00253
00254 UInt_t fragmentDataSizeMax=I2O_MAX_SIZE-headerSize;
00255 UInt_t fragmentCount=(dataSize/fragmentDataSizeMax);
00256 if (dataSize%fragmentDataSizeMax) ++fragmentCount;
00257
00258 UInt_t currentPosition =0;
00259 UInt_t remainingDataSize=dataSize;
00260
00261 MemRef_t *head(0);
00262 MemRef_t *tail(0);
00263
00264 try {
00265
00266 for (UInt_t iFragment=0;iFragment<fragmentCount;iFragment++) {
00267
00268 UInt_t fragmentDataSize=fragmentDataSizeMax;
00269 UInt_t fragmentSize =fragmentDataSize+headerSize;
00270
00271 if (remainingDataSize<fragmentDataSizeMax) {
00272 fragmentDataSize=remainingDataSize;
00273 fragmentSize=fragmentDataSize+headerSize;
00274 if (fragmentSize&0x7) fragmentSize = ((fragmentSize >> 3) + 1) << 3;
00275 }
00276
00277
00278 toolbox::mem::Reference *bufRef =
00279 toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,fragmentSize);
00280
00281
00282 I2O_MESSAGE_FRAME *stdMsg;
00283 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg;
00284 I2O_SM_MULTIPART_MESSAGE_FRAME *msg;
00285
00286 stdMsg=(I2O_MESSAGE_FRAME*)bufRef->getDataLocation();
00287 pvtMsg=(I2O_PRIVATE_MESSAGE_FRAME*)stdMsg;
00288 msg =(I2O_SM_MULTIPART_MESSAGE_FRAME*)stdMsg;
00289
00290 stdMsg->VersionOffset =0;
00291 stdMsg->MsgFlags =0;
00292 stdMsg->MessageSize =fragmentSize >> 2;
00293 stdMsg->Function =I2O_PRIVATE_MESSAGE;
00294 stdMsg->InitiatorAddress=i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00295 stdMsg->TargetAddress =i2o::utils::getAddressMap()->getTid(smAppDesc_);
00296
00297 pvtMsg->XFunctionCode =i2oFunctionCode;
00298 pvtMsg->OrganizationID =XDAQ_ORGANIZATION_ID;
00299
00300 msg->dataSize =fragmentDataSize;
00301 msg->hltLocalId =fuAppDesc_->getLocalId();
00302 msg->hltInstance =fuAppDesc_->getInstance();
00303 msg->hltTid =i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00304 msg->numFrames =fragmentCount;
00305 msg->frameCount =iFragment;
00306 msg->originalSize =dataSize;
00307
00308 for (UInt_t i=0;i<fuUrl_.size();i++)
00309 msg->hltURL[i]=fuUrl_[i];
00310 msg->hltURL[fuUrl_.size()]='\0';
00311
00312 for (UInt_t i=0;i<fuClassName_.size();i++)
00313 msg->hltClassName[i]=fuClassName_[i];
00314 msg->hltClassName[fuClassName_.size()]='\0';
00315
00316 if (iFragment==0) {
00317 head=bufRef;
00318 tail=bufRef;
00319 }
00320 else {
00321 tail->setNextReference(bufRef);
00322 tail=bufRef;
00323 }
00324
00325 if (fragmentDataSize!=0) {
00326 UChar_t* targetAddr=(UChar_t*)msg+headerSize;
00327 std::copy(data+currentPosition,
00328 data+currentPosition+fragmentDataSize,
00329 targetAddr);
00330 }
00331
00332 bufRef->setDataSize(fragmentSize);
00333 remainingDataSize-=fragmentDataSize;
00334 currentPosition +=fragmentDataSize;
00335 totalSize +=fragmentSize;
00336
00337 }
00338 }
00339 catch(toolbox::mem::exception::Exception& e) {
00340 if (0!=head) head->release();
00341 totalSize=0;
00342 string errmsg="Failed to allocate buffer reference.";
00343 LOG4CPLUS_FATAL(log_,errmsg);
00344 XCEPT_RETHROW(evf::Exception,errmsg,e);
00345 }
00346 catch(xdaq::exception::ApplicationDescriptorNotFound& e) {
00347 if (0!=head) head->release();
00348 totalSize=0;
00349 string errmsg="Failed to get tid.";
00350 LOG4CPLUS_FATAL(log_,errmsg);
00351 XCEPT_RETHROW(evf::Exception,errmsg,e);
00352 }
00353
00354 return head;
00355 }