00001
00002
00003 #include <EventFilter/StorageManager/interface/EventStreamService.h>
00004 #include <EventFilter/StorageManager/interface/ProgressMarker.h>
00005 #include <EventFilter/StorageManager/interface/Parameter.h>
00006 #include "EventFilter/StorageManager/interface/Configurator.h"
00007 #include "EventFilter/StorageManager/interface/EventOutputService.h"
00008
00009 #include <iostream>
00010 #include <iomanip>
00011 #include <sys/stat.h>
00012 #include <sys/statfs.h>
00013
00014 using namespace edm;
00015 using namespace std;
00016 using boost::shared_ptr;
00017 using stor::ProgressMarker;
00018
00019
00020
00021
00022
00023 EventStreamService::EventStreamService(ParameterSet const& pset, InitMsgView const& view)
00024 {
00025 parameterSet_ = pset;
00026 runNumber_ = 0;
00027 lumiSection_ = 0;
00028 numberOfFileSystems_ = 0;
00029 maxFileSizeInMB_ = 0;
00030 maxSize_ = 0;
00031 highWaterMark_ = 0;
00032 lumiSectionTimeOut_ = 0;
00033 ntotal_ = 0;
00034
00035 saveInitMessage(view);
00036 initializeSelection(view);
00037 setStreamParameter();
00038 }
00039
00040
00041
00042
00043
00044 bool EventStreamService::nextEvent(const uint8 * const bufPtr)
00045 {
00046 ProgressMarker::instance()->processing(true);
00047 EventMsgView view((void *) bufPtr);
00048 if (!acceptEvent(view))
00049 {
00050 ProgressMarker::instance()->processing(false);
00051 return false;
00052 }
00053 runNumber_ = view.run();
00054 lumiSection_ = view.lumi();
00055
00056 shared_ptr<OutputService> outputService = getOutputService(view);
00057 ProgressMarker::instance()->processing(false);
00058
00059 ProgressMarker::instance()->writing(true);
00060 outputService->writeEvent(bufPtr);
00061 ProgressMarker::instance()->writing(false);
00062 return true;
00063 }
00064
00065
00066
00067
00068
00069 void EventStreamService::stop()
00070 {
00071 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00072 boost::shared_ptr<FileRecord> fd(it->first);
00073 fd->setWhyClosed(1);
00074 outputMap_.erase(it++);
00075 fillOutputSummaryClosed(fd);
00076 }
00077 }
00078
00079
00080
00081
00082
00083
00084
00085
00086 void EventStreamService::closeTimedOutFiles(int lumi, double timeoutdiff)
00087 {
00088
00089 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00090
00091 int reason = lumi*100000 + it->first->lumiSection()*10;
00092
00093
00094 if (it->first->lumiSection() == lumi) {
00095 it->first->setWhyClosed(reason);
00096 ++it;
00097 continue;
00098 }
00099
00100 if (it->first->lumiSection() < lumi-1) {
00101 it->first->setWhyClosed(reason+2);
00102 } else if (timeoutdiff > lumiSectionTimeOut_) {
00103 it->first->setWhyClosed(reason+3);
00104 } else {
00105 it->first->setWhyClosed(reason+9);
00106 }
00107
00108 ++it;
00109 }
00110
00111
00112 double currentTime = getCurrentTime();
00113 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00114 if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00115 boost::shared_ptr<FileRecord> fd(it->first);
00116 outputMap_.erase(it++);
00117 fillOutputSummaryClosed(fd);
00118 } else
00119 ++it;
00120 }
00121 }
00122
00123
00124
00125
00126
00127
00128 void EventStreamService::closeTimedOutFiles()
00129 {
00130 double currentTime = getCurrentTime();
00131 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ) {
00132 if (currentTime - it->second->lastEntry() > lumiSectionTimeOut_) {
00133 boost::shared_ptr<FileRecord> fd(it->first);
00134 outputMap_.erase(it++);
00135 fillOutputSummaryClosed(fd);
00136 } else {
00137 ++it;
00138 }
00139 }
00140 }
00141
00142
00143
00144
00145
00146
00147 boost::shared_ptr<OutputService> EventStreamService::getOutputService(EventMsgView const& view)
00148 {
00149 for (OutputMapIterator it = outputMap_.begin(); it != outputMap_.end(); ++it) {
00150 if (it->first->lumiSection() == lumiSection_) {
00151 if (checkEvent(it->first, view))
00152 return it->second;
00153 else {
00154 boost::shared_ptr<FileRecord> fd(it->first);
00155 fd->setWhyClosed(4);
00156 outputMap_.erase(it);
00157 fillOutputSummaryClosed(fd);
00158 break;
00159 }
00160 }
00161 }
00162 return newOutputService();
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172 boost::shared_ptr<OutputService> EventStreamService::newOutputService()
00173 {
00174 boost::shared_ptr<FileRecord> file = generateFileRecord();
00175 InitMsgView view(&saved_initmsg_[0]);
00176
00177 shared_ptr<OutputService> outputService(new EventOutputService(file, view));
00178 outputMap_[file] = outputService;
00179
00180 return outputService;
00181 }
00182
00183
00184
00185
00186
00187
00188 bool EventStreamService::checkEvent(shared_ptr<FileRecord> file, EventMsgView const& view) const
00189 {
00190 if (file->fileSize() + static_cast<long long>(view.size()) > maxSize_ && file->events() > 0)
00191 return false;
00192
00193 return true;
00194 }
00195
00196
00197
00198
00199
00200 void EventStreamService::initializeSelection(InitMsgView const& initView)
00201 {
00202 Strings triggerNameList;
00203 initView.hltTriggerNames(triggerNameList);
00204 eventSelector_.reset(new EventSelector(parameterSet_.getUntrackedParameter("SelectEvents", ParameterSet()),triggerNameList));
00205 }
00206
00207
00208
00209
00210
00211 bool EventStreamService::acceptEvent(EventMsgView const& view)
00212 {
00213 std::vector<unsigned char> hlt_out;
00214 hlt_out.resize(1 + (view.hltCount()-1)/4);
00215 view.hltTriggerBits(&hlt_out[0]);
00216 int num_paths = view.hltCount();
00217 bool rc = (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hlt_out[0], num_paths));
00218 return rc;
00219 }
00220
00221
00222
00223
00224
00225 void EventStreamService::saveInitMessage(InitMsgView const& view)
00226 {
00227 saved_initmsg_.resize(view.size() + 20);
00228 unsigned char* pos = &saved_initmsg_[0];
00229 unsigned char* from = view.startAddress();
00230 unsigned int dsize = view.size();
00231 copy(from,from+dsize,pos);
00232 }
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246 boost::shared_ptr<FileRecord> EventStreamService::generateFileRecord()
00247 {
00248 std::ostringstream oss;
00249 oss << setupLabel_
00250 << "." << setfill('0') << std::setw(8) << runNumber_
00251 << "." << setfill('0') << std::setw(4) << lumiSection_
00252 << "." << streamLabel_
00253 << "." << fileName_
00254 << "." << setfill('0') << std::setw(2) << sourceId_;
00255 string fileName = oss.str();
00256
00257 shared_ptr<FileRecord> fd = shared_ptr<FileRecord>(new FileRecord(lumiSection_, fileName, filePath_));
00258 ++ntotal_;
00259
00260 boost::mutex::scoped_lock sl(list_lock_);
00261 map<string, int>::iterator it = outputSummary_.find(fileName);
00262 if(it==outputSummary_.end()) {
00263 outputSummary_.insert(std::pair<string, int>(fileName,0));
00264 } else {
00265 ++it->second;
00266 fd->setFileCounter(it->second);
00267 }
00268
00269 if (numberOfFileSystems_ > 0)
00270 fd->fileSystem((runNumber_ + atoi(sourceId_.c_str()) + ntotal_) % numberOfFileSystems_);
00271
00272 fd->checkDirectories();
00273 fd->setRunNumber(runNumber_);
00274 fd->setStreamLabel(streamLabel_);
00275 fd->setSetupLabel(setupLabel_);
00276
00277
00278 return fd;
00279 }
00280
00281
00282
00283
00284
00285 void EventStreamService::report(ostream &os, int indentation) const
00286 {
00287 string prefix(indentation, ' ');
00288 os << "\n";
00289 os << prefix << "------------- EventStreamService -------------\n";
00290 os << prefix << "fileName " << fileName_ << "\n";
00291 os << prefix << "filePath " << filePath_ << "\n";
00292 os << prefix << "sourceId " << sourceId_ << "\n";
00293 os << prefix << "setupLabel " << setupLabel_ << "\n";
00294 os << prefix << "streamLabel " << streamLabel_ << "\n";
00295 os << prefix << "maxSize " << maxSize_ << "\n";
00296 os << prefix << "highWaterMark " << highWaterMark_ << "\n";
00297 os << prefix << "lumiSectionTimeOut " << lumiSectionTimeOut_ << "\n";
00298 os << prefix << "no. active files " << outputMap_.size() << "\n";
00299 os << prefix << "no. files " << outputSummary_.size() << "\n";
00300 os << prefix << "-----------------------------------------\n";
00301 }