#include <ReadRepacker.h>
Public Member Functions | |
IOSize | bufferUsed () const |
IOSize | extraBytes () const |
std::vector< IOPosBuffer > & | iov () |
int | pack (long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) |
IOSize | realBytesProcessed () const |
void | unpack (char *buf) |
Static Public Attributes | |
static const IOSize | BIG_READ_SIZE = 256 * 1024 |
static const IOSize | READ_COALESCE_SIZE = 32 * 1024 |
static const IOSize | TEMPORARY_BUFFER_SIZE = 256 * 1024 |
Private Member Functions | |
int | packInternal (long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size) |
void | reset (unsigned int nbuf) |
Private Attributes | |
IOSize | m_buffer_used |
IOSize | m_extra_bytes |
std::vector< int > | m_idx_to_iopb |
std::vector< int > | m_idx_to_iopb_offset |
std::vector< IOPosBuffer > | m_iov |
int * | m_len |
std::vector< char > | m_spare_buffer |
Repack a set of read requests from the ROOT layer to be optimized for the storage layer.
The basic technique employed is to coalesce nearby, but not adjacent reads into one larger read in the request to the storage system. We will be purposely over-reading from storage.
The read-coalescing is done because the vector reads are typically unrolled server-side in a "dumb" fashion, with OS read-ahead disabled. The coalescing actually decreases the number of requests sent to disk; important, as ROOT I/O is typically latency bound.
The complexity here is in the fact that we must have buffer space to hold the extra bytes from the storage system, even through they're going to be discarded.
The approach is to reuse the ROOT buffer as temporary holding space, plus a small, fixed-size "spare buffer". So, in the worst-case, we will use about 256KB of extra buffer space. The read-coalesce algorithm is greedy, so we can't provide an a-priori estimate on how many extra I/O transactions will be sent to the storage (compared to vector-reads with no coalescing). Tests currently indicate that this approach usually causes zero to one additional I/O transaction to occur.
Definition at line 32 of file ReadRepacker.h.
IOSize ReadRepacker::bufferUsed | ( | ) | const [inline] |
Definition at line 49 of file ReadRepacker.h.
References m_buffer_used.
Referenced by TStorageFactoryFile::ReadBuffersSync().
{return m_buffer_used;} // Returns the total amount of space in the temp buffer used.
IOSize ReadRepacker::extraBytes | ( | ) | const [inline] |
Definition at line 50 of file ReadRepacker.h.
References m_extra_bytes.
{return m_extra_bytes;} // Returns the number of extra bytes to be issued to the I/O system
std::vector<IOPosBuffer>& ReadRepacker::iov | ( | ) | [inline] |
Definition at line 47 of file ReadRepacker.h.
References m_iov.
Referenced by TStorageFactoryFile::ReadBuffersSync().
{ return m_iov; } // Returns the IO vector, optimized for storage.
int ReadRepacker::pack | ( | long long int * | pos, |
int * | len, | ||
int | nbuf, | ||
char * | buf, | ||
IOSize | buffer_size | ||
) |
Given a list of offsets and positions, pack them into a vector of IOPosBuffer (an "IO Vector"). This function will coalesce reads that are within READ_COALESCE_SIZE into a IOPosBuffer. This function will not create an IO vector whose summed buffer size is larger than TEMPORARY_BUFFER_SIZE. The IOPosBuffer in iov all point to a location inside buf.
pos,: | An array of file offsets, nbuf long. |
len,: | An array of offset length, nbuf long. |
nbuf,: | Number of buffers to pack. |
buf,: | Location of temporary buffer for the results of the storage request. |
buffer_size,: | Size of the temporary buffer. |
Returns the number of entries of the original array packed into iov.
Definition at line 22 of file ReadRepacker.cc.
References m_len, m_spare_buffer, packInternal(), reset(), and TEMPORARY_BUFFER_SIZE.
Referenced by TStorageFactoryFile::ReadBuffersSync().
{ reset(nbuf); m_len = len; // Record the len array so we can later unpack. // Determine the buffer to use for the initial packing. char * tmp_buf; IOSize tmp_size; if (buffer_size < TEMPORARY_BUFFER_SIZE) { m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE); tmp_buf = &m_spare_buffer[0]; tmp_size = TEMPORARY_BUFFER_SIZE; } else { tmp_buf = buf; tmp_size = buffer_size; } int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size); if ((nbuf - pack_count > 0) && // If there is remaining work.. (tmp_buf != &m_spare_buffer[0]) && // and the spare buffer isn't already used ((IOSize)len[pack_count] < TEMPORARY_BUFFER_SIZE)) { // And the spare buffer is big enough to hold at least one read. // Verify the spare is allocated. // If tmp_buf != &m_spare_buffer[0] before, it certainly won't after. m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE); // If there are remaining chunks and we aren't already using the spare // buffer, try using that too. // This clutters up the code badly, but could save a network round-trip. pack_count += packInternal(&pos[pack_count], &len[pack_count], nbuf-pack_count, &m_spare_buffer[0], TEMPORARY_BUFFER_SIZE); } return pack_count; }
int ReadRepacker::packInternal | ( | long long int * | pos, |
int * | len, | ||
int | nbuf, | ||
char * | buf, | ||
IOSize | buffer_size | ||
) | [private] |
Definition at line 61 of file ReadRepacker.cc.
References BIG_READ_SIZE, customizeTrackingMonitorSeedNumber::idx, m_buffer_used, m_extra_bytes, m_idx_to_iopb, m_idx_to_iopb_offset, m_iov, IOPosBuffer::offset(), READ_COALESCE_SIZE, IOPosBuffer::set_data(), IOPosBuffer::set_offset(), IOPosBuffer::set_size(), and IOPosBuffer::size().
Referenced by pack().
{ if (nbuf == 0) { return 0; } // Handle case 1 separately to make the for-loop cleaner. int iopb_offset = m_iov.size(); // Because we re-use the buffer from ROOT, we are guarantee this iopb will // fit. assert(static_cast<IOSize>(len[0]) <= buffer_size); IOPosBuffer iopb(pos[0], buf, len[0]); m_idx_to_iopb.push_back(iopb_offset); m_idx_to_iopb_offset.push_back(0); IOSize buffer_used = len[0]; int idx; for (idx=1; idx < nbuf; idx++) { if (buffer_used + len[idx] > buffer_size) { // No way we can include this chunk in the read buffer break; } IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size()); assert(extra_bytes_signed >= 0); IOSize extra_bytes = static_cast<IOSize>(extra_bytes_signed); if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) && (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) { // The space between the two reads is small enough we can coalesce. // We enforce that the current read or the current iopb must be small. // This is so we can "perfectly pack" buffers consisting of only big // reads - in such a case, read coalescing doesn't help much. m_idx_to_iopb.push_back(iopb_offset); m_idx_to_iopb_offset.push_back(pos[idx]-iopb.offset()); iopb.set_size(pos[idx]+len[idx] - iopb.offset()); buffer_used += (len[idx] + extra_bytes); m_extra_bytes += extra_bytes; continue; } // There is a big jump, but still space left in the temporary buffer. // Record our current iopb: m_iov.push_back(iopb); // Reset iopb iopb.set_offset(pos[idx]); iopb.set_data(buf + buffer_used); iopb.set_size(len[idx]); // Record location of this chunk. iopb_offset ++; m_idx_to_iopb.push_back(iopb_offset); m_idx_to_iopb_offset.push_back(0); buffer_used += len[idx]; } m_iov.push_back(iopb); m_buffer_used += buffer_used; return idx; }
IOSize ReadRepacker::realBytesProcessed | ( | ) | const [inline] |
Definition at line 52 of file ReadRepacker.h.
References m_buffer_used, and m_extra_bytes.
Referenced by TStorageFactoryFile::ReadBuffersSync().
{return m_buffer_used-m_extra_bytes;} // Return the number of bytes of the input request that would be processed by the IO vector
void ReadRepacker::reset | ( | unsigned int | nbuf | ) | [private] |
Definition at line 149 of file ReadRepacker.cc.
References m_buffer_used, m_extra_bytes, m_idx_to_iopb, m_idx_to_iopb_offset, and m_iov.
Referenced by pack().
{ m_extra_bytes = 0; m_buffer_used = 0; // Number of buffers to storage typically decreases, but nbuf/2 is just an // somewhat-informed guess. m_iov.reserve(nbuf/2); m_iov.clear(); m_idx_to_iopb.reserve(nbuf); m_idx_to_iopb.clear(); m_idx_to_iopb_offset.reserve(nbuf); m_idx_to_iopb_offset.clear(); }
void ReadRepacker::unpack | ( | char * | buf | ) |
Unpack the optimized set of reads from the storage system and copy the results in the order ROOT requested.
Definition at line 129 of file ReadRepacker.cc.
References IOPosBuffer::data(), customizeTrackingMonitorSeedNumber::idx, m_idx_to_iopb, m_idx_to_iopb_offset, m_iov, and m_len.
Referenced by TStorageFactoryFile::ReadBuffersSync().
{ char * root_result_ptr = buf; int nbuf = m_idx_to_iopb.size(); for (int idx=0; idx < nbuf; idx++) { int iov_idx = m_idx_to_iopb[idx]; IOPosBuffer &iopb = m_iov[iov_idx]; int iopb_offset = m_idx_to_iopb_offset[idx]; char * io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset; // Note that we use the input buffer as a temporary where possible. // Hence, the source and destination can overlap; use memmove instead of memcpy. memmove(root_result_ptr, io_result_ptr, m_len[idx]); root_result_ptr += m_len[idx]; } }
const IOSize ReadRepacker::BIG_READ_SIZE = 256 * 1024 [static] |
Definition at line 62 of file ReadRepacker.h.
Referenced by packInternal().
IOSize ReadRepacker::m_buffer_used [private] |
Definition at line 75 of file ReadRepacker.h.
Referenced by bufferUsed(), packInternal(), realBytesProcessed(), and reset().
IOSize ReadRepacker::m_extra_bytes [private] |
Definition at line 76 of file ReadRepacker.h.
Referenced by extraBytes(), packInternal(), realBytesProcessed(), and reset().
std::vector<int> ReadRepacker::m_idx_to_iopb [private] |
Definition at line 71 of file ReadRepacker.h.
Referenced by packInternal(), reset(), and unpack().
std::vector<int> ReadRepacker::m_idx_to_iopb_offset [private] |
Definition at line 72 of file ReadRepacker.h.
Referenced by packInternal(), reset(), and unpack().
std::vector<IOPosBuffer> ReadRepacker::m_iov [private] |
Definition at line 73 of file ReadRepacker.h.
Referenced by iov(), packInternal(), reset(), and unpack().
int* ReadRepacker::m_len [private] |
Definition at line 74 of file ReadRepacker.h.
std::vector<char> ReadRepacker::m_spare_buffer [private] |
Definition at line 77 of file ReadRepacker.h.
Referenced by pack().
const IOSize ReadRepacker::READ_COALESCE_SIZE = 32 * 1024 [static] |
Definition at line 59 of file ReadRepacker.h.
Referenced by packInternal().
const IOSize ReadRepacker::TEMPORARY_BUFFER_SIZE = 256 * 1024 [static] |
Definition at line 56 of file ReadRepacker.h.
Referenced by pack().