Add support for hash-based RUG scale out
This adds support for running multiple RUG processes to scale out and distribute appliance load across them. It uses a hash ring implementation lifted from Ironic (with modification). The gist is: * Workers now maintain a copy of the hash ring, which is hashed using the list of members in the cluster. * A new subprocess connects to an external coordination service via tooz, ie memcache or zookeeper. This service's only purpose is to track cluster membership and report changes to this subprocess. On membership changes, the coordination subprocess creates a REBALANCE event and puts it on the internal notification queue. There is no leadership election required. * When a worker gets a REBALANCE event, it rebalances the hash ring based on the new membership list. * Prior to processing any events bound for a specified router, the worker first checks the hash manager to find if the resource is assigned to it. If not, it ignores it. If it is, it processes the event. This also applies to incoming command events. Partially implements: blueprint rug-scaling Change-Id: I8d04100ffc0e2f2223ebf4b079551dac99224344
This commit is contained in:
parent
68150f681f
commit
03738f00ba
|
@ -0,0 +1,196 @@
|
||||||
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||||
|
# Copyright 2015 Akanda, Inc.
|
||||||
|
#
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import bisect
|
||||||
|
import hashlib
|
||||||
|
import threading
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
import six
|
||||||
|
|
||||||
|
from akanda.rug.common.i18n import _
|
||||||
|
|
||||||
|
hash_opts = [
|
||||||
|
cfg.IntOpt('hash_partition_exponent',
|
||||||
|
default=5,
|
||||||
|
help='Exponent to determine number of hash partitions to use '
|
||||||
|
'when distributing load across Rugs. Larger values '
|
||||||
|
'will result in more even distribution of load and less '
|
||||||
|
'load when rebalancing the ring, but more memory usage. '
|
||||||
|
'Number of partitions per rug is '
|
||||||
|
'(2^hash_partition_exponent). This determines the '
|
||||||
|
'granularity of rebalancing: given 10 hosts, and an '
|
||||||
|
'exponent of the 2, there are 40 partitions in the ring.'
|
||||||
|
'A few thousand partitions should make rebalancing '
|
||||||
|
'smooth in most cases. The default is suitable for up to '
|
||||||
|
'a few hundred rugs. Too many partitions has a CPU '
|
||||||
|
'impact.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(hash_opts)
|
||||||
|
|
||||||
|
|
||||||
|
# A static key that can be used to choose a single host when from the
|
||||||
|
# ring we have no other data to hash with.
|
||||||
|
DC_KEY = 'akanda_designated_coordinator'
|
||||||
|
|
||||||
|
|
||||||
|
class Invalid(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Lifted from ironic with some modifications.
|
||||||
|
class HashRing(object):
|
||||||
|
"""A stable hash ring.
|
||||||
|
|
||||||
|
We map item N to a host Y based on the closest lower hash:
|
||||||
|
|
||||||
|
- hash(item) -> partition
|
||||||
|
- hash(host) -> divider
|
||||||
|
- closest lower divider is the host to use
|
||||||
|
- we hash each host many times to spread load more finely
|
||||||
|
as otherwise adding a host gets (on average) 50% of the load of
|
||||||
|
just one other host assigned to it.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hosts, replicas=1):
|
||||||
|
"""Create a new hash ring across the specified hosts.
|
||||||
|
|
||||||
|
:param hosts: an iterable of hosts which will be mapped.
|
||||||
|
:param replicas: number of hosts to map to each hash partition,
|
||||||
|
or len(hosts), which ever is lesser.
|
||||||
|
Default: 1
|
||||||
|
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.hosts = set(hosts)
|
||||||
|
self.replicas = replicas if replicas <= len(hosts) else len(hosts)
|
||||||
|
except TypeError:
|
||||||
|
raise Invalid(
|
||||||
|
_("Invalid hosts supplied when building HashRing."))
|
||||||
|
|
||||||
|
self._host_hashes = {}
|
||||||
|
for host in hosts:
|
||||||
|
key = str(host).encode('utf8')
|
||||||
|
key_hash = hashlib.md5(key)
|
||||||
|
for p in range(2 ** CONF.hash_partition_exponent):
|
||||||
|
key_hash.update(key)
|
||||||
|
hashed_key = self._hash2int(key_hash)
|
||||||
|
self._host_hashes[hashed_key] = host
|
||||||
|
# Gather the (possibly colliding) resulting hashes into a bisectable
|
||||||
|
# list.
|
||||||
|
self._partitions = sorted(self._host_hashes.keys())
|
||||||
|
|
||||||
|
def _hash2int(self, key_hash):
|
||||||
|
"""Convert the given hash's digest to a numerical value for the ring.
|
||||||
|
|
||||||
|
:returns: An integer equivalent value of the digest.
|
||||||
|
"""
|
||||||
|
return int(key_hash.hexdigest(), 16)
|
||||||
|
|
||||||
|
def _get_partition(self, data):
|
||||||
|
try:
|
||||||
|
if six.PY3 and data is not None:
|
||||||
|
data = data.encode('utf-8')
|
||||||
|
key_hash = hashlib.md5(data)
|
||||||
|
hashed_key = self._hash2int(key_hash)
|
||||||
|
position = bisect.bisect(self._partitions, hashed_key)
|
||||||
|
return position if position < len(self._partitions) else 0
|
||||||
|
except TypeError:
|
||||||
|
raise Invalid(
|
||||||
|
_("Invalid data supplied to HashRing.get_hosts."))
|
||||||
|
|
||||||
|
def get_hosts(self, data, ignore_hosts=None):
|
||||||
|
"""Get the list of hosts which the supplied data maps onto.
|
||||||
|
|
||||||
|
:param data: A string identifier to be mapped across the ring.
|
||||||
|
:param ignore_hosts: A list of hosts to skip when performing the hash.
|
||||||
|
Useful to temporarily skip down hosts without
|
||||||
|
performing a full rebalance.
|
||||||
|
Default: None.
|
||||||
|
:returns: a list of hosts.
|
||||||
|
The length of this list depends on the number of replicas
|
||||||
|
this `HashRing` was created with. It may be less than this
|
||||||
|
if ignore_hosts is not None.
|
||||||
|
"""
|
||||||
|
hosts = []
|
||||||
|
if ignore_hosts is None:
|
||||||
|
ignore_hosts = set()
|
||||||
|
else:
|
||||||
|
ignore_hosts = set(ignore_hosts)
|
||||||
|
ignore_hosts.intersection_update(self.hosts)
|
||||||
|
partition = self._get_partition(data)
|
||||||
|
for replica in range(0, self.replicas):
|
||||||
|
if len(hosts) + len(ignore_hosts) == len(self.hosts):
|
||||||
|
# prevent infinite loop - cannot allocate more fallbacks.
|
||||||
|
break
|
||||||
|
# Linear probing: partition N, then N+1 etc.
|
||||||
|
host = self._get_host(partition)
|
||||||
|
while host in hosts or host in ignore_hosts:
|
||||||
|
partition += 1
|
||||||
|
if partition >= len(self._partitions):
|
||||||
|
partition = 0
|
||||||
|
host = self._get_host(partition)
|
||||||
|
hosts.append(host)
|
||||||
|
return hosts
|
||||||
|
|
||||||
|
def _get_host(self, partition):
|
||||||
|
"""Find what host is serving a partition.
|
||||||
|
|
||||||
|
:param partition: The index of the partition in the partition map.
|
||||||
|
e.g. 0 is the first partition, 1 is the second.
|
||||||
|
:return: The host object the ring was constructed with.
|
||||||
|
"""
|
||||||
|
return self._host_hashes[self._partitions[partition]]
|
||||||
|
|
||||||
|
|
||||||
|
class HashRingManager(object):
|
||||||
|
_hash_ring = None
|
||||||
|
_lock = threading.Lock()
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._hosts = []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ring(self):
|
||||||
|
# Hot path, no lock
|
||||||
|
if self._hash_ring is not None:
|
||||||
|
return self._hash_ring
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if self._hash_ring is None:
|
||||||
|
ring = self._load_hash_ring()
|
||||||
|
self.__class__._hash_ring = ring
|
||||||
|
return self._hash_ring
|
||||||
|
|
||||||
|
@property
|
||||||
|
def hosts(self):
|
||||||
|
return self.ring.hosts
|
||||||
|
|
||||||
|
def _load_hash_ring(self):
|
||||||
|
return HashRing(self._hosts)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def reset(cls):
|
||||||
|
with cls._lock:
|
||||||
|
cls._hash_ring = None
|
||||||
|
|
||||||
|
def rebalance(self, hosts):
|
||||||
|
self.reset()
|
||||||
|
with self._lock:
|
||||||
|
self._hosts = hosts
|
|
@ -132,7 +132,7 @@ class Connection(object):
|
||||||
transport = get_transport()
|
transport = get_transport()
|
||||||
target = get_target(topic=topic, fanout=False,
|
target = get_target(topic=topic, fanout=False,
|
||||||
exchange=exchange)
|
exchange=exchange)
|
||||||
pool = 'akanda.' + topic
|
pool = 'akanda.' + topic + '.' + cfg.CONF.host
|
||||||
server = oslo_messaging.get_notification_listener(
|
server = oslo_messaging.get_notification_listener(
|
||||||
transport, [target], endpoints, pool=pool)
|
transport, [target], endpoints, pool=pool)
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
|
|
|
@ -0,0 +1,169 @@
|
||||||
|
# Copyright 2015 Akanda, Inc.
|
||||||
|
#
|
||||||
|
# Author: Akanda, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import signal
|
||||||
|
import time
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log
|
||||||
|
|
||||||
|
import tooz
|
||||||
|
from tooz import coordination as tz_coordination
|
||||||
|
|
||||||
|
from akanda.rug import event as ak_event
|
||||||
|
from akanda.rug.common.i18n import _, _LI
|
||||||
|
|
||||||
|
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
COORD_OPTS = [
|
||||||
|
cfg.BoolOpt('enabled', default=False,
|
||||||
|
help=_('Whether to use an external coordination service to '
|
||||||
|
'a cluster of akanda-rug nodes. This may be disabled '
|
||||||
|
'for akanda-rug node environments.')),
|
||||||
|
cfg.StrOpt('url',
|
||||||
|
default='memcached://localhost:11211',
|
||||||
|
help=_('URL of suppoted coordination service')),
|
||||||
|
cfg.StrOpt('group_id', default='akanda.rug',
|
||||||
|
help=_('ID of coordination group to join.')),
|
||||||
|
cfg.IntOpt('heartbeat_interval', default=1,
|
||||||
|
help=_('Interval (in seconds) for cluster heartbeats')),
|
||||||
|
]
|
||||||
|
CONF.register_group(cfg.OptGroup(name='coordination'))
|
||||||
|
CONF.register_opts(COORD_OPTS, group='coordination')
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidEventType(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CoordinatorDone(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class RugCoordinator(object):
|
||||||
|
def __init__(self, notifications_queue):
|
||||||
|
self._queue = notifications_queue
|
||||||
|
self.host = CONF.host
|
||||||
|
self.url = CONF.coordination.url
|
||||||
|
self.group = CONF.coordination.group_id
|
||||||
|
self.heartbeat_interval = CONF.coordination.heartbeat_interval
|
||||||
|
self._coordinator = None
|
||||||
|
signal.signal(signal.SIGTERM, self.stop)
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Brings up coordination service online
|
||||||
|
|
||||||
|
This connects the coordination service to its tooz backend. This
|
||||||
|
involves:
|
||||||
|
|
||||||
|
- connecting to the cluster
|
||||||
|
- creating the coordination group (if required)
|
||||||
|
- joining the coordination group
|
||||||
|
- registering callbacks to respond to join/leave membership
|
||||||
|
events
|
||||||
|
|
||||||
|
After the local node has joined the cluster and knows its remote
|
||||||
|
peers, it fires off an initial rebalance event to the workers
|
||||||
|
so they can seed their hash ring with the current membership.
|
||||||
|
"""
|
||||||
|
LOG.info(
|
||||||
|
'Starting RUG coordinator process for host %s on %s' %
|
||||||
|
(self.host, self.url))
|
||||||
|
self._coordinator = tz_coordination.get_coordinator(
|
||||||
|
self.url, self.host)
|
||||||
|
self._coordinator.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._coordinator.create_group(self.group).get()
|
||||||
|
except tooz.coordination.GroupAlreadyExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._coordinator.join_group(self.group).get()
|
||||||
|
self._coordinator.heartbeat()
|
||||||
|
except tooz.coordination.MemberAlreadyExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._coordinator.watch_join_group(self.group, self.cluster_changed)
|
||||||
|
self._coordinator.watch_leave_group(self.group, self.cluster_changed)
|
||||||
|
self._coordinator.heartbeat()
|
||||||
|
LOG.debug("Sending initial event changed for members; %s" %
|
||||||
|
self.members)
|
||||||
|
self.cluster_changed(event=None)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
self._coordinator.heartbeat()
|
||||||
|
self._coordinator.run_watchers()
|
||||||
|
time.sleep(self.heartbeat_interval)
|
||||||
|
except CoordinatorDone:
|
||||||
|
LOG.info(_LI('Stopping RUG coordinator.'))
|
||||||
|
return
|
||||||
|
|
||||||
|
def stop(self, signal=None, frame=None):
|
||||||
|
"""Stop the coordinator service.
|
||||||
|
|
||||||
|
This ensures a clean shutdown of the coordinator service and attemps to
|
||||||
|
advertise its departure to the rest of the cluster. Note this is
|
||||||
|
registered as a signal handler for SIGINT so that its run when the main
|
||||||
|
shutdowns and subprocesses receive the signal.
|
||||||
|
"""
|
||||||
|
self._coordinator.unwatch_join_group(self.group, self.cluster_changed)
|
||||||
|
self._coordinator.unwatch_leave_group(self.group, self.cluster_changed)
|
||||||
|
|
||||||
|
if self.is_leader:
|
||||||
|
try:
|
||||||
|
self._coordinator.stand_down_group_leader(self.group)
|
||||||
|
except tooz.NotImplemented:
|
||||||
|
pass
|
||||||
|
self._coordinator.leave_group(self.group).get()
|
||||||
|
raise CoordinatorDone()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def members(self):
|
||||||
|
"""Returns the current cluster membership list"""
|
||||||
|
return self._coordinator.get_members(self.group).get()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_leader(self):
|
||||||
|
"""Returns true if the local cluster member is the leader"""
|
||||||
|
return self._coordinator.get_leader(self.group).get() == self.host
|
||||||
|
|
||||||
|
def cluster_changed(self, event):
|
||||||
|
"""Event callback to be called by tooz on membership changes"""
|
||||||
|
LOG.debug('Broadcasting cluster changed event to trigger rebalance. '
|
||||||
|
'members=%s' % self.members)
|
||||||
|
|
||||||
|
r = ak_event.Resource(
|
||||||
|
tenant_id='*',
|
||||||
|
id='*',
|
||||||
|
driver='*',
|
||||||
|
)
|
||||||
|
e = ak_event.Event(
|
||||||
|
resource=r,
|
||||||
|
crud=ak_event.REBALANCE,
|
||||||
|
body={'members': self.members}
|
||||||
|
)
|
||||||
|
self._queue.put(('*', e))
|
||||||
|
|
||||||
|
|
||||||
|
def start(notification_queue):
|
||||||
|
return RugCoordinator(notification_queue).run()
|
|
@ -22,6 +22,7 @@ DELETE = 'delete'
|
||||||
POLL = 'poll'
|
POLL = 'poll'
|
||||||
COMMAND = 'command' # an external command to be processed
|
COMMAND = 'command' # an external command to be processed
|
||||||
REBUILD = 'rebuild'
|
REBUILD = 'rebuild'
|
||||||
|
REBALANCE = 'rebalance'
|
||||||
|
|
||||||
|
|
||||||
class Event(object):
|
class Event(object):
|
||||||
|
|
|
@ -28,6 +28,7 @@ from oslo_log import log
|
||||||
|
|
||||||
from akanda.rug.common.i18n import _LE, _LI
|
from akanda.rug.common.i18n import _LE, _LI
|
||||||
from akanda.rug.common import config as ak_cfg
|
from akanda.rug.common import config as ak_cfg
|
||||||
|
from akanda.rug import coordination
|
||||||
from akanda.rug import daemon
|
from akanda.rug import daemon
|
||||||
from akanda.rug import health
|
from akanda.rug import health
|
||||||
from akanda.rug import metadata
|
from akanda.rug import metadata
|
||||||
|
@ -147,6 +148,18 @@ def main(argv=sys.argv[1:]):
|
||||||
)
|
)
|
||||||
notification_proc.start()
|
notification_proc.start()
|
||||||
|
|
||||||
|
if CONF.coordination.enabled:
|
||||||
|
coordinator_proc = multiprocessing.Process(
|
||||||
|
target=coordination.start,
|
||||||
|
kwargs={
|
||||||
|
'notification_queue': notification_queue
|
||||||
|
},
|
||||||
|
name='coordinator',
|
||||||
|
)
|
||||||
|
coordinator_proc.start()
|
||||||
|
else:
|
||||||
|
coordinator_proc = None
|
||||||
|
|
||||||
mgt_ip_address = neutron_api.get_local_service_ip(cfg.CONF).split('/')[0]
|
mgt_ip_address = neutron_api.get_local_service_ip(cfg.CONF).split('/')[0]
|
||||||
metadata_proc = multiprocessing.Process(
|
metadata_proc = multiprocessing.Process(
|
||||||
target=metadata.serve,
|
target=metadata.serve,
|
||||||
|
@ -200,6 +213,9 @@ def main(argv=sys.argv[1:]):
|
||||||
publisher.stop()
|
publisher.stop()
|
||||||
|
|
||||||
# Terminate the subprocesses
|
# Terminate the subprocesses
|
||||||
for subproc in [notification_proc, metadata_proc, rug_api_proc]:
|
for subproc in [notification_proc, coordinator_proc, metadata_proc,
|
||||||
|
rug_api_proc]:
|
||||||
|
if not subproc:
|
||||||
|
continue
|
||||||
LOG.info(_LI('Stopping %s.'), subproc.name)
|
LOG.info(_LI('Stopping %s.'), subproc.name)
|
||||||
subproc.terminate()
|
subproc.terminate()
|
||||||
|
|
|
@ -28,6 +28,14 @@ from akanda.rug import drivers
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def repopulate():
|
||||||
|
"""Called from workers after a rebalance to find newly owned resources"""
|
||||||
|
resources = []
|
||||||
|
for driver in drivers.enabled_drivers():
|
||||||
|
resources += driver.pre_populate_hook()
|
||||||
|
return resources
|
||||||
|
|
||||||
|
|
||||||
def _pre_populate_workers(scheduler):
|
def _pre_populate_workers(scheduler):
|
||||||
"""Loops through enabled drivers triggering each drivers pre_populate_hook
|
"""Loops through enabled drivers triggering each drivers pre_populate_hook
|
||||||
which is a static method for each driver.
|
which is a static method for each driver.
|
||||||
|
|
|
@ -534,3 +534,13 @@ class Automaton(object):
|
||||||
|
|
||||||
def has_error(self):
|
def has_error(self):
|
||||||
return self.instance.state == states.ERROR
|
return self.instance.state == states.ERROR
|
||||||
|
|
||||||
|
def drop_queue(self):
|
||||||
|
"""Drop all pending actions from the local state machine's work queue.
|
||||||
|
|
||||||
|
This is used after a ring rebalance if this state machine no longer
|
||||||
|
maps to the local Rug process.
|
||||||
|
"""
|
||||||
|
self.driver.log.info(
|
||||||
|
'Dropping %s pending actions from queue', len(self._queue))
|
||||||
|
self._queue.clear()
|
||||||
|
|
|
@ -0,0 +1,229 @@
|
||||||
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||||
|
# Copyright 2015 Akanda, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
import mock
|
||||||
|
from oslo_config import cfg
|
||||||
|
from testtools import matchers
|
||||||
|
|
||||||
|
from akanda.rug.common import hash_ring
|
||||||
|
from akanda.rug.test.unit import base
|
||||||
|
# from akanda.rug.tests.unit.db import base as db_base
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
|
class HashRingTestCase(base.RugTestBase):
|
||||||
|
|
||||||
|
# NOTE(deva): the mapping used in these tests is as follows:
|
||||||
|
# if hosts = [foo, bar]:
|
||||||
|
# fake -> foo, bar
|
||||||
|
# if hosts = [foo, bar, baz]:
|
||||||
|
# fake -> foo, bar, baz
|
||||||
|
# fake-again -> bar, baz, foo
|
||||||
|
|
||||||
|
@mock.patch.object(hashlib, 'md5', autospec=True)
|
||||||
|
def test__hash2int_returns_int(self, mock_md5):
|
||||||
|
CONF.set_override('hash_partition_exponent', 0)
|
||||||
|
r1 = 32 * 'a'
|
||||||
|
r2 = 32 * 'b'
|
||||||
|
mock_md5.return_value.hexdigest.side_effect = [r1, r2]
|
||||||
|
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
replicas = 1
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=replicas)
|
||||||
|
|
||||||
|
self.assertIn(int(r1, 16), ring._host_hashes)
|
||||||
|
self.assertIn(int(r2, 16), ring._host_hashes)
|
||||||
|
|
||||||
|
def test_create_ring(self):
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
replicas = 2
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=replicas)
|
||||||
|
self.assertEqual(set(hosts), ring.hosts)
|
||||||
|
self.assertEqual(replicas, ring.replicas)
|
||||||
|
|
||||||
|
def test_create_with_different_partition_counts(self):
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
CONF.set_override('hash_partition_exponent', 2)
|
||||||
|
ring = hash_ring.HashRing(hosts)
|
||||||
|
self.assertEqual(2 ** 2 * 2, len(ring._partitions))
|
||||||
|
|
||||||
|
CONF.set_override('hash_partition_exponent', 8)
|
||||||
|
ring = hash_ring.HashRing(hosts)
|
||||||
|
self.assertEqual(2 ** 8 * 2, len(ring._partitions))
|
||||||
|
|
||||||
|
CONF.set_override('hash_partition_exponent', 16)
|
||||||
|
ring = hash_ring.HashRing(hosts)
|
||||||
|
self.assertEqual(2 ** 16 * 2, len(ring._partitions))
|
||||||
|
|
||||||
|
def test_distribution_one_replica(self):
|
||||||
|
hosts = ['foo', 'bar', 'baz']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
|
fake_1_hosts = ring.get_hosts('fake')
|
||||||
|
fake_2_hosts = ring.get_hosts('fake-again')
|
||||||
|
# We should have one hosts for each thing
|
||||||
|
self.assertThat(fake_1_hosts, matchers.HasLength(1))
|
||||||
|
self.assertThat(fake_2_hosts, matchers.HasLength(1))
|
||||||
|
# And they must not be the same answers even on this simple data.
|
||||||
|
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
|
||||||
|
|
||||||
|
def test_distribution_two_replicas(self):
|
||||||
|
hosts = ['foo', 'bar', 'baz']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=2)
|
||||||
|
fake_1_hosts = ring.get_hosts('fake')
|
||||||
|
fake_2_hosts = ring.get_hosts('fake-again')
|
||||||
|
# We should have two hosts for each thing
|
||||||
|
self.assertThat(fake_1_hosts, matchers.HasLength(2))
|
||||||
|
self.assertThat(fake_2_hosts, matchers.HasLength(2))
|
||||||
|
# And they must not be the same answers even on this simple data
|
||||||
|
# because if they were we'd be making the active replica a hot spot.
|
||||||
|
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
|
||||||
|
|
||||||
|
def test_distribution_three_replicas(self):
|
||||||
|
hosts = ['foo', 'bar', 'baz']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=3)
|
||||||
|
fake_1_hosts = ring.get_hosts('fake')
|
||||||
|
fake_2_hosts = ring.get_hosts('fake-again')
|
||||||
|
# We should have two hosts for each thing
|
||||||
|
self.assertThat(fake_1_hosts, matchers.HasLength(3))
|
||||||
|
self.assertThat(fake_2_hosts, matchers.HasLength(3))
|
||||||
|
# And they must not be the same answers even on this simple data
|
||||||
|
# because if they were we'd be making the active replica a hot spot.
|
||||||
|
self.assertNotEqual(fake_1_hosts, fake_2_hosts)
|
||||||
|
self.assertNotEqual(fake_1_hosts[0], fake_2_hosts[0])
|
||||||
|
|
||||||
|
def test_ignore_hosts(self):
|
||||||
|
hosts = ['foo', 'bar', 'baz']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
|
equals_bar_or_baz = matchers.MatchesAny(
|
||||||
|
matchers.Equals(['bar']),
|
||||||
|
matchers.Equals(['baz']))
|
||||||
|
self.assertThat(
|
||||||
|
ring.get_hosts('fake', ignore_hosts=['foo']),
|
||||||
|
equals_bar_or_baz)
|
||||||
|
self.assertThat(
|
||||||
|
ring.get_hosts('fake', ignore_hosts=['foo', 'bar']),
|
||||||
|
equals_bar_or_baz)
|
||||||
|
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
|
||||||
|
|
||||||
|
def test_ignore_hosts_with_replicas(self):
|
||||||
|
hosts = ['foo', 'bar', 'baz']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=2)
|
||||||
|
self.assertEqual(
|
||||||
|
set(['bar', 'baz']),
|
||||||
|
set(ring.get_hosts('fake', ignore_hosts=['foo'])))
|
||||||
|
self.assertEqual(
|
||||||
|
set(['baz']),
|
||||||
|
set(ring.get_hosts('fake', ignore_hosts=['foo', 'bar'])))
|
||||||
|
self.assertEqual(
|
||||||
|
set(['baz', 'foo']),
|
||||||
|
set(ring.get_hosts('fake-again', ignore_hosts=['bar'])))
|
||||||
|
self.assertEqual(
|
||||||
|
set(['foo']),
|
||||||
|
set(ring.get_hosts('fake-again', ignore_hosts=['bar', 'baz'])))
|
||||||
|
self.assertEqual([], ring.get_hosts('fake', ignore_hosts=hosts))
|
||||||
|
|
||||||
|
def _compare_rings(self, nodes, conductors, ring,
|
||||||
|
new_conductors, new_ring):
|
||||||
|
delta = {}
|
||||||
|
mapping = dict((node, ring.get_hosts(node)[0]) for node in nodes)
|
||||||
|
new_mapping = dict(
|
||||||
|
(node, new_ring.get_hosts(node)[0]) for node in nodes)
|
||||||
|
|
||||||
|
for key, old in mapping.items():
|
||||||
|
new = new_mapping.get(key, None)
|
||||||
|
if new != old:
|
||||||
|
delta[key] = (old, new)
|
||||||
|
return delta
|
||||||
|
|
||||||
|
def test_rebalance_stability_join(self):
|
||||||
|
num_conductors = 10
|
||||||
|
num_nodes = 10000
|
||||||
|
# Adding 1 conductor to a set of N should move 1/(N+1) of all nodes
|
||||||
|
# Eg, for a cluster of 10 nodes, adding one should move 1/11, or 9%
|
||||||
|
# We allow for 1/N to allow for rounding in tests.
|
||||||
|
redistribution_factor = 1.0 / num_conductors
|
||||||
|
|
||||||
|
nodes = [str(x) for x in range(num_nodes)]
|
||||||
|
conductors = [str(x) for x in range(num_conductors)]
|
||||||
|
new_conductors = conductors + ['new']
|
||||||
|
delta = self._compare_rings(
|
||||||
|
nodes, conductors, hash_ring.HashRing(conductors),
|
||||||
|
new_conductors, hash_ring.HashRing(new_conductors))
|
||||||
|
|
||||||
|
self.assertTrue(len(delta) < num_nodes * redistribution_factor)
|
||||||
|
|
||||||
|
def test_rebalance_stability_leave(self):
|
||||||
|
num_conductors = 10
|
||||||
|
num_nodes = 10000
|
||||||
|
# Removing 1 conductor from a set of N should move 1/(N) of all nodes
|
||||||
|
# Eg, for a cluster of 10 nodes, removing one should move 1/10, or 10%
|
||||||
|
# We allow for 1/(N-1) to allow for rounding in tests.
|
||||||
|
redistribution_factor = 1.0 / (num_conductors - 1)
|
||||||
|
|
||||||
|
nodes = [str(x) for x in range(num_nodes)]
|
||||||
|
conductors = [str(x) for x in range(num_conductors)]
|
||||||
|
new_conductors = conductors[:]
|
||||||
|
new_conductors.pop()
|
||||||
|
delta = self._compare_rings(
|
||||||
|
nodes, conductors, hash_ring.HashRing(conductors),
|
||||||
|
new_conductors, hash_ring.HashRing(new_conductors))
|
||||||
|
|
||||||
|
self.assertTrue(len(delta) < num_nodes * redistribution_factor)
|
||||||
|
|
||||||
|
def test_more_replicas_than_hosts(self):
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=10)
|
||||||
|
self.assertEqual(set(hosts), set(ring.get_hosts('fake')))
|
||||||
|
|
||||||
|
def test_ignore_non_existent_host(self):
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
ring = hash_ring.HashRing(hosts, replicas=1)
|
||||||
|
self.assertEqual(['foo'], ring.get_hosts('fake',
|
||||||
|
ignore_hosts=['baz']))
|
||||||
|
|
||||||
|
def test_create_ring_invalid_data(self):
|
||||||
|
hosts = None
|
||||||
|
self.assertRaises(hash_ring.Invalid,
|
||||||
|
hash_ring.HashRing,
|
||||||
|
hosts)
|
||||||
|
|
||||||
|
def test_get_hosts_invalid_data(self):
|
||||||
|
hosts = ['foo', 'bar']
|
||||||
|
ring = hash_ring.HashRing(hosts)
|
||||||
|
self.assertRaises(hash_ring.Invalid,
|
||||||
|
ring.get_hosts,
|
||||||
|
None)
|
||||||
|
|
||||||
|
|
||||||
|
class HashRingManagerTestCase(base.RugTestBase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(HashRingManagerTestCase, self).setUp()
|
||||||
|
self.ring_manager = hash_ring.HashRingManager()
|
||||||
|
|
||||||
|
def test_reset(self):
|
||||||
|
self.ring_manager.rebalance(hosts=['foo', 'bar'])
|
||||||
|
self.ring_manager.reset()
|
||||||
|
self.assertIsNone(self.ring_manager._hash_ring)
|
||||||
|
|
||||||
|
def test_rebalance(self):
|
||||||
|
self.ring_manager.rebalance(hosts=['foo', 'bar'])
|
||||||
|
self.assertEqual(set(['foo', 'bar']), self.ring_manager.hosts)
|
||||||
|
self.ring_manager.rebalance(hosts=['bar', 'baz'])
|
||||||
|
self.assertEqual(set(['bar', 'baz']), self.ring_manager.hosts)
|
|
@ -0,0 +1,130 @@
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from Queue import Queue
|
||||||
|
|
||||||
|
from tooz import coordination as tz_coordination
|
||||||
|
|
||||||
|
from akanda.rug import coordination
|
||||||
|
from akanda.rug import event
|
||||||
|
from akanda.rug.test.unit import base
|
||||||
|
|
||||||
|
|
||||||
|
class TestRugCoordinator(base.RugTestBase):
|
||||||
|
def get_fake_coordinator(self, url, member_id):
|
||||||
|
return self.fake_coord
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestRugCoordinator, self).setUp()
|
||||||
|
self.config(url='memcache://foo_cache', group='coordination')
|
||||||
|
self.config(group_id='foo_coord_group', group='coordination')
|
||||||
|
self.config(heartbeat_interval=9, group='coordination')
|
||||||
|
self.config(host='foo_host')
|
||||||
|
|
||||||
|
self.fake_coord = mock.MagicMock(
|
||||||
|
create_group=mock.MagicMock(),
|
||||||
|
join_group=mock.MagicMock(),
|
||||||
|
heartbeat=mock.MagicMock(),
|
||||||
|
watch_join_group=mock.MagicMock(),
|
||||||
|
watch_leave_group=mock.MagicMock(),
|
||||||
|
get_leader=mock.MagicMock(),
|
||||||
|
stand_down_group_leader=mock.MagicMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
fake_get_coord = mock.patch.object(coordination, 'tz_coordination',
|
||||||
|
autospec=True)
|
||||||
|
self._fake_get_coord = fake_get_coord.start()
|
||||||
|
self._fake_get_coord.get_coordinator = self.get_fake_coordinator
|
||||||
|
|
||||||
|
self.addCleanup(mock.patch.stopall)
|
||||||
|
self.queue = Queue()
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.start')
|
||||||
|
def test_setup(self, fake_start):
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertEqual('memcache://foo_cache', self.coordinator.url)
|
||||||
|
self.assertEqual('foo_coord_group', self.coordinator.group)
|
||||||
|
self.assertEqual(9, self.coordinator.heartbeat_interval)
|
||||||
|
self.assertEqual('foo_host', self.coordinator.host)
|
||||||
|
self.assertTrue(fake_start.called)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.cluster_changed')
|
||||||
|
def test_start(self, fake_cluster_changed):
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertTrue(self.fake_coord.start.called)
|
||||||
|
self.fake_coord.create_group.assert_called_with('foo_coord_group')
|
||||||
|
self.fake_coord.join_group.assert_called_with('foo_coord_group')
|
||||||
|
self.fake_coord.watch_join_group.assert_called_with(
|
||||||
|
'foo_coord_group',
|
||||||
|
fake_cluster_changed)
|
||||||
|
self.fake_coord.watch_leave_group.assert_called_with(
|
||||||
|
'foo_coord_group',
|
||||||
|
fake_cluster_changed)
|
||||||
|
self.assertTrue(self.fake_coord.heartbeat.called)
|
||||||
|
fake_cluster_changed.assert_called_with(event=None)
|
||||||
|
|
||||||
|
def test_start_raises(self):
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.fake_coord.create_group.side_effect = (
|
||||||
|
tz_coordination.GroupAlreadyExist(self.coordinator.group))
|
||||||
|
self.fake_coord.join_group.side_effect = (
|
||||||
|
tz_coordination.MemberAlreadyExist(
|
||||||
|
self.coordinator.host, self.coordinator.group))
|
||||||
|
return self.test_start()
|
||||||
|
|
||||||
|
@mock.patch('time.sleep')
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.stop')
|
||||||
|
def test_run(self, fake_stop, fake_sleep):
|
||||||
|
fake_sleep.side_effect = coordination.CoordinatorDone()
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.coordinator.run()
|
||||||
|
self.assertTrue(self.fake_coord.heartbeat.called)
|
||||||
|
self.assertTrue(self.fake_coord.run_watchers.called)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.is_leader')
|
||||||
|
def test_stop_not_leader(self, fake_is_leader):
|
||||||
|
fake_is_leader.__get__ = mock.Mock(return_value=False)
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertRaises(coordination.CoordinatorDone, self.coordinator.stop)
|
||||||
|
self.fake_coord.leave_group.assert_called_with(self.coordinator.group)
|
||||||
|
self.assertFalse(self.fake_coord.stand_down_group_leader.called)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.is_leader')
|
||||||
|
def test_stop_leader(self, fake_is_leader):
|
||||||
|
fake_is_leader.__get__ = mock.Mock(return_value=True)
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertRaises(coordination.CoordinatorDone, self.coordinator.stop)
|
||||||
|
self.fake_coord.stand_down_group_leader.assert_called_with(
|
||||||
|
self.coordinator.group)
|
||||||
|
self.fake_coord.leave_group.assert_called_with(self.coordinator.group)
|
||||||
|
|
||||||
|
def test_members(self):
|
||||||
|
fake_async_resp = mock.MagicMock(
|
||||||
|
get=mock.MagicMock(return_value=['foo', 'bar'])
|
||||||
|
)
|
||||||
|
self.fake_coord.get_members.return_value = fake_async_resp
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertEqual(self.coordinator.members, ['foo', 'bar'])
|
||||||
|
self.fake_coord.get_members.assert_called_with(self.coordinator.group)
|
||||||
|
|
||||||
|
def test_is_leader(self):
|
||||||
|
fake_async_resp = mock.MagicMock(
|
||||||
|
get=mock.MagicMock(return_value='foo_host')
|
||||||
|
)
|
||||||
|
self.fake_coord.get_leader.return_value = fake_async_resp
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
self.assertEqual(self.coordinator.is_leader, True)
|
||||||
|
self.fake_coord.get_leader.assert_called_with(self.coordinator.group)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.coordination.RugCoordinator.members')
|
||||||
|
def test_cluster_changed(self, fake_members):
|
||||||
|
fake_members.__get__ = mock.Mock(return_value=['foo', 'bar'])
|
||||||
|
self.coordinator = coordination.RugCoordinator(self.queue)
|
||||||
|
expected_rebalance_event = event.Event(
|
||||||
|
resource=event.Resource('*', '*', '*'),
|
||||||
|
crud=event.REBALANCE,
|
||||||
|
body={'members': ['foo', 'bar']})
|
||||||
|
|
||||||
|
self.coordinator.cluster_changed(event=None)
|
||||||
|
expected = ('*', expected_rebalance_event)
|
||||||
|
res = self.queue.get()
|
||||||
|
self.assertEqual(res, expected)
|
|
@ -76,3 +76,18 @@ class TestPrePopulateWorkers(base.RugTestBase):
|
||||||
t.mock_calls,
|
t.mock_calls,
|
||||||
[mock.call.setDaemon(True), mock.call.start()]
|
[mock.call.setDaemon(True), mock.call.start()]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.drivers.enabled_drivers')
|
||||||
|
def test_repopulate(self, enabled_drivers):
|
||||||
|
drivers = []
|
||||||
|
for i in range(2):
|
||||||
|
driver = mock.Mock()
|
||||||
|
driver.pre_populate_hook = mock.Mock()
|
||||||
|
driver.pre_populate_hook.return_value = [
|
||||||
|
'driver_%s_resource' % i,
|
||||||
|
]
|
||||||
|
drivers.append(driver)
|
||||||
|
enabled_drivers.return_value = drivers
|
||||||
|
res = populate.repopulate()
|
||||||
|
self.assertEqual(
|
||||||
|
set(res), set(['driver_0_resource', 'driver_1_resource']))
|
||||||
|
|
|
@ -145,7 +145,7 @@ class TestConnection(testtools.TestCase):
|
||||||
topic='foo_topic', fanout=False, exchange='foo_exchange')
|
topic='foo_topic', fanout=False, exchange='foo_exchange')
|
||||||
fake_get_listener.assert_called_with(
|
fake_get_listener.assert_called_with(
|
||||||
'fake_transport', ['fake_target'], endpoints,
|
'fake_transport', ['fake_target'], endpoints,
|
||||||
pool='akanda.foo_topic')
|
pool='akanda.foo_topic.test_host')
|
||||||
self.connection._add_server_thread.assert_called_with(
|
self.connection._add_server_thread.assert_called_with(
|
||||||
'fake_listener_server')
|
'fake_listener_server')
|
||||||
|
|
||||||
|
|
|
@ -703,3 +703,9 @@ class TestAutomaton(unittest.TestCase):
|
||||||
with mock.patch.object(self.sm, 'instance') as instance:
|
with mock.patch.object(self.sm, 'instance') as instance:
|
||||||
instance.state = states.UP
|
instance.state = states.UP
|
||||||
self.assertFalse(self.sm.has_error())
|
self.assertFalse(self.sm.has_error())
|
||||||
|
|
||||||
|
def test_drop_queue(self):
|
||||||
|
self.sm._queue.append('foo_item')
|
||||||
|
self.assertEqual(1, len(self.sm._queue))
|
||||||
|
self.sm.drop_queue()
|
||||||
|
self.assertEqual(0, len(self.sm._queue))
|
||||||
|
|
|
@ -26,10 +26,11 @@ from oslo_config import cfg
|
||||||
from akanda.rug import commands
|
from akanda.rug import commands
|
||||||
from akanda.rug import event
|
from akanda.rug import event
|
||||||
from akanda.rug import notifications
|
from akanda.rug import notifications
|
||||||
|
from akanda.rug.api import neutron
|
||||||
from akanda.rug.drivers import router
|
from akanda.rug.drivers import router
|
||||||
from akanda.rug import worker
|
from akanda.rug import worker
|
||||||
|
|
||||||
from akanda.rug.api import neutron
|
from akanda.rug.common.hash_ring import DC_KEY
|
||||||
|
|
||||||
from akanda.rug.test.unit.db import base
|
from akanda.rug.test.unit.db import base
|
||||||
|
|
||||||
|
@ -109,6 +110,7 @@ class TestWorker(WorkerTestBase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestWorker, self).setUp()
|
super(TestWorker, self).setUp()
|
||||||
|
self.config(enabled=True, group='coordination')
|
||||||
self.target = self.tenant_id
|
self.target = self.tenant_id
|
||||||
self.resource = event.Resource(
|
self.resource = event.Resource(
|
||||||
self.driver,
|
self.driver,
|
||||||
|
@ -123,22 +125,21 @@ class TestWorker(WorkerTestBase):
|
||||||
self.fake_cache.get_by_tenant = mock.MagicMock()
|
self.fake_cache.get_by_tenant = mock.MagicMock()
|
||||||
self.w.resource_cache = self.fake_cache
|
self.w.resource_cache = self.fake_cache
|
||||||
|
|
||||||
def test__should_process_true(self):
|
def test__should_process_message_global_debug(self):
|
||||||
self.assertEqual(
|
|
||||||
self.msg,
|
|
||||||
self.w._should_process(self.msg))
|
|
||||||
|
|
||||||
def test__should_process_global_debug(self):
|
|
||||||
self.dbapi.enable_global_debug()
|
self.dbapi.enable_global_debug()
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
self.w._should_process(self.msg))
|
self.w._should_process_message(self.target, self.msg))
|
||||||
|
|
||||||
def test__should_process_tenant_debug(self):
|
def test__should_process_message_tenant_debug(self):
|
||||||
self.dbapi.enable_tenant_debug(tenant_uuid=self.tenant_id)
|
self.dbapi.enable_tenant_debug(tenant_uuid=self.tenant_id)
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
self.w._should_process(self.msg))
|
self.w._should_process_message(self.target, self.msg))
|
||||||
|
|
||||||
def test__should_process_no_router_id(self):
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_no_router_id(self, fake_hash):
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = [self.w.host]
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
self.fake_cache.get_by_tenant.return_value = (
|
self.fake_cache.get_by_tenant.return_value = (
|
||||||
'9846d012-3c75-11e5-b476-8321b3ff1a1d')
|
'9846d012-3c75-11e5-b476-8321b3ff1a1d')
|
||||||
r = event.Resource(
|
r = event.Resource(
|
||||||
|
@ -161,7 +162,9 @@ class TestWorker(WorkerTestBase):
|
||||||
crud=event.CREATE,
|
crud=event.CREATE,
|
||||||
body={'key': 'value'},
|
body={'key': 'value'},
|
||||||
)
|
)
|
||||||
self.assertEquals(expected, self.w._should_process(msg))
|
self.assertEquals(
|
||||||
|
expected,
|
||||||
|
self.w._should_process_message(self.target, msg))
|
||||||
|
|
||||||
def test__should_process_no_router_id_no_router_found(self):
|
def test__should_process_no_router_id_no_router_found(self):
|
||||||
self.fake_cache.get_by_tenant.return_value = None
|
self.fake_cache.get_by_tenant.return_value = None
|
||||||
|
@ -175,10 +178,10 @@ class TestWorker(WorkerTestBase):
|
||||||
crud=event.CREATE,
|
crud=event.CREATE,
|
||||||
body={'key': 'value'},
|
body={'key': 'value'},
|
||||||
)
|
)
|
||||||
self.assertFalse(self.w._should_process(msg))
|
self.assertFalse(self.w._should_process_message(self.target, msg))
|
||||||
|
|
||||||
@mock.patch('akanda.rug.worker.Worker._deliver_message')
|
@mock.patch('akanda.rug.worker.Worker._deliver_message')
|
||||||
@mock.patch('akanda.rug.worker.Worker._should_process')
|
@mock.patch('akanda.rug.worker.Worker._should_process_message')
|
||||||
def test_handle_message_should_process(self, fake_should_process,
|
def test_handle_message_should_process(self, fake_should_process,
|
||||||
fake_deliver):
|
fake_deliver):
|
||||||
# ensure we plumb through the return of should_process to
|
# ensure we plumb through the return of should_process to
|
||||||
|
@ -193,16 +196,144 @@ class TestWorker(WorkerTestBase):
|
||||||
fake_should_process.return_value = new_msg
|
fake_should_process.return_value = new_msg
|
||||||
self.w.handle_message(self.target, self.msg)
|
self.w.handle_message(self.target, self.msg)
|
||||||
fake_deliver.assert_called_with(self.target, new_msg)
|
fake_deliver.assert_called_with(self.target, new_msg)
|
||||||
fake_should_process.assert_called_with(self.msg)
|
fake_should_process.assert_called_with(self.target, self.msg)
|
||||||
|
|
||||||
@mock.patch('akanda.rug.worker.Worker._deliver_message')
|
@mock.patch('akanda.rug.worker.Worker._deliver_message')
|
||||||
@mock.patch('akanda.rug.worker.Worker._should_process')
|
@mock.patch('akanda.rug.worker.Worker._should_process_message')
|
||||||
def test_handle_message_should_not_process(self, fake_should_process,
|
def test_handle_message_should_not_process(self, fake_should_process,
|
||||||
fake_deliver):
|
fake_deliver):
|
||||||
fake_should_process.return_value = False
|
fake_should_process.return_value = False
|
||||||
self.w.handle_message(self.target, self.msg)
|
self.w.handle_message(self.target, self.msg)
|
||||||
self.assertFalse(fake_deliver.called)
|
self.assertFalse(fake_deliver.called)
|
||||||
fake_should_process.assert_called_with(self.msg)
|
fake_should_process.assert_called_with(self.target, self.msg)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_message_does_not_hash(self, fake_hash):
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = ['not_this_host']
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
|
self.assertFalse(
|
||||||
|
self.w._should_process_message(self.target, self.msg))
|
||||||
|
fake_ring_manager.ring.get_hosts.assert_called_with(self.router_id)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_message_wildcard_true(self, fake_hash):
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = ['not_this_host']
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
|
self.assertTrue(
|
||||||
|
self.w._should_process_message('*', self.msg))
|
||||||
|
self.assertFalse(fake_ring_manager.ring.called)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_message_true(self, fake_hash):
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = [self.w.host]
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
|
self.assertEqual(
|
||||||
|
self.w._should_process_message(self.target, self.msg),
|
||||||
|
self.msg)
|
||||||
|
fake_ring_manager.ring.get_hosts.assert_called_with(self.router_id)
|
||||||
|
|
||||||
|
def test__should_process_command_debug_config(self):
|
||||||
|
for cmd in [commands.WORKERS_DEBUG, commands.CONFIG_RELOAD]:
|
||||||
|
r = event.Resource(
|
||||||
|
tenant_id=self.tenant_id,
|
||||||
|
id=self.router_id,
|
||||||
|
driver='router',
|
||||||
|
)
|
||||||
|
msg = event.Event(
|
||||||
|
resource=r,
|
||||||
|
crud=event.COMMAND,
|
||||||
|
body={'command': cmd},
|
||||||
|
)
|
||||||
|
self.assertTrue(self.w._should_process_command(msg))
|
||||||
|
|
||||||
|
def _test__should_process_command(self, fake_hash, cmds, key,
|
||||||
|
negative=False):
|
||||||
|
self.config(enabled=True, group='coordination')
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
|
||||||
|
if not negative:
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = [self.w.host]
|
||||||
|
assertion = self.assertTrue
|
||||||
|
else:
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = ['not_this_host']
|
||||||
|
assertion = self.assertFalse
|
||||||
|
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
|
for cmd in cmds:
|
||||||
|
r = event.Resource(
|
||||||
|
tenant_id=self.tenant_id,
|
||||||
|
id=self.router_id,
|
||||||
|
driver='router',
|
||||||
|
)
|
||||||
|
msg = event.Event(
|
||||||
|
resource=r,
|
||||||
|
crud=event.COMMAND,
|
||||||
|
body={
|
||||||
|
'command': cmd,
|
||||||
|
'resource_id': self.router_id,
|
||||||
|
'router_id': self.router_id, # compat.
|
||||||
|
'tenant_id': self.tenant_id}
|
||||||
|
)
|
||||||
|
assertion(self.w._should_process_command(msg))
|
||||||
|
|
||||||
|
if key == DC_KEY:
|
||||||
|
fake_ring_manager.ring.get_hosts.assert_called_with(DC_KEY)
|
||||||
|
else:
|
||||||
|
fake_ring_manager.ring.get_hosts.assert_called_with(
|
||||||
|
msg.body[key])
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_resources(self, fake_hash):
|
||||||
|
cmds = worker.EVENT_COMMANDS
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='resource_id', negative=False)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_resources_negative(self, fake_hash):
|
||||||
|
cmds = [commands.RESOURCE_DEBUG, commands.RESOURCE_MANAGE]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='resource_id', negative=True)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_routers(self, fake_hash):
|
||||||
|
cmds = [commands.ROUTER_DEBUG, commands.ROUTER_MANAGE]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='router_id', negative=False)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_routers_negative(self, fake_hash):
|
||||||
|
cmds = [commands.ROUTER_DEBUG, commands.ROUTER_MANAGE]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='router_id', negative=True)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_tenants(self, fake_hash):
|
||||||
|
cmds = [commands.TENANT_DEBUG, commands.TENANT_MANAGE]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='tenant_id', negative=False)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_tenants_negative(self, fake_hash):
|
||||||
|
cmds = [commands.TENANT_DEBUG, commands.TENANT_MANAGE]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key='tenant_id', negative=True)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_global_debug(self, fake_hash):
|
||||||
|
fake_hash.DC_KEY = DC_KEY
|
||||||
|
cmds = [commands.GLOBAL_DEBUG]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key=DC_KEY, negative=False)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test__should_process_command_global_debug_negative(self, fake_hash):
|
||||||
|
fake_hash.DC_KEY = DC_KEY
|
||||||
|
cmds = [commands.GLOBAL_DEBUG]
|
||||||
|
self._test__should_process_command(
|
||||||
|
fake_hash, cmds=cmds, key=DC_KEY, negative=True)
|
||||||
|
|
||||||
|
|
||||||
class TestResourceCache(WorkerTestBase):
|
class TestResourceCache(WorkerTestBase):
|
||||||
|
@ -272,6 +403,11 @@ class TestCreatingResource(WorkerTestBase):
|
||||||
trm = self.w.tenant_managers[self.tenant_id]
|
trm = self.w.tenant_managers[self.tenant_id]
|
||||||
self.assertEqual(self.tenant_id, trm.tenant_id)
|
self.assertEqual(self.tenant_id, trm.tenant_id)
|
||||||
|
|
||||||
|
def test_not_in_tenant_managers(self):
|
||||||
|
self.w._should_process_message = mock.MagicMock(return_value=False)
|
||||||
|
self.w.handle_message(self.tenant_id, self.msg)
|
||||||
|
self.assertNotIn(self.tenant_id, self.w.tenant_managers)
|
||||||
|
|
||||||
def test_message_enqueued(self):
|
def test_message_enqueued(self):
|
||||||
self.w.handle_message(self.tenant_id, self.msg)
|
self.w.handle_message(self.tenant_id, self.msg)
|
||||||
trm = self.w.tenant_managers[self.tenant_id]
|
trm = self.w.tenant_managers[self.tenant_id]
|
||||||
|
@ -286,6 +422,7 @@ class TestWildcardMessages(WorkerTestBase):
|
||||||
|
|
||||||
self.tenant_id_1 = 'a8f964d4-6631-11e5-a79f-525400cfc32a'
|
self.tenant_id_1 = 'a8f964d4-6631-11e5-a79f-525400cfc32a'
|
||||||
self.tenant_id_2 = 'ef1a6e90-6631-11e5-83cb-525400cfc326'
|
self.tenant_id_2 = 'ef1a6e90-6631-11e5-83cb-525400cfc326'
|
||||||
|
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
|
||||||
|
|
||||||
# Create some tenants
|
# Create some tenants
|
||||||
for msg in [
|
for msg in [
|
||||||
|
@ -348,8 +485,18 @@ class TestUpdateStateMachine(WorkerTestBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestUpdateStateMachine, self).setUp()
|
super(TestUpdateStateMachine, self).setUp()
|
||||||
self.worker_context = worker.WorkerContext()
|
self.worker_context = worker.WorkerContext()
|
||||||
|
self.w._should_process_message = mock.MagicMock(return_value=self.msg)
|
||||||
|
|
||||||
|
def _test(self, fake_hash, negative=False):
|
||||||
|
self.config(enabled=True, group='coordination')
|
||||||
|
fake_ring_manager = fake_hash.HashRingManager()
|
||||||
|
if not negative:
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = [self.w.host]
|
||||||
|
else:
|
||||||
|
fake_ring_manager.ring.get_hosts.return_value = []
|
||||||
|
|
||||||
|
self.w.hash_ring_mgr = fake_ring_manager
|
||||||
|
|
||||||
def test(self):
|
|
||||||
# Create the router manager and state machine so we can
|
# Create the router manager and state machine so we can
|
||||||
# replace the update() method with a mock.
|
# replace the update() method with a mock.
|
||||||
trm = self.w._get_trms(self.tenant_id)[0]
|
trm = self.w._get_trms(self.tenant_id)[0]
|
||||||
|
@ -365,7 +512,19 @@ class TestUpdateStateMachine(WorkerTestBase):
|
||||||
# work) so we just invoke the thread target ourselves to
|
# work) so we just invoke the thread target ourselves to
|
||||||
# pretend.
|
# pretend.
|
||||||
used_context = self.w._thread_target()
|
used_context = self.w._thread_target()
|
||||||
meth.assert_called_once_with(used_context)
|
|
||||||
|
if not negative:
|
||||||
|
meth.assert_called_once_with(used_context)
|
||||||
|
else:
|
||||||
|
self.assertFalse(meth.called)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test_host_mapped(self, fake_hash):
|
||||||
|
self._test(fake_hash)
|
||||||
|
|
||||||
|
@mock.patch('akanda.rug.worker.hash_ring', autospec=True)
|
||||||
|
def test_host_not_mapped(self, fake_hash):
|
||||||
|
self._test(fake_hash, negative=True)
|
||||||
|
|
||||||
|
|
||||||
class TestReportStatus(WorkerTestBase):
|
class TestReportStatus(WorkerTestBase):
|
||||||
|
@ -576,3 +735,38 @@ class TestGlobalDebug(WorkerTestBase):
|
||||||
# method shouldn't ever be invoked.
|
# method shouldn't ever be invoked.
|
||||||
meth.side_effect = AssertionError('send_message was called')
|
meth.side_effect = AssertionError('send_message was called')
|
||||||
self.w.handle_message(tenant_id, msg)
|
self.w.handle_message(tenant_id, msg)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRebalance(WorkerTestBase):
|
||||||
|
def test_rebalance(self):
|
||||||
|
tenant_id = '98dd9c41-d3ac-4fd6-8927-567afa0b8fc3'
|
||||||
|
resource_id = 'ac194fc5-f317-412e-8611-fb290629f624'
|
||||||
|
r = event.Resource(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
id=resource_id,
|
||||||
|
driver='router',
|
||||||
|
)
|
||||||
|
msg = event.Event(
|
||||||
|
resource=r,
|
||||||
|
crud=event.CREATE,
|
||||||
|
body={'key': 'value'},
|
||||||
|
)
|
||||||
|
trm = self.w._get_trms(tenant_id)[0]
|
||||||
|
sm = trm.get_state_machines(msg, worker.WorkerContext())[0]
|
||||||
|
|
||||||
|
self.w.hash_ring_mgr.rebalance(['foo'])
|
||||||
|
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo']))
|
||||||
|
r = event.Resource(
|
||||||
|
tenant_id='*',
|
||||||
|
id='*',
|
||||||
|
driver='*',
|
||||||
|
)
|
||||||
|
msg = event.Event(
|
||||||
|
resource=r,
|
||||||
|
crud=event.REBALANCE,
|
||||||
|
body={'members': ['foo', 'bar']},
|
||||||
|
)
|
||||||
|
with mock.patch.object(sm, 'drop_queue') as meth:
|
||||||
|
self.w.handle_message('*', msg)
|
||||||
|
self.assertTrue(meth.called)
|
||||||
|
self.assertEqual(self.w.hash_ring_mgr.hosts, set(['foo', 'bar']))
|
||||||
|
|
|
@ -33,9 +33,11 @@ from akanda.rug import drivers
|
||||||
from akanda.rug.common.i18n import _LE, _LI, _LW
|
from akanda.rug.common.i18n import _LE, _LI, _LW
|
||||||
from akanda.rug import event
|
from akanda.rug import event
|
||||||
from akanda.rug import tenant
|
from akanda.rug import tenant
|
||||||
|
from akanda.rug.common import hash_ring
|
||||||
from akanda.rug.api import nova
|
from akanda.rug.api import nova
|
||||||
from akanda.rug.api import neutron
|
from akanda.rug.api import neutron
|
||||||
from akanda.rug.db import api as db_api
|
from akanda.rug.db import api as db_api
|
||||||
|
from akanda.rug import populate
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
@ -129,6 +131,7 @@ class Worker(object):
|
||||||
self._ignore_directory = cfg.CONF.ignored_router_directory
|
self._ignore_directory = cfg.CONF.ignored_router_directory
|
||||||
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
|
self._queue_warning_threshold = cfg.CONF.queue_warning_threshold
|
||||||
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
|
self._reboot_error_threshold = cfg.CONF.reboot_error_threshold
|
||||||
|
self.host = cfg.CONF.host
|
||||||
self.work_queue = Queue.Queue()
|
self.work_queue = Queue.Queue()
|
||||||
self.lock = threading.Lock()
|
self.lock = threading.Lock()
|
||||||
self._keep_going = True
|
self._keep_going = True
|
||||||
|
@ -161,6 +164,9 @@ class Worker(object):
|
||||||
)
|
)
|
||||||
for i in xrange(cfg.CONF.num_worker_threads)
|
for i in xrange(cfg.CONF.num_worker_threads)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
self.hash_ring_mgr = hash_ring.HashRingManager()
|
||||||
|
|
||||||
for t in self.threads:
|
for t in self.threads:
|
||||||
t.setDaemon(True)
|
t.setDaemon(True)
|
||||||
t.start()
|
t.start()
|
||||||
|
@ -195,6 +201,23 @@ class Worker(object):
|
||||||
LOG.debug('Skipping update of resource %s in debug mode. '
|
LOG.debug('Skipping update of resource %s in debug mode. '
|
||||||
'(reason: %s)', sm.resource_id, reason)
|
'(reason: %s)', sm.resource_id, reason)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# In the event that a rebalance took place while processing an
|
||||||
|
# event, it may have been put back into the work queue. Check
|
||||||
|
# the hash table once more to find out if we still manage it
|
||||||
|
# and do some cleanup if not.
|
||||||
|
if cfg.CONF.coordination.enabled:
|
||||||
|
target_hosts = self.hash_ring_mgr.ring.get_hosts(
|
||||||
|
sm.resource_id)
|
||||||
|
if self.host not in target_hosts:
|
||||||
|
LOG.debug('Skipping update of router %s, it no longer '
|
||||||
|
'maps here.', sm.resource_id)
|
||||||
|
sm._do_delete()
|
||||||
|
self.work_queue.task_done()
|
||||||
|
with self.lock:
|
||||||
|
self._release_resource_lock(sm)
|
||||||
|
continue
|
||||||
|
|
||||||
# FIXME(dhellmann): Need to look at the router to see if
|
# FIXME(dhellmann): Need to look at the router to see if
|
||||||
# it belongs to a tenant which is in debug mode, but we
|
# it belongs to a tenant which is in debug mode, but we
|
||||||
# don't have that data in the sm, yet.
|
# don't have that data in the sm, yet.
|
||||||
|
@ -318,7 +341,7 @@ class Worker(object):
|
||||||
|
|
||||||
return message
|
return message
|
||||||
|
|
||||||
def _should_process(self, message):
|
def _should_process_message(self, target, message):
|
||||||
"""Determines whether a message should be processed or not."""
|
"""Determines whether a message should be processed or not."""
|
||||||
global_debug, reason = self.db_api.global_debug()
|
global_debug, reason = self.db_api.global_debug()
|
||||||
if global_debug:
|
if global_debug:
|
||||||
|
@ -352,6 +375,18 @@ class Worker(object):
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if target in commands.WILDCARDS:
|
||||||
|
return message
|
||||||
|
|
||||||
|
if cfg.CONF.coordination.enabled:
|
||||||
|
target_hosts = self.hash_ring_mgr.ring.get_hosts(
|
||||||
|
message.resource.id)
|
||||||
|
if self.host not in target_hosts:
|
||||||
|
LOG.debug('Ignoring message intended for resource %s as it '
|
||||||
|
'does not map to this Rug process.',
|
||||||
|
message.resource.id)
|
||||||
|
return False
|
||||||
|
|
||||||
return message
|
return message
|
||||||
|
|
||||||
def handle_message(self, target, message):
|
def handle_message(self, target, message):
|
||||||
|
@ -364,11 +399,12 @@ class Worker(object):
|
||||||
return
|
return
|
||||||
if message.crud == event.COMMAND:
|
if message.crud == event.COMMAND:
|
||||||
self._dispatch_command(target, message)
|
self._dispatch_command(target, message)
|
||||||
|
elif message.crud == event.REBALANCE:
|
||||||
|
self._rebalance(message)
|
||||||
else:
|
else:
|
||||||
message = self._should_process(message)
|
message = self._should_process_message(target, message)
|
||||||
if not message:
|
if not message:
|
||||||
return
|
return
|
||||||
|
|
||||||
# This is an update command for the router, so deliver it
|
# This is an update command for the router, so deliver it
|
||||||
# to the state machine.
|
# to the state machine.
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
@ -380,7 +416,93 @@ class Worker(object):
|
||||||
if sm:
|
if sm:
|
||||||
return sm
|
return sm
|
||||||
|
|
||||||
|
def _rebalance(self, message):
|
||||||
|
self.hash_ring_mgr.rebalance(message.body.get('members'))
|
||||||
|
|
||||||
|
# After we rebalance, we need to repopulate state machines
|
||||||
|
# for any resources that now map here. This is required
|
||||||
|
# otherwise commands that hash here will not be delivered
|
||||||
|
# until a state machine is created during a later event
|
||||||
|
# delivery. Note that this causes a double populate for new
|
||||||
|
# nodes (once for pre-populate on startup, again for the
|
||||||
|
# repopulate here once the node has joined the cluster)
|
||||||
|
for resource in populate.repopulate():
|
||||||
|
if not self.hash_ring_mgr.ring.get_hosts(resource.id):
|
||||||
|
continue
|
||||||
|
e = event.Event(resource=resource, crud=None, body={})
|
||||||
|
trms = self._get_trms(resource.tenant_id)
|
||||||
|
for trm in trms:
|
||||||
|
trm.get_state_machines(e, self._context)
|
||||||
|
# rebalance our hash ring according to new cluster membership
|
||||||
|
self.hash_ring_mgr.rebalance(message.body.get('members'))
|
||||||
|
|
||||||
|
# loop through all local state machines and drop all pending work
|
||||||
|
# for those that are no longer managed here, as per newly balanced
|
||||||
|
# hash ring
|
||||||
|
trms = self._get_trms('*')
|
||||||
|
for trm in trms:
|
||||||
|
sms = trm.get_state_machines(message, self._context)
|
||||||
|
for sm in sms:
|
||||||
|
target_hosts = self.hash_ring_mgr.ring.get_hosts(
|
||||||
|
sm.resource_id)
|
||||||
|
if self.host not in target_hosts:
|
||||||
|
sm.drop_queue()
|
||||||
|
|
||||||
|
# NOTE(adam_g): If somethings queued up on a SM, it means the SM
|
||||||
|
# is currently executing something thats probably long running
|
||||||
|
# (ie a create). We should add some smarts here to transfer the
|
||||||
|
# currently executing task to the new owner
|
||||||
|
|
||||||
|
def _should_process_command(self, message):
|
||||||
|
command = message.body['command']
|
||||||
|
|
||||||
|
def _hash_by(k, d):
|
||||||
|
if not cfg.CONF.coordination.enabled:
|
||||||
|
return True
|
||||||
|
|
||||||
|
data = d.get(k)
|
||||||
|
target_hosts = self.hash_ring_mgr.ring.get_hosts(data)
|
||||||
|
if self.host not in target_hosts:
|
||||||
|
LOG.debug(
|
||||||
|
'Ignoring command, it does not map to this host by %s '
|
||||||
|
'(%s)' % (k, data))
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
if command in [commands.WORKERS_DEBUG, commands.CONFIG_RELOAD]:
|
||||||
|
# All RUGs get workers_debug and config reload commands
|
||||||
|
return True
|
||||||
|
|
||||||
|
resource_cmds = ([commands.RESOURCE_DEBUG, commands.RESOURCE_MANAGE] +
|
||||||
|
EVENT_COMMANDS.keys())
|
||||||
|
if command in resource_cmds:
|
||||||
|
# hash router commands to a RUG by router_id
|
||||||
|
return _hash_by('resource_id', message.body)
|
||||||
|
|
||||||
|
# NOTE(adam_g): This is compat. with old style router-specific rug-ctl
|
||||||
|
# and should be dropped in M.
|
||||||
|
router_cmds = ([commands.ROUTER_DEBUG, commands.ROUTER_MANAGE] +
|
||||||
|
DEPRECATED_ROUTER_COMMANDS.keys())
|
||||||
|
if command in router_cmds:
|
||||||
|
# hash router commands to a RUG by router_id
|
||||||
|
return _hash_by('router_id', message.body)
|
||||||
|
|
||||||
|
if command in [commands.TENANT_DEBUG, commands.TENANT_MANAGE]:
|
||||||
|
# hash tenant commands to a RUG by tenant_id
|
||||||
|
return _hash_by('tenant_id', message.body)
|
||||||
|
|
||||||
|
if command in [commands.GLOBAL_DEBUG]:
|
||||||
|
# global debug can happen anywhere but to avoid a stempeding
|
||||||
|
# herd trying to update a singe thing in the DB, hash it to
|
||||||
|
# a single host using a static key
|
||||||
|
return _hash_by(
|
||||||
|
hash_ring.DC_KEY,
|
||||||
|
{hash_ring.DC_KEY: hash_ring.DC_KEY})
|
||||||
|
|
||||||
def _dispatch_command(self, target, message):
|
def _dispatch_command(self, target, message):
|
||||||
|
if not self._should_process_command(message):
|
||||||
|
return
|
||||||
|
|
||||||
instructions = message.body
|
instructions = message.body
|
||||||
if instructions['command'] == commands.WORKERS_DEBUG:
|
if instructions['command'] == commands.WORKERS_DEBUG:
|
||||||
self.report_status()
|
self.report_status()
|
||||||
|
@ -456,7 +578,6 @@ class Worker(object):
|
||||||
# sending commands to specific routers and can be
|
# sending commands to specific routers and can be
|
||||||
# removed once the CLI component is dropped in M.
|
# removed once the CLI component is dropped in M.
|
||||||
elif instructions['command'] in DEPRECATED_ROUTER_COMMANDS:
|
elif instructions['command'] in DEPRECATED_ROUTER_COMMANDS:
|
||||||
print 'XXX DEPR'
|
|
||||||
new_rsc = event.Resource(
|
new_rsc = event.Resource(
|
||||||
driver=drivers.router.Router.RESOURCE_NAME,
|
driver=drivers.router.Router.RESOURCE_NAME,
|
||||||
id=message.body.get('router_id'),
|
id=message.body.get('router_id'),
|
||||||
|
@ -583,3 +704,11 @@ class Worker(object):
|
||||||
resource_id, reason)
|
resource_id, reason)
|
||||||
else:
|
else:
|
||||||
LOG.info(_LI('No resources in debug mode'))
|
LOG.info(_LI('No resources in debug mode'))
|
||||||
|
|
||||||
|
if cfg.CONF.coordination.enabled:
|
||||||
|
# NOTE(adam_g): This list could be big with a large cluster.
|
||||||
|
LOG.info(_LI(
|
||||||
|
'Peer akanda-rug hosts: %s'), self.hash_ring_mgr.hosts)
|
||||||
|
else:
|
||||||
|
LOG.info(_LI(
|
||||||
|
'No peer akanda-rug hosts, coordination disabled.'))
|
||||||
|
|
|
@ -1 +1,2 @@
|
||||||
debootstrap
|
debootstrap
|
||||||
|
memcached
|
||||||
|
|
|
@ -42,6 +42,8 @@ HORIZON_LOCAL_SETTINGS=$HORIZON_DIR/openstack_dashboard/local/local_settings.py
|
||||||
# within the appliance VM.
|
# within the appliance VM.
|
||||||
AKANDA_APPLIANCE_SSH_PUBLIC_KEY=${AKANDA_APPLIANCE_SSH_PUBLIC_KEY:-/home/$STACK_USER/.ssh/id_rsa.pub}
|
AKANDA_APPLIANCE_SSH_PUBLIC_KEY=${AKANDA_APPLIANCE_SSH_PUBLIC_KEY:-/home/$STACK_USER/.ssh/id_rsa.pub}
|
||||||
|
|
||||||
|
AKANDA_COORDINATION_ENABLED=${AKANDA_COORDINATION_ENABLED:-True}
|
||||||
|
AKANDA_COORDINATION_URL=${AKANDA_COORDINATION_URL:-memcached://localhost:11211}
|
||||||
|
|
||||||
function colorize_logging {
|
function colorize_logging {
|
||||||
# Add color to logging output - this is lifted from devstack's functions to colorize the non-standard
|
# Add color to logging output - this is lifted from devstack's functions to colorize the non-standard
|
||||||
|
@ -88,6 +90,11 @@ function configure_akanda() {
|
||||||
if [ "$LOG_COLOR" == "True" ] && [ "$SYSLOG" == "False" ]; then
|
if [ "$LOG_COLOR" == "True" ] && [ "$SYSLOG" == "False" ]; then
|
||||||
colorize_logging
|
colorize_logging
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [[ "$AKANDA_COORDINATION_ENABLED" == "True" ]]; then
|
||||||
|
iniset $AKANDA_RUG_CONF coordination enabled True
|
||||||
|
iniset $AKANDA_RUG_CONF coordination url $AKANDA_COORDINATION_URL
|
||||||
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
function configure_akanda_nova() {
|
function configure_akanda_nova() {
|
||||||
|
|
|
@ -34,3 +34,12 @@ control_exchange = quantum
|
||||||
|
|
||||||
[AGENT]
|
[AGENT]
|
||||||
root_helper=sudo
|
root_helper=sudo
|
||||||
|
|
||||||
|
# If running multiple akanda-rug instances, configure use of an external
|
||||||
|
# cluster coordinator here. For more information on supported coordination
|
||||||
|
# backends, see http://docs.openstack.org/developer/tooz/
|
||||||
|
[coordination]
|
||||||
|
enabled = True
|
||||||
|
url = memcached://localhost:11211
|
||||||
|
group_id = akanda.rug
|
||||||
|
heartbeat_interval = 1
|
||||||
|
|
|
@ -18,3 +18,5 @@ WebOb>=1.2.3
|
||||||
python-novaclient>=2.28.1
|
python-novaclient>=2.28.1
|
||||||
cliff>=1.14.0 # Apache-2.0
|
cliff>=1.14.0 # Apache-2.0
|
||||||
six>=1.9.0
|
six>=1.9.0
|
||||||
|
tooz>=0.16.0 # Apache-2.0
|
||||||
|
pymemcache>=1.2.9,!=1.3.0 # Apache 2.0 License
|
||||||
|
|
Loading…
Reference in New Issue