Go to the documentation of this file.00001
00003
00004 #include <EventFilter/StorageManager/interface/Exception.h>
00005 #include <EventFilter/StorageManager/interface/FileHandler.h>
00006 #include <EventFilter/StorageManager/interface/I2OChain.h>
00007
00008 #include <FWCore/MessageLogger/interface/MessageLogger.h>
00009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00010 #include <FWCore/Version/interface/GetReleaseVersion.h>
00011
00012 #include "boost/shared_array.hpp"
00013
00014 #include <errno.h>
00015 #include <iostream>
00016 #include <sstream>
00017 #include <fstream>
00018 #include <iomanip>
00019 #include <cstdio>
00020 #include <sys/stat.h>
00021 #include <string.h>
00022
00023 namespace stor {
00024
00025 FileHandler::FileHandler
00026 (
00027 FilesMonitorCollection::FileRecordPtr fileRecord,
00028 const DbFileHandlerPtr dbFileHandler,
00029 const uint64_t& maxFileSize
00030 ):
00031 fileRecord_(fileRecord),
00032 dbFileHandler_(dbFileHandler),
00033 firstEntry_(utils::getCurrentTime()),
00034 lastEntry_(firstEntry_),
00035 diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00036 maxFileSize_(maxFileSize),
00037 cmsver_(edm::getReleaseVersion())
00038 {
00039
00040 if(cmsver_[0]=='"') cmsver_=cmsver_.substr(1,cmsver_.size()-2);
00041
00042 checkDirectories();
00043
00044 insertFileInDatabase();
00045 }
00046
00047 void FileHandler::writeEvent(const I2OChain& event)
00048 {
00049 if ( ! fileRecord_->isOpen )
00050 {
00051 std::ostringstream msg;
00052 msg << "Tried to write an event to "
00053 << fileRecord_->completeFileName()
00054 << "which has already been closed.";
00055 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00056 }
00057
00058 do_writeEvent(event);
00059
00060 fileRecord_->fileSize += event.totalDataSize();
00061 ++fileRecord_->eventCount;
00062 lastEntry_ = utils::getCurrentTime();
00063 }
00064
00065
00067
00069
00070 void FileHandler::updateDatabase() const
00071 {
00072 std::ostringstream oss;
00073 oss << "./closeFile.pl "
00074 << " --FILENAME " << fileRecord_->fileName()
00075 << " --FILECOUNTER " << fileRecord_->fileCounter
00076 << " --NEVENTS " << events()
00077 << " --FILESIZE " << fileSize()
00078 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00079 << " --STOPTIME " << utils::secondsSinceEpoch(lastEntry_)
00080 << " --STATUS " << "closed"
00081 << " --RUNNUMBER " << fileRecord_->runNumber
00082 << " --LUMISECTION " << fileRecord_->lumiSection
00083 << " --PATHNAME " << fileRecord_->filePath()
00084 << " --HOSTNAME " << diskWritingParams_.hostName_
00085 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00086 << " --STREAM " << fileRecord_->streamLabel
00087 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00088 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00089 << " --APPVERSION " << cmsver_
00090 << " --APPNAME CMSSW"
00091 << " --TYPE streamer"
00092 << " --DEBUGCLOSE " << fileRecord_->whyClosed
00093 << " --CHECKSUM " << std::hex << fileRecord_->adler32
00094 << " --CHECKSUMIND 0";
00095
00096 dbFileHandler_->writeOld( lastEntry_, oss.str() );
00097 }
00098
00099
00100 void FileHandler::insertFileInDatabase() const
00101 {
00102 std::ostringstream oss;
00103 oss << "./insertFile.pl "
00104 << " --FILENAME " << fileRecord_->fileName()
00105 << " --FILECOUNTER " << fileRecord_->fileCounter
00106 << " --NEVENTS " << events()
00107 << " --FILESIZE " << fileSize()
00108 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00109 << " --STOPTIME 0"
00110 << " --STATUS open"
00111 << " --RUNNUMBER " << fileRecord_->runNumber
00112 << " --LUMISECTION " << fileRecord_->lumiSection
00113 << " --PATHNAME " << fileRecord_->filePath()
00114 << " --HOSTNAME " << diskWritingParams_.hostName_
00115 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00116 << " --STREAM " << fileRecord_->streamLabel
00117 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00118 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00119 << " --APPVERSION " << cmsver_
00120 << " --APPNAME CMSSW"
00121 << " --TYPE streamer"
00122 << " --CHECKSUM 0"
00123 << " --CHECKSUMIND 0";
00124
00125 dbFileHandler_->writeOld( firstEntry_, oss.str() );
00126 }
00127
00128
00129 bool FileHandler::tooOld(const utils::TimePoint_t currentTime)
00130 {
00131 if (diskWritingParams_.lumiSectionTimeOut_ > boost::posix_time::seconds(0) &&
00132 (currentTime - lastEntry_) > diskWritingParams_.lumiSectionTimeOut_)
00133 {
00134 closeFile(FilesMonitorCollection::FileRecord::timeout);
00135 return true;
00136 }
00137 else
00138 {
00139 return false;
00140 }
00141 }
00142
00143
00144 bool FileHandler::isFromLumiSection(const uint32_t lumiSection)
00145 {
00146 if (lumiSection == fileRecord_->lumiSection)
00147 {
00148 closeFile(FilesMonitorCollection::FileRecord::LSended);
00149 return true;
00150 }
00151 else
00152 {
00153 return false;
00154 }
00155 }
00156
00157
00158 bool FileHandler::tooLarge(const uint64_t& dataSize)
00159 {
00160 if ( ((fileSize() + dataSize) > maxFileSize_) && (events() > 0) )
00161 {
00162 closeFile(FilesMonitorCollection::FileRecord::size);
00163 return true;
00164 }
00165 else
00166 {
00167 return false;
00168 }
00169 }
00170
00171
00172 uint32_t FileHandler::events() const
00173 {
00174 return fileRecord_->eventCount;
00175 }
00176
00177
00178 uint64_t FileHandler::fileSize() const
00179 {
00180 return fileRecord_->fileSize;
00181 }
00182
00183
00185
00187
00188
00189 void FileHandler::moveFileToClosed
00190 (
00191 const FilesMonitorCollection::FileRecord::ClosingReason& reason
00192 )
00193 {
00194 const std::string openFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::open));
00195 const std::string closedFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::closed));
00196
00197 const uint64_t openFileSize = checkFileSizeMatch(openFileName, fileSize());
00198
00199 makeFileReadOnly(openFileName);
00200 try
00201 {
00202 renameFile(openFileName, closedFileName);
00203 }
00204 catch (stor::exception::DiskWriting& e)
00205 {
00206 XCEPT_RETHROW(stor::exception::DiskWriting,
00207 "Could not move streamer file to closed area.", e);
00208 }
00209 fileRecord_->isOpen = false;
00210 fileRecord_->whyClosed = reason;
00211 checkFileSizeMatch(closedFileName, openFileSize);
00212 checkAdler32(closedFileName);
00213 }
00214
00215
00216 uint64_t FileHandler::checkFileSizeMatch(const std::string& fileName, const uint64_t& size) const
00217 {
00218 #if linux
00219 struct stat64 statBuff;
00220 int statStatus = stat64(fileName.c_str(), &statBuff);
00221 #else
00222 struct stat statBuff;
00223 int statStatus = stat(fileName.c_str(), &statBuff);
00224 #endif
00225 if ( statStatus != 0 )
00226 {
00227 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::inaccessible;
00228 std::ostringstream msg;
00229 msg << "Error checking the status of file "
00230 << fileName
00231 << ": " << strerror(errno);
00232 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00233 }
00234
00235 if ( sizeMismatch(size, statBuff.st_size) )
00236 {
00237 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::truncated;
00238 std::ostringstream msg;
00239 msg << "Found an unexpected file size when trying to move"
00240 << " the file to the closed state. File " << fileName
00241 << " has an actual size of " << statBuff.st_size
00242 << " (" << statBuff.st_blocks << " blocks)"
00243 << " instead of the expected size of " << size
00244 << " (" << (size/512)+1 << " blocks).";
00245 XCEPT_RAISE(stor::exception::FileTruncation, msg.str());
00246 }
00247
00248 return static_cast<uint64_t>(statBuff.st_size);
00249 }
00250
00251
00252 bool FileHandler::sizeMismatch(const uint64_t& initialSize, const uint64_t& finalSize) const
00253 {
00254 double pctDiff = calcPctDiff(initialSize, finalSize);
00255 return (pctDiff > diskWritingParams_.fileSizeTolerance_);
00256 }
00257
00258
00259 void FileHandler::makeFileReadOnly(const std::string& fileName) const
00260 {
00261 int ronly = chmod(fileName.c_str(), S_IREAD|S_IRGRP|S_IROTH);
00262 if (ronly != 0) {
00263 std::ostringstream msg;
00264 msg << "Unable to change permissions of " << fileName
00265 << " to read only: " << strerror(errno);
00266 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00267 }
00268 }
00269
00270
00271 void FileHandler::checkAdler32(const std::string& fileName) const
00272 {
00273 if (!diskWritingParams_.checkAdler32_) return;
00274
00275 std::ifstream file(fileName.c_str(), std::ios::in | std::ios::binary | std::ios::ate);
00276 if (!file.is_open())
00277 {
00278 std::ostringstream msg;
00279 msg << "File " << fileName << " could not be opened.\n";
00280 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00281 }
00282
00283 std::ifstream::pos_type size = file.tellg();
00284 file.seekg (0, std::ios::beg);
00285
00286 boost::shared_array<char> ptr(new char[1024*1024]);
00287 uint32_t a = 1, b = 0;
00288
00289 std::ifstream::pos_type rsize = 0;
00290 while(rsize < size)
00291 {
00292 file.read(ptr.get(), 1024*1024);
00293 rsize += 1024*1024;
00294 if(file.gcount())
00295 cms::Adler32(ptr.get(), file.gcount(), a, b);
00296 else
00297 break;
00298 }
00299 file.close();
00300
00301 const uint32 adler = (b << 16) | a;
00302 if (adler != fileRecord_->adler32)
00303 {
00304 std::ostringstream msg;
00305 msg << "Disk resident file " << fileName <<
00306 " has Adler32 checksum " << std::hex << adler <<
00307 ", while the expected checkum is " << std::hex << fileRecord_->adler32;
00308 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00309 }
00310 }
00311
00312
00313 void FileHandler::renameFile
00314 (
00315 const std::string& openFileName,
00316 const std::string& closedFileName
00317 ) const
00318 {
00319 int result = rename( openFileName.c_str(), closedFileName.c_str() );
00320 if (result != 0) {
00321 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00322 std::ostringstream msg;
00323 msg << "Unable to move " << openFileName << " to "
00324 << closedFileName << ": " << strerror(errno);
00325 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00326 }
00327 }
00328
00329
00330 void FileHandler::checkDirectories() const
00331 {
00332 utils::checkDirectory(diskWritingParams_.filePath_);
00333 utils::checkDirectory(fileRecord_->baseFilePath);
00334 utils::checkDirectory(fileRecord_->baseFilePath + "/open");
00335 utils::checkDirectory(fileRecord_->baseFilePath + "/closed");
00336 }
00337
00338
00339 double FileHandler::calcPctDiff(const uint64_t& value1, const uint64_t& value2) const
00340 {
00341 if (value1 == value2) return 0;
00342 uint64_t largerValue = std::max(value1,value2);
00343 uint64_t smallerValue = std::min(value1,value2);
00344 return ( largerValue > 0 ? (largerValue - smallerValue) / largerValue : 0 );
00345 }
00346
00347 }
00348