Add gcloud pubsub support to Gerrit driver

This adds support for the Google Cloud Pub/Sub service to the
Gerrit driver.  It is very similar to Kafka.

Change-Id: Ib6e4dc01058b74e1042bfd1deb9fa4f3f43f7a36
This commit is contained in:
James E. Blair 2023-07-31 15:04:57 -07:00
parent 6c0ffe565f
commit 36276806b8
7 changed files with 354 additions and 2 deletions

View File

@ -30,8 +30,8 @@ Zuul interacts with Gerrit in up to three ways:
* Reporting results
Trigger events arrive over an event stream, either SSH (via the
``gerrit stream-events`` command) or other protocols such as Kafka, or
AWS Kinesis.
``gerrit stream-events`` command) or other protocols such as Kafka,
AWS Kinesis, or Google Cloud Pub/Sub.
Fetching source code may happen over SSH or HTTP.
@ -296,6 +296,53 @@ some implications for event delivery:
The AWS secret key to use.
Google Cloud Pub/Sub Event Support
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Zuul includes support for Gerrit's `events-gcloud-pubsub` plugin. This may be
used as an alternative to SSH for receiving trigger events.
Google Cloud Pub/Sub does provide event delivery guarantees, so unlike
SSH, if all Zuul schedulers are unable to communicate with Gerrit or
Google Cloud Pub/Sub, they will eventually receive queued events on
reconnection.
All Zuul schedulers will attempt to connect to Google Cloud Pub/Sub.
There are some implications for event delivery:
* All events will be delivered to Zuul at least once. In the case of
a disrupted connection, Zuul may receive duplicate events.
* Because the `events-gcloud-pubsub` plugin does not at the time of
this writing specify that messages are ordered, events may be
received by Zuul out of order. Since this behavior is under the
control of the Gerrit plugin, it may change in the future.
.. attr:: <gerrit gcloud pubsub connection>
.. attr:: gcloud_pubsub_project
:required:
The Google Cloud project name to use.
.. attr:: gcloud_pubsub_topic
:default: gerrit
The Google Cloud Pub/Sub topic to which Zuul should subscribe.
.. attr:: gcloud_pubsub_subscription_id
:default: zuul
The ID of the Google Cloud Pub/Sub subscription to use. If the
subscription does not exist, it will be created.
.. attr:: gcloud_pubsub_private_key
Path to a file containing the JSON encoded key of a service
account. If not provided, then Google Cloud local auth is used.
If Zuul is not running in the same Google Cloud project as
Gerrit, this is required.
Trigger Configuration
---------------------

View File

@ -0,0 +1,5 @@
---
features:
- |
Support for using Google Cloud Pub/Sub as an event source has been
added to the Gerrit driver.

View File

@ -42,3 +42,4 @@ opentelemetry-exporter-otlp-proto-grpc
opentelemetry-exporter-otlp-proto-http
confluent-kafka
boto3
google-cloud-pubsub

37
tests/fixtures/zuul-gerrit-gcloud.conf vendored Normal file
View File

@ -0,0 +1,37 @@
[statsd]
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61
server=127.0.0.1
[scheduler]
tenant_config=main.yaml
[merger]
git_dir=/tmp/zuul-test/merger-git
git_user_email=zuul@example.com
git_user_name=zuul
[web]
root=http://zuul.example.com
[executor]
git_dir=/tmp/zuul-test/executor-git
load_multiplier=100
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=fake_id_rsa_path
password=badpassword
gcloud_pubsub_project=testproject
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[database]
dburi=$MYSQL_FIXTURE_DBURI$

View File

@ -0,0 +1,154 @@
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import json
import os
import queue
import time
import threading
import google.api_core.exceptions
import tests.base
from tests.base import (
ZuulTestCase,
iterate_timeout,
simple_layout,
)
FIXTURE_DIR = os.path.join(tests.base.FIXTURE_DIR, 'gerrit')
class FakePubsubMessage:
def __init__(self, data):
self.data = data
self._acked = False
def ack(self):
self._acked = True
class FakePubsubFuture:
def __init__(self):
self.event = threading.Event()
self._error = None
def result(self):
self.event.wait()
if self._error:
raise self._error
def cancel(self, error=None):
self._error = error
self.event.set()
class FakePubsubSubscriber:
def __init__(self, credentials=None):
self.credentials = credentials
self._queue = queue.Queue()
self._closed = False
self._entered = False
def put(self, data):
self._queue.put(data)
def create_subscription(self, name=None, topic=None):
if not self._entered:
raise Exception("Attempt to use subscriber "
"outside of context manager")
self._name = name
self._topic = topic
raise google.api_core.exceptions.AlreadyExists("Exists")
def subscribe(self, name, callback):
if self._closed:
raise Exception("Attempt to use closed subscriber")
if not self._entered:
raise Exception("Attempt to use subscriber "
"outside of context manager")
assert self._name == name
self._callback = callback
self._future = FakePubsubFuture()
self._thread = threading.Thread(target=self._run)
self._thread.start()
return self._future
def _run(self):
while not self._closed:
data = self._queue.get()
self._queue.task_done()
if data is None:
continue
msg = FakePubsubMessage(data)
self._callback(msg)
def __enter__(self):
self._entered = True
return self
def __exit__(self, *args, **kw):
self._closed = True
self._queue.put(None)
def serialize(event):
return json.dumps(event)
class TestGerritEventSourceGcloudPubsub(ZuulTestCase):
config_file = 'zuul-gerrit-gcloud.conf'
def setUp(self):
self.useFixture(fixtures.MonkeyPatch(
'zuul.driver.gerrit.gerriteventgcloudpubsub.'
'pubsub_v1.SubscriberClient',
FakePubsubSubscriber))
super().setUp()
@simple_layout('layouts/simple.yaml')
def test_gcloud_pubsub(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
subscriber = self.fake_gerrit.event_thread.subscriber
self.fake_gerrit.event_thread.RECONNECTION_DELAY = 1
# Assert we passed the required config entries
self.assertEqual('projects/testproject/subscriptions/zuul',
subscriber._name)
self.assertEqual('projects/testproject/topics/gerrit',
subscriber._topic)
# Exercise reconnection
err = Exception("Test error")
subscriber._future.cancel(err)
for _ in iterate_timeout(60, 'wait for reconnect'):
if subscriber is not self.fake_gerrit.event_thread.subscriber:
break
time.sleep(0.2)
subscriber = self.fake_gerrit.event_thread.subscriber
self.additional_event_queues.append(subscriber._queue)
subscriber.put(serialize(A.getPatchsetCreatedEvent(1)))
self.waitUntilSettled()
self.assertHistory([
dict(name='check-job', result='SUCCESS', changes='1,1')
])
self.assertEqual(A.reported, 1, "A should be reported")
self.assertTrue(subscriber._queue.empty())

View File

@ -47,6 +47,9 @@ from zuul.driver.gerrit.gerriteventkafka import GerritKafkaEventListener
from zuul.driver.gerrit.gerriteventawskinesis import (
GerritAWSKinesisEventListener,
)
from zuul.driver.gerrit.gerriteventgcloudpubsub import (
GerritGcloudPubsubEventListener,
)
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib import tracing
from zuul.lib.logutil import get_annotated_logger
@ -416,6 +419,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
EVENT_SOURCE_STREAM_EVENTS = 'stream-events'
EVENT_SOURCE_KAFKA = 'kafka'
EVENT_SOURCE_KINESIS = 'kinesis'
EVENT_SOURCE_GCLOUD_PUBSUB = 'gcloudpubsub'
def __init__(self, driver, connection_name, connection_config):
super(GerritConnection, self).__init__(driver, connection_name,
@ -447,6 +451,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.event_source = self.EVENT_SOURCE_KAFKA
elif self.connection_config.get('aws_kinesis_region', None):
self.event_source = self.EVENT_SOURCE_KINESIS
elif self.connection_config.get('gcloud_pubsub_project', None):
self.event_source = self.EVENT_SOURCE_GCLOUD_PUBSUB
# Thread for whatever event source we use
self.event_thread = None
@ -1646,6 +1652,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.startKafkaListener()
elif self.event_source == self.EVENT_SOURCE_KINESIS:
self.startAWSKinesisListener()
elif self.event_source == self.EVENT_SOURCE_GCLOUD_PUBSUB:
self.startGcloudPubsubListener()
else:
self.log.warning("No gerrit event source configured")
self.startRefWatcherThread()
@ -1667,6 +1675,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.event_thread = GerritAWSKinesisEventListener(
self, self.connection_config)
def startGcloudPubsubListener(self):
self.log.info("Starting gcloud pubsub consumer")
self.event_thread = GerritGcloudPubsubEventListener(
self, self.connection_config)
def startPollerThread(self):
if self.session is not None:
self.poller_thread = self._poller_class(self)

View File

@ -0,0 +1,95 @@
# Copyright 2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import google.api_core.exceptions
from google.oauth2 import service_account
from google.cloud import pubsub_v1
import logging
import pprint
import threading
class GerritGcloudPubsubEventListener:
log = logging.getLogger("zuul.GerritConnection.gcloudpubsub")
RECONNECTION_DELAY = 5
def __init__(self, gerrit_connection, connection_config):
self.gerrit_connection = gerrit_connection
project = connection_config.get('gcloud_pubsub_project')
topic = connection_config.get('gcloud_pubsub_topic', 'gerrit')
sub = connection_config.get('gcloud_pubsub_subscription_id', 'zuul')
key = connection_config.get('gcloud_pubsub_private_key')
self.kwargs = {}
if key:
with open(key) as keyfile:
info = json.load(keyfile)
credentials = service_account.Credentials.\
from_service_account_info(info)
self.kwargs['credentials'] = credentials
self.topic_name = f'projects/{project}/topics/{topic}'
self.subscription_name = f'projects/{project}/subscriptions/{sub}'
self._stop_event = threading.Event()
self._stopped = False
self._thread = None
self._future = None
def start(self):
self._thread = threading.Thread(target=self.run)
self._thread.start()
def stop(self):
self.log.debug("Stopping gcloud pubsub listener")
self._stopped = True
self._stop_event.set()
try:
if self._future:
self._future.cancel()
except Exception:
self.log.exception("Error canceling future:")
def join(self):
if self._thread:
self._thread.join()
def callback(self, message):
data = json.loads(message.data)
self.log.info("Received data from gcloud: \n%s" %
pprint.pformat(data))
self.gerrit_connection.addEvent(data)
message.ack()
def _run(self):
subscriber = pubsub_v1.SubscriberClient(**self.kwargs)
# So the unit tests can access it
self.subscriber = subscriber
with subscriber as client:
try:
client.create_subscription(name=self.subscription_name,
topic=self.topic_name)
except google.api_core.exceptions.AlreadyExists:
pass
self._future = client.subscribe(
self.subscription_name, self.callback)
self._future.result()
def run(self):
while not self._stopped:
try:
self._run()
except Exception:
self.log.exception(
"Exception in gcloud pubsub consumer with %s:",
self.gerrit_connection.connection_name)
self._stop_event.wait(self.RECONNECTION_DELAY)