CMS 3D CMS Logo

HDF5ProductResolver.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: CondCore/HDF5ESSource
4 // Class : HDF5ProductResolver
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Christopher Jones
10 // Created: Tue, 20 Jun 2023 13:52:59 GMT
11 //
12 
13 // system include files
14 #include <iostream>
15 #include <fstream>
16 #include <cassert>
17 #include "zlib.h"
18 #include "lzma.h"
19 
20 // user include files
21 #include "HDF5ProductResolver.h"
22 #include "convertSyncValue.h"
29 
30 #include "h5_DataSet.h"
31 #include "h5_Attribute.h"
32 
33 //
34 // constants, enums and typedefs
35 //
36 
37 //
38 // static data member definitions
39 //
40 
41 //
42 // constructors and destructor
43 //
45  std::unique_ptr<cond::serialization::SerializationHelperBase> iHelper,
46  cms::h5::File const* iFile,
47  std::string const& iFileName,
48  cond::hdf5::Compression iCompression,
49  cond::hdf5::Record const* iRecord,
50  cond::hdf5::DataProduct const* iDataProduct)
51  : edm::eventsetup::ESSourceProductResolverBase(),
52  queue_(iQueue),
53  helper_(std::move(iHelper)),
54  file_(iFile),
55  fileName_(iFileName),
56  record_(iRecord),
57  dataProduct_(iDataProduct),
58  compression_(iCompression) {}
59 
60 // HDF5ProductResolver::HDF5ProductResolver(const HDF5ProductResolver& rhs)
61 // {
62 // // do actual copying here;
63 // }
64 
66 
67 //
68 // member functions
69 //
70 
73  edm::eventsetup::DataKey const& iKey,
74  edm::EventSetupImpl const*,
75  edm::ServiceToken const&,
76  edm::ESParentContext const& iParent) {
78  [this, iov = iRecord.validityInterval(), iParent, &iRecord](auto& iGroup, auto iActivity) {
79  queue_->push(iGroup, [this, &iGroup, act = std::move(iActivity), iov, iParent, &iRecord] {
80  CMS_SA_ALLOW try {
82  reinterpret_cast<std::uintptr_t>(this),
84  iParent);
85  iRecord.activityRegistry()->preESModuleSignal_.emit(iRecord.key(), context);
86  struct EndGuard {
87  EndGuard(edm::eventsetup::EventSetupRecordImpl const& iRecord,
88  edm::ESModuleCallingContext const& iContext)
89  : record_{iRecord}, context_{iContext} {}
90  ~EndGuard() { record_.activityRegistry()->postESModuleSignal_.emit(record_.key(), context_); }
92  edm::ESModuleCallingContext const& context_;
93  } guardAR(iRecord, context);
94 
95  auto index = indexForInterval(iov);
96 
98  iGroup.run(std::move(act));
99  exceptPtr_ = {};
100  } catch (...) {
101  exceptPtr_ = std::current_exception();
102  }
103  });
104  },
105  []() { return true; },
106  std::move(iTask),
107  iRecord,
108  iKey,
109  iParent);
110 }
111 
113  using namespace cond::hdf5;
114  auto firstSync = convertSyncValue(iIOV.first(), record_->iovIsRunLumi_);
115 
116  auto itFound = findMatchingFirst(record_->iovFirsts_, firstSync);
117  assert(itFound != record_->iovFirsts_.end());
118 
119  return itFound - record_->iovFirsts_.begin();
120 }
121 
122 void HDF5ProductResolver::readFromHDF5api(std::ptrdiff_t iIndex) {
123  auto payloadRef = dataProduct_->payloadForIOVs_[iIndex];
124  auto ds = file_->derefDataSet(payloadRef);
125  storageSize_ = ds->storageSize();
126  if (storageSize_ == 0) {
127  return;
128  }
129 
130  fileOffset_ = ds->fileOffset();
131  memSize_ = ds->findAttribute("memsize")->readUInt32();
132  type_ = ds->findAttribute("type")->readString();
133 }
134 
136  if (exceptPtr_) {
137  rethrow_exception(exceptPtr_);
138  }
139  if (storageSize_ == 0) {
140  return;
141  }
143 }
144 
145 std::vector<char> HDF5ProductResolver::decompress_zlib(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
146  std::vector<char> buffer;
147  if (iMemSize == compressedBuffer.size()) {
148  //memory was not compressed
149  //std::cout <<"NOT COMPRESSED"<<std::endl;
150  buffer = std::move(compressedBuffer);
151  } else {
152  //zlib compression was used
153  z_stream strm;
154  strm.zalloc = Z_NULL;
155  strm.zfree = Z_NULL;
156  strm.opaque = Z_NULL;
157  strm.avail_in = 0;
158  strm.next_in = Z_NULL;
159  auto ret = inflateInit(&strm);
160  assert(ret == Z_OK);
161 
162  strm.avail_in = compressedBuffer.size();
163  strm.next_in = reinterpret_cast<unsigned char*>(compressedBuffer.data());
164 
165  buffer = std::vector<char>(iMemSize);
166  strm.avail_out = buffer.size();
167  strm.next_out = reinterpret_cast<unsigned char*>(buffer.data());
168  ret = inflate(&strm, Z_FINISH);
169  assert(ret != Z_STREAM_ERROR);
170  //if(ret != Z_STREAM_END) {std::cout <<"mem "<<memSize<<" "<<ret<<" out "<<strm.avail_out<<std::endl;}
171  assert(ret == Z_STREAM_END);
172 
173  (void)inflateEnd(&strm);
174  }
175  return buffer;
176 }
177 
178 std::vector<char> HDF5ProductResolver::decompress_lzma(std::vector<char> compressedBuffer, std::size_t iMemSize) const {
179  std::vector<char> buffer;
180  if (iMemSize == compressedBuffer.size()) {
181  //memory was not compressed
182  //std::cout <<"NOT COMPRESSED"<<std::endl;
183  buffer = std::move(compressedBuffer);
184  } else {
185  // code 'cribbed' from ROOT
186  lzma_stream stream = LZMA_STREAM_INIT;
187 
188  auto returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
189  if (returnStatus != LZMA_OK) {
190  throw cms::Exception("H5CondFailedDecompress") << "failed to setup lzma";
191  }
192 
193  stream.next_in = reinterpret_cast<uint8_t*>(compressedBuffer.data());
194  stream.avail_in = compressedBuffer.size();
195 
196  buffer = std::vector<char>(iMemSize);
197  stream.next_out = reinterpret_cast<uint8_t*>(buffer.data());
198  stream.avail_out = buffer.size();
199 
200  returnStatus = lzma_code(&stream, LZMA_FINISH);
201  lzma_end(&stream);
202  if (returnStatus != LZMA_STREAM_END) {
203  throw cms::Exception("H5CondFailedDecompress") << "failed to decompress buffer using lzma";
204  }
205  }
206  return buffer;
207 }
208 
210  std::size_t iStorageSize,
211  std::size_t iMemSize,
212  const std::string& iTypeName) {
213  //Done interacting with the hdf5 API
214 
215  //std::cout <<" prefetch "<<dataProduct_->fileOffsets_[index]<<" "<<dataProduct_->storageSizes_[index]<<" "<<memSize<<std::endl;
216  std::vector<char> compressedBuffer(iStorageSize);
217  std::fstream file(fileName_.c_str());
218  file.seekg(iFileOffset);
219  file.read(compressedBuffer.data(), compressedBuffer.size());
220 
221  std::vector<char> buffer;
223  buffer = decompress_zlib(std::move(compressedBuffer), iMemSize);
225  buffer = decompress_lzma(std::move(compressedBuffer), iMemSize);
226  } else {
227  buffer = std::move(compressedBuffer);
228  }
229 
230  std::stringbuf sBuffer;
231  sBuffer.pubsetbuf(&buffer[0], buffer.size());
232  data_ = helper_->deserialize(sBuffer, iTypeName);
233  if (data_.get() == nullptr) {
234  throw cms::Exception("H5CondFailedDeserialization")
235  << "failed to deserialize: buffer size:" << buffer.size() << " type: '" << iTypeName << "'";
236  }
237 }
238 
240  ESSourceProductResolverBase::invalidateCache();
242 }
243 
244 //
245 // const member functions
246 //
247 void const* HDF5ProductResolver::getAfterPrefetchImpl() const { return data_.get(); }
248 
249 //
250 // static member functions
251 //
cond::hdf5::DataProduct const * dataProduct_
const IOVSyncValue & first() const
#define CMS_SA_ALLOW
std::unique_ptr< cond::serialization::SerializationHelperBase > helper_
std::ptrdiff_t indexForInterval(edm::ValidityInterval const &iIOV) const
ret
prodAgent to be discontinued
void push(oneapi::tbb::task_group &, const T &iAction)
asynchronously pushes functor iAction into queue
HDF5ProductResolver(edm::SerialTaskQueue *iQueue, std::unique_ptr< cond::serialization::SerializationHelperBase >, cms::h5::File const *iFile, std::string const &iFileName, cond::hdf5::Compression iCompression, cond::hdf5::Record const *iRecord, cond::hdf5::DataProduct const *iDataProduct)
std::vector< IOVSyncValue > iovFirsts_
Definition: Record.h:34
void threadFriendlyPrefetch(uint64_t iFileOffset, std::size_t iStorageSize, std::size_t iMemSize, const std::string &iType)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
assert(be >=bs)
const void * get() const noexcept
TEMPL(T2) struct Divides void
Definition: Factorize.h:24
void prefetchAsyncImplTemplate(ASYNC iAsync, GUARD iGuardFactory, edm::WaitingTaskHolder iTask, edm::eventsetup::EventSetupRecordImpl const &iRecord, edm::eventsetup::DataKey const &iKey, edm::ESParentContext const &iContext)
void emit(Args &&... args) const
Definition: Signal.h:48
void prefetch(edm::eventsetup::DataKey const &iKey, edm::EventSetupRecordDetails) final
ComponentDescription const * providerDescription() const
returns the description of the ESProductResolverProvider which owns this Resolver ...
bool iovIsRunLumi_
Definition: Record.h:37
edm::SerialTaskQueue * queue_
std::vector< IOVSyncValue >::const_iterator findMatchingFirst(std::vector< IOVSyncValue > const &iIOVs, IOVSyncValue iMatch)
Definition: IOVSyncValue.cc:19
cond::serialization::unique_void_ptr data_
std::vector< hobj_ref_t > payloadForIOVs_
Definition: DataProduct.h:35
void readFromHDF5api(std::ptrdiff_t iIndex)
cond::hdf5::Compression compression_
std::exception_ptr exceptPtr_
unsigned long long uint64_t
Definition: Time.h:13
EventSetupRecordKey const & key() const
cond::hdf5::Record const * record_
PreESModule preESModuleSignal_
void const * getAfterPrefetchImpl() const final
HLT enums.
IOVSyncValue convertSyncValue(edm::IOVSyncValue const &iFrom, bool iIsRunLumi)
void prefetchAsyncImpl(edm::WaitingTaskHolder iTask, edm::eventsetup::EventSetupRecordImpl const &iES, edm::eventsetup::DataKey const &iKey, edm::EventSetupImpl const *, edm::ServiceToken const &, edm::ESParentContext const &) final
std::shared_ptr< DataSet > derefDataSet(hobj_ref_t iRef) const
Definition: h5_File.cc:77
std::vector< char > decompress_lzma(std::vector< char >, std::size_t iMemSize) const
def move(src, dest)
Definition: eostools.py:511
cms::h5::File const * file_
ActivityRegistry const * activityRegistry() const noexcept
std::vector< char > decompress_zlib(std::vector< char >, std::size_t iMemSize) const