diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index c5142b3..09746bf 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -16,6 +16,19 @@ class ApiVersionResponse_v0(Response): ) +class ApiVersionResponse_v1(Response): + API_KEY = 18 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', Array( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16))), + ('throttle_time_ms', Int32) + ) + + class ApiVersionRequest_v0(Request): API_KEY = 18 API_VERSION = 0 @@ -23,8 +36,15 @@ class ApiVersionRequest_v0(Request): SCHEMA = Schema() -ApiVersionRequest = [ApiVersionRequest_v0] -ApiVersionResponse = [ApiVersionResponse_v0] +class ApiVersionRequest_v1(Request): + API_KEY = 18 + API_VERSION = 1 + RESPONSE_TYPE = ApiVersionResponse_v1 + SCHEMA = ApiVersionRequest_v0.SCHEMA + + +ApiVersionRequest = [ApiVersionRequest_v0, ApiVersionRequest_v1] +ApiVersionResponse = [ApiVersionResponse_v0, ApiVersionResponse_v1] class CreateTopicsResponse_v0(Response): @@ -48,6 +68,18 @@ class CreateTopicsResponse_v1(Response): ) +class CreateTopicsResponse_v2(Response): + API_KEY = 19 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topic_error_codes', Array( + ('topic', String('utf-8')), + ('error_code', Int16), + ('error_message', String('utf-8')))) + ) + + class CreateTopicsRequest_v0(Request): API_KEY = 19 API_VERSION = 0 @@ -87,8 +119,19 @@ class CreateTopicsRequest_v1(Request): ) -CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1] -CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1] +class CreateTopicsRequest_v2(Request): + API_KEY = 19 + API_VERSION = 2 + RESPONSE_TYPE = CreateTopicsResponse_v2 + SCHEMA = CreateTopicsRequest_v1.SCHEMA + + +CreateTopicsRequest = [ + CreateTopicsRequest_v0, CreateTopicsRequest_v1, CreateTopicsRequest_v2 +] +CreateTopicsResponse = [ + CreateTopicsResponse_v0, CreateTopicsResponse_v1, CreateTopicsResponse_v2 +] class DeleteTopicsResponse_v0(Response): @@ -101,6 +144,17 @@ class DeleteTopicsResponse_v0(Response): ) +class DeleteTopicsResponse_v1(Response): + API_KEY = 20 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topic_error_codes', Array( + ('topic', String('utf-8')), + ('error_code', Int16))) + ) + + class DeleteTopicsRequest_v0(Request): API_KEY = 20 API_VERSION = 0 @@ -111,8 +165,15 @@ class DeleteTopicsRequest_v0(Request): ) -DeleteTopicsRequest = [DeleteTopicsRequest_v0] -DeleteTopicsResponse = [DeleteTopicsResponse_v0] +class DeleteTopicsRequest_v1(Request): + API_KEY = 20 + API_VERSION = 1 + RESPONSE_TYPE = DeleteTopicsResponse_v1 + SCHEMA = DeleteTopicsRequest_v0.SCHEMA + + +DeleteTopicsRequest = [DeleteTopicsRequest_v0, DeleteTopicsRequest_v1] +DeleteTopicsResponse = [DeleteTopicsResponse_v0, DeleteTopicsResponse_v1] class ListGroupsResponse_v0(Response): @@ -126,6 +187,18 @@ class ListGroupsResponse_v0(Response): ) +class ListGroupsResponse_v1(Response): + API_KEY = 16 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('groups', Array( + ('group', String('utf-8')), + ('protocol_type', String('utf-8')))) + ) + + class ListGroupsRequest_v0(Request): API_KEY = 16 API_VERSION = 0 @@ -133,8 +206,15 @@ class ListGroupsRequest_v0(Request): SCHEMA = Schema() -ListGroupsRequest = [ListGroupsRequest_v0] -ListGroupsResponse = [ListGroupsResponse_v0] +class ListGroupsRequest_v1(Request): + API_KEY = 16 + API_VERSION = 1 + RESPONSE_TYPE = ListGroupsResponse_v1 + SCHEMA = ListGroupsRequest_v0.SCHEMA + + +ListGroupsRequest = [ListGroupsRequest_v0, ListGroupsRequest_v1] +ListGroupsResponse = [ListGroupsResponse_v0, ListGroupsResponse_v1] class DescribeGroupsResponse_v0(Response): @@ -156,6 +236,27 @@ class DescribeGroupsResponse_v0(Response): ) +class DescribeGroupsResponse_v1(Response): + API_KEY = 15 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('groups', Array( + ('error_code', Int16), + ('group', String('utf-8')), + ('state', String('utf-8')), + ('protocol_type', String('utf-8')), + ('protocol', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('client_id', String('utf-8')), + ('client_host', String('utf-8')), + ('member_metadata', Bytes), + ('member_assignment', Bytes))))) + ) + + + class DescribeGroupsRequest_v0(Request): API_KEY = 15 API_VERSION = 0 @@ -165,8 +266,15 @@ class DescribeGroupsRequest_v0(Request): ) -DescribeGroupsRequest = [DescribeGroupsRequest_v0] -DescribeGroupsResponse = [DescribeGroupsResponse_v0] +class DescribeGroupsRequest_v1(Request): + API_KEY = 15 + API_VERSION = 1 + RESPONSE_TYPE = DescribeGroupsResponse_v1 + SCHEMA = DescribeGroupsRequest_v0.SCHEMA + + +DescribeGroupsRequest = [DescribeGroupsRequest_v0, DescribeGroupsRequest_v1] +DescribeGroupsResponse = [DescribeGroupsResponse_v0, DescribeGroupsResponse_v1] class SaslHandShakeResponse_v0(Response): diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index bcffe67..9d744c7 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class OffsetCommitResponse_v0(Response): @@ -28,6 +28,19 @@ class OffsetCommitResponse_v2(Response): SCHEMA = OffsetCommitResponse_v1.SCHEMA +class OffsetCommitResponse_v3(Response): + API_KEY = 8 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + class OffsetCommitRequest_v0(Request): API_KEY = 8 API_VERSION = 0 # Zookeeper-backed storage @@ -81,10 +94,21 @@ class OffsetCommitRequest_v2(Request): DEFAULT_RETENTION_TIME = -1 -OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1, - OffsetCommitRequest_v2] -OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1, - OffsetCommitResponse_v2] +class OffsetCommitRequest_v3(Request): + API_KEY = 8 + API_VERSION = 3 + RESPONSE_TYPE = OffsetCommitResponse_v3 + SCHEMA = OffsetCommitRequest_v2.SCHEMA + + +OffsetCommitRequest = [ + OffsetCommitRequest_v0, OffsetCommitRequest_v1, + OffsetCommitRequest_v2, OffsetCommitRequest_v3 +] +OffsetCommitResponse = [ + OffsetCommitResponse_v0, OffsetCommitResponse_v1, + OffsetCommitResponse_v2, OffsetCommitResponse_v3 +] class OffsetFetchResponse_v0(Response): @@ -123,6 +147,22 @@ class OffsetFetchResponse_v2(Response): ) +class OffsetFetchResponse_v3(Response): + API_KEY = 9 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))), + ('error_code', Int16) + ) + + class OffsetFetchRequest_v0(Request): API_KEY = 9 API_VERSION = 0 # zookeeper-backed storage @@ -152,10 +192,21 @@ class OffsetFetchRequest_v2(Request): SCHEMA = OffsetFetchRequest_v1.SCHEMA -OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1, - OffsetFetchRequest_v2] -OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1, - OffsetFetchResponse_v2] +class OffsetFetchRequest_v3(Request): + API_KEY = 9 + API_VERSION = 3 + RESPONSE_TYPE = OffsetFetchResponse_v3 + SCHEMA = OffsetFetchRequest_v2.SCHEMA + + +OffsetFetchRequest = [ + OffsetFetchRequest_v0, OffsetFetchRequest_v1, + OffsetFetchRequest_v2, OffsetFetchRequest_v3, +] +OffsetFetchResponse = [ + OffsetFetchResponse_v0, OffsetFetchResponse_v1, + OffsetFetchResponse_v2, OffsetFetchResponse_v3, +] class GroupCoordinatorResponse_v0(Response): @@ -169,6 +220,18 @@ class GroupCoordinatorResponse_v0(Response): ) +class GroupCoordinatorResponse_v1(Response): + API_KEY = 10 + API_VERSION = 1 + SCHEMA = Schema( + ('error_code', Int16), + ('error_message', String('utf-8')), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) + ) + + class GroupCoordinatorRequest_v0(Request): API_KEY = 10 API_VERSION = 0 @@ -178,5 +241,15 @@ class GroupCoordinatorRequest_v0(Request): ) -GroupCoordinatorRequest = [GroupCoordinatorRequest_v0] -GroupCoordinatorResponse = [GroupCoordinatorResponse_v0] +class GroupCoordinatorRequest_v1(Request): + API_KEY = 10 + API_VERSION = 1 + RESPONSE_TYPE = GroupCoordinatorResponse_v1 + SCHEMA = Schema( + ('coordinator_key', String('utf-8')), + ('coordinator_type', Int8) + ) + + +GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1] +GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1] diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index b441e63..359f197 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from .api import Request, Response from .message import MessageSet -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class FetchResponse_v0(Response): @@ -46,6 +46,45 @@ class FetchResponse_v3(Response): SCHEMA = FetchResponse_v2.SCHEMA +class FetchResponse_v4(Response): + API_KEY = 1 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', MessageSet))))) + ) + + +class FetchResponse_v5(Response): + API_KEY = 1 + API_VERSION = 5 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', MessageSet))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -95,7 +134,52 @@ class FetchRequest_v3(Request): ) -FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, - FetchRequest_v3] -FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, - FetchResponse_v3] +class FetchRequest_v4(Request): + # Adds isolation_level field + API_KEY = 1 + API_VERSION = 4 + RESPONSE_TYPE = FetchResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('max_bytes', Int32))))) + ) + + +class FetchRequest_v5(Request): + # This may only be used in broker-broker api calls + API_KEY = 1 + API_VERSION = 5 + RESPONSE_TYPE = FetchResponse_v5 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))) + ) + + +FetchRequest = [ + FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, + FetchRequest_v3, FetchRequest_v4, FetchRequest_v5 +] +FetchResponse = [ + FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, + FetchResponse_v3, FetchResponse_v4, FetchResponse_v5 +] diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 5cab754..ce75a5f 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -26,6 +26,22 @@ class JoinGroupResponse_v1(Response): SCHEMA = JoinGroupResponse_v0.SCHEMA +class JoinGroupResponse_v2(Response): + API_KEY = 11 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + class JoinGroupRequest_v0(Request): API_KEY = 11 API_VERSION = 0 @@ -59,8 +75,20 @@ class JoinGroupRequest_v1(Request): UNKNOWN_MEMBER_ID = '' -JoinGroupRequest = [JoinGroupRequest_v0, JoinGroupRequest_v1] -JoinGroupResponse = [JoinGroupResponse_v0, JoinGroupResponse_v1] +class JoinGroupRequest_v2(Request): + API_KEY = 11 + API_VERSION = 2 + RESPONSE_TYPE = JoinGroupResponse_v2 + SCHEMA = JoinGroupRequest_v1.SCHEMA + UNKNOWN_MEMBER_ID = '' + + +JoinGroupRequest = [ + JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2 +] +JoinGroupResponse = [ + JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1 +] class ProtocolMetadata(Struct): @@ -80,6 +108,16 @@ class SyncGroupResponse_v0(Response): ) +class SyncGroupResponse_v1(Response): + API_KEY = 14 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('member_assignment', Bytes) + ) + + class SyncGroupRequest_v0(Request): API_KEY = 14 API_VERSION = 0 @@ -94,8 +132,15 @@ class SyncGroupRequest_v0(Request): ) -SyncGroupRequest = [SyncGroupRequest_v0] -SyncGroupResponse = [SyncGroupResponse_v0] +class SyncGroupRequest_v1(Request): + API_KEY = 14 + API_VERSION = 1 + RESPONSE_TYPE = SyncGroupResponse_v1 + SCHEMA = SyncGroupRequest_v0.SCHEMA + + +SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1] +SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1] class MemberAssignment(Struct): @@ -116,6 +161,15 @@ class HeartbeatResponse_v0(Response): ) +class HeartbeatResponse_v1(Response): + API_KEY = 12 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + class HeartbeatRequest_v0(Request): API_KEY = 12 API_VERSION = 0 @@ -127,8 +181,15 @@ class HeartbeatRequest_v0(Request): ) -HeartbeatRequest = [HeartbeatRequest_v0] -HeartbeatResponse = [HeartbeatResponse_v0] +class HeartbeatRequest_v1(Request): + API_KEY = 12 + API_VERSION = 1 + RESPONSE_TYPE = HeartbeatResponse_v1 + SCHEMA = HeartbeatRequest_v0 + + +HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1] +HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1] class LeaveGroupResponse_v0(Response): @@ -139,6 +200,15 @@ class LeaveGroupResponse_v0(Response): ) +class LeaveGroupResponse_v1(Response): + API_KEY = 13 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16) + ) + + class LeaveGroupRequest_v0(Request): API_KEY = 13 API_VERSION = 0 @@ -149,5 +219,12 @@ class LeaveGroupRequest_v0(Request): ) -LeaveGroupRequest = [LeaveGroupRequest_v0] -LeaveGroupResponse = [LeaveGroupResponse_v0] +class LeaveGroupRequest_v1(Request): + API_KEY = 13 + API_VERSION = 1 + RESPONSE_TYPE = LeaveGroupResponse_v1 + SCHEMA = LeaveGroupRequest_v0.SCHEMA + + +LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1] +LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1] diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 907ec25..2be8209 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -71,6 +71,37 @@ class MetadataResponse_v2(Response): ) +class MetadataResponse_v3(Response): + API_KEY = 3 + API_VERSION = 3 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32), + ('rack', String('utf-8')))), + ('cluster_id', String('utf-8')), + ('controller_id', Int32), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('is_internal', Boolean), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)))))) + ) + + +class MetadataResponse_v4(Response): + API_KEY = 3 + API_VERSION = 4 + SCHEMA = MetadataResponse_v3.SCHEMA + + class MetadataRequest_v0(Request): API_KEY = 3 API_VERSION = 0 @@ -95,8 +126,36 @@ class MetadataRequest_v2(Request): API_VERSION = 2 RESPONSE_TYPE = MetadataResponse_v2 SCHEMA = MetadataRequest_v1.SCHEMA + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics -MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2] +class MetadataRequest_v3(Request): + API_KEY = 3 + API_VERSION = 3 + RESPONSE_TYPE = MetadataResponse_v3 + SCHEMA = MetadataRequest_v1.SCHEMA + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics + + +class MetadataRequest_v4(Request): + API_KEY = 3 + API_VERSION = 4 + RESPONSE_TYPE = MetadataResponse_v4 + SCHEMA = Schema( + ('topics', Array(String('utf-8'))), + ('allow_auto_topic_creation', Boolean) + ) + ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics + NO_TOPICS = None # Empty array (len 0) for topics returns no topics + + +MetadataRequest = [ + MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2, + MetadataRequest_v3, MetadataRequest_v4 +] MetadataResponse = [ - MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2] + MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2, + MetadataResponse_v3, MetadataResponse_v4 +] diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 588dfec..8353f8c 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from .api import Request, Response -from .types import Array, Int16, Int32, Int64, Schema, String +from .types import Array, Int8, Int16, Int32, Int64, Schema, String class OffsetResetStrategy(object): @@ -36,6 +36,21 @@ class OffsetResponse_v1(Response): ) +class OffsetResponse_v2(Response): + API_KEY = 2 + API_VERSION = 2 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64))))) + ) + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -70,5 +85,23 @@ class OffsetRequest_v1(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1] +class OffsetRequest_v2(Request): + API_KEY = 2 + API_VERSION = 2 + RESPONSE_TYPE = OffsetResponse_v2 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] +OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 9b03354..da1f308 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -47,6 +47,12 @@ class ProduceResponse_v2(Response): ) +class ProduceResponse_v3(Response): + API_KEY = 0 + API_VERSION = 3 + SCHEMA = ProduceResponse_v2.SCHEMA + + class ProduceRequest_v0(Request): API_KEY = 0 API_VERSION = 0 @@ -91,5 +97,32 @@ class ProduceRequest_v2(Request): return True -ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2] -ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2] +class ProduceRequest_v3(Request): + API_KEY = 0 + API_VERSION = 3 + RESPONSE_TYPE = ProduceResponse_v3 + SCHEMA = Schema( + ('transactional_id', String('utf-8')), + ('required_acks', Int16), + ('timeout', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('messages', MessageSet))))) + ) + + def expect_response(self): + if self.required_acks == 0: # pylint: disable=no-member + return False + return True + + +ProduceRequest = [ + ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2, + ProduceRequest_v3 +] +ProduceResponse = [ + ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2, + ProduceResponse_v2 +]