Add Kafka support to Gerrit
Gerrit supports a number of pub-sub plugins which can act as alternatives to stream-events. These can often be easier for users to configure than ssh access and have the advantage of providing queueing and delivery guarantees for messages. This change not only adds support for Kafka, but is meant as a template for adding support for other Gerrit pub-sub plugins as well. Change-Id: Ib03d8cb9ef709b625d2717a09125930548c86a22
This commit is contained in:
parent
a485ff5e67
commit
4dc0962a49
|
@ -10,15 +10,6 @@ sources, triggers, and reporters.
|
|||
|
||||
Zuul will need access to a Gerrit user.
|
||||
|
||||
Create an SSH keypair for Zuul to use if there isn't one already, and
|
||||
create a Gerrit user with that key::
|
||||
|
||||
cat ~/id_rsa.pub | ssh -p29418 review.example.com gerrit create-account --ssh-key - --full-name Zuul zuul
|
||||
|
||||
.. note:: If you use an RSA key, ensure it is encoded in the PEM
|
||||
format (use the ``-t rsa -m PEM`` arguments to
|
||||
`ssh-keygen`).
|
||||
|
||||
Give that user whatever permissions will be needed on the projects you
|
||||
want Zuul to report on. For instance, you may want to grant
|
||||
``Verified +/-1`` and ``Submit`` to the user. Additional categories
|
||||
|
@ -32,6 +23,26 @@ dependency support in any pipeline queues where such changes may
|
|||
appear. See :attr:`queue.allow-circular-dependencies` for information
|
||||
on how to configure this.
|
||||
|
||||
Zuul interacts with Gerrit in up to three ways:
|
||||
|
||||
* Receiving trigger events
|
||||
* Fetching source code
|
||||
* Reporting results
|
||||
|
||||
Trigger events arrive over an event stream, either SSH (via the
|
||||
``gerrit stream-events`` command) or a pub-sub protocol such as Kafka.
|
||||
|
||||
Fetching source code may happen over SSH or HTTP.
|
||||
|
||||
Reporting may happen over SSH or HTTP (strongly preferred).
|
||||
|
||||
The appropriate connection methods must be configured to satisfy the
|
||||
interactions Zuul will have with Gerrit. The recommended
|
||||
configuration is to configure both SSH and HTTP access.
|
||||
|
||||
The section below describes commond configuration settings. Specific
|
||||
settings for different connection methods follow.
|
||||
|
||||
Connection Configuration
|
||||
------------------------
|
||||
|
||||
|
@ -51,12 +62,6 @@ The supported options in ``zuul.conf`` connections are:
|
|||
|
||||
Fully qualified domain name of Gerrit server.
|
||||
|
||||
.. attr:: ssh_server
|
||||
|
||||
If SSH access to the Gerrit server should be via a different
|
||||
hostname than web access, set this value to the hostname to use
|
||||
for SSH connections.
|
||||
|
||||
.. attr:: canonical_hostname
|
||||
|
||||
The canonical hostname associated with the git repos on the
|
||||
|
@ -70,11 +75,6 @@ The supported options in ``zuul.conf`` connections are:
|
|||
them in the job's working directory, they appear under this
|
||||
directory name.
|
||||
|
||||
.. attr:: port
|
||||
:default: 29418
|
||||
|
||||
Gerrit server port.
|
||||
|
||||
.. attr:: baseurl
|
||||
:default: https://{server}
|
||||
|
||||
|
@ -94,7 +94,36 @@ The supported options in ``zuul.conf`` connections are:
|
|||
.. attr:: user
|
||||
:default: zuul
|
||||
|
||||
User name to use when logging into Gerrit via ssh.
|
||||
User name to use when accessing Gerrit.
|
||||
|
||||
SSH Configuration
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
To prepare for SSH access, create an SSH keypair for Zuul to use if
|
||||
there isn't one already, and create a Gerrit user with that key::
|
||||
|
||||
cat ~/id_rsa.pub | ssh -p29418 review.example.com gerrit create-account --ssh-key - --full-name Zuul zuul
|
||||
|
||||
.. note:: If you use an RSA key, ensure it is encoded in the PEM
|
||||
format (use the ``-t rsa -m PEM`` arguments to
|
||||
`ssh-keygen`).
|
||||
|
||||
If using Gerrit 2.7 or later, make sure the user is a member of a group
|
||||
that is granted the ``Stream Events`` permission, otherwise it will not
|
||||
be able to invoke the ``gerrit stream-events`` command over SSH.
|
||||
|
||||
.. attr:: <gerrit ssh connection>
|
||||
|
||||
.. attr:: ssh_server
|
||||
|
||||
If SSH access to the Gerrit server should be via a different
|
||||
hostname than web access, set this value to the hostname to use
|
||||
for SSH connections.
|
||||
|
||||
.. attr:: port
|
||||
:default: 29418
|
||||
|
||||
Gerrit SSH server port.
|
||||
|
||||
.. attr:: sshkey
|
||||
:default: ~zuul/.ssh/id_rsa
|
||||
|
@ -106,6 +135,20 @@ The supported options in ``zuul.conf`` connections are:
|
|||
|
||||
SSH connection keepalive timeout; ``0`` disables.
|
||||
|
||||
.. attr:: git_over_ssh
|
||||
:default: false
|
||||
|
||||
This forces git operation over SSH even if the ``password``
|
||||
attribute is set. This allow REST API access to the Gerrit
|
||||
server even when git-over-http operation is disabled on the
|
||||
server.
|
||||
|
||||
|
||||
HTTP Configuration
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. attr:: <gerrit ssh connection>
|
||||
|
||||
.. attr:: password
|
||||
|
||||
The HTTP authentication password for the user. This is
|
||||
|
@ -115,14 +158,6 @@ The supported options in ``zuul.conf`` connections are:
|
|||
messages). Retrieve this password from the ``HTTP Password``
|
||||
section of the ``Settings`` page in Gerrit.
|
||||
|
||||
.. attr:: git_over_ssh
|
||||
:default: false
|
||||
|
||||
This forces git operation over SSH even if the ``password``
|
||||
attribute is set. This allow REST API access to the Gerrit
|
||||
server even when git-over-http operation is disabled on the
|
||||
server.
|
||||
|
||||
.. attr:: auth_type
|
||||
:default: basic
|
||||
|
||||
|
@ -158,17 +193,64 @@ The supported options in ``zuul.conf`` connections are:
|
|||
When using a self-signed certificate, this may be set to
|
||||
``false`` to disable SSL certificate verification.
|
||||
|
||||
Kafka Event Support
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Zuul includes support for Gerrit's `events-kafka` plugin. This may be
|
||||
used as an alternative to SSH for receiving trigger events.
|
||||
|
||||
Kafka does provide event delivery guarantees, so unlike SSH, if all
|
||||
Zuul schedulers are unable to communicate with Gerrit or Kafka, they
|
||||
will eventually receive queued events on reconnection.
|
||||
|
||||
All Zuul schedulers will attempt to connect to Kafka brokers. There
|
||||
are some implications for event delivery:
|
||||
|
||||
* All events will be delivered to Zuul at least once. In the case of
|
||||
a disrupted connection, Zuul may receive duplicate events.
|
||||
|
||||
* Events should generally arrive in order, however some events in
|
||||
rapid succession may be received by Zuul out of order.
|
||||
|
||||
.. attr:: <gerrit kafka connection>
|
||||
|
||||
.. attr:: kafka_bootstrap_servers
|
||||
:required:
|
||||
|
||||
A comma-separated list of Kafka servers (optionally including
|
||||
port separated with `:`).
|
||||
|
||||
.. attr:: kafka_topic
|
||||
:default: gerrit
|
||||
|
||||
The Kafka topic to which Zuul should subscribe.
|
||||
|
||||
.. attr:: kafka_client_id
|
||||
:default: zuul
|
||||
|
||||
The Kafka client ID.
|
||||
|
||||
.. attr:: kafka_group_id
|
||||
:default: zuul
|
||||
|
||||
The Kafka group ID.
|
||||
|
||||
.. attr:: kafka_tls_cert
|
||||
|
||||
Path to TLS certificate to use when connecting to a Kafka broker.
|
||||
|
||||
.. attr:: kafka_tls_key
|
||||
|
||||
Path to TLS certificate key to use when connecting to a Kafka broker.
|
||||
|
||||
.. attr:: kafka_tls_ca
|
||||
|
||||
Path to TLS CA certificate to use when connecting to a Kafka broker.
|
||||
|
||||
|
||||
Trigger Configuration
|
||||
---------------------
|
||||
|
||||
Zuul works with standard versions of Gerrit by invoking the ``gerrit
|
||||
stream-events`` command over an SSH connection. It also reports back
|
||||
to Gerrit using SSH.
|
||||
|
||||
If using Gerrit 2.7 or later, make sure the user is a member of a group
|
||||
that is granted the ``Stream Events`` permission, otherwise it will not
|
||||
be able to invoke the ``gerrit stream-events`` command over SSH.
|
||||
|
||||
.. attr:: pipeline.trigger.<gerrit source>
|
||||
|
||||
The dictionary passed to the Gerrit pipeline ``trigger`` attribute
|
||||
|
@ -287,10 +369,6 @@ be able to invoke the ``gerrit stream-events`` command over SSH.
|
|||
Reporter Configuration
|
||||
----------------------
|
||||
|
||||
Zuul works with standard versions of Gerrit by invoking the ``gerrit``
|
||||
command over an SSH connection, unless the connection is configured
|
||||
with an HTTP password, in which case the HTTP API is used.
|
||||
|
||||
.. attr:: pipeline.reporter.<gerrit reporter>
|
||||
|
||||
The dictionary passed to the Gerrit reporter is used to provide label
|
||||
|
@ -482,16 +560,14 @@ Here is an example of standard pipelines you may want to define:
|
|||
.. literalinclude:: /examples/pipelines/gerrit-reference-pipelines.yaml
|
||||
:language: yaml
|
||||
|
||||
Checks Plugin Support (Experimental)
|
||||
|
||||
Checks Plugin Support (Deprecated)
|
||||
------------------------------------
|
||||
|
||||
The Gerrit driver has experimental support for Gerrit's `checks`
|
||||
plugin. Neither the `checks` plugin itself nor Zuul's support for it
|
||||
are stable yet, and this is not recommended for production use. If
|
||||
you wish to help develop this support, you should expect to be able to
|
||||
upgrade both Zuul and Gerrit frequently as the two systems evolve. No
|
||||
backward-compatible support will be provided and configurations may
|
||||
need to be updated frequently.
|
||||
The Gerrit driver has support for Gerrit's `checks` plugin. Due to
|
||||
the deprecation of the checks plugin in Gerrit, support in Zuul is
|
||||
also deprecated and likely to be removed in a future version. It is
|
||||
not recommended for use.
|
||||
|
||||
Caveats include (but are not limited to):
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
upgrade:
|
||||
- |
|
||||
The default for :attr:`<gerrit connection>.auth_type` has changed from
|
||||
The default for `auth_type` has changed from
|
||||
``digest`` to ``basic``. Digest authentication has not been supported
|
||||
in Gerrit since version 2.15.
|
||||
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
Support for using Kafka as an event source has been added to the Gerrit driver.
|
|
@ -40,3 +40,4 @@ psycopg2-binary
|
|||
opentelemetry-sdk
|
||||
opentelemetry-exporter-otlp-proto-grpc
|
||||
opentelemetry-exporter-otlp-proto-http
|
||||
confluent-kafka
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
[statsd]
|
||||
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
|
||||
# see: https://github.com/jsocol/pystatsd/issues/61
|
||||
server=127.0.0.1
|
||||
|
||||
[scheduler]
|
||||
tenant_config=main.yaml
|
||||
|
||||
[merger]
|
||||
git_dir=/tmp/zuul-test/merger-git
|
||||
git_user_email=zuul@example.com
|
||||
git_user_name=zuul
|
||||
|
||||
[web]
|
||||
root=http://zuul.example.com
|
||||
|
||||
[executor]
|
||||
git_dir=/tmp/zuul-test/executor-git
|
||||
load_multiplier=100
|
||||
|
||||
[connection gerrit]
|
||||
driver=gerrit
|
||||
server=review.example.com
|
||||
user=jenkins
|
||||
sshkey=fake_id_rsa_path
|
||||
password=badpassword
|
||||
kafka_bootstrap_servers=localhost
|
||||
kafka_client_id=zuul
|
||||
kafka_group_id=zuul
|
||||
kafka_topic=gerrit
|
||||
|
||||
[connection smtp]
|
||||
driver=smtp
|
||||
server=localhost
|
||||
port=25
|
||||
default_from=zuul@example.com
|
||||
default_to=you@example.com
|
||||
|
||||
[database]
|
||||
dburi=$MYSQL_FIXTURE_DBURI$
|
|
@ -0,0 +1,139 @@
|
|||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import json
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
|
||||
import confluent_kafka as kafka
|
||||
|
||||
import tests.base
|
||||
from tests.base import (
|
||||
ZuulTestCase,
|
||||
iterate_timeout,
|
||||
simple_layout,
|
||||
)
|
||||
|
||||
|
||||
FIXTURE_DIR = os.path.join(tests.base.FIXTURE_DIR, 'gerrit')
|
||||
|
||||
|
||||
class FakeKafkaMessage:
|
||||
def __init__(self, topic, offset, value, error=None):
|
||||
self._topic = topic
|
||||
self._error = error
|
||||
self._offset = offset
|
||||
if error:
|
||||
self._value = None
|
||||
else:
|
||||
self._value = value
|
||||
|
||||
def error(self):
|
||||
return self._error
|
||||
|
||||
def value(self):
|
||||
return self._value
|
||||
|
||||
def partition(self):
|
||||
return 0
|
||||
|
||||
def topic(self):
|
||||
return self._topic
|
||||
|
||||
def offset(self):
|
||||
return self._offset
|
||||
|
||||
|
||||
class FakeKafkaConsumer:
|
||||
def __init__(self, config, logger):
|
||||
self.config = config
|
||||
self.logger = logger
|
||||
self.topics = None
|
||||
self._queue = queue.Queue()
|
||||
self.closed = 0
|
||||
self._offset = 0
|
||||
|
||||
def put(self, data):
|
||||
self._queue.put(data)
|
||||
|
||||
def subscribe(self, topics):
|
||||
self.topics = topics
|
||||
|
||||
def poll(self, timeout=0):
|
||||
try:
|
||||
data = self._queue.get(timeout=timeout)
|
||||
self._queue.task_done()
|
||||
if isinstance(data, kafka.KafkaError):
|
||||
return FakeKafkaMessage(
|
||||
'gerrit', self._offset, None, error=data)
|
||||
self._offset += 1
|
||||
return FakeKafkaMessage('gerrit', self._offset - 1, data)
|
||||
except queue.Empty:
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
self.closed += 1
|
||||
|
||||
|
||||
def serialize(event):
|
||||
return json.dumps(event).encode('utf8')
|
||||
|
||||
|
||||
class TestGerritEventSourceKafka(ZuulTestCase):
|
||||
config_file = 'zuul-gerrit-kafka.conf'
|
||||
|
||||
def setUp(self):
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'zuul.driver.gerrit.gerriteventkafka.kafka.Consumer',
|
||||
FakeKafkaConsumer))
|
||||
super().setUp()
|
||||
|
||||
@simple_layout('layouts/simple.yaml')
|
||||
def test_kafka(self):
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
consumer = self.fake_gerrit.event_thread.consumer
|
||||
self.fake_gerrit.event_thread.RECONNECTION_DELAY = 1
|
||||
|
||||
# Assert we passed the required config entries
|
||||
self.assertTrue(isinstance(consumer.config, dict))
|
||||
self.assertTrue('bootstrap.servers' in consumer.config)
|
||||
self.assertTrue('group.id' in consumer.config)
|
||||
|
||||
# Exercise error handling
|
||||
err = kafka.KafkaError(kafka.KafkaError._PARTITION_EOF)
|
||||
consumer.put(err)
|
||||
|
||||
# Exercise reconnection
|
||||
err = kafka.KafkaError(kafka.KafkaError.NETWORK_EXCEPTION)
|
||||
consumer.put(err)
|
||||
|
||||
for _ in iterate_timeout(60, 'wait for reconnect'):
|
||||
if consumer is not self.fake_gerrit.event_thread.consumer:
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
consumer = self.fake_gerrit.event_thread.consumer
|
||||
self.additional_event_queues.append(consumer._queue)
|
||||
|
||||
consumer.put(serialize(A.getPatchsetCreatedEvent(1)))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertHistory([
|
||||
dict(name='check-job', result='SUCCESS', changes='1,1')
|
||||
])
|
||||
self.assertEqual(A.reported, 1, "A should be reported")
|
||||
|
||||
self.assertTrue(consumer._queue.empty())
|
|
@ -43,6 +43,7 @@ from zuul.driver.gerrit.gcloudauth import GCloudAuth
|
|||
from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
|
||||
from zuul.driver.gerrit.gerriteventssh import GerritSSHEventListener
|
||||
from zuul.driver.gerrit.gerriteventchecks import GerritChecksPoller
|
||||
from zuul.driver.gerrit.gerriteventkafka import GerritKafkaEventListener
|
||||
from zuul.driver.git.gitwatcher import GitWatcher
|
||||
from zuul.lib import tracing
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
|
@ -410,6 +411,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||
|
||||
EVENT_SOURCE_NONE = 'none'
|
||||
EVENT_SOURCE_STREAM_EVENTS = 'stream-events'
|
||||
EVENT_SOURCE_KAFKA = 'kafka'
|
||||
|
||||
def __init__(self, driver, connection_name, connection_config):
|
||||
super(GerritConnection, self).__init__(driver, connection_name,
|
||||
|
@ -437,6 +439,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||
if enable_stream_events in [
|
||||
'true', 'True', '1', 1, 'TRUE', True]:
|
||||
self.event_source = self.EVENT_SOURCE_STREAM_EVENTS
|
||||
if self.connection_config.get('kafka_bootstrap_servers', None):
|
||||
self.event_source = self.EVENT_SOURCE_KAFKA
|
||||
|
||||
# Thread for whatever event source we use
|
||||
self.event_thread = None
|
||||
|
@ -1632,6 +1636,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||
def startEventSourceThread(self):
|
||||
if self.event_source == self.EVENT_SOURCE_STREAM_EVENTS:
|
||||
self.startSSHListener()
|
||||
elif self.event_source == self.EVENT_SOURCE_KAFKA:
|
||||
self.startKafkaListener()
|
||||
else:
|
||||
self.log.warning("No gerrit event source configured")
|
||||
self.startRefWatcherThread()
|
||||
|
@ -1643,6 +1649,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
|||
self.event_thread = GerritSSHEventListener(
|
||||
self, self.connection_config)
|
||||
|
||||
def startKafkaListener(self):
|
||||
self.log.info("Starting Kafka consumer")
|
||||
self.event_thread = GerritKafkaEventListener(
|
||||
self, self.connection_config)
|
||||
|
||||
def startPollerThread(self):
|
||||
if self.session is not None:
|
||||
self.poller_thread = self._poller_class(self)
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
# Copyright 2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import confluent_kafka as kafka
|
||||
import logging
|
||||
import pprint
|
||||
import threading
|
||||
|
||||
# With multiple Kafka partitions, events could arrive out of order.
|
||||
# Similar to webhooks, we accept that and don't do anything to
|
||||
# mitigate that. They should mostly arrive in the correct order, at
|
||||
# least at human scale. That is, while we are somewhat likely to see
|
||||
# an update to a change and the corresponding update to refs/meta out
|
||||
# of order, we are unlikely to see a change abandoned before it is
|
||||
# created.
|
||||
|
||||
|
||||
class GerritKafkaEventListener:
|
||||
log = logging.getLogger("zuul.GerritConnection.kafka")
|
||||
RECONNECTION_DELAY = 5
|
||||
|
||||
def __init__(self, gerrit_connection, connection_config):
|
||||
self.gerrit_connection = gerrit_connection
|
||||
bs = connection_config.get('kafka_bootstrap_servers')
|
||||
kafka_config = {
|
||||
'bootstrap.servers': bs,
|
||||
}
|
||||
kafka_config['client_id'] = connection_config.get(
|
||||
'kafka_client_id', 'zuul')
|
||||
kafka_config['group.id'] = connection_config.get(
|
||||
'kafka_group_id', 'zuul')
|
||||
tls_key = connection_config.get('kafka_tls_key', None)
|
||||
tls_cert = connection_config.get('kafka_tls_cert', None)
|
||||
tls_ca = connection_config.get('kafka_tls_ca', None)
|
||||
if tls_key:
|
||||
kafka_config['ssl.key.location'] = tls_key
|
||||
kafka_config['ssl.certificate.location'] = tls_cert
|
||||
kafka_config['ssl.ca.location'] = tls_ca
|
||||
self.kafka_config = kafka_config
|
||||
self.topic = connection_config.get('kafka_topic', 'gerrit')
|
||||
self._stop_event = threading.Event()
|
||||
self._stopped = False
|
||||
self._thread = None
|
||||
|
||||
def start(self):
|
||||
self._thread = threading.Thread(target=self.run)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping kafka listener")
|
||||
self._stopped = True
|
||||
self._stop_event.set()
|
||||
|
||||
def join(self):
|
||||
if self._thread:
|
||||
self._thread.join()
|
||||
|
||||
def _run(self):
|
||||
self.log.info("Connecting to kafka at %s",
|
||||
self.kafka_config['bootstrap.servers'])
|
||||
consumer = kafka.Consumer(self.kafka_config, logger=self.log)
|
||||
# So the unit tests can access it
|
||||
self.consumer = consumer
|
||||
try:
|
||||
consumer.subscribe([self.topic])
|
||||
while not self._stopped:
|
||||
msg = consumer.poll(timeout=2.0)
|
||||
if msg is None:
|
||||
continue
|
||||
|
||||
if msg.error():
|
||||
if msg.error().code() == kafka.KafkaError._PARTITION_EOF:
|
||||
self.log.info(
|
||||
"Kafka topic %s partition %s "
|
||||
"reached end at offset %s",
|
||||
msg.topic(), msg.partition(), msg.offset())
|
||||
else:
|
||||
raise kafka.KafkaException(msg.error())
|
||||
else:
|
||||
data = json.loads(msg.value().decode('utf8'))
|
||||
self.log.info("Received data from kafka: \n%s" %
|
||||
pprint.pformat(data))
|
||||
self.gerrit_connection.addEvent(data)
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
self.log.exception("Exception in kafka consumer with %s:",
|
||||
self.gerrit_connection.connection_name)
|
||||
self._stop_event.wait(self.RECONNECTION_DELAY)
|
Loading…
Reference in New Issue