From fea10d9c169214af82303744069bdd6c66c4a2ef Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Mar 2017 11:01:58 -0700 Subject: [PATCH] LZ4 support in kafka 0.8/0.9 does not accept a ContentSize header --- kafka/codec.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index 29db48e..a527b42 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -229,13 +229,21 @@ def lz4_encode_old_kafka(payload): assert xxhash is not None data = lz4_encode(payload) header_size = 7 - if isinstance(data[4], int): - flg = data[4] - else: - flg = ord(data[4]) + flg = data[4] + if not isinstance(flg, int): + flg = ord(flg) + content_size_bit = ((flg >> 3) & 1) if content_size_bit: - header_size += 8 + # Old kafka does not accept the content-size field + # so we need to discard it and reset the header flag + flg -= 8 + data = bytearray(data) + data[4] = flg + data = bytes(data) + payload = data[header_size+8:] + else: + payload = data[header_size:] # This is the incorrect hc hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member @@ -243,7 +251,7 @@ def lz4_encode_old_kafka(payload): return b''.join([ data[0:header_size-1], hc, - data[header_size:] + payload ])