Update to switch to confluent-kafka library for driver

This patch:
* Updates server version to 2.0.0 release
* Add service backend for hybrid combinations
* Add dependencies

Change-Id: Ice374dca539b8ed1b1965b75379bad5140121483
This commit is contained in:
Andy Smith 2018-09-17 09:56:38 -04:00
parent 264cee3044
commit 28be3eed69
2 changed files with 78 additions and 21 deletions

View File

@ -24,10 +24,12 @@
# Notification transport wil be kafka://
#
# Environment Configuration
# RPC_SERVICE - the messaging backend service for RPC
# RPC_HOST - the host used to connect to the RPC messaging service.
# RPC_PORT - the port used to connect to the RPC messaging service.
# Defaults to 5672.
# RPC_{USERNAME,PASSWORD} - for authentication with RPC messaging service.
# NOTIFY_SERVICE - the messaging backend service for Notification
# NOTIFY_HOST - the host used to connect to the Notification messaging service.
# NOTIFY_PORT - the port used to connect to the Notification messaging service.
# Defaults to 9092.
@ -41,23 +43,38 @@ set +o xtrace
# builds rpc transport url string
function _get_rpc_transport_url {
local virtual_host=$1
if [ -z "$RPC_USERNAME" ]; then
echo "amqp://$RPC_HOST:${RPC_PORT}/$virtual_host"
if [ "$RPC_SERVICE" == "rabbit" ]; then
echo $(_get_rabbit_transport_url $virtual_host)
elif [ -z "$RPC_USERNAME" ]; then
echo "$RPC_SERVICE://$RPC_HOST:${RPC_PORT}/$virtual_host"
else
echo "amqp://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host"
echo "$RPC_SERVICE://$RPC_USERNAME:$RPC_PASSWORD@$RPC_HOST:${RPC_PORT}/$virtual_host"
fi
}
# builds notify transport url string
function _get_notify_transport_url {
local virtual_host=$1
if [ -z "$NOTIFY_USERNAME" ]; then
echo "kafka://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
echo "$NOTIFY_SERVICE://$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
else
echo "kafka://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
echo "$NOTIFY_SERVICE://$NOTIFY_USERNAME:$NOTIFY_PASSWORD@$NOTIFY_HOST:${NOTIFY_PORT}/$virtual_host"
fi
}
# override the default in devstack
function _rpc_add_vhost {
if [ "$RPC_SERVICE" == "rabbit" ]; then
_rabbit_rpc_backend_add_vhost $@
fi
# no configuration necessary for amqp backend
return 0
}
# Functions
# ------------
# _download_kafka() - downloading kafka
@ -72,7 +89,9 @@ function _download_kafka {
# driver
function _install_kafka_python {
# Install kafka client API
# TODO(ansmith) remove kafka-python library following switch
pip_install_gr kafka-python
pip_install_gr confluent-kafka
}
# _install_kafka_backend() - installing Kafka with Scala and Zookeeper
@ -81,9 +100,11 @@ function _install_kafka_backend {
local scala_version=${SCALA_VERSION}.0
if is_ubuntu; then
sudo apt-get install -y scala
sudo apt-get install -y scala librdkafka1
elif is_fedora; then
install_package librdkafka
is_package_installed java-1.8.0-openjdk-headless || install_package java-1.8.0-openjdk-headless
if [ ! -f ${FILES}/scala-${scala_version}.tar.gz ]; then
@ -142,6 +163,14 @@ function _cleanup_kafka {
echo_summary "Clean up kafka service"
rm ${FILES}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
rm -rf ${KAFKA_DEST}/kafka
if is_ubuntu; then
uninstall_package librdkafka1
elif is_fedora; then
uninstall_package librdkafka
else
exit_distro_not_supported "kafka installation"
fi
}
# Set up the various configuration files used by the qpid-dispatch-router (qdr)
@ -361,9 +390,22 @@ function _cleanup_qdr_backend {
if is_service_enabled kafka; then
# Note: this is the only tricky part about out of tree
# oslo.messaging plugins, you must overwrite the functions
# so that the correct settings files are made.
# this plugin can be configured to use rabbit for rpc,
# so save a copy of the original
if [ ! $(type -t _get_rabbit_transport_url) ]; then
get_transport_url_definition=$(declare -f get_transport_url)
eval "_get_rabbit_transport_url() ${get_transport_url_definition#*\()}"
export -f _get_rabbit_transport_url
fi
# we will need original if using rabbit for rpc
if [ ! $(type -t _rabbit_rpc_backend_add_vhost) ]; then
rpc_backend_add_vhost_definition=$(declare -f rpc_backend_add_vhost)
eval "_rabbit_rpc_backend_add_vhost() ${rpc_backend_add_vhost_definition#*\()}"
export -f _rabbit_rpc_backend_add_vhost
fi
# export the overridden functions
function iniset_rpc_backend {
local package=$1
local file=$2
@ -373,21 +415,24 @@ if is_service_enabled kafka; then
iniset $file $section transport_url $(_get_rpc_transport_url "$virtual_host")
iniset $file oslo_messaging_notifications transport_url $(_get_notify_transport_url "$virtual_host")
}
export -f iniset_rpc_backend
function get_transport_url {
# TODO (ansmith) introduce separate get_*_transport calls in devstak
_get_rpc_transport_url $@
}
export -f get_transport_url
function get_notification_url {
_get_notify_transport_url $@
}
function rpc_backend_add_vhost {
return 0
}
export -f iniset_rpc_backend
export -f get_transport_url
export -f get_notification_url
function rpc_backend_add_vhost {
_rpc_add_vhost $@
}
export -f rpc_backend_add_vhost
fi
# check for kafka service
@ -403,24 +448,32 @@ if is_service_enabled kafka; then
elif [[ "$1" == "stack" && "$2" == "install" ]]; then
# Install and configure the messaging services
_install_kafka_backend
_install_qdr_backend
if [ "$RPC_SERVICE" == "amqp" ]; then
_install_qdr_backend
fi
elif [[ "$1" == "stack" && "$2" == "post-config" ]]; then
# Start the messaging service processes, this happens before
# any services start
_start_kafka_backend
_start_qdr_backend
if [ "$RPC_SERVICE" == "amqp" ]; then
_start_qdr_backend
fi
elif [[ "$1" == "unstack" ]]; then
# Shut down messaging services
_stop_kafka
_stop_qdr
if [ "$RPC_SERVICE" == "amqp" ]; then
_stop_qdr
fi
elif [[ "$1" == "clean" ]]; then
# Remove state and transient data
# Remember clean.sh first calls unstack.sh
_cleanup_kafka
_cleanup_qdr
if [ "$RPC_SERVICE" == "amqp" ]; then
_cleanup_qdr
fi
fi
fi

View File

@ -6,7 +6,7 @@ enable_service kafka
KAFKA_DEST=${KAFKA_DEST:-/opt/stack/devstack-plugin-kafka}
# Specify Kafka version
KAFKA_VERSION=${KAFKA_VERSION:-1.1.0}
KAFKA_VERSION=${KAFKA_VERSION:-2.0.0}
KAFKA_BASEURL=${KAFKA_BASEURL:-http://www.apache.org/dist/kafka}
# Specify Scala version
@ -14,11 +14,15 @@ SCALA_VERSION=${SCALA_VERSION:-2.12}
SCALA_BASEURL=${SCALA_BASEURL:-http://www.scala-lang.org/files/archive}
# RPC
RPC_SERVICE=${RPC_SERVICE:-rabbit}
RPC_HOST=${RPC_HOST:-$SERVICE_HOST}
RPC_PORT=${RPC_PORT:-5672}
# Notify
NOTIFY_SERVICE=${NOTIFY_SERVICE:-kafka}
NOTIFY_HOST=${NOTIFY_HOST:-$SERVICE_HOST}
NOTIFY_PORT=${NOTIFY_PORT:-9092}
disable_service rabbit
if [ "$RPC_SERVICE" != "rabbit" ]; then
disable_service rabbit
fi