Go to the documentation of this file.00001
00003
00004 #include <EventFilter/StorageManager/interface/Exception.h>
00005 #include <EventFilter/StorageManager/interface/FileHandler.h>
00006 #include <EventFilter/StorageManager/interface/I2OChain.h>
00007
00008 #include <FWCore/MessageLogger/interface/MessageLogger.h>
00009 #include <FWCore/Version/interface/GetReleaseVersion.h>
00010
00011 #include <errno.h>
00012 #include <iostream>
00013 #include <sstream>
00014 #include <fstream>
00015 #include <iomanip>
00016 #include <cstdio>
00017 #include <sys/stat.h>
00018 #include <string.h>
00019
00020 namespace stor {
00021
00022 FileHandler::FileHandler
00023 (
00024 FilesMonitorCollection::FileRecordPtr fileRecord,
00025 const DbFileHandlerPtr dbFileHandler,
00026 const DiskWritingParams& dwParams,
00027 const uint64_t& maxFileSize
00028 ):
00029 fileRecord_(fileRecord),
00030 dbFileHandler_(dbFileHandler),
00031 firstEntry_(utils::getCurrentTime()),
00032 lastEntry_(firstEntry_),
00033 diskWritingParams_(dwParams),
00034 maxFileSize_(maxFileSize),
00035 cmsver_(edm::getReleaseVersion()),
00036 adler_(0)
00037 {
00038
00039 if(cmsver_[0]=='"') cmsver_=cmsver_.substr(1,cmsver_.size()-2);
00040
00041 checkDirectories();
00042
00043 insertFileInDatabase();
00044 }
00045
00046 void FileHandler::writeEvent(const I2OChain& event)
00047 {
00048 if ( ! fileRecord_->isOpen )
00049 {
00050 std::ostringstream msg;
00051 msg << "Tried to write an event to "
00052 << fileRecord_->completeFileName()
00053 << "which has already been closed.";
00054 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00055 }
00056
00057 do_writeEvent(event);
00058
00059 fileRecord_->fileSize += event.totalDataSize();
00060 ++fileRecord_->eventCount;
00061 lastEntry_ = utils::getCurrentTime();
00062 }
00063
00064
00066
00068
00069 void FileHandler::writeToSummaryCatalog() const
00070 {
00071 std::ostringstream currentStat;
00072 std::string ind(":");
00073 currentStat
00074 << fileRecord_->filePath() << ind
00075 << fileRecord_->fileName() << ind
00076 << fileSize() << ind
00077 << events() << ind
00078 << utils::timeStamp(lastEntry_) << ind
00079 << (lastEntry_ - firstEntry_).total_seconds() << ind
00080 << fileRecord_->whyClosed << std::endl;
00081 std::string currentStatString (currentStat.str());
00082 std::ofstream of(diskWritingParams_.fileCatalog_.c_str(), std::ios_base::ate | std::ios_base::out | std::ios_base::app );
00083 of << currentStatString;
00084 of.close();
00085 }
00086
00087
00088 void FileHandler::updateDatabase() const
00089 {
00090 std::ostringstream oss;
00091 oss << "./closeFile.pl "
00092 << " --FILENAME " << fileRecord_->fileName()
00093 << " --FILECOUNTER " << fileRecord_->fileCounter
00094 << " --NEVENTS " << events()
00095 << " --FILESIZE " << fileSize()
00096 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00097 << " --STOPTIME " << utils::secondsSinceEpoch(lastEntry_)
00098 << " --STATUS " << "closed"
00099 << " --RUNNUMBER " << fileRecord_->runNumber
00100 << " --LUMISECTION " << fileRecord_->lumiSection
00101 << " --PATHNAME " << fileRecord_->filePath()
00102 << " --HOSTNAME " << diskWritingParams_.hostName_
00103 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00104 << " --STREAM " << fileRecord_->streamLabel
00105 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00106 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00107 << " --APPVERSION " << cmsver_
00108 << " --APPNAME CMSSW"
00109 << " --TYPE streamer"
00110 << " --DEBUGCLOSE " << fileRecord_->whyClosed
00111 << " --CHECKSUM " << std::hex << adler_
00112 << " --CHECKSUMIND " << std::hex << 0
00113 << "\n";
00114
00115 dbFileHandler_->writeOld( lastEntry_, oss.str() );
00116 }
00117
00118
00119 void FileHandler::insertFileInDatabase() const
00120 {
00121 std::ostringstream oss;
00122 oss << "./insertFile.pl "
00123 << " --FILENAME " << fileRecord_->fileName()
00124 << " --FILECOUNTER " << fileRecord_->fileCounter
00125 << " --NEVENTS " << events()
00126 << " --FILESIZE " << fileSize()
00127 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00128 << " --STOPTIME 0"
00129 << " --STATUS open"
00130 << " --RUNNUMBER " << fileRecord_->runNumber
00131 << " --LUMISECTION " << fileRecord_->lumiSection
00132 << " --PATHNAME " << fileRecord_->filePath()
00133 << " --HOSTNAME " << diskWritingParams_.hostName_
00134 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00135 << " --STREAM " << fileRecord_->streamLabel
00136 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00137 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00138 << " --APPVERSION " << cmsver_
00139 << " --APPNAME CMSSW"
00140 << " --TYPE streamer"
00141 << " --CHECKSUM 0"
00142 << " --CHECKSUMIND 0"
00143 << "\n";
00144
00145 dbFileHandler_->writeOld( firstEntry_, oss.str() );
00146 }
00147
00148
00149 bool FileHandler::tooOld(const utils::TimePoint_t currentTime)
00150 {
00151 if (diskWritingParams_.lumiSectionTimeOut_ > boost::posix_time::seconds(0) &&
00152 (currentTime - lastEntry_) > diskWritingParams_.lumiSectionTimeOut_)
00153 {
00154 closeFile(FilesMonitorCollection::FileRecord::timeout);
00155 return true;
00156 }
00157 else
00158 {
00159 return false;
00160 }
00161 }
00162
00163
00164 bool FileHandler::isFromLumiSection(const uint32_t lumiSection)
00165 {
00166 if (lumiSection == fileRecord_->lumiSection)
00167 {
00168 closeFile(FilesMonitorCollection::FileRecord::LSended);
00169 return true;
00170 }
00171 else
00172 {
00173 return false;
00174 }
00175 }
00176
00177
00178 bool FileHandler::tooLarge(const uint64_t& dataSize)
00179 {
00180 if ( ((fileSize() + dataSize) > maxFileSize_) && (events() > 0) )
00181 {
00182 closeFile(FilesMonitorCollection::FileRecord::size);
00183 return true;
00184 }
00185 else
00186 {
00187 return false;
00188 }
00189 }
00190
00191
00192 uint32_t FileHandler::events() const
00193 {
00194 return fileRecord_->eventCount;
00195 }
00196
00197
00198 uint64_t FileHandler::fileSize() const
00199 {
00200 return fileRecord_->fileSize;
00201 }
00202
00203
00205
00207
00208
00209 void FileHandler::moveFileToClosed
00210 (
00211 const FilesMonitorCollection::FileRecord::ClosingReason& reason
00212 )
00213 {
00214 const std::string openFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::open));
00215 const std::string closedFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::closed));
00216
00217 const uint64_t openFileSize = checkFileSizeMatch(openFileName, fileSize());
00218
00219 makeFileReadOnly(openFileName);
00220 try
00221 {
00222 renameFile(openFileName, closedFileName);
00223 }
00224 catch (stor::exception::DiskWriting& e)
00225 {
00226 XCEPT_RETHROW(stor::exception::DiskWriting,
00227 "Could not move streamer file to closed area.", e);
00228 }
00229 fileRecord_->isOpen = false;
00230 fileRecord_->whyClosed = reason;
00231 checkFileSizeMatch(closedFileName, openFileSize);
00232 }
00233
00234
00235 uint64_t FileHandler::checkFileSizeMatch(const std::string& fileName, const uint64_t& size) const
00236 {
00237 #if linux
00238 struct stat64 statBuff;
00239 int statStatus = stat64(fileName.c_str(), &statBuff);
00240 #else
00241 struct stat statBuff;
00242 int statStatus = stat(fileName.c_str(), &statBuff);
00243 #endif
00244 if ( statStatus != 0 )
00245 {
00246 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::inaccessible;
00247 std::ostringstream msg;
00248 msg << "Error checking the status of file "
00249 << fileName
00250 << ": " << strerror(errno);
00251 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00252 }
00253
00254 if ( sizeMismatch(size, statBuff.st_size) )
00255 {
00256 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::truncated;
00257 std::ostringstream msg;
00258 msg << "Found an unexpected file size when trying to move"
00259 << " the file to the closed state. File " << fileName
00260 << " has an actual size of " << statBuff.st_size
00261 << " (" << statBuff.st_blocks << " blocks)"
00262 << " instead of the expected size of " << size
00263 << " (" << (size/512)+1 << " blocks).";
00264 XCEPT_RAISE(stor::exception::FileTruncation, msg.str());
00265 }
00266
00267 return static_cast<uint64_t>(statBuff.st_size);
00268 }
00269
00270
00271 bool FileHandler::sizeMismatch(const uint64_t& initialSize, const uint64_t& finalSize) const
00272 {
00273 double pctDiff = calcPctDiff(initialSize, finalSize);
00274 return (pctDiff > diskWritingParams_.fileSizeTolerance_);
00275 }
00276
00277
00278 void FileHandler::makeFileReadOnly(const std::string& fileName) const
00279 {
00280 int ronly = chmod(fileName.c_str(), S_IREAD|S_IRGRP|S_IROTH);
00281 if (ronly != 0) {
00282 std::ostringstream msg;
00283 msg << "Unable to change permissions of " << fileName
00284 << " to read only: " << strerror(errno);
00285 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00286 }
00287 }
00288
00289
00290 void FileHandler::renameFile
00291 (
00292 const std::string& openFileName,
00293 const std::string& closedFileName
00294 ) const
00295 {
00296 int result = rename( openFileName.c_str(), closedFileName.c_str() );
00297 if (result != 0) {
00298 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00299 std::ostringstream msg;
00300 msg << "Unable to move " << openFileName << " to "
00301 << closedFileName << ": " << strerror(errno);
00302 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00303 }
00304 }
00305
00306
00307 void FileHandler::checkDirectories() const
00308 {
00309 utils::checkDirectory(diskWritingParams_.filePath_);
00310 utils::checkDirectory(fileRecord_->baseFilePath);
00311 utils::checkDirectory(fileRecord_->baseFilePath + "/open");
00312 utils::checkDirectory(fileRecord_->baseFilePath + "/closed");
00313 }
00314
00315
00316 double FileHandler::calcPctDiff(const uint64_t& value1, const uint64_t& value2) const
00317 {
00318 if (value1 == value2) return 0;
00319 uint64_t largerValue = std::max(value1,value2);
00320 uint64_t smallerValue = std::min(value1,value2);
00321 return ( largerValue > 0 ? (largerValue - smallerValue) / largerValue : 0 );
00322 }
00323
00324 }
00325