00001
00002
00003
00004
00005
00006
00008
00009
00010 #include "EventFilter/ResourceBroker/interface/SMProxy.h"
00011
00012 #include "xdaq/Application.h"
00013
00014 #include "toolbox/mem/Reference.h"
00015 #include "toolbox/mem/MemoryPoolFactory.h"
00016 #include "toolbox/mem/exception/Exception.h"
00017
00018 #include "i2o/Method.h"
00019 #include "i2o/utils/AddressMap.h"
00020
00021 #include "xcept/tools.h"
00022
00023 #include <iostream>
00024 #include <cmath>
00025
00026 using namespace std;
00027 using namespace evf;
00028
00030
00032
00033
00034 SMProxy::SMProxy(xdaq::ApplicationDescriptor *fuAppDesc,
00035 xdaq::ApplicationDescriptor *smAppDesc,
00036 xdaq::ApplicationContext *fuAppContext, toolbox::mem::Pool *i2oPool) :
00037 log_(fuAppContext->getLogger()), fuAppDesc_(fuAppDesc),
00038 smAppDesc_(smAppDesc), fuAppContext_(fuAppContext),
00039 i2oPool_(i2oPool),
00040 initHeaderSize_(sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME)),
00041 dataHeaderSize_(sizeof(I2O_SM_DATA_MESSAGE_FRAME)),
00042 dqmHeaderSize_(sizeof(I2O_SM_DQM_MESSAGE_FRAME)) {
00043 fuUrl_ = fuAppDesc_->getContextDescriptor()->getURL();
00044 if (fuUrl_.size() >= MAX_I2O_SM_URLCHARS)
00045 fuUrl_ = fuUrl_.substr(0, MAX_I2O_SM_URLCHARS - 1);
00046
00047 fuClassName_ = fuAppDesc_->getClassName();
00048 if (fuClassName_.size() >= MAX_I2O_SM_URLCHARS)
00049 fuClassName_ = fuClassName_.substr(0, MAX_I2O_SM_URLCHARS - 1);
00050 }
00051
00052
00053 SMProxy::~SMProxy() {
00054
00055 }
00056
00058
00060
00061
00062 UInt_t SMProxy::sendInitMessage(UInt_t fuResourceId, UInt_t outModId,
00063 UInt_t fuProcessId, UInt_t fuGuid, UChar_t*data, UInt_t dataSize,
00064 UInt_t nExpectedEPs) throw (evf::Exception) {
00065 UInt_t totalSize = 0;
00066 MemRef_t* bufRef = createFragmentChain(I2O_SM_PREAMBLE, initHeaderSize_,
00067 data, dataSize, totalSize);
00068
00069 I2O_SM_PREAMBLE_MESSAGE_FRAME *msg;
00070 MemRef_t* next = bufRef;
00071 do {
00072 msg = (I2O_SM_PREAMBLE_MESSAGE_FRAME*) next->getDataLocation();
00073 msg->rbBufferID = fuResourceId;
00074 msg->outModID = outModId;
00075 msg->fuProcID = fuProcessId;
00076 msg->fuGUID = fuGuid;
00077 msg->nExpectedEPs = nExpectedEPs;
00078 } while ((next = next->getNextReference()));
00079
00080 try {
00081 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00082 } catch (xdaq::exception::Exception &e) {
00083 string msg = "Failed to post INIT Message.";
00084 XCEPT_RETHROW(evf::Exception, msg, e);
00085 }
00086
00087 return totalSize;
00088 }
00089
00090
00091 UInt_t SMProxy::sendDataEvent(UInt_t fuResourceId, UInt_t runNumber,
00092 UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid,
00093 UChar_t *data, UInt_t dataSize) throw (evf::Exception) {
00094 UInt_t totalSize = 0;
00095 MemRef_t* bufRef = createFragmentChain(I2O_SM_DATA, dataHeaderSize_, data,
00096 dataSize, totalSize);
00097
00098 I2O_SM_DATA_MESSAGE_FRAME *msg;
00099 MemRef_t* next = bufRef;
00100 do {
00101 msg = (I2O_SM_DATA_MESSAGE_FRAME*) next->getDataLocation();
00102 msg->rbBufferID = fuResourceId;
00103 msg->runID = runNumber;
00104 msg->eventID = evtNumber;
00105 msg->outModID = outModId;
00106 msg->fuProcID = fuProcessId;
00107 msg->fuGUID = fuGuid;
00108 } while ((next = next->getNextReference()));
00109
00110 try {
00111 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00112 } catch (xdaq::exception::Exception &e) {
00113 string errmsg = "Failed to post DATA Message.";
00114 LOG4CPLUS_FATAL(log_, errmsg);
00115 XCEPT_RETHROW(evf::Exception, errmsg, e);
00116 }
00117
00118 return totalSize;
00119 }
00120
00121
00122 UInt_t SMProxy::sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber,
00123 UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data,
00124 UInt_t dataSize) throw (evf::Exception) {
00125 UInt_t totalSize = 0;
00126 MemRef_t* bufRef = createFragmentChain(I2O_SM_ERROR, dataHeaderSize_, data,
00127 dataSize, totalSize);
00128
00129 I2O_SM_DATA_MESSAGE_FRAME *msg;
00130 MemRef_t* next = bufRef;
00131 do {
00132 msg = (I2O_SM_DATA_MESSAGE_FRAME*) next->getDataLocation();
00133 msg->rbBufferID = fuResourceId;
00134 msg->runID = runNumber;
00135 msg->eventID = evtNumber;
00136 msg->outModID = 0xffffffff;
00137 msg->fuProcID = fuProcessId;
00138 msg->fuGUID = fuGuid;
00139 } while ((next = next->getNextReference()));
00140
00141 try {
00142 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00143 } catch (xdaq::exception::Exception &e) {
00144 string errmsg = "Failed to post ERROR Message.";
00145 LOG4CPLUS_FATAL(log_, errmsg);
00146 XCEPT_RETHROW(evf::Exception, errmsg, e);
00147 }
00148
00149 return totalSize;
00150 }
00151
00152
00153 UInt_t SMProxy::sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber,
00154 UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid,
00155 UChar_t*data, UInt_t dataSize) throw (evf::Exception) {
00156 UInt_t totalSize = 0;
00157 MemRef_t* bufRef = createFragmentChain(I2O_SM_DQM, dqmHeaderSize_, data,
00158 dataSize, totalSize);
00159
00160 I2O_SM_DQM_MESSAGE_FRAME *msg;
00161 MemRef_t* next = bufRef;
00162 do {
00163 msg = (I2O_SM_DQM_MESSAGE_FRAME*) next->getDataLocation();
00164 msg->rbBufferID = fuDqmId;
00165 msg->runID = runNumber;
00166 msg->eventAtUpdateID = evtAtUpdate;
00167 msg->folderID = folderId;
00168 msg->fuProcID = fuProcessId;
00169 msg->fuGUID = fuGuid;
00170 } while ((next = next->getNextReference()));
00171
00172 try {
00173 fuAppContext_->postFrame(bufRef, fuAppDesc_, smAppDesc_);
00174 } catch (xdaq::exception::Exception &e) {
00175 string errmsg = "Failed to post DQM Message.";
00176 LOG4CPLUS_FATAL(log_, errmsg);
00177 XCEPT_RETHROW(evf::Exception, errmsg, e);
00178 }
00179
00180 return totalSize;
00181 }
00182
00184
00186
00187
00188 MemRef_t* SMProxy::createFragmentChain(UShort_t i2oFunctionCode,
00189 UInt_t headerSize, UChar_t *data, UInt_t dataSize, UInt_t &totalSize)
00190 throw (evf::Exception) {
00191 totalSize = 0;
00192
00193 UInt_t fragmentDataSizeMax = I2O_MAX_SIZE - headerSize;
00194 UInt_t fragmentCount = (dataSize / fragmentDataSizeMax);
00195 if (dataSize % fragmentDataSizeMax)
00196 ++fragmentCount;
00197
00198 UInt_t currentPosition = 0;
00199 UInt_t remainingDataSize = dataSize;
00200
00201 MemRef_t *head(0);
00202 MemRef_t *tail(0);
00203
00204 try {
00205
00206 for (UInt_t iFragment = 0; iFragment < fragmentCount; iFragment++) {
00207
00208 UInt_t fragmentDataSize = fragmentDataSizeMax;
00209 UInt_t fragmentSize = fragmentDataSize + headerSize;
00210
00211 if (remainingDataSize < fragmentDataSizeMax) {
00212 fragmentDataSize = remainingDataSize;
00213 fragmentSize = fragmentDataSize + headerSize;
00214 if (fragmentSize & 0x7)
00215 fragmentSize = ((fragmentSize >> 3) + 1) << 3;
00216 }
00217
00218
00219 toolbox::mem::Reference *bufRef =
00220 toolbox::mem::getMemoryPoolFactory()->getFrame(i2oPool_,
00221 fragmentSize);
00222
00223
00224 I2O_MESSAGE_FRAME *stdMsg;
00225 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg;
00226 I2O_SM_MULTIPART_MESSAGE_FRAME *msg;
00227
00228 stdMsg = (I2O_MESSAGE_FRAME*) bufRef->getDataLocation();
00229 pvtMsg = (I2O_PRIVATE_MESSAGE_FRAME*) stdMsg;
00230 msg = (I2O_SM_MULTIPART_MESSAGE_FRAME*) stdMsg;
00231
00232 stdMsg->VersionOffset = 0;
00233 stdMsg->MsgFlags = 0;
00234 stdMsg->MessageSize = fragmentSize >> 2;
00235 stdMsg->Function = I2O_PRIVATE_MESSAGE;
00236 stdMsg->InitiatorAddress = i2o::utils::getAddressMap()->getTid(
00237 fuAppDesc_);
00238 stdMsg->TargetAddress = i2o::utils::getAddressMap()->getTid(
00239 smAppDesc_);
00240
00241 pvtMsg->XFunctionCode = i2oFunctionCode;
00242 pvtMsg->OrganizationID = XDAQ_ORGANIZATION_ID;
00243
00244 msg->dataSize = fragmentDataSize;
00245 msg->hltLocalId = fuAppDesc_->getLocalId();
00246 msg->hltInstance = fuAppDesc_->getInstance();
00247 msg->hltTid = i2o::utils::getAddressMap()->getTid(fuAppDesc_);
00248 msg->numFrames = fragmentCount;
00249 msg->frameCount = iFragment;
00250 msg->originalSize = dataSize;
00251
00252 for (UInt_t i = 0; i < fuUrl_.size(); i++)
00253 msg->hltURL[i] = fuUrl_[i];
00254 msg->hltURL[fuUrl_.size()] = '\0';
00255
00256 for (UInt_t i = 0; i < fuClassName_.size(); i++)
00257 msg->hltClassName[i] = fuClassName_[i];
00258 msg->hltClassName[fuClassName_.size()] = '\0';
00259
00260 if (iFragment == 0) {
00261 head = bufRef;
00262 tail = bufRef;
00263 } else {
00264 tail->setNextReference(bufRef);
00265 tail = bufRef;
00266 }
00267
00268 if (fragmentDataSize != 0) {
00269 UChar_t* targetAddr = (UChar_t*) msg + headerSize;
00270 std::copy(data + currentPosition,
00271 data + currentPosition + fragmentDataSize, targetAddr);
00272 }
00273
00274 bufRef->setDataSize(fragmentSize);
00275 remainingDataSize -= fragmentDataSize;
00276 currentPosition += fragmentDataSize;
00277 totalSize += fragmentSize;
00278
00279 }
00280 } catch (toolbox::mem::exception::Exception& e) {
00281 if (0 != head)
00282 head->release();
00283 totalSize = 0;
00284 string errmsg = "Failed to allocate buffer reference.";
00285 LOG4CPLUS_FATAL(log_, errmsg);
00286 XCEPT_RETHROW(evf::Exception, errmsg, e);
00287 } catch (xdaq::exception::ApplicationDescriptorNotFound& e) {
00288 if (0 != head)
00289 head->release();
00290 totalSize = 0;
00291 string errmsg = "Failed to get tid.";
00292 LOG4CPLUS_FATAL(log_, errmsg);
00293 XCEPT_RETHROW(evf::Exception, errmsg, e);
00294 }
00295
00296 return head;
00297 }