deb-python-kafka/test/test_sender.py

54 lines
1.4 KiB
Python

# pylint: skip-file
from __future__ import absolute_import
import io
import pytest
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics import Metrics
from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
from kafka.producer.sender import Sender
from kafka.structs import TopicPartition, OffsetAndMetadata
@pytest.fixture
def client(mocker):
_cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[], api_version=(0, 9)))
_cli.cluster = mocker.Mock(spec=ClusterMetadata())
return _cli
@pytest.fixture
def accumulator():
return RecordAccumulator()
@pytest.fixture
def metrics():
return Metrics()
@pytest.fixture
def sender(client, accumulator, metrics):
return Sender(client, client.cluster, accumulator, metrics)
@pytest.mark.parametrize(("api_version", "produce_version"), [
((0, 10), 2),
((0, 9), 1),
((0, 8), 0)
])
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
records = MessageSetBuffer(io.BytesIO(), 100000)
batch = RecordBatch(tp, records)
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])