Rearrange connection tests to separate legacy KafkaConnection

This commit is contained in:
Dana Powers 2016-06-04 16:49:38 -07:00
parent 2afe09e7c1
commit 81860eeea1
3 changed files with 74 additions and 78 deletions

View File

@ -1,12 +1,10 @@
import socket
from time import sleep
from mock import ANY, MagicMock, patch
import six
from . import unittest
from kafka import SimpleClient
from kafka.conn import KafkaConnection
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
@ -15,7 +13,6 @@ from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
from test.testutil import Timer
NO_ERROR = 0
UNKNOWN_TOPIC_OR_PARTITION = 3
@ -91,7 +88,7 @@ class TestSimpleClient(unittest.TestCase):
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject KafkaConnection side effects
# inject BrokerConnection side effects
mock_conn(mocked_conns[('kafka01', 9092)], success=False)
mock_conn(mocked_conns[('kafka03', 9092)], success=False)
future = Future()
@ -389,19 +386,6 @@ class TestSimpleClient(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
def test_timeout(self):
def _timeout(*args, **kwargs):
timeout = args[1]
sleep(timeout)
raise socket.timeout
with patch.object(socket, "create_connection", side_effect=_timeout):
with Timer() as t:
with self.assertRaises(ConnectionError):
KafkaConnection("nowhere", 1234, 1.0)
self.assertGreaterEqual(t.interval, 1.0)
def test_correlation_rollover(self):
with patch.object(SimpleClient, 'load_metadata_for_topics'):
big_num = 2**31 - 3
@ -409,4 +393,3 @@ class TestSimpleClient(unittest.TestCase):
self.assertEqual(big_num + 1, client._next_id())
self.assertEqual(big_num + 2, client._next_id())
self.assertEqual(0, client._next_id())

View File

@ -7,7 +7,7 @@ import time
import pytest
from kafka.conn import BrokerConnection, ConnectionStates
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
from kafka.protocol.metadata import MetadataRequest
@ -29,14 +29,14 @@ def conn(_socket):
@pytest.mark.parametrize("states", [
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),),
(([EALREADY, EALREADY], ConnectionStates.CONNECTING),),
(([0], ConnectionStates.CONNECTED),),
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
([ECONNRESET], ConnectionStates.DISCONNECTED)),
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
([EALREADY], ConnectionStates.CONNECTING),
([EISCONN], ConnectionStates.CONNECTED)),
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),),
(([EALREADY, EALREADY], ConnectionStates.CONNECTING),),
(([0], ConnectionStates.CONNECTED),),
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
([ECONNRESET], ConnectionStates.DISCONNECTED)),
(([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
([EALREADY], ConnectionStates.CONNECTING),
([EISCONN], ConnectionStates.CONNECTED)),
])
def test_connect(_socket, conn, states):
assert conn.state is ConnectionStates.DISCONNECTED
@ -216,3 +216,51 @@ def test_recv(_socket, conn):
def test_close(conn):
pass # TODO
def test_collect_hosts__happy_path():
hosts = "127.0.0.1:1234,127.0.0.1"
results = collect_hosts(hosts)
assert set(results) == set([
('127.0.0.1', 1234, socket.AF_INET),
('127.0.0.1', 9092, socket.AF_INET),
])
def test_collect_hosts__ipv6():
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_INET6),
('2001:1000:2000::1', 9092, socket.AF_INET6),
('2001:1000:2000::1', 1234, socket.AF_INET6),
])
def test_collect_hosts__string_list():
hosts = [
'localhost:1234',
'localhost',
'[localhost]',
'2001::1',
'[2001::1]',
'[2001::1]:1234',
]
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 1234, socket.AF_INET6),
])
def test_collect_hosts__with_spaces():
hosts = "localhost:1234, localhost"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
])

View File

@ -1,12 +1,14 @@
import socket
import struct
from threading import Thread
import time
import mock
from . import unittest
from kafka.errors import ConnectionError
from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from test.testutil import Timer
class ConnTest(unittest.TestCase):
@ -47,56 +49,6 @@ class ConnTest(unittest.TestCase):
# Reset any mock counts caused by __init__
self.MockCreateConn.reset_mock()
def test_collect_hosts__happy_path(self):
hosts = "127.0.0.1:1234,127.0.0.1"
results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('127.0.0.1', 1234, socket.AF_INET),
('127.0.0.1', 9092, socket.AF_INET),
]))
def test_collect_hosts__ipv6(self):
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234, socket.AF_INET6),
('2001:1000:2000::1', 9092, socket.AF_INET6),
('2001:1000:2000::1', 1234, socket.AF_INET6),
]))
def test_collect_hosts__string_list(self):
hosts = [
'localhost:1234',
'localhost',
'[localhost]',
'2001::1',
'[2001::1]',
'[2001::1]:1234',
]
results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 1234, socket.AF_INET6),
]))
def test_collect_hosts__with_spaces(self):
hosts = "localhost:1234, localhost"
results = collect_hosts(hosts)
self.assertEqual(set(results), set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
]))
def test_send(self):
self.conn.send(self.config['request_id'], self.config['payload'])
self.conn._sock.sendall.assert_called_with(self.config['payload'])
@ -243,3 +195,16 @@ class TestKafkaConnection(unittest.TestCase):
self.assertEqual(err, [None])
self.assertEqual(socket.call_count, 2)
def test_timeout(self):
def _timeout(*args, **kwargs):
timeout = args[1]
time.sleep(timeout)
raise socket.timeout
with mock.patch.object(socket, "create_connection", side_effect=_timeout):
with Timer() as t:
with self.assertRaises(ConnectionError):
KafkaConnection("nowhere", 1234, 1.0)
self.assertGreaterEqual(t.interval, 1.0)