CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_8_patch3/src/IOPool/TFileAdaptor/src/ReadRepacker.cc

Go to the documentation of this file.
00001 
00002 #include <assert.h>
00003 #include <string.h>
00004 
00005 #include "ReadRepacker.h"
00006 
00021 int
00022 ReadRepacker::pack(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
00023 {
00024   reset(nbuf);
00025   m_len = len; // Record the len array so we can later unpack.
00026 
00027   // Determine the buffer to use for the initial packing.
00028   char * tmp_buf;
00029   IOSize tmp_size;
00030   if (buffer_size < TEMPORARY_BUFFER_SIZE) {
00031         m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
00032         tmp_buf = &m_spare_buffer[0];
00033         tmp_size = TEMPORARY_BUFFER_SIZE;
00034   } else {
00035         tmp_buf = buf;
00036         tmp_size = buffer_size;
00037   } 
00038   
00039   int pack_count = packInternal(pos, len, nbuf, tmp_buf, tmp_size);
00040 
00041   if ((nbuf - pack_count > 0) &&  // If there is remaining work..
00042       (tmp_buf != &m_spare_buffer[0]) &&    // and the spare buffer isn't already used
00043       ((IOSize)len[pack_count] < TEMPORARY_BUFFER_SIZE)) { // And the spare buffer is big enough to hold at least one read.
00044 
00045     // Verify the spare is allocated.
00046     // If tmp_buf != &m_spare_buffer[0] before, it certainly won't after.
00047     m_spare_buffer.resize(TEMPORARY_BUFFER_SIZE);
00048 
00049     // If there are remaining chunks and we aren't already using the spare
00050     // buffer, try using that too.
00051     // This clutters up the code badly, but could save a network round-trip.
00052     pack_count += packInternal(&pos[pack_count], &len[pack_count], nbuf-pack_count,
00053                                &m_spare_buffer[0], TEMPORARY_BUFFER_SIZE);
00054 
00055   }
00056 
00057   return pack_count;
00058 }
00059 
00060 int
00061 ReadRepacker::packInternal(long long int *pos, int *len, int nbuf, char *buf, IOSize buffer_size)
00062 {
00063   if (nbuf == 0) {
00064     return 0;
00065   }
00066 
00067   // Handle case 1 separately to make the for-loop cleaner.
00068   int iopb_offset = m_iov.size();
00069   // Because we re-use the buffer from ROOT, we are guarantee this iopb will
00070   // fit.
00071   assert(static_cast<IOSize>(len[0]) <= buffer_size);
00072   IOPosBuffer iopb(pos[0], buf, len[0]);
00073   m_idx_to_iopb.push_back(iopb_offset);
00074   m_idx_to_iopb_offset.push_back(0);
00075 
00076   IOSize buffer_used = len[0];
00077   int idx;
00078   for (idx=1; idx < nbuf; idx++) {
00079     if (buffer_used + len[idx] > buffer_size) {
00080       // No way we can include this chunk in the read buffer
00081       break;
00082     }
00083 
00084     IOOffset extra_bytes_signed = (idx == 0) ? 0 : ((pos[idx] - iopb.offset()) - iopb.size()); assert(extra_bytes_signed >= 0);
00085     IOSize   extra_bytes = static_cast<IOSize>(extra_bytes_signed);
00086 
00087     if (((static_cast<IOSize>(len[idx]) < BIG_READ_SIZE) || (iopb.size() < BIG_READ_SIZE)) && 
00088         (extra_bytes < READ_COALESCE_SIZE) && (buffer_used + len[idx] + extra_bytes <= buffer_size)) {
00089       // The space between the two reads is small enough we can coalesce.
00090 
00091       // We enforce that the current read or the current iopb must be small.
00092       // This is so we can "perfectly pack" buffers consisting of only big
00093       // reads - in such a case, read coalescing doesn't help much.
00094       m_idx_to_iopb.push_back(iopb_offset);
00095       m_idx_to_iopb_offset.push_back(pos[idx]-iopb.offset());
00096       iopb.set_size(pos[idx]+len[idx] - iopb.offset());
00097       buffer_used += (len[idx] + extra_bytes);
00098       m_extra_bytes += extra_bytes;
00099       continue;
00100     }
00101     // There is a big jump, but still space left in the temporary buffer.
00102     // Record our current iopb:
00103     m_iov.push_back(iopb);
00104 
00105     // Reset iopb
00106     iopb.set_offset(pos[idx]);
00107     iopb.set_data(buf + buffer_used);
00108     iopb.set_size(len[idx]);
00109 
00110     // Record location of this chunk.
00111     iopb_offset ++;
00112 
00113     m_idx_to_iopb.push_back(iopb_offset);
00114     m_idx_to_iopb_offset.push_back(0);
00115 
00116     buffer_used += len[idx];
00117   }
00118   m_iov.push_back(iopb);
00119 
00120   m_buffer_used += buffer_used;
00121   return idx;
00122 }
00123 
00128 void
00129 ReadRepacker::unpack(char *buf)
00130 {
00131 
00132   char * root_result_ptr = buf;
00133   int nbuf = m_idx_to_iopb.size();
00134   for (int idx=0; idx < nbuf; idx++) {
00135     int iov_idx = m_idx_to_iopb[idx];
00136     IOPosBuffer &iopb = m_iov[iov_idx];
00137     int iopb_offset = m_idx_to_iopb_offset[idx];
00138     char * io_result_ptr = static_cast<char *>(iopb.data()) + iopb_offset;
00139     // Note that we use the input buffer as a temporary where possible.
00140     // Hence, the source and destination can overlap; use memmove instead of memcpy.
00141     memmove(root_result_ptr, io_result_ptr, m_len[idx]);
00142 
00143     root_result_ptr += m_len[idx];
00144   }
00145 
00146 }
00147 
00148 void
00149 ReadRepacker::reset(unsigned int nbuf)
00150 {
00151   m_extra_bytes = 0;
00152   m_buffer_used = 0;
00153 
00154   // Number of buffers to storage typically decreases, but nbuf/2 is just an
00155   // somewhat-informed guess.
00156   m_iov.reserve(nbuf/2);
00157   m_iov.clear();
00158   m_idx_to_iopb.reserve(nbuf);
00159   m_idx_to_iopb.clear();
00160   m_idx_to_iopb_offset.reserve(nbuf);
00161   m_idx_to_iopb_offset.clear();
00162 }