diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index f04a1d1..045e81e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -604,20 +604,20 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(100, 200)) # Start a consumer. FetchResponse_v3 should always include at least 1 - # full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time + # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time + # But 0.11.0.0 returns 1 MessageSet at a time when the messages are + # stored in the new v2 format by the broker. + # + # DP Note: This is a strange test. The consumer shouldn't care + # how many messages are included in a FetchResponse, as long as it is + # non-zero. I would not mind if we deleted this test. It caused + # a minor headache when testing 0.11.0.0. group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5) consumer = self.kafka_consumer( group_id=group, auto_offset_reset='earliest', + consumer_timeout_ms=5000, fetch_max_bytes=1) - fetched_msgs = [] - # A bit hacky, but we need this in order for message count to be exact - consumer._coordinator.ensure_active_group() - for i in range(10): - poll_res = consumer.poll(timeout_ms=2000) - print(poll_res) - for partition, msgs in six.iteritems(poll_res): - for msg in msgs: - fetched_msgs.append(msg) + fetched_msgs = [next(consumer) for i in range(10)] self.assertEqual(len(fetched_msgs), 10)