CMS 3D CMS Logo

ReadRepacker.cc
Go to the documentation of this file.
1 
2 #include <cassert>
3 #include <cstring>
4 
5 #include "ReadRepacker.h"
6 
21 int ReadRepacker::pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
22  reset(nbuf);
23  m_len = len; // Record the len array so we can later unpack.
24 
25  // Determine the buffer to use for the initial packing.
26  char *tmp_buf;
27  IOSize tmp_size;
28  if (buffer_size < TEMPORARY_BUFFER_SIZE) {
30  tmp_buf = &m_spare_buffer[0];
31  tmp_size = TEMPORARY_BUFFER_SIZE;
32  } else {
33  tmp_buf = buf;
34  tmp_size = buffer_size;
35  }
36 
37  int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size);
38 
39  if ((nbuf - pack_count > 0) && // If there is remaining work..
40  (tmp_buf != &m_spare_buffer[0]) && // and the spare buffer isn't already used
41  ((IOSize)len[pack_count] <
42  TEMPORARY_BUFFER_SIZE)) { // And the spare buffer is big enough to hold at least one read.
43 
44  // Verify the spare is allocated.
45  // If tmp_buf != &m_spare_buffer[0] before, it certainly won't after.
47 
48  // If there are remaining chunks and we aren't already using the spare
49  // buffer, try using that too.
50  // This clutters up the code badly, but could save a network round-trip.
51  pack_count +=
52  packInternal(&pos[pack_count], &len[pack_count], nbuf - pack_count, &m_spare_buffer[0], TEMPORARY_BUFFER_SIZE);
53  }
54 
55  return pack_count;
56 }
57 
58 int ReadRepacker::packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) {
59  if (nbuf == 0) {
60  return 0;
61  }
62 
63  // Handle case 1 separately to make the for-loop cleaner.
64  int iopb_offset = m_iov.size();
65  // Because we re-use the buffer from ROOT, we are guarantee this iopb will
66  // fit.
67  assert(static_cast<IOSize>(len[0]) <= buffer_size);
68  IOPosBuffer iopb(pos[0], buf, len[0]);
69  m_idx_to_iopb.push_back(iopb_offset);
70  m_idx_to_iopb_offset.push_back(0);
71 
72  IOSize buffer_used = len[0];
73  int idx;
74  for (idx = 1; idx < nbuf; idx++) {
75  if (buffer_used + len[idx] > buffer_size) {
76  // No way we can include this chunk in the read buffer
77  break;
78  }
79 
80  IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size());
81  assert(extra_bytes_signed >= 0);
82  IOSize extra_bytes = static_cast<IOSize>(extra_bytes_signed);
83 
84  if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) &&
85  (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) {
86  // The space between the two reads is small enough we can coalesce.
87 
88  // We enforce that the current read or the current iopb must be small.
89  // This is so we can "perfectly pack" buffers consisting of only big
90  // reads - in such a case, read coalescing doesn't help much.
91  m_idx_to_iopb.push_back(iopb_offset);
92  m_idx_to_iopb_offset.push_back(pos[idx] - iopb.offset());
93  iopb.set_size(pos[idx] + len[idx] - iopb.offset());
94  buffer_used += (len[idx] + extra_bytes);
95  m_extra_bytes += extra_bytes;
96  continue;
97  }
98  // There is a big jump, but still space left in the temporary buffer.
99  // Record our current iopb:
100  m_iov.push_back(iopb);
101 
102  // Reset iopb
103  iopb.set_offset(pos[idx]);
104  iopb.set_data(buf + buffer_used);
105  iopb.set_size(len[idx]);
106 
107  // Record location of this chunk.
108  iopb_offset++;
109 
110  m_idx_to_iopb.push_back(iopb_offset);
111  m_idx_to_iopb_offset.push_back(0);
112 
113  buffer_used += len[idx];
114  }
115  m_iov.push_back(iopb);
116 
117  m_buffer_used += buffer_used;
118  return idx;
119 }
120 
125 void ReadRepacker::unpack(char *buf) {
126  char *root_result_ptr = buf;
127  int nbuf = m_idx_to_iopb.size();
128  for (int idx = 0; idx < nbuf; idx++) {
129  int iov_idx = m_idx_to_iopb[idx];
130  IOPosBuffer &iopb = m_iov[iov_idx];
131  int iopb_offset = m_idx_to_iopb_offset[idx];
132  char *io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset;
133  // Note that we use the input buffer as a temporary where possible.
134  // Hence, the source and destination can overlap; use memmove instead of memcpy.
135  memmove(root_result_ptr, io_result_ptr, m_len[idx]);
136 
137  root_result_ptr += m_len[idx];
138  }
139 }
140 
141 void ReadRepacker::reset(unsigned int nbuf) {
142  m_extra_bytes = 0;
143  m_buffer_used = 0;
144 
145  // Number of buffers to storage typically decreases, but nbuf/2 is just an
146  // somewhat-informed guess.
147  m_iov.reserve(nbuf / 2);
148  m_iov.clear();
149  m_idx_to_iopb.reserve(nbuf);
150  m_idx_to_iopb.clear();
151  m_idx_to_iopb_offset.reserve(nbuf);
152  m_idx_to_iopb_offset.clear();
153 }
std::vector< IOPosBuffer > m_iov
Definition: ReadRepacker.h:79
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void reset(unsigned int nbuf)
std::vector< char > m_spare_buffer
Definition: ReadRepacker.h:83
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:21
void unpack(char *buf)
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
IOSize m_extra_bytes
Definition: ReadRepacker.h:82
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOPosBuffer.h:64
std::vector< int > m_idx_to_iopb_offset
Definition: ReadRepacker.h:78
int64_t IOOffset
Definition: IOTypes.h:19
static const IOSize TEMPORARY_BUFFER_SIZE
Definition: ReadRepacker.h:58
int packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:58
size_t IOSize
Definition: IOTypes.h:14
static const IOSize BIG_READ_SIZE
Definition: ReadRepacker.h:64
edm::propagate_const< int * > m_len
Definition: ReadRepacker.h:80
static const IOSize READ_COALESCE_SIZE
Definition: ReadRepacker.h:61
IOSize m_buffer_used
Definition: ReadRepacker.h:81
std::vector< int > m_idx_to_iopb
Definition: ReadRepacker.h:76