Merge pull request #196 from methane/fallback-bytearray-buffer

fallback: Rewrite buffer from array of bytes to bytes
This commit is contained in:
INADA Naoki 2016-05-22 11:06:02 +09:00
commit ae8e98e669
1 changed files with 69 additions and 100 deletions

View File

@ -86,11 +86,8 @@ def unpack(stream, **kwargs):
Raises `ExtraData` when `packed` contains extra bytes.
See :class:`Unpacker` for options.
"""
unpacker = Unpacker(stream, **kwargs)
ret = unpacker._fb_unpack()
if unpacker._fb_got_extradata():
raise ExtraData(ret, unpacker._fb_get_extradata())
return ret
data = stream.read()
return unpackb(data, **kwargs)
def unpackb(packed, **kwargs):
@ -121,7 +118,7 @@ class Unpacker(object):
If specified, unpacker reads serialized data from it and :meth:`feed()` is not usable.
:param int read_size:
Used as `file_like.read(read_size)`. (default: `min(1024**2, max_buffer_size)`)
Used as `file_like.read(read_size)`. (default: `min(16*1024, max_buffer_size)`)
:param bool use_list:
If true, unpack msgpack array to Python list.
@ -199,13 +196,9 @@ class Unpacker(object):
self._fb_feeding = False
#: array of bytes feeded.
self._fb_buffers = []
#: Which buffer we currently reads
self._fb_buf_i = 0
self._buffer = b""
#: Which position we currently reads
self._fb_buf_o = 0
#: Total size of _fb_bufferes
self._fb_buf_n = 0
self._buff_i = 0
# When Unpacker is used as an iterable, between the calls to next(),
# the buffer is not "consumed" completely, for efficiency sake.
@ -213,13 +206,13 @@ class Unpacker(object):
# the correct moments, we have to keep track of how sloppy we were.
# Furthermore, when the buffer is incomplete (that is: in the case
# we raise an OutOfData) we need to rollback the buffer to the correct
# state, which _fb_slopiness records.
self._fb_sloppiness = 0
# state, which _buf_checkpoint records.
self._buf_checkpoint = 0
self._max_buffer_size = max_buffer_size or 2**31-1
if read_size > self._max_buffer_size:
raise ValueError("read_size must be smaller than max_buffer_size")
self._read_size = read_size or min(self._max_buffer_size, 4096)
self._read_size = read_size or min(self._max_buffer_size, 16*1024)
self._encoding = encoding
self._unicode_errors = unicode_errors
self._use_list = use_list
@ -248,103 +241,75 @@ class Unpacker(object):
def feed(self, next_bytes):
if isinstance(next_bytes, array.array):
next_bytes = next_bytes.tostring()
elif isinstance(next_bytes, bytearray):
next_bytes = bytes(next_bytes)
if not isinstance(next_bytes, (bytes, bytearray)):
raise TypeError("next_bytes should be bytes, bytearray or array.array")
assert self._fb_feeding
if (self._fb_buf_n + len(next_bytes) - self._fb_sloppiness
> self._max_buffer_size):
raise BufferFull
self._fb_buf_n += len(next_bytes)
self._fb_buffers.append(next_bytes)
def _fb_sloppy_consume(self):
""" Gets rid of some of the used parts of the buffer. """
if self._fb_buf_i:
for i in xrange(self._fb_buf_i):
self._fb_buf_n -= len(self._fb_buffers[i])
self._fb_buffers = self._fb_buffers[self._fb_buf_i:]
self._fb_buf_i = 0
if self._fb_buffers:
self._fb_sloppiness = self._fb_buf_o
else:
self._fb_sloppiness = 0
if (len(self._buffer) - self._buff_i + len(next_bytes) > self._max_buffer_size):
raise BufferFull
# bytes + bytearray -> bytearray
# So cast before append
self._buffer += bytes(next_bytes)
def _fb_consume(self):
""" Gets rid of the used parts of the buffer. """
if self._fb_buf_i:
for i in xrange(self._fb_buf_i):
self._fb_buf_n -= len(self._fb_buffers[i])
self._fb_buffers = self._fb_buffers[self._fb_buf_i:]
self._fb_buf_i = 0
if self._fb_buffers:
self._fb_buffers[0] = self._fb_buffers[0][self._fb_buf_o:]
self._fb_buf_n -= self._fb_buf_o
else:
self._fb_buf_n = 0
self._fb_buf_o = 0
self._fb_sloppiness = 0
self._buf_checkpoint = self._buff_i
def _fb_got_extradata(self):
if self._fb_buf_i != len(self._fb_buffers):
return True
if self._fb_feeding:
return False
if not self.file_like:
return False
if self.file_like.read(1):
return True
return False
return self._buff_i < len(self._buffer)
def __iter__(self):
return self
def _fb_get_extradata(self):
return self._buffer[self._buff_i:]
def read_bytes(self, n):
return self._fb_read(n)
def _fb_rollback(self):
self._fb_buf_i = 0
self._fb_buf_o = self._fb_sloppiness
def _fb_get_extradata(self):
bufs = self._fb_buffers[self._fb_buf_i:]
if bufs:
bufs[0] = bufs[0][self._fb_buf_o:]
return b''.join(bufs)
def _fb_read(self, n, write_bytes=None):
buffs = self._fb_buffers
# We have a redundant codepath for the most common case, such that
# pypy optimizes it properly. This is the case that the read fits
# in the current buffer.
if (write_bytes is None and self._fb_buf_i < len(buffs) and
self._fb_buf_o + n < len(buffs[self._fb_buf_i])):
self._fb_buf_o += n
return buffs[self._fb_buf_i][self._fb_buf_o - n:self._fb_buf_o]
# (int, Optional[Callable]) -> bytearray
remain_bytes = len(self._buffer) - self._buff_i - n
# The remaining cases.
ret = b''
while len(ret) != n:
sliced = n - len(ret)
if self._fb_buf_i == len(buffs):
if self._fb_feeding:
break
to_read = sliced
if self._read_size > to_read:
to_read = self._read_size
tmp = self.file_like.read(to_read)
if not tmp:
break
buffs.append(tmp)
self._fb_buf_n += len(tmp)
continue
ret += buffs[self._fb_buf_i][self._fb_buf_o:self._fb_buf_o + sliced]
self._fb_buf_o += sliced
if self._fb_buf_o >= len(buffs[self._fb_buf_i]):
self._fb_buf_o = 0
self._fb_buf_i += 1
if len(ret) != n:
self._fb_rollback()
# Fast path: buffer has n bytes already
if remain_bytes >= 0:
ret = self._buffer[self._buff_i:self._buff_i+n]
self._buff_i += n
if write_bytes is not None:
write_bytes(ret)
return ret
if self._fb_feeding:
self._buff_i = self._buf_checkpoint
raise OutOfData
# Strip buffer before checkpoint before reading file.
if self._buf_checkpoint > 0:
self._buffer = self._buffer[self._buf_checkpoint:]
self._buff_i -= self._buf_checkpoint
self._buf_checkpoint = 0
# Read from file
remain_bytes = -remain_bytes
while remain_bytes > 0:
to_read_bytes = max(self._read_size, remain_bytes)
read_data = self.file_like.read(to_read_bytes)
if not read_data:
break
assert isinstance(read_data, bytes)
self._buffer += read_data
remain_bytes -= len(read_data)
if len(self._buffer) < n + self._buff_i:
self._buff_i = 0 # rollback
raise OutOfData
if len(self._buffer) == n:
# checkpoint == 0
ret = self._buffer
self._buffer = b""
self._buff_i = 0
else:
ret = self._buffer[self._buff_i:self._buff_i+n]
self._buff_i += n
if write_bytes is not None:
write_bytes(ret)
return ret
@ -562,15 +527,19 @@ class Unpacker(object):
assert typ == TYPE_IMMEDIATE
return obj
def next(self):
def __iter__(self):
return self
def __next__(self):
try:
ret = self._fb_unpack(EX_CONSTRUCT, None)
self._fb_sloppy_consume()
self._fb_consume()
return ret
except OutOfData:
self._fb_consume()
raise StopIteration
__next__ = next
next = __next__
def skip(self, write_bytes=None):
self._fb_unpack(EX_SKIP, write_bytes)