Add support for Rabbit HA

After this got merged, kombu driver is able to connect
to another rabbit host from given cluster, when previous
goes down.

Change-Id: I4d6a54dfdcec9bfb1764f3b3fe87a650677d66e0
This commit is contained in:
Dawid Deja 2017-01-24 14:43:01 +01:00
parent 88e8ca9227
commit 0ee5433899
4 changed files with 102 additions and 55 deletions

View File

@ -56,15 +56,21 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
self._timeout = CONF.rpc_response_timeout
self.routing_key = self.topic
host = self._hosts.get_host()
hosts = self._hosts.get_hosts()
self.conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
self.virtual_host
)
self._connections = []
for host in hosts:
conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
self.virtual_host
)
self._connections.append(conn)
self.conn = self._connections[0]
# Create exchange.
exchange = self._make_exchange(
@ -85,7 +91,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
)
self._listener = kombu_listener.KombuRPCListener(
connection=self.conn,
connections=self._connections,
callback_queue=self.callback_queue
)

View File

@ -52,3 +52,6 @@ class KombuHosts(object):
def get_host(self):
return self._hosts_cycle.next()
def get_hosts(self):
return self._hosts

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from kombu.mixins import ConsumerMixin
from six import moves
import threading
@ -26,11 +27,16 @@ LOG = logging.getLogger(__name__)
class KombuRPCListener(ConsumerMixin):
def __init__(self, connection, callback_queue):
def __init__(self, connections, callback_queue):
self._results = {}
self.connection = connection
self._connections = itertools.cycle(connections)
self._callback_queue = callback_queue
self._thread = None
self.connection = self._connections.next()
# TODO(ddeja): Those 2 options should be gathered from config.
self._sleep_time = 1
self._max_sleep_time = 512
def add_listener(self, correlation_id):
self._results[correlation_id] = moves.queue.Queue()
@ -93,3 +99,11 @@ class KombuRPCListener(ConsumerMixin):
def get_result(self, correlation_id, timeout):
return self._results[correlation_id].get(block=True, timeout=timeout)
def on_connection_error(self, exc, interval):
self.connection = self._connections.next()
LOG.debug("Broker connection failed: %s" % exc)
LOG.debug("Sleeping for %s seconds, then retrying connection" %
interval
)

View File

@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import amqp
import socket
import threading
import time
import kombu
from oslo_config import cfg
@ -69,6 +71,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
self.endpoints = []
self._worker = None
# TODO(ddeja): Those 2 options should be gathered from config.
self._sleep_time = 1
self._max_sleep_time = 512
@property
def is_running(self):
"""Return whether server is running."""
@ -77,57 +83,75 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def run(self, executor='blocking'):
"""Start the server."""
self._prepare_worker(executor)
host = self._hosts.get_host()
self.conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
self.virtual_host,
)
while True:
try:
host = self._hosts.get_host()
LOG.info("Connected to AMQP at %s:%s" % (host.hostname, host.port))
self.conn = self._make_connection(
host.hostname,
host.port,
host.username,
host.password,
self.virtual_host,
)
try:
conn = kombu.connections[self.conn].acquire(block=True)
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
queue = self._make_queue(
self.topic,
exchange,
routing_key=self.routing_key,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
with conn.Consumer(
queues=queue,
callbacks=[self._process_message],
) as consumer:
consumer.qos(prefetch_count=1)
conn = kombu.connections[self.conn].acquire(block=True)
self._running.set()
self._stopped.clear()
exchange = self._make_exchange(
self.exchange,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
while self.is_running:
try:
conn.drain_events(timeout=1)
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
queue = self._make_queue(
self.topic,
exchange,
routing_key=self.routing_key,
durable=self.durable_queue,
auto_delete=self.auto_delete
)
with conn.Consumer(
queues=queue,
callbacks=[self._process_message],
) as consumer:
consumer.qos(prefetch_count=1)
LOG.info("Server with id='{0}' stopped.".format(
self.server_id))
self._running.set()
self._stopped.clear()
return
except socket.error as e:
raise exc.MistralException("Broker connection failed: %s" % e)
finally:
self._stopped.set()
LOG.info("Connected to AMQP at %s:%s" % (
host.hostname,
host.port
))
while self.is_running:
try:
conn.drain_events(timeout=1)
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
LOG.info("Server with id='{0}' stopped.".format(
self.server_id))
return
except (socket.error, amqp.exceptions.ConnectionForced) as e:
LOG.debug("Broker connection failed: %s" % e)
finally:
self._stopped.set()
LOG.debug("Sleeping for %s seconds, than retrying connection" %
self._sleep_time
)
time.sleep(self._sleep_time)
self._sleep_time = min(
self._sleep_time * 2,
self._max_sleep_time
)
def stop(self, graceful=False):
self._running.clear()