Go to the documentation of this file.00001
00003
00004 #include <EventFilter/StorageManager/interface/Exception.h>
00005 #include <EventFilter/StorageManager/interface/FileHandler.h>
00006 #include <EventFilter/StorageManager/interface/I2OChain.h>
00007
00008 #include <FWCore/MessageLogger/interface/MessageLogger.h>
00009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00010 #include <FWCore/Version/interface/GetReleaseVersion.h>
00011
00012 #include "boost/shared_array.hpp"
00013
00014 #include <errno.h>
00015 #include <iostream>
00016 #include <sstream>
00017 #include <fstream>
00018 #include <iomanip>
00019 #include <cstdio>
00020 #include <sys/stat.h>
00021 #include <string.h>
00022
00023 namespace stor {
00024
00025 FileHandler::FileHandler
00026 (
00027 FilesMonitorCollection::FileRecordPtr fileRecord,
00028 const DbFileHandlerPtr dbFileHandler,
00029 const DiskWritingParams& dwParams,
00030 const uint64_t& maxFileSize
00031 ):
00032 fileRecord_(fileRecord),
00033 dbFileHandler_(dbFileHandler),
00034 firstEntry_(utils::getCurrentTime()),
00035 lastEntry_(firstEntry_),
00036 diskWritingParams_(dwParams),
00037 maxFileSize_(maxFileSize),
00038 cmsver_(edm::getReleaseVersion())
00039 {
00040
00041 if(cmsver_[0]=='"') cmsver_=cmsver_.substr(1,cmsver_.size()-2);
00042
00043 checkDirectories();
00044
00045 insertFileInDatabase();
00046 }
00047
00048 void FileHandler::writeEvent(const I2OChain& event)
00049 {
00050 if ( ! fileRecord_->isOpen )
00051 {
00052 std::ostringstream msg;
00053 msg << "Tried to write an event to "
00054 << fileRecord_->completeFileName()
00055 << "which has already been closed.";
00056 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00057 }
00058
00059 do_writeEvent(event);
00060
00061 fileRecord_->fileSize += event.totalDataSize();
00062 ++fileRecord_->eventCount;
00063 lastEntry_ = utils::getCurrentTime();
00064 }
00065
00066
00068
00070
00071 void FileHandler::updateDatabase() const
00072 {
00073 std::ostringstream oss;
00074 oss << "./closeFile.pl "
00075 << " --FILENAME " << fileRecord_->fileName()
00076 << " --FILECOUNTER " << fileRecord_->fileCounter
00077 << " --NEVENTS " << events()
00078 << " --FILESIZE " << fileSize()
00079 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00080 << " --STOPTIME " << utils::secondsSinceEpoch(lastEntry_)
00081 << " --STATUS " << "closed"
00082 << " --RUNNUMBER " << fileRecord_->runNumber
00083 << " --LUMISECTION " << fileRecord_->lumiSection
00084 << " --PATHNAME " << fileRecord_->filePath()
00085 << " --HOSTNAME " << diskWritingParams_.hostName_
00086 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00087 << " --STREAM " << fileRecord_->streamLabel
00088 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00089 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00090 << " --APPVERSION " << cmsver_
00091 << " --APPNAME CMSSW"
00092 << " --TYPE streamer"
00093 << " --DEBUGCLOSE " << fileRecord_->whyClosed
00094 << " --CHECKSUM " << std::hex << fileRecord_->adler32
00095 << " --CHECKSUMIND 0"
00096 << "\n";
00097
00098 dbFileHandler_->writeOld( lastEntry_, oss.str() );
00099 }
00100
00101
00102 void FileHandler::insertFileInDatabase() const
00103 {
00104 std::ostringstream oss;
00105 oss << "./insertFile.pl "
00106 << " --FILENAME " << fileRecord_->fileName()
00107 << " --FILECOUNTER " << fileRecord_->fileCounter
00108 << " --NEVENTS " << events()
00109 << " --FILESIZE " << fileSize()
00110 << " --STARTTIME " << utils::secondsSinceEpoch(firstEntry_)
00111 << " --STOPTIME 0"
00112 << " --STATUS open"
00113 << " --RUNNUMBER " << fileRecord_->runNumber
00114 << " --LUMISECTION " << fileRecord_->lumiSection
00115 << " --PATHNAME " << fileRecord_->filePath()
00116 << " --HOSTNAME " << diskWritingParams_.hostName_
00117 << " --SETUPLABEL " << diskWritingParams_.setupLabel_
00118 << " --STREAM " << fileRecord_->streamLabel
00119 << " --INSTANCE " << diskWritingParams_.smInstanceString_
00120 << " --SAFETY " << diskWritingParams_.initialSafetyLevel_
00121 << " --APPVERSION " << cmsver_
00122 << " --APPNAME CMSSW"
00123 << " --TYPE streamer"
00124 << " --CHECKSUM 0"
00125 << " --CHECKSUMIND 0"
00126 << "\n";
00127
00128 dbFileHandler_->writeOld( firstEntry_, oss.str() );
00129 }
00130
00131
00132 bool FileHandler::tooOld(const utils::TimePoint_t currentTime)
00133 {
00134 if (diskWritingParams_.lumiSectionTimeOut_ > boost::posix_time::seconds(0) &&
00135 (currentTime - lastEntry_) > diskWritingParams_.lumiSectionTimeOut_)
00136 {
00137 closeFile(FilesMonitorCollection::FileRecord::timeout);
00138 return true;
00139 }
00140 else
00141 {
00142 return false;
00143 }
00144 }
00145
00146
00147 bool FileHandler::isFromLumiSection(const uint32_t lumiSection)
00148 {
00149 if (lumiSection == fileRecord_->lumiSection)
00150 {
00151 closeFile(FilesMonitorCollection::FileRecord::LSended);
00152 return true;
00153 }
00154 else
00155 {
00156 return false;
00157 }
00158 }
00159
00160
00161 bool FileHandler::tooLarge(const uint64_t& dataSize)
00162 {
00163 if ( ((fileSize() + dataSize) > maxFileSize_) && (events() > 0) )
00164 {
00165 closeFile(FilesMonitorCollection::FileRecord::size);
00166 return true;
00167 }
00168 else
00169 {
00170 return false;
00171 }
00172 }
00173
00174
00175 uint32_t FileHandler::events() const
00176 {
00177 return fileRecord_->eventCount;
00178 }
00179
00180
00181 uint64_t FileHandler::fileSize() const
00182 {
00183 return fileRecord_->fileSize;
00184 }
00185
00186
00188
00190
00191
00192 void FileHandler::moveFileToClosed
00193 (
00194 const FilesMonitorCollection::FileRecord::ClosingReason& reason
00195 )
00196 {
00197 const std::string openFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::open));
00198 const std::string closedFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::closed));
00199
00200 const uint64_t openFileSize = checkFileSizeMatch(openFileName, fileSize());
00201
00202 makeFileReadOnly(openFileName);
00203 try
00204 {
00205 renameFile(openFileName, closedFileName);
00206 }
00207 catch (stor::exception::DiskWriting& e)
00208 {
00209 XCEPT_RETHROW(stor::exception::DiskWriting,
00210 "Could not move streamer file to closed area.", e);
00211 }
00212 fileRecord_->isOpen = false;
00213 fileRecord_->whyClosed = reason;
00214 checkFileSizeMatch(closedFileName, openFileSize);
00215 checkAdler32(closedFileName);
00216 }
00217
00218
00219 uint64_t FileHandler::checkFileSizeMatch(const std::string& fileName, const uint64_t& size) const
00220 {
00221 #if linux
00222 struct stat64 statBuff;
00223 int statStatus = stat64(fileName.c_str(), &statBuff);
00224 #else
00225 struct stat statBuff;
00226 int statStatus = stat(fileName.c_str(), &statBuff);
00227 #endif
00228 if ( statStatus != 0 )
00229 {
00230 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::inaccessible;
00231 std::ostringstream msg;
00232 msg << "Error checking the status of file "
00233 << fileName
00234 << ": " << strerror(errno);
00235 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00236 }
00237
00238 if ( sizeMismatch(size, statBuff.st_size) )
00239 {
00240 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::truncated;
00241 std::ostringstream msg;
00242 msg << "Found an unexpected file size when trying to move"
00243 << " the file to the closed state. File " << fileName
00244 << " has an actual size of " << statBuff.st_size
00245 << " (" << statBuff.st_blocks << " blocks)"
00246 << " instead of the expected size of " << size
00247 << " (" << (size/512)+1 << " blocks).";
00248 XCEPT_RAISE(stor::exception::FileTruncation, msg.str());
00249 }
00250
00251 return static_cast<uint64_t>(statBuff.st_size);
00252 }
00253
00254
00255 bool FileHandler::sizeMismatch(const uint64_t& initialSize, const uint64_t& finalSize) const
00256 {
00257 double pctDiff = calcPctDiff(initialSize, finalSize);
00258 return (pctDiff > diskWritingParams_.fileSizeTolerance_);
00259 }
00260
00261
00262 void FileHandler::makeFileReadOnly(const std::string& fileName) const
00263 {
00264 int ronly = chmod(fileName.c_str(), S_IREAD|S_IRGRP|S_IROTH);
00265 if (ronly != 0) {
00266 std::ostringstream msg;
00267 msg << "Unable to change permissions of " << fileName
00268 << " to read only: " << strerror(errno);
00269 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00270 }
00271 }
00272
00273
00274 void FileHandler::checkAdler32(const std::string& fileName) const
00275 {
00276 if (!diskWritingParams_.checkAdler32_) return;
00277
00278 std::ifstream file(fileName.c_str(), std::ios::in | std::ios::binary | std::ios::ate);
00279 if (!file.is_open())
00280 {
00281 std::ostringstream msg;
00282 msg << "File " << fileName << " could not be opened.\n";
00283 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00284 }
00285
00286 std::ifstream::pos_type size = file.tellg();
00287 file.seekg (0, std::ios::beg);
00288
00289 boost::shared_array<char> ptr(new char[1024*1024]);
00290 uint32_t a = 1, b = 0;
00291
00292 std::ifstream::pos_type rsize = 0;
00293 while(rsize < size)
00294 {
00295 file.read(ptr.get(), 1024*1024);
00296 rsize += 1024*1024;
00297 if(file.gcount())
00298 cms::Adler32(ptr.get(), file.gcount(), a, b);
00299 else
00300 break;
00301 }
00302 file.close();
00303
00304 const uint32 adler = (b << 16) | a;
00305 if (adler != fileRecord_->adler32)
00306 {
00307 std::ostringstream msg;
00308 msg << "Disk resident file " << fileName <<
00309 " has Adler32 checksum " << std::hex << adler <<
00310 ", while the expected checkum is " << std::hex << fileRecord_->adler32;
00311 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00312 }
00313 }
00314
00315
00316 void FileHandler::renameFile
00317 (
00318 const std::string& openFileName,
00319 const std::string& closedFileName
00320 ) const
00321 {
00322 int result = rename( openFileName.c_str(), closedFileName.c_str() );
00323 if (result != 0) {
00324 fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00325 std::ostringstream msg;
00326 msg << "Unable to move " << openFileName << " to "
00327 << closedFileName << ": " << strerror(errno);
00328 XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00329 }
00330 }
00331
00332
00333 void FileHandler::checkDirectories() const
00334 {
00335 utils::checkDirectory(diskWritingParams_.filePath_);
00336 utils::checkDirectory(fileRecord_->baseFilePath);
00337 utils::checkDirectory(fileRecord_->baseFilePath + "/open");
00338 utils::checkDirectory(fileRecord_->baseFilePath + "/closed");
00339 }
00340
00341
00342 double FileHandler::calcPctDiff(const uint64_t& value1, const uint64_t& value2) const
00343 {
00344 if (value1 == value2) return 0;
00345 uint64_t largerValue = std::max(value1,value2);
00346 uint64_t smallerValue = std::min(value1,value2);
00347 return ( largerValue > 0 ? (largerValue - smallerValue) / largerValue : 0 );
00348 }
00349
00350 }
00351