Fix and improve the partition coordinator

* Fix the partition coordinator to distribute tasks properly.

* Improve the partition coordination mechanism in retry logic, exception
  handling, and log messages, etc. Refer to the Ceilometer's changes:

- Icf60381e30f3baf986cf9e008e133287765d9827
- I6a48cf38b24a00a0db94d3dea0c6746b52526026
- Ic0b6b62dace88e4e1ce7932024350bb211efb9ef
- I8100160a3aa83a190c4110e6e8be9b26aef8fd1c
- I2aed2241ded798464089b3eec5e1394422a45844

Closes-Bug: #1575530
Change-Id: I5729ae3080898e8a6d92889f8c520174dc371113
(cherry picked from commit dd06bf9277)
(cherry picked from commit 966f7692ab)
This commit is contained in:
liusheng 2016-04-27 10:33:30 +08:00 committed by Julien Danjou
parent 8b663afb39
commit 73c64955d5
6 changed files with 131 additions and 37 deletions

View File

@ -13,14 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import bisect
import hashlib
import struct
import uuid
from oslo_config import cfg
from oslo_log import log
import retrying
import six
import tooz.coordination
from aodh.i18n import _LE, _LI
from aodh import utils
from aodh.i18n import _LE, _LI, _LW
LOG = log.getLogger(__name__)
@ -39,11 +43,70 @@ OPTS = [
cfg.FloatOpt('check_watchers',
default=10.0,
help='Number of seconds between checks to see if group '
'membership has changed')
'membership has changed'),
cfg.IntOpt('retry_backoff',
default=1,
help='Retry backoff factor when retrying to connect with'
' coordination backend'),
cfg.IntOpt('max_retry_interval',
default=30,
help='Maximum number of seconds between retry to join '
'partitioning group')
]
class ErrorJoiningPartitioningGroup(Exception):
def __init__(self):
super(ErrorJoiningPartitioningGroup, self).__init__(_LE(
'Error occurred when joining partitioning group'))
class MemberNotInGroupError(Exception):
def __init__(self, group_id, members, my_id):
super(MemberNotInGroupError, self).__init__(_LE(
'Group ID: %{group_id}s, Members: %{members}s, Me: %{me}s: '
'Current agent is not part of group and cannot take tasks') %
{'group_id': group_id, 'members': members, 'me': my_id})
def retry_on_error_joining_partition(exception):
return isinstance(exception, ErrorJoiningPartitioningGroup)
def retry_on_member_not_in_group(exception):
return isinstance(exception, MemberNotInGroupError)
class HashRing(object):
def __init__(self, nodes, replicas=100):
self._ring = dict()
self._sorted_keys = []
for node in nodes:
for r in six.moves.range(replicas):
hashed_key = self._hash('%s-%s' % (node, r))
self._ring[hashed_key] = node
self._sorted_keys.append(hashed_key)
self._sorted_keys.sort()
@staticmethod
def _hash(key):
return struct.unpack_from('>I',
hashlib.md5(str(key).encode()).digest())[0]
def _get_position_on_ring(self, key):
hashed_key = self._hash(key)
position = bisect.bisect(self._sorted_keys, hashed_key)
return position if position < len(self._sorted_keys) else 0
def get_node(self, key):
if not self._ring:
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]
class PartitionCoordinator(object):
"""Workload partitioning coordinator.
@ -58,12 +121,12 @@ class PartitionCoordinator(object):
empty iterable in this case.
"""
def __init__(self, backend_url, my_id=None):
self.backend_url = backend_url
def __init__(self, conf, my_id=None):
self.conf = conf
self.backend_url = self.conf.coordination.backend_url
self._coordinator = None
self._groups = set()
self._my_id = my_id or str(uuid.uuid4())
self._started = False
def start(self):
if self.backend_url:
@ -71,10 +134,8 @@ class PartitionCoordinator(object):
self._coordinator = tooz.coordination.get_coordinator(
self.backend_url, self._my_id)
self._coordinator.start()
self._started = True
LOG.info(_LI('Coordination backend started successfully.'))
except tooz.coordination.ToozError:
self._started = False
LOG.exception(_LE('Error connecting to coordination backend.'))
def stop(self):
@ -90,14 +151,13 @@ class PartitionCoordinator(object):
LOG.exception(_LE('Error connecting to coordination backend.'))
finally:
self._coordinator = None
self._started = False
def is_active(self):
return self._coordinator is not None
def heartbeat(self):
if self._coordinator:
if not self._started:
if not self._coordinator.is_started:
# re-connect
self.start()
try:
@ -116,14 +176,23 @@ class PartitionCoordinator(object):
self._coordinator.run_watchers()
def join_group(self, group_id):
if not self._coordinator or not self._started or not group_id:
if (not self._coordinator or not self._coordinator.is_started
or not group_id):
return
while True:
retry_backoff = self.conf.coordination.retry_backoff * 1000
max_retry_interval = self.conf.coordination.max_retry_interval * 1000
@retrying.retry(
wait_exponential_multiplier=retry_backoff,
wait_exponential_max=max_retry_interval,
retry_on_exception=retry_on_error_joining_partition,
wrap_exception=True)
def _inner():
try:
join_req = self._coordinator.join_group(group_id)
join_req.get()
LOG.info(_LI('Joined partitioning group %s'), group_id)
break
except tooz.coordination.MemberAlreadyExist:
return
except tooz.coordination.GroupNotCreated:
@ -132,7 +201,14 @@ class PartitionCoordinator(object):
create_grp_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
self._groups.add(group_id)
raise ErrorJoiningPartitioningGroup()
except tooz.coordination.ToozError:
LOG.exception(_LE('Error joining partitioning group %s,'
' re-trying'), group_id)
raise ErrorJoiningPartitioningGroup()
self._groups.add(group_id)
return _inner()
def leave_group(self, group_id):
if group_id not in self._groups:
@ -153,7 +229,9 @@ class PartitionCoordinator(object):
except tooz.coordination.GroupNotCreated:
self.join_group(group_id)
def extract_my_subset(self, group_id, iterable):
@retrying.retry(stop_max_attempt_number=5, wait_random_max=2000,
retry_on_exception=retry_on_member_not_in_group)
def extract_my_subset(self, group_id, universal_set):
"""Filters an iterable, returning only objects assigned to this agent.
We have a list of objects and get a list of active group members from
@ -161,17 +239,26 @@ class PartitionCoordinator(object):
the ones that hashed into *our* bucket.
"""
if not group_id:
return iterable
return universal_set
if group_id not in self._groups:
self.join_group(group_id)
try:
members = self._get_members(group_id)
LOG.debug('Members of group: %s', members)
hr = utils.HashRing(members)
filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', filtered)
return filtered
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
if self._my_id not in members:
LOG.warning(_LW('Cannot extract tasks because agent failed to '
'join group properly. Rejoining group.'))
self.join_group(group_id)
members = self._get_members(group_id)
if self._my_id not in members:
raise MemberNotInGroupError(group_id, members, self._my_id)
LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = HashRing(members)
LOG.debug('Universal set: %s', universal_set)
my_subset = [v for v in universal_set
if hr.get_node(str(v)) == self._my_id]
LOG.debug('My subset: %s', my_subset)
return my_subset
except tooz.coordination.ToozError:
LOG.exception(_LE('Error getting group membership info from '
'coordination backend.'))

View File

@ -182,6 +182,7 @@ class AlarmService(object):
self.conf = conf
self.storage_conn = None
self._load_evaluators()
self.partition_coordinator = coordination.PartitionCoordinator(conf)
@property
def _storage_conn(self):
@ -229,12 +230,10 @@ class AlarmEvaluationService(AlarmService, os_service.Service):
def __init__(self, conf):
super(AlarmEvaluationService, self).__init__(conf)
self.partition_coordinator = coordination.PartitionCoordinator(
conf.coordination.backend_url)
self.partition_coordinator = coordination.PartitionCoordinator(conf)
def start(self):
super(AlarmEvaluationService, self).start()
self.storage_conn = storage.get_connection_from_config(self.conf)
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
@ -261,5 +260,8 @@ class AlarmEvaluationService(AlarmService, os_service.Service):
# those alarms.
all_alarms = self._storage_conn.get_alarms(enabled=True,
exclude=dict(type='event'))
return self.partition_coordinator.extract_my_subset(
self.PARTITIONING_GROUP_NAME, all_alarms)
all_alarms = list(all_alarms)
all_alarm_ids = [a.alarm_id for a in all_alarms]
selected = self.partition_coordinator.extract_my_subset(
self.PARTITIONING_GROUP_NAME, all_alarm_ids)
return list(filter(lambda a: a.alarm_id in selected, all_alarms))

View File

@ -27,9 +27,10 @@ class MockToozCoordinator(object):
def __init__(self, member_id, shared_storage):
self._member_id = member_id
self._groups = shared_storage
self.is_started = False
def start(self):
pass
self.is_started = True
def stop(self):
pass
@ -121,8 +122,7 @@ class TestPartitioning(base.BaseTestCase):
with mock.patch('tooz.coordination.get_coordinator',
lambda _, member_id:
coordinator_cls(member_id, shared_storage)):
pc = coordination.PartitionCoordinator(
self.CONF.coordination.backend_url, agent_id)
pc = coordination.PartitionCoordinator(self.CONF, agent_id)
pc.start()
return pc
@ -211,7 +211,7 @@ class TestPartitioning(base.BaseTestCase):
def test_group_id_none(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
self.assertTrue(coord._coordinator.is_started)
with mock.patch.object(coord._coordinator, 'join_group') as mocked:
coord.join_group(None)
@ -222,9 +222,8 @@ class TestPartitioning(base.BaseTestCase):
def test_stop(self):
coord = self._get_new_started_coordinator({}, 'a')
self.assertTrue(coord._started)
self.assertTrue(coord._coordinator.is_started)
coord.join_group("123")
coord.stop()
self.assertIsEmpty(coord._groups)
self.assertFalse(coord._started)
self.assertIsNone(coord._coordinator)

View File

@ -94,7 +94,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
coordination_heartbeat=5)
def test_evaluation_cycle(self):
alarm = mock.Mock(type='threshold')
alarm = mock.Mock(type='threshold', alarm_id="alarm_id1")
self.storage_conn.get_alarms.return_value = [alarm]
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
@ -104,7 +104,7 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
self.svc._evaluate_assigned_alarms()
p_coord_mock.extract_my_subset.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME, [alarm])
self.svc.PARTITIONING_GROUP_NAME, ["alarm_id1"])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
def test_evaluation_cycle_with_bad_alarm(self):

View File

@ -0,0 +1,6 @@
---
fixes:
- >
[`bug 1575530 <https://bugs.launchpad.net/aodh/+bug/1575530>`_]
Patch was added to fix and improve the partition coordinator, make sure
the input tasks can be correctly distributed to partition members.

View File

@ -30,7 +30,7 @@ requests>=2.5.2
six>=1.9.0
SQLAlchemy<1.1.0,>=0.9.7
stevedore>=1.5.0 # Apache-2.0
tooz>=0.16.0 # Apache-2.0
tooz>=1.28.0 # Apache-2.0
Werkzeug>=0.7 # BSD License
WebOb>=1.2.3
WSME>=0.8