Derive all api classes from Request / Response base classes (#1030)

This commit is contained in:
Dana Powers 2017-03-14 13:34:37 -07:00 committed by GitHub
parent a00f9ead16
commit 65ba8822b1
13 changed files with 146 additions and 88 deletions

View File

@ -257,18 +257,14 @@ class SimpleClient(object):
continue
request = encoder_fn(payloads=broker_payloads)
# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
expect_response = (decoder_fn is not None)
future = conn.send(request, expect_response=expect_response)
future = conn.send(request)
if future.failed():
refresh_metadata = True
failed_payloads(broker_payloads)
continue
if not expect_response:
if not request.expect_response():
for payload in broker_payloads:
topic_partition = (str(payload.topic), payload.partition)
responses[topic_partition] = None

View File

@ -464,12 +464,7 @@ class KafkaClient(object):
if not self._maybe_connect(node_id):
return Future().failure(Errors.NodeNotReadyError(node_id))
# Every request gets a response, except one special case:
expect_response = True
if isinstance(request, tuple(ProduceRequest)) and request.required_acks == 0:
expect_response = False
return self._conns[node_id].send(request, expect_response=expect_response)
return self._conns[node_id].send(request)
def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
"""Try to read and write to sockets.

View File

@ -525,7 +525,7 @@ class BrokerConnection(object):
ifr.future.failure(error)
self.config['state_change_callback'](self)
def send(self, request, expect_response=True):
def send(self, request):
"""send request, return Future()
Can block on network if request is larger than send_buffer_bytes
@ -537,9 +537,9 @@ class BrokerConnection(object):
return future.failure(Errors.ConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request, expect_response=expect_response)
return self._send(request)
def _send(self, request, expect_response=True):
def _send(self, request):
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
future = Future()
correlation_id = self._next_correlation_id()
@ -569,7 +569,7 @@ class BrokerConnection(object):
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
if expect_response:
if request.expect_response():
ifr = InFlightRequest(request=request,
correlation_id=correlation_id,
response_type=request.RESPONSE_TYPE,

View File

@ -1,10 +1,10 @@
from __future__ import absolute_import
from .struct import Struct
from .api import Request, Response
from .types import Array, Boolean, Bytes, Int16, Int32, Schema, String
class ApiVersionResponse_v0(Struct):
class ApiVersionResponse_v0(Response):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
@ -16,7 +16,7 @@ class ApiVersionResponse_v0(Struct):
)
class ApiVersionRequest_v0(Struct):
class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
@ -27,7 +27,7 @@ ApiVersionRequest = [ApiVersionRequest_v0]
ApiVersionResponse = [ApiVersionResponse_v0]
class CreateTopicsResponse_v0(Struct):
class CreateTopicsResponse_v0(Response):
API_KEY = 19
API_VERSION = 0
SCHEMA = Schema(
@ -37,7 +37,7 @@ class CreateTopicsResponse_v0(Struct):
)
class CreateTopicsResponse_v1(Struct):
class CreateTopicsResponse_v1(Response):
API_KEY = 19
API_VERSION = 1
SCHEMA = Schema(
@ -48,7 +48,7 @@ class CreateTopicsResponse_v1(Struct):
)
class CreateTopicsRequest_v0(Struct):
class CreateTopicsRequest_v0(Request):
API_KEY = 19
API_VERSION = 0
RESPONSE_TYPE = CreateTopicsResponse_v0
@ -67,7 +67,7 @@ class CreateTopicsRequest_v0(Struct):
)
class CreateTopicsRequest_v1(Struct):
class CreateTopicsRequest_v1(Request):
API_KEY = 19
API_VERSION = 1
RESPONSE_TYPE = CreateTopicsResponse_v1
@ -91,7 +91,7 @@ CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1]
CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1]
class DeleteTopicsResponse_v0(Struct):
class DeleteTopicsResponse_v0(Response):
API_KEY = 20
API_VERSION = 0
SCHEMA = Schema(
@ -101,7 +101,7 @@ class DeleteTopicsResponse_v0(Struct):
)
class DeleteTopicsRequest_v0(Struct):
class DeleteTopicsRequest_v0(Request):
API_KEY = 20
API_VERSION = 0
RESPONSE_TYPE = DeleteTopicsResponse_v0
@ -115,7 +115,7 @@ DeleteTopicsRequest = [DeleteTopicsRequest_v0]
DeleteTopicsResponse = [DeleteTopicsResponse_v0]
class ListGroupsResponse_v0(Struct):
class ListGroupsResponse_v0(Response):
API_KEY = 16
API_VERSION = 0
SCHEMA = Schema(
@ -126,7 +126,7 @@ class ListGroupsResponse_v0(Struct):
)
class ListGroupsRequest_v0(Struct):
class ListGroupsRequest_v0(Request):
API_KEY = 16
API_VERSION = 0
RESPONSE_TYPE = ListGroupsResponse_v0
@ -137,7 +137,7 @@ ListGroupsRequest = [ListGroupsRequest_v0]
ListGroupsResponse = [ListGroupsResponse_v0]
class DescribeGroupsResponse_v0(Struct):
class DescribeGroupsResponse_v0(Response):
API_KEY = 15
API_VERSION = 0
SCHEMA = Schema(
@ -156,7 +156,7 @@ class DescribeGroupsResponse_v0(Struct):
)
class DescribeGroupsRequest_v0(Struct):
class DescribeGroupsRequest_v0(Request):
API_KEY = 15
API_VERSION = 0
RESPONSE_TYPE = DescribeGroupsResponse_v0
@ -169,7 +169,7 @@ DescribeGroupsRequest = [DescribeGroupsRequest_v0]
DescribeGroupsResponse = [DescribeGroupsResponse_v0]
class SaslHandShakeResponse_v0(Struct):
class SaslHandShakeResponse_v0(Response):
API_KEY = 17
API_VERSION = 0
SCHEMA = Schema(
@ -178,7 +178,7 @@ class SaslHandShakeResponse_v0(Struct):
)
class SaslHandShakeRequest_v0(Struct):
class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
RESPONSE_TYPE = SaslHandShakeResponse_v0

View File

@ -1,5 +1,7 @@
from __future__ import absolute_import
import abc
from .struct import Struct
from .types import Int16, Int32, String, Schema
@ -16,3 +18,50 @@ class RequestHeader(Struct):
super(RequestHeader, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id
)
class Request(Struct):
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the request structure"""
pass
@abc.abstractproperty
def RESPONSE_TYPE(self):
"""The Response class associated with the api request"""
pass
def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True
class Response(Struct):
__metaclass__ = abc.ABCMeta
@abc.abstractproperty
def API_KEY(self):
"""Integer identifier for api request/response"""
pass
@abc.abstractproperty
def API_VERSION(self):
"""Integer of api request/response version"""
pass
@abc.abstractproperty
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass

View File

@ -1,10 +1,10 @@
from __future__ import absolute_import
from .struct import Struct
from .api import Request, Response
from .types import Array, Int16, Int32, Int64, Schema, String
class OffsetCommitResponse_v0(Struct):
class OffsetCommitResponse_v0(Response):
API_KEY = 8
API_VERSION = 0
SCHEMA = Schema(
@ -16,19 +16,19 @@ class OffsetCommitResponse_v0(Struct):
)
class OffsetCommitResponse_v1(Struct):
class OffsetCommitResponse_v1(Response):
API_KEY = 8
API_VERSION = 1
SCHEMA = OffsetCommitResponse_v0.SCHEMA
class OffsetCommitResponse_v2(Struct):
class OffsetCommitResponse_v2(Response):
API_KEY = 8
API_VERSION = 2
SCHEMA = OffsetCommitResponse_v1.SCHEMA
class OffsetCommitRequest_v0(Struct):
class OffsetCommitRequest_v0(Request):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
RESPONSE_TYPE = OffsetCommitResponse_v0
@ -43,7 +43,7 @@ class OffsetCommitRequest_v0(Struct):
)
class OffsetCommitRequest_v1(Struct):
class OffsetCommitRequest_v1(Request):
API_KEY = 8
API_VERSION = 1 # Kafka-backed storage
RESPONSE_TYPE = OffsetCommitResponse_v1
@ -61,7 +61,7 @@ class OffsetCommitRequest_v1(Struct):
)
class OffsetCommitRequest_v2(Struct):
class OffsetCommitRequest_v2(Request):
API_KEY = 8
API_VERSION = 2 # added retention_time, dropped timestamp
RESPONSE_TYPE = OffsetCommitResponse_v2
@ -87,7 +87,7 @@ OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1,
OffsetCommitResponse_v2]
class OffsetFetchResponse_v0(Struct):
class OffsetFetchResponse_v0(Response):
API_KEY = 9
API_VERSION = 0
SCHEMA = Schema(
@ -101,13 +101,13 @@ class OffsetFetchResponse_v0(Struct):
)
class OffsetFetchResponse_v1(Struct):
class OffsetFetchResponse_v1(Response):
API_KEY = 9
API_VERSION = 1
SCHEMA = OffsetFetchResponse_v0.SCHEMA
class OffsetFetchResponse_v2(Struct):
class OffsetFetchResponse_v2(Response):
# Added in KIP-88
API_KEY = 9
API_VERSION = 2
@ -123,7 +123,7 @@ class OffsetFetchResponse_v2(Struct):
)
class OffsetFetchRequest_v0(Struct):
class OffsetFetchRequest_v0(Request):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
RESPONSE_TYPE = OffsetFetchResponse_v0
@ -135,14 +135,14 @@ class OffsetFetchRequest_v0(Struct):
)
class OffsetFetchRequest_v1(Struct):
class OffsetFetchRequest_v1(Request):
API_KEY = 9
API_VERSION = 1 # kafka-backed storage
RESPONSE_TYPE = OffsetFetchResponse_v1
SCHEMA = OffsetFetchRequest_v0.SCHEMA
class OffsetFetchRequest_v2(Struct):
class OffsetFetchRequest_v2(Request):
# KIP-88: Allows passing null topics to return offsets for all partitions
# that the consumer group has a stored offset for, even if no consumer in
# the group is currently consuming that partition.
@ -158,7 +158,7 @@ OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1,
OffsetFetchResponse_v2]
class GroupCoordinatorResponse_v0(Struct):
class GroupCoordinatorResponse_v0(Response):
API_KEY = 10
API_VERSION = 0
SCHEMA = Schema(
@ -169,7 +169,7 @@ class GroupCoordinatorResponse_v0(Struct):
)
class GroupCoordinatorRequest_v0(Struct):
class GroupCoordinatorRequest_v0(Request):
API_KEY = 10
API_VERSION = 0
RESPONSE_TYPE = GroupCoordinatorResponse_v0

View File

@ -1,11 +1,11 @@
from __future__ import absolute_import
from .api import Request, Response
from .message import MessageSet
from .struct import Struct
from .types import Array, Int16, Int32, Int64, Schema, String
class FetchResponse_v0(Struct):
class FetchResponse_v0(Response):
API_KEY = 1
API_VERSION = 0
SCHEMA = Schema(
@ -19,7 +19,7 @@ class FetchResponse_v0(Struct):
)
class FetchResponse_v1(Struct):
class FetchResponse_v1(Response):
API_KEY = 1
API_VERSION = 1
SCHEMA = Schema(
@ -34,19 +34,19 @@ class FetchResponse_v1(Struct):
)
class FetchResponse_v2(Struct):
class FetchResponse_v2(Response):
API_KEY = 1
API_VERSION = 2
SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
class FetchResponse_v3(Struct):
class FetchResponse_v3(Response):
API_KEY = 1
API_VERSION = 3
SCHEMA = FetchResponse_v2.SCHEMA
class FetchRequest_v0(Struct):
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
RESPONSE_TYPE = FetchResponse_v0
@ -63,21 +63,21 @@ class FetchRequest_v0(Struct):
)
class FetchRequest_v1(Struct):
class FetchRequest_v1(Request):
API_KEY = 1
API_VERSION = 1
RESPONSE_TYPE = FetchResponse_v1
SCHEMA = FetchRequest_v0.SCHEMA
class FetchRequest_v2(Struct):
class FetchRequest_v2(Request):
API_KEY = 1
API_VERSION = 2
RESPONSE_TYPE = FetchResponse_v2
SCHEMA = FetchRequest_v1.SCHEMA
class FetchRequest_v3(Struct):
class FetchRequest_v3(Request):
API_KEY = 1
API_VERSION = 3
RESPONSE_TYPE = FetchResponse_v3

View File

@ -1,10 +1,11 @@
from __future__ import absolute_import
from .api import Request, Response
from .struct import Struct
from .types import Array, Bytes, Int16, Int32, Schema, String
class JoinGroupResponse_v0(Struct):
class JoinGroupResponse_v0(Response):
API_KEY = 11
API_VERSION = 0
SCHEMA = Schema(
@ -19,13 +20,13 @@ class JoinGroupResponse_v0(Struct):
)
class JoinGroupResponse_v1(Struct):
class JoinGroupResponse_v1(Response):
API_KEY = 11
API_VERSION = 1
SCHEMA = JoinGroupResponse_v0.SCHEMA
class JoinGroupRequest_v0(Struct):
class JoinGroupRequest_v0(Request):
API_KEY = 11
API_VERSION = 0
RESPONSE_TYPE = JoinGroupResponse_v0
@ -41,7 +42,7 @@ class JoinGroupRequest_v0(Struct):
UNKNOWN_MEMBER_ID = ''
class JoinGroupRequest_v1(Struct):
class JoinGroupRequest_v1(Request):
API_KEY = 11
API_VERSION = 1
RESPONSE_TYPE = JoinGroupResponse_v1
@ -70,7 +71,7 @@ class ProtocolMetadata(Struct):
)
class SyncGroupResponse_v0(Struct):
class SyncGroupResponse_v0(Response):
API_KEY = 14
API_VERSION = 0
SCHEMA = Schema(
@ -79,7 +80,7 @@ class SyncGroupResponse_v0(Struct):
)
class SyncGroupRequest_v0(Struct):
class SyncGroupRequest_v0(Request):
API_KEY = 14
API_VERSION = 0
RESPONSE_TYPE = SyncGroupResponse_v0
@ -107,7 +108,7 @@ class MemberAssignment(Struct):
)
class HeartbeatResponse_v0(Struct):
class HeartbeatResponse_v0(Response):
API_KEY = 12
API_VERSION = 0
SCHEMA = Schema(
@ -115,7 +116,7 @@ class HeartbeatResponse_v0(Struct):
)
class HeartbeatRequest_v0(Struct):
class HeartbeatRequest_v0(Request):
API_KEY = 12
API_VERSION = 0
RESPONSE_TYPE = HeartbeatResponse_v0
@ -130,7 +131,7 @@ HeartbeatRequest = [HeartbeatRequest_v0]
HeartbeatResponse = [HeartbeatResponse_v0]
class LeaveGroupResponse_v0(Struct):
class LeaveGroupResponse_v0(Response):
API_KEY = 13
API_VERSION = 0
SCHEMA = Schema(
@ -138,7 +139,7 @@ class LeaveGroupResponse_v0(Struct):
)
class LeaveGroupRequest_v0(Struct):
class LeaveGroupRequest_v0(Request):
API_KEY = 13
API_VERSION = 0
RESPONSE_TYPE = LeaveGroupResponse_v0

View File

@ -1,10 +1,10 @@
from __future__ import absolute_import
from .struct import Struct
from .api import Request, Response
from .types import Array, Boolean, Int16, Int32, Schema, String
class MetadataResponse_v0(Struct):
class MetadataResponse_v0(Response):
API_KEY = 3
API_VERSION = 0
SCHEMA = Schema(
@ -24,7 +24,7 @@ class MetadataResponse_v0(Struct):
)
class MetadataResponse_v1(Struct):
class MetadataResponse_v1(Response):
API_KEY = 3
API_VERSION = 1
SCHEMA = Schema(
@ -47,7 +47,7 @@ class MetadataResponse_v1(Struct):
)
class MetadataResponse_v2(Struct):
class MetadataResponse_v2(Response):
API_KEY = 3
API_VERSION = 2
SCHEMA = Schema(
@ -71,7 +71,7 @@ class MetadataResponse_v2(Struct):
)
class MetadataRequest_v0(Struct):
class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
RESPONSE_TYPE = MetadataResponse_v0
@ -81,7 +81,7 @@ class MetadataRequest_v0(Struct):
ALL_TOPICS = None # Empty Array (len 0) for topics returns all topics
class MetadataRequest_v1(Struct):
class MetadataRequest_v1(Request):
API_KEY = 3
API_VERSION = 1
RESPONSE_TYPE = MetadataResponse_v1
@ -90,7 +90,7 @@ class MetadataRequest_v1(Struct):
NO_TOPICS = None # Empty array (len 0) for topics returns no topics
class MetadataRequest_v2(Struct):
class MetadataRequest_v2(Request):
API_KEY = 3
API_VERSION = 2
RESPONSE_TYPE = MetadataResponse_v2

View File

@ -1,6 +1,6 @@
from __future__ import absolute_import
from .struct import Struct
from .api import Request, Response
from .types import Array, Int16, Int32, Int64, Schema, String
@ -10,7 +10,7 @@ class OffsetResetStrategy(object):
NONE = 0
class OffsetResponse_v0(Struct):
class OffsetResponse_v0(Response):
API_KEY = 2
API_VERSION = 0
SCHEMA = Schema(
@ -22,7 +22,7 @@ class OffsetResponse_v0(Struct):
('offsets', Array(Int64))))))
)
class OffsetResponse_v1(Struct):
class OffsetResponse_v1(Response):
API_KEY = 2
API_VERSION = 1
SCHEMA = Schema(
@ -36,7 +36,7 @@ class OffsetResponse_v1(Struct):
)
class OffsetRequest_v0(Struct):
class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
RESPONSE_TYPE = OffsetResponse_v0
@ -53,7 +53,7 @@ class OffsetRequest_v0(Struct):
'replica_id': -1
}
class OffsetRequest_v1(Struct):
class OffsetRequest_v1(Request):
API_KEY = 2
API_VERSION = 1
RESPONSE_TYPE = OffsetResponse_v1

View File

@ -1,11 +1,11 @@
from __future__ import absolute_import
from .api import Request, Response
from .message import MessageSet
from .struct import Struct
from .types import Int16, Int32, Int64, String, Array, Schema
class ProduceResponse_v0(Struct):
class ProduceResponse_v0(Response):
API_KEY = 0
API_VERSION = 0
SCHEMA = Schema(
@ -18,7 +18,7 @@ class ProduceResponse_v0(Struct):
)
class ProduceResponse_v1(Struct):
class ProduceResponse_v1(Response):
API_KEY = 0
API_VERSION = 1
SCHEMA = Schema(
@ -32,7 +32,7 @@ class ProduceResponse_v1(Struct):
)
class ProduceResponse_v2(Struct):
class ProduceResponse_v2(Response):
API_KEY = 0
API_VERSION = 2
SCHEMA = Schema(
@ -47,7 +47,7 @@ class ProduceResponse_v2(Struct):
)
class ProduceRequest_v0(Struct):
class ProduceRequest_v0(Request):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
@ -61,20 +61,35 @@ class ProduceRequest_v0(Struct):
('messages', MessageSet)))))
)
def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True
class ProduceRequest_v1(Struct):
class ProduceRequest_v1(Request):
API_KEY = 0
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA
def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True
class ProduceRequest_v2(Struct):
class ProduceRequest_v2(Request):
API_KEY = 0
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA
def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True
ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]

View File

@ -236,13 +236,14 @@ def test_send(cli, conn):
cli._maybe_connect(0)
# ProduceRequest w/ 0 required_acks -> no response
request = ProduceRequest[0](0, 0, [])
assert request.expect_response() is False
ret = cli.send(0, request)
assert conn.send.called_with(request, expect_response=False)
assert conn.send.called_with(request)
assert isinstance(ret, Future)
request = MetadataRequest[0]([])
cli.send(0, request)
assert conn.send.called_with(request, expect_response=True)
assert conn.send.called_with(request)
def test_poll(mocker):

View File

@ -11,6 +11,7 @@ import pytest
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest
import kafka.common as Errors
@ -112,7 +113,7 @@ def test_send_max_ifr(conn):
def test_send_no_response(_socket, conn):
conn.connect()
assert conn.state is ConnectionStates.CONNECTED
req = MetadataRequest[0]([])
req = ProduceRequest[0](required_acks=0, timeout=0, topics=[])
header = RequestHeader(req, client_id=conn.config['client_id'])
payload_bytes = len(header.encode()) + len(req.encode())
third = payload_bytes // 3
@ -120,7 +121,7 @@ def test_send_no_response(_socket, conn):
_socket.send.side_effect = [4, third, third, third, remainder]
assert len(conn.in_flight_requests) == 0
f = conn.send(req, expect_response=False)
f = conn.send(req)
assert f.succeeded() is True
assert f.value is None
assert len(conn.in_flight_requests) == 0