CMS 3D CMS Logo

/data/git/CMSSW_5_3_11_patch5/src/EventFilter/StorageManager/src/FileHandler.cc

Go to the documentation of this file.
00001 // $Id: FileHandler.cc,v 1.32 2012/04/04 14:53:16 mommsen Exp $
00003 
00004 #include <EventFilter/StorageManager/interface/Exception.h>
00005 #include <EventFilter/StorageManager/interface/FileHandler.h>
00006 #include <EventFilter/StorageManager/interface/I2OChain.h>
00007 
00008 #include <FWCore/MessageLogger/interface/MessageLogger.h>
00009 #include "FWCore/Utilities/interface/Adler32Calculator.h"
00010 #include <FWCore/Version/interface/GetReleaseVersion.h>
00011 
00012 #include "boost/shared_array.hpp"
00013 
00014 #include <errno.h>
00015 #include <iostream>
00016 #include <sstream>
00017 #include <fstream>
00018 #include <iomanip>
00019 #include <cstdio>
00020 #include <sys/stat.h>
00021 #include <string.h>
00022 
00023 namespace stor {
00024   
00025   FileHandler::FileHandler
00026   (
00027     FilesMonitorCollection::FileRecordPtr fileRecord,
00028     const DbFileHandlerPtr dbFileHandler,
00029     const uint64_t& maxFileSize
00030   ):
00031   fileRecord_(fileRecord),
00032   dbFileHandler_(dbFileHandler),
00033   firstEntry_(utils::getCurrentTime()),
00034   lastEntry_(firstEntry_),
00035   diskWritingParams_(dbFileHandler->getDiskWritingParams()),
00036   maxFileSize_(maxFileSize),
00037   cmsver_(edm::getReleaseVersion())
00038   {
00039     // stripp quotes if present
00040     if(cmsver_[0]=='"') cmsver_=cmsver_.substr(1,cmsver_.size()-2);
00041     
00042     checkDirectories();
00043     
00044     insertFileInDatabase();
00045   }
00046   
00047   void FileHandler::writeEvent(const I2OChain& event)
00048   {
00049     if ( ! fileRecord_->isOpen )
00050     {
00051       std::ostringstream msg;
00052       msg << "Tried to write an event to "
00053         << fileRecord_->completeFileName()
00054         << "which has already been closed.";
00055       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00056     }
00057     
00058     do_writeEvent(event);
00059     
00060     fileRecord_->fileSize += event.totalDataSize();
00061     ++fileRecord_->eventCount;
00062     lastEntry_ = utils::getCurrentTime();
00063   }
00064   
00065   
00067   // File bookkeeping //
00069   
00070   void FileHandler::updateDatabase() const
00071   {
00072     std::ostringstream oss;
00073     oss << "./closeFile.pl "
00074       << " --FILENAME "     << fileRecord_->fileName()
00075       << " --FILECOUNTER "  << fileRecord_->fileCounter
00076       << " --NEVENTS "      << events()
00077       << " --FILESIZE "     << fileSize()                          
00078       << " --STARTTIME "    << utils::secondsSinceEpoch(firstEntry_)
00079       << " --STOPTIME "     << utils::secondsSinceEpoch(lastEntry_)
00080       << " --STATUS "       << "closed"
00081       << " --RUNNUMBER "    << fileRecord_->runNumber
00082       << " --LUMISECTION "  << fileRecord_->lumiSection
00083       << " --PATHNAME "     << fileRecord_->filePath()
00084       << " --HOSTNAME "     << diskWritingParams_.hostName_
00085       << " --SETUPLABEL "   << diskWritingParams_.setupLabel_
00086       << " --STREAM "       << fileRecord_->streamLabel                      
00087       << " --INSTANCE "     << diskWritingParams_.smInstanceString_
00088       << " --SAFETY "       << diskWritingParams_.initialSafetyLevel_
00089       << " --APPVERSION "   << cmsver_
00090       << " --APPNAME CMSSW"
00091       << " --TYPE streamer"               
00092       << " --DEBUGCLOSE "   << fileRecord_->whyClosed
00093       << " --CHECKSUM "     << std::hex << fileRecord_->adler32
00094       << " --CHECKSUMIND 0";
00095     
00096     dbFileHandler_->writeOld( lastEntry_, oss.str() );
00097   }
00098   
00099   
00100   void FileHandler::insertFileInDatabase() const
00101   {
00102     std::ostringstream oss;
00103     oss << "./insertFile.pl "
00104       << " --FILENAME "     << fileRecord_->fileName()
00105       << " --FILECOUNTER "  << fileRecord_->fileCounter
00106       << " --NEVENTS "      << events()
00107       << " --FILESIZE "     << fileSize()
00108       << " --STARTTIME "    << utils::secondsSinceEpoch(firstEntry_)
00109       << " --STOPTIME 0"
00110       << " --STATUS open"
00111       << " --RUNNUMBER "    << fileRecord_->runNumber
00112       << " --LUMISECTION "  << fileRecord_->lumiSection
00113       << " --PATHNAME "     << fileRecord_->filePath()
00114       << " --HOSTNAME "     << diskWritingParams_.hostName_
00115       << " --SETUPLABEL "   << diskWritingParams_.setupLabel_
00116       << " --STREAM "       << fileRecord_->streamLabel
00117       << " --INSTANCE "     << diskWritingParams_.smInstanceString_
00118       << " --SAFETY "       << diskWritingParams_.initialSafetyLevel_
00119       << " --APPVERSION "   << cmsver_
00120       << " --APPNAME CMSSW"
00121       << " --TYPE streamer"               
00122       << " --CHECKSUM 0"
00123       << " --CHECKSUMIND 0";
00124     
00125     dbFileHandler_->writeOld( firstEntry_, oss.str() );
00126   }
00127   
00128   
00129   bool FileHandler::tooOld(const utils::TimePoint_t currentTime)
00130   {
00131     if (diskWritingParams_.lumiSectionTimeOut_ > boost::posix_time::seconds(0) && 
00132       (currentTime - lastEntry_) > diskWritingParams_.lumiSectionTimeOut_)
00133     {
00134       closeFile(FilesMonitorCollection::FileRecord::timeout);
00135       return true;
00136     }
00137     else
00138     {
00139       return false;
00140     }
00141   }
00142   
00143   
00144   bool FileHandler::isFromLumiSection(const uint32_t lumiSection)
00145   {
00146     if (lumiSection == fileRecord_->lumiSection)
00147     {
00148       closeFile(FilesMonitorCollection::FileRecord::LSended);
00149       return true;
00150     }
00151     else
00152     {
00153       return false;
00154     }
00155   }
00156   
00157   
00158   bool FileHandler::tooLarge(const uint64_t& dataSize)
00159   {
00160     if ( ((fileSize() + dataSize) > maxFileSize_) && (events() > 0) )
00161     {
00162       closeFile(FilesMonitorCollection::FileRecord::size);
00163       return true;
00164     }
00165     else
00166     {
00167       return false;
00168     }
00169   }
00170   
00171   
00172   uint32_t FileHandler::events() const
00173   {
00174     return fileRecord_->eventCount;
00175   }
00176   
00177   
00178   uint64_t FileHandler::fileSize() const
00179   {
00180     return fileRecord_->fileSize;
00181   }
00182   
00183   
00185   // File system interaction //
00187   
00188   
00189   void FileHandler::moveFileToClosed
00190   (
00191     const FilesMonitorCollection::FileRecord::ClosingReason& reason
00192   )
00193   {
00194     const std::string openFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::open));
00195     const std::string closedFileName(fileRecord_->completeFileName(FilesMonitorCollection::FileRecord::closed));
00196     
00197     const uint64_t openFileSize = checkFileSizeMatch(openFileName, fileSize());
00198     
00199     makeFileReadOnly(openFileName);
00200     try
00201     {
00202       renameFile(openFileName, closedFileName);
00203     }
00204     catch (stor::exception::DiskWriting& e)
00205     {
00206       XCEPT_RETHROW(stor::exception::DiskWriting, 
00207         "Could not move streamer file to closed area.", e);
00208     }
00209     fileRecord_->isOpen = false;
00210     fileRecord_->whyClosed = reason;
00211     checkFileSizeMatch(closedFileName, openFileSize);
00212     checkAdler32(closedFileName);
00213   }
00214   
00215   
00216   uint64_t FileHandler::checkFileSizeMatch(const std::string& fileName, const uint64_t& size) const
00217   {
00218     #if linux
00219     struct stat64 statBuff;
00220     int statStatus = stat64(fileName.c_str(), &statBuff);
00221     #else
00222     struct stat statBuff;
00223     int statStatus = stat(fileName.c_str(), &statBuff);
00224     #endif
00225     if ( statStatus != 0 )
00226     {
00227       fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::inaccessible;
00228       std::ostringstream msg;
00229       msg << "Error checking the status of file "
00230         << fileName
00231         << ": " << strerror(errno);
00232       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00233     }
00234     
00235     if ( sizeMismatch(size, statBuff.st_size) )
00236     {
00237       fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::truncated;
00238       std::ostringstream msg;
00239       msg << "Found an unexpected file size when trying to move"
00240         << " the file to the closed state. File " << fileName
00241         << " has an actual size of " << statBuff.st_size
00242         << " (" << statBuff.st_blocks << " blocks)"
00243         << " instead of the expected size of " << size
00244         << " (" << (size/512)+1 << " blocks).";
00245       XCEPT_RAISE(stor::exception::FileTruncation, msg.str());
00246     }
00247     
00248     return static_cast<uint64_t>(statBuff.st_size);
00249   }
00250   
00251   
00252   bool FileHandler::sizeMismatch(const uint64_t& initialSize, const uint64_t& finalSize) const
00253   {
00254     double pctDiff = calcPctDiff(initialSize, finalSize);
00255     return (pctDiff > diskWritingParams_.fileSizeTolerance_);
00256   }
00257   
00258   
00259   void FileHandler::makeFileReadOnly(const std::string& fileName) const
00260   {
00261     int ronly  = chmod(fileName.c_str(), S_IREAD|S_IRGRP|S_IROTH);
00262     if (ronly != 0) {
00263       std::ostringstream msg;
00264       msg << "Unable to change permissions of " << fileName
00265         << " to read only: " << strerror(errno);
00266       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00267     }
00268   }
00269   
00270   
00271   void FileHandler::checkAdler32(const std::string& fileName) const
00272   {
00273     if (!diskWritingParams_.checkAdler32_) return;
00274 
00275     std::ifstream file(fileName.c_str(), std::ios::in | std::ios::binary | std::ios::ate);
00276     if (!file.is_open())
00277     {
00278       std::ostringstream msg;
00279       msg << "File " << fileName << " could not be opened.\n";
00280       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00281     }
00282     
00283     std::ifstream::pos_type size = file.tellg();
00284     file.seekg (0, std::ios::beg);
00285     
00286     boost::shared_array<char> ptr(new char[1024*1024]);
00287     uint32_t a = 1, b = 0;
00288     
00289     std::ifstream::pos_type rsize = 0;
00290     while(rsize < size)
00291     {
00292       file.read(ptr.get(), 1024*1024);
00293       rsize += 1024*1024;
00294       if(file.gcount()) 
00295         cms::Adler32(ptr.get(), file.gcount(), a, b);
00296       else 
00297         break;
00298     }
00299     file.close();
00300     
00301     const uint32 adler = (b << 16) | a;
00302     if (adler != fileRecord_->adler32)
00303     {
00304       std::ostringstream msg;
00305       msg << "Disk resident file " << fileName <<
00306         " has Adler32 checksum " << std::hex << adler <<
00307         ", while the expected checkum is " << std::hex << fileRecord_->adler32;
00308       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00309     }
00310   }
00311   
00312   
00313   void FileHandler::renameFile
00314   (
00315     const std::string& openFileName,
00316     const std::string& closedFileName
00317   ) const
00318   {
00319     int result = rename( openFileName.c_str(), closedFileName.c_str() );
00320     if (result != 0) {
00321       fileRecord_->whyClosed = FilesMonitorCollection::FileRecord::notClosed;
00322       std::ostringstream msg;
00323       msg << "Unable to move " << openFileName << " to "
00324         << closedFileName << ": " << strerror(errno);
00325       XCEPT_RAISE(stor::exception::DiskWriting, msg.str());
00326     }
00327   }
00328   
00329   
00330   void FileHandler::checkDirectories() const
00331   {
00332     utils::checkDirectory(diskWritingParams_.filePath_);
00333     utils::checkDirectory(fileRecord_->baseFilePath);
00334     utils::checkDirectory(fileRecord_->baseFilePath + "/open");
00335     utils::checkDirectory(fileRecord_->baseFilePath + "/closed");
00336   }
00337   
00338   
00339   double FileHandler::calcPctDiff(const uint64_t& value1, const uint64_t& value2) const
00340   {
00341     if (value1 == value2) return 0;
00342     uint64_t largerValue = std::max(value1,value2);
00343     uint64_t smallerValue = std::min(value1,value2);
00344     return ( largerValue > 0 ? (largerValue - smallerValue) / largerValue : 0 );
00345   }
00346   
00347 } // namespace stor
00348