70 if (!m_open_for_write) {
71 if (!m_error_buf.size()) {m_error_buf =
"Logic error: writing to a buffer not opened for write";}
74 size_t bytes_accepted = 0;
76 if (offset < m_offset) {
77 if (!m_error_buf.size()) {m_error_buf =
"Logic error: writing to a prior offset";}
83 if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
84 retval = WriteImpl(offset, buf, size);
85 bytes_accepted = retval;
93 if (m_avail_count == m_buffers.size()) {
101 bool buffer_was_written;
102 size_t avail_count = 0;
106 buffer_was_written =
false;
107 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
108 entry_iter != m_buffers.end();
112 int retval2 = (*entry_iter)->Write(*
this, size == 0);
114 if (!m_error_buf.size()) {m_error_buf =
"Unknown filesystem write failure.";}
117 buffer_was_written |= retval2 > 0;
118 if ((*entry_iter)->Available()) {
119 if (!avail_entry) {avail_entry = *entry_iter;}
122 else if (bytes_accepted != size && size) {
123 size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted);
126 if (new_accept && new_accept != size - bytes_accepted) {
127 int retval3 = (*entry_iter)->Write(*
this,
false);
129 if (!m_error_buf.size()) {m_error_buf =
"Unknown filesystem write failure.";}
132 buffer_was_written =
true;
134 bytes_accepted += new_accept;
137 }
while ((avail_count != m_buffers.size()) && buffer_was_written);
138 m_avail_count = avail_count;
140 if (bytes_accepted != size && size) {
143 m_error_buf =
"No empty buffers available to place unordered data.";
146 if (avail_entry->Accept(offset + bytes_accepted, buf + bytes_accepted, size - bytes_accepted) != size - bytes_accepted) {
147 m_error_buf =
"Empty re-ordering buffer was unable to to accept data; internal logic error.";
154 if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
155 for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
156 entry_iter != m_buffers.end();
158 (*entry_iter)->ShrinkIfUnused();
187 m_log.
Emsg(
"Stream::DumpBuffers",
"Beginning dump of stream buffers.");
189 std::stringstream ss;
190 ss <<
"Stream offset: " << m_offset;
191 m_log.
Emsg(
"Stream::DumpBuffers", ss.str().c_str());
194 for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
195 entry_iter!= m_buffers.end();
197 std::stringstream ss;
198 ss <<
"Buffer " << idx <<
": Offset=" << (*entry_iter)->GetOffset() <<
", Size="
199 << (*entry_iter)->GetSize() <<
", Capacity=" << (*entry_iter)->GetCapacity();
200 m_log.
Emsg(
"Stream::DumpBuffers", ss.str().c_str());
203 m_log.
Emsg(
"Stream::DumpBuffers",
"Finish dump of stream buffers.");