CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ReadRepacker.cc
Go to the documentation of this file.
1 
2 #include <assert.h>
3 #include <string.h>
4 
5 #include "ReadRepacker.h"
6 
21 int
22 ReadRepacker::pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
23 {
24  reset(nbuf);
25  m_len = len; // Record the len array so we can later unpack.
26 
27  // Determine the buffer to use for the initial packing.
28  char * tmp_buf;
29  IOSize tmp_size;
30  if (buffer_size < TEMPORARY_BUFFER_SIZE) {
32  tmp_buf = &m_spare_buffer[0];
33  tmp_size = TEMPORARY_BUFFER_SIZE;
34  } else {
35  tmp_buf = buf;
36  tmp_size = buffer_size;
37  }
38 
39  int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size);
40 
41  if ((nbuf - pack_count > 0) && // If there is remaining work..
42  (tmp_buf != &m_spare_buffer[0]) && // and the spare buffer isn't already used
43  ((IOSize)len[pack_count] < TEMPORARY_BUFFER_SIZE)) { // And the spare buffer is big enough to hold at least one read.
44 
45  // Verify the spare is allocated.
46  // If tmp_buf != &m_spare_buffer[0] before, it certainly won't after.
48 
49  // If there are remaining chunks and we aren't already using the spare
50  // buffer, try using that too.
51  // This clutters up the code badly, but could save a network round-trip.
52  pack_count += packInternal(&pos[pack_count], &len[pack_count], nbuf-pack_count,
54 
55  }
56 
57  return pack_count;
58 }
59 
60 int
61 ReadRepacker::packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
62 {
63  if (nbuf == 0) {
64  return 0;
65  }
66 
67  // Handle case 1 separately to make the for-loop cleaner.
68  int iopb_offset = m_iov.size();
69  // Because we re-use the buffer from ROOT, we are guarantee this iopb will
70  // fit.
71  assert(static_cast<IOSize>(len[0]) <= buffer_size);
72  IOPosBuffer iopb(pos[0], buf, len[0]);
73  m_idx_to_iopb.push_back(iopb_offset);
74  m_idx_to_iopb_offset.push_back(0);
75 
76  IOSize buffer_used = len[0];
77  int idx;
78  for (idx=1; idx < nbuf; idx++) {
79  if (buffer_used + len[idx] > buffer_size) {
80  // No way we can include this chunk in the read buffer
81  break;
82  }
83 
84  IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size()); assert(extra_bytes_signed >= 0);
85  IOSize extra_bytes = static_cast<IOSize>(extra_bytes_signed);
86 
87  if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) &&
88  (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) {
89  // The space between the two reads is small enough we can coalesce.
90 
91  // We enforce that the current read or the current iopb must be small.
92  // This is so we can "perfectly pack" buffers consisting of only big
93  // reads - in such a case, read coalescing doesn't help much.
94  m_idx_to_iopb.push_back(iopb_offset);
95  m_idx_to_iopb_offset.push_back(pos[idx]-iopb.offset());
96  iopb.set_size(pos[idx]+len[idx] - iopb.offset());
97  buffer_used += (len[idx] + extra_bytes);
98  m_extra_bytes += extra_bytes;
99  continue;
100  }
101  // There is a big jump, but still space left in the temporary buffer.
102  // Record our current iopb:
103  m_iov.push_back(iopb);
104 
105  // Reset iopb
106  iopb.set_offset(pos[idx]);
107  iopb.set_data(buf + buffer_used);
108  iopb.set_size(len[idx]);
109 
110  // Record location of this chunk.
111  iopb_offset ++;
112 
113  m_idx_to_iopb.push_back(iopb_offset);
114  m_idx_to_iopb_offset.push_back(0);
115 
116  buffer_used += len[idx];
117  }
118  m_iov.push_back(iopb);
119 
120  m_buffer_used += buffer_used;
121  return idx;
122 }
123 
128 void
130 {
131 
132  char * root_result_ptr = buf;
133  int nbuf = m_idx_to_iopb.size();
134  for (int idx=0; idx < nbuf; idx++) {
135  int iov_idx = m_idx_to_iopb[idx];
136  IOPosBuffer &iopb = m_iov[iov_idx];
137  int iopb_offset = m_idx_to_iopb_offset[idx];
138  char * io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset;
139  // Note that we use the input buffer as a temporary where possible.
140  // Hence, the source and destination can overlap; use memmove instead of memcpy.
141  memmove(root_result_ptr, io_result_ptr, m_len[idx]);
142 
143  root_result_ptr += m_len[idx];
144  }
145 
146 }
147 
148 void
149 ReadRepacker::reset(unsigned int nbuf)
150 {
151  m_extra_bytes = 0;
152  m_buffer_used = 0;
153 
154  // Number of buffers to storage typically decreases, but nbuf/2 is just an
155  // somewhat-informed guess.
156  m_iov.reserve(nbuf/2);
157  m_iov.clear();
158  m_idx_to_iopb.reserve(nbuf);
159  m_idx_to_iopb.clear();
160  m_idx_to_iopb_offset.reserve(nbuf);
161  m_idx_to_iopb_offset.clear();
162 }
std::vector< IOPosBuffer > m_iov
Definition: ReadRepacker.h:73
void set_data(void *new_buffer)
Definition: IOPosBuffer.h:74
void set_size(IOSize new_size)
Definition: IOPosBuffer.h:79
void reset(unsigned int nbuf)
std::vector< char > m_spare_buffer
Definition: ReadRepacker.h:77
void set_offset(IOOffset new_offset)
Definition: IOPosBuffer.h:69
int pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:22
void unpack(char *buf)
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
IOSize m_extra_bytes
Definition: ReadRepacker.h:76
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOPosBuffer.h:64
std::vector< int > m_idx_to_iopb_offset
Definition: ReadRepacker.h:72
int64_t IOOffset
Definition: IOTypes.h:19
static const IOSize TEMPORARY_BUFFER_SIZE
Definition: ReadRepacker.h:56
int packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
Definition: ReadRepacker.cc:61
size_t IOSize
Definition: IOTypes.h:14
static const IOSize BIG_READ_SIZE
Definition: ReadRepacker.h:62
static const IOSize READ_COALESCE_SIZE
Definition: ReadRepacker.h:59
IOSize m_buffer_used
Definition: ReadRepacker.h:75
std::vector< int > m_idx_to_iopb
Definition: ReadRepacker.h:71