CMS 3D CMS Logo

HDF5ProductResolver.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: CondCore/HDF5ESSource
4 // Class : HDF5ProductResolver
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Christopher Jones
10 // Created: Tue, 20 Jun 2023 13:52:59 GMT
11 //
12 
13 // system include files
14 #include <iostream>
15 #include <fstream>
16 #include <cassert>
17 #include "zlib.h"
18 #include "lzma.h"
19 
20 // user include files
21 #include "HDF5ProductResolver.h"
22 #include "convertSyncValue.h"
29 
30 #include "h5_DataSet.h"
31 #include "h5_Attribute.h"
32 
33 //
34 // constants, enums and typedefs
35 //
36 
37 //
38 // static data member definitions
39 //
40 
41 //
42 // constructors and destructor
43 //
45  std::unique_ptr<cond::serialization::SerializationHelperBase> iHelper,
46  cms::h5::File const* iFile,
47  std::string const& iFileName,
48  cond::hdf5::Compression iCompression,
49  cond::hdf5::Record const* iRecord,
50  cond::hdf5::DataProduct const* iDataProduct)
51  : edm::eventsetup::ESSourceProductResolverBase(),
52  queue_(iQueue),
53  helper_(std::move(iHelper)),
54  file_(iFile),
55  fileName_(iFileName),
56  record_(iRecord),
57  dataProduct_(iDataProduct),
58  compression_(iCompression) {}
59 
60 // HDF5ProductResolver::HDF5ProductResolver(const HDF5ProductResolver& rhs)
61 // {
62 // // do actual copying here;
63 // }
64 
66 
67 //
68 // member functions
69 //
70 
73  edm::eventsetup::DataKey const& iKey,
74  edm::EventSetupImpl const*,
75  edm::ServiceToken const&,
76  edm::ESParentContext const& iParent) {
78  [this, iov = iRecord.validityInterval(), iParent, &iRecord](auto& iGroup, auto iActivity) {
79  queue_->push(iGroup, [this, &iGroup, act = std::move(iActivity), iov, iParent, &iRecord] {
80  CMS_SA_ALLOW try {
83  iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
84  struct EndGuard {
85  EndGuard(edm::eventsetup::EventSetupRecordImpl const& iRecord,
86  edm::ESModuleCallingContext const& iContext)
87  : record_{iRecord}, context_{iContext} {}
88  ~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
90  edm::ESModuleCallingContext const& context_;
91  } guardAR(iRecord, context);
92 
93  auto index = indexForInterval(iov);
94 
96  iGroup.run(std::move(act));
97  exceptPtr_ = {};
98  } catch (...) {
99  exceptPtr_ = std::current_exception();
100  }
101  });
102  },
103  []() { return true; },
104  std::move(iTask),
105  iRecord,
106  iKey,
107  iParent);
108 }
109 
111  using namespace cond::hdf5;
112  auto firstSync = convertSyncValue(iIOV.first(), record_->iovIsRunLumi_);
113 
114  auto itFound = findMatchingFirst(record_->iovFirsts_, firstSync);
115  assert(itFound != record_->iovFirsts_.end());
116 
117  return itFound - record_->iovFirsts_.begin();
118 }
119 
120 void HDF5ProductResolver::readFromHDF5api(std::ptrdiff_t iIndex) {
121  auto payloadRef = dataProduct_->payloadForIOVs_[iIndex];
122  auto ds = file_->derefDataSet(payloadRef);
123  storageSize_ = ds->storageSize();
124  if (storageSize_ == 0) {
125  return;
126  }
127 
128  fileOffset_ = ds->fileOffset();
129  memSize_ = ds->findAttribute("memsize")->readUInt32();
130  type_ = ds->findAttribute("type")->readString();
131 }
132 
134  if (exceptPtr_) {
135  rethrow_exception(exceptPtr_);
136  }
137  if (storageSize_ == 0) {
138  return;
139  }
141 }
142 
143 std::vector<char> HDF5ProductResolver::decompress_zlib(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
144  std::vector<char> buffer;
145  if (iMemSize == compressedBuffer.size()) {
146  //memory was not compressed
147  //std::cout <<"NOT COMPRESSED"<<std::endl;
148  buffer = std::move(compressedBuffer);
149  } else {
150  //zlib compression was used
151  z_stream strm;
152  strm.zalloc = Z_NULL;
153  strm.zfree = Z_NULL;
154  strm.opaque = Z_NULL;
155  strm.avail_in = 0;
156  strm.next_in = Z_NULL;
157  auto ret = inflateInit(&strm);
158  assert(ret == Z_OK);
159 
160  strm.avail_in = compressedBuffer.size();
161  strm.next_in = reinterpret_cast<unsigned char*>(compressedBuffer.data());
162 
163  buffer = std::vector<char>(iMemSize);
164  strm.avail_out = buffer.size();
165  strm.next_out = reinterpret_cast<unsigned char*>(buffer.data());
166  ret = inflate(&strm, Z_FINISH);
167  assert(ret != Z_STREAM_ERROR);
168  //if(ret != Z_STREAM_END) {std::cout <<"mem "<<memSize<<" "<<ret<<" out "<<strm.avail_out<<std::endl;}
169  assert(ret == Z_STREAM_END);
170 
171  (void)inflateEnd(&strm);
172  }
173  return buffer;
174 }
175 
176 std::vector<char> HDF5ProductResolver::decompress_lzma(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
177  std::vector<char> buffer;
178  if (iMemSize == compressedBuffer.size()) {
179  //memory was not compressed
180  //std::cout <<"NOT COMPRESSED"<<std::endl;
181  buffer = std::move(compressedBuffer);
182  } else {
183  // code 'cribbed' from ROOT
184  lzma_stream stream = LZMA_STREAM_INIT;
185 
186  auto returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
187  if (returnStatus != LZMA_OK) {
188  throw cms::Exception("H5CondFailedDecompress") << "failed to setup lzma";
189  }
190 
191  stream.next_in = reinterpret_cast<uint8_t*>(compressedBuffer.data());
192  stream.avail_in = compressedBuffer.size();
193 
194  buffer = std::vector<char>(iMemSize);
195  stream.next_out = reinterpret_cast<uint8_t*>(buffer.data());
196  stream.avail_out = buffer.size();
197 
198  returnStatus = lzma_code(&stream, LZMA_FINISH);
199  lzma_end(&stream);
200  if (returnStatus != LZMA_STREAM_END) {
201  throw cms::Exception("H5CondFailedDecompress") << "failed to decompress buffer using lzma";
202  }
203  }
204  return buffer;
205 }
206 
208  std::size_t iStorageSize,
209  std::size_t iMemSize,
210  const std::string& iTypeName) {
211  //Done interacting with the hdf5 API
212 
213  //std::cout <<" prefetch "<<dataProduct_->fileOffsets_[index]<<" "<<dataProduct_->storageSizes_[index]<<" "<<memSize<<std::endl;
214  std::vector<char> compressedBuffer(iStorageSize);
215  std::fstream file(fileName_.c_str());
216  file.seekg(iFileOffset);
217  file.read(compressedBuffer.data(), compressedBuffer.size());
218 
219  std::vector<char> buffer;
221  buffer = decompress_zlib(std::move(compressedBuffer), iMemSize);
223  buffer = decompress_lzma(std::move(compressedBuffer), iMemSize);
224  } else {
225  buffer = std::move(compressedBuffer);
226  }
227 
228  std::stringbuf sBuffer;
229  sBuffer.pubsetbuf(&buffer[0], buffer.size());
230  data_ = helper_->deserialize(sBuffer, iTypeName);
231  if (data_.get() == nullptr) {
232  throw cms::Exception("H5CondFailedDeserialization")
233  << "failed to deserialize: buffer size:" << buffer.size() << " type: '" << iTypeName << "'";
234  }
235 }
236 
238  ESSourceProductResolverBase::invalidateCache();
240 }
241 
242 //
243 // const member functions
244 //
245 void const* HDF5ProductResolver::getAfterPrefetchImpl() const { return data_.get(); }
246 
247 //
248 // static member functions
249 //
cond::hdf5::DataProduct const * dataProduct_
const IOVSyncValue & first() const
#define CMS_SA_ALLOW
std::unique_ptr< cond::serialization::SerializationHelperBase > helper_
std::ptrdiff_t indexForInterval(edm::ValidityInterval const &iIOV) const
ret
prodAgent to be discontinued
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
HDF5ProductResolver(edm::SerialTaskQueue *iQueue, std::unique_ptr< cond::serialization::SerializationHelperBase >, cms::h5::File const *iFile, std::string const &iFileName, cond::hdf5::Compression iCompression, cond::hdf5::Record const *iRecord, cond::hdf5::DataProduct const *iDataProduct)
std::vector< IOVSyncValue > iovFirsts_
Definition: Record.h:34
void threadFriendlyPrefetch(uint64_t iFileOffset, std::size_t iStorageSize, std::size_t iMemSize, const std::string &iType)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
assert(be >=bs)
const void * get() const noexcept
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
void prefetchAsyncImplTemplate(ASYNC iAsync, GUARD iGuardFactory, edm::WaitingTaskHolder iTask, edm::eventsetup::EventSetupRecordImpl const &iRecord, edm::eventsetup::DataKey const &iKey, edm::ESParentContext const &iContext)
void emit(Args &&... args) const
Definition: Signal.h:48
void prefetch(edm::eventsetup::DataKey const &iKey, edm::EventSetupRecordDetails) final
ComponentDescription const * providerDescription() const
returns the description of the ESProductResolverProvider which owns this Resolver ...
bool iovIsRunLumi_
Definition: Record.h:37
edm::SerialTaskQueue * queue_
std::vector< IOVSyncValue >::const_iterator findMatchingFirst(std::vector< IOVSyncValue > const &iIOVs, IOVSyncValue iMatch)
Definition: IOVSyncValue.cc:19
cond::serialization::unique_void_ptr data_
std::vector< hobj_ref_t > payloadForIOVs_
Definition: DataProduct.h:35
void readFromHDF5api(std::ptrdiff_t iIndex)
cond::hdf5::Compression compression_
std::exception_ptr exceptPtr_
unsigned long long uint64_t
Definition: Time.h:13
EventSetupRecordKey const & key() const
cond::hdf5::Record const * record_
PreESModule preESModuleSignal_
void const * getAfterPrefetchImpl() const final
HLT enums.
IOVSyncValue convertSyncValue(edm::IOVSyncValue const &iFrom, bool iIsRunLumi)
void prefetchAsyncImpl(edm::WaitingTaskHolder iTask, edm::eventsetup::EventSetupRecordImpl const &iES, edm::eventsetup::DataKey const &iKey, edm::EventSetupImpl const *, edm::ServiceToken const &, edm::ESParentContext const &) final
std::shared_ptr< DataSet > derefDataSet(hobj_ref_t iRef) const
Definition: h5_File.cc:77
std::vector< char > decompress_lzma(std::vector< char >, std::size_t iMemSize) const
def move(src, dest)
Definition: eostools.py:511
cms::h5::File const * file_
ActivityRegistry const * activityRegistry() const noexcept
std::vector< char > decompress_zlib(std::vector< char >, std::size_t iMemSize) const