Merge "coordination: use tooz builtin heartbeat feature"

This commit is contained in:
Jenkins 2017-07-19 17:32:42 +00:00 committed by Gerrit Code Review
commit 4a600fda49
3 changed files with 36 additions and 119 deletions

View File

@ -16,14 +16,9 @@
"""Coordination and locking utilities."""
import inspect
import random
import threading
import uuid
import decorator
import eventlet
from eventlet import tpool
import itertools
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
@ -41,16 +36,28 @@ coordination_opts = [
cfg.FloatOpt('heartbeat',
default=1.0,
help='Number of seconds between heartbeats for distributed '
'coordination.'),
'coordination. No longer used since distributed '
'coordination manages its heartbeat internally.',
deprecated_for_removal=True,
deprecated_reason='This option is no longer used.',
deprecated_since='11.0.0'),
cfg.FloatOpt('initial_reconnect_backoff',
default=0.1,
help='Initial number of seconds to wait after failed '
'reconnection.'),
'reconnection. No longer used since distributed '
'coordination manages its heartbeat internally.',
deprecated_for_removal=True,
deprecated_reason='This option is no longer used.',
deprecated_since='11.0.0'),
cfg.FloatOpt('max_reconnect_backoff',
default=60.0,
help='Maximum number of seconds between sequential '
'reconnection retries.'),
'reconnection retries. No longer used since '
'distributed coordination manages its heartbeat '
'internally.',
deprecated_for_removal=True,
deprecated_reason='This option is no longer used.',
deprecated_since='11.0.0'),
]
CONF = cfg.CONF
@ -73,34 +80,22 @@ class Coordinator(object):
self.agent_id = agent_id or str(uuid.uuid4())
self.started = False
self.prefix = prefix
self._ev = None
self._dead = None
def start(self):
"""Connect to coordination backend and start heartbeat."""
if not self.started:
try:
self._dead = threading.Event()
self._start()
self.started = True
# NOTE(bluex): Start heartbeat in separate thread to avoid
# being blocked by long coroutines.
if self.coordinator and self.coordinator.requires_beating:
self._ev = eventlet.spawn(
lambda: tpool.execute(self.heartbeat))
except coordination.ToozError:
LOG.exception('Error starting coordination backend.')
raise
LOG.info('Coordination backend started successfully.')
if self.started:
return
# NOTE(bluex): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')
self.coordinator = coordination.get_coordinator(
cfg.CONF.coordination.backend_url, member_id)
self.coordinator.start(start_heart=True)
self.started = True
def stop(self):
"""Disconnect from coordination backend and stop heartbeat."""
if self.started:
self.coordinator.stop()
self._dead.set()
if self._ev is not None:
self._ev.wait()
self._ev = None
self.coordinator = None
self.started = False
@ -117,63 +112,6 @@ class Coordinator(object):
else:
raise exception.LockCreationFailed(_('Coordinator uninitialized.'))
def heartbeat(self):
"""Coordinator heartbeat.
Method that every couple of seconds (config: `coordination.heartbeat`)
sends heartbeat to prove that the member is not dead.
If connection to coordination backend is broken it tries to
reconnect every couple of seconds
(config: `coordination.initial_reconnect_backoff` up to
`coordination.max_reconnect_backoff`)
"""
while self.coordinator is not None and not self._dead.is_set():
try:
self._heartbeat()
except coordination.ToozConnectionError:
self._reconnect()
else:
self._dead.wait(cfg.CONF.coordination.heartbeat)
def _start(self):
# NOTE(bluex): Tooz expects member_id as a byte string.
member_id = (self.prefix + self.agent_id).encode('ascii')
self.coordinator = coordination.get_coordinator(
cfg.CONF.coordination.backend_url, member_id)
self.coordinator.start()
def _heartbeat(self):
try:
self.coordinator.heartbeat()
return True
except coordination.ToozConnectionError:
LOG.exception('Connection error while sending a heartbeat '
'to coordination backend.')
raise
except coordination.ToozError:
LOG.exception('Error sending a heartbeat to coordination '
'backend.')
return False
def _reconnect(self):
"""Reconnect with jittered exponential backoff increase."""
LOG.info('Reconnecting to coordination backend.')
cap = cfg.CONF.coordination.max_reconnect_backoff
backoff = base = cfg.CONF.coordination.initial_reconnect_backoff
for attempt in itertools.count(1):
try:
self._start()
break
except coordination.ToozError:
backoff = min(cap, random.uniform(base, backoff * 3))
msg = ('Reconnect attempt %(attempt)s failed. '
'Next try in %(backoff).2fs.')
LOG.warning(msg, {'attempt': attempt, 'backoff': backoff})
self._dead.wait(backoff)
LOG.info('Reconnected to coordination backend.')
COORDINATOR = Coordinator(prefix='cinder-')

View File

@ -43,25 +43,19 @@ class MockToozLock(tooz.locking.Lock):
self.active_locks.remove(self.name)
@mock.patch('time.sleep', lambda _: None)
@mock.patch('eventlet.spawn', lambda f: f())
@mock.patch('eventlet.tpool.execute', lambda f: f())
@mock.patch.object(coordination.Coordinator, 'heartbeat')
@mock.patch('tooz.coordination.get_coordinator')
@mock.patch('random.uniform', lambda _a, _b: 0)
class CoordinatorTestCase(test.TestCase):
MOCK_TOOZ = False
def test_coordinator_start(self, get_coordinator, heartbeat):
def test_coordinator_start(self, get_coordinator):
crd = get_coordinator.return_value
agent = coordination.Coordinator()
agent.start()
self.assertTrue(get_coordinator.called)
self.assertTrue(heartbeat.called)
self.assertTrue(crd.start.called)
def test_coordinator_stop(self, get_coordinator, heartbeat):
def test_coordinator_stop(self, get_coordinator):
crd = get_coordinator.return_value
agent = coordination.Coordinator()
@ -71,7 +65,7 @@ class CoordinatorTestCase(test.TestCase):
self.assertTrue(crd.stop.called)
self.assertIsNone(agent.coordinator)
def test_coordinator_lock(self, get_coordinator, heartbeat):
def test_coordinator_lock(self, get_coordinator):
crd = get_coordinator.return_value
crd.get_lock.side_effect = lambda n: MockToozLock(n)
@ -90,35 +84,13 @@ class CoordinatorTestCase(test.TestCase):
self.assertRaises(Locked, agent2.get_lock(lock_name).acquire)
self.assertNotIn(expected_name, MockToozLock.active_locks)
def test_coordinator_offline(self, get_coordinator, heartbeat):
def test_coordinator_offline(self, get_coordinator):
crd = get_coordinator.return_value
crd.start.side_effect = tooz.coordination.ToozConnectionError('err')
agent = coordination.Coordinator()
self.assertRaises(tooz.coordination.ToozError, agent.start)
self.assertFalse(agent.started)
self.assertFalse(heartbeat.called)
def test_coordinator_reconnect(self, get_coordinator, heartbeat):
start_online = iter([True] + [False] * 5 + [True])
heartbeat_online = iter((False, True, True))
def raiser(cond):
if not cond:
raise tooz.coordination.ToozConnectionError('err')
crd = get_coordinator.return_value
crd.start.side_effect = lambda *_: raiser(next(start_online))
crd.heartbeat.side_effect = lambda *_: raiser(next(heartbeat_online))
agent = coordination.Coordinator()
agent.start()
self.assertRaises(tooz.coordination.ToozConnectionError,
agent._heartbeat)
self.assertEqual(1, get_coordinator.call_count)
agent._reconnect()
self.assertEqual(7, get_coordinator.call_count)
agent._heartbeat()
@mock.patch.object(coordination.COORDINATOR, 'get_lock')

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
The coordination system used by Cinder has been simplified to leverage tooz
builtin heartbeat feature. Therefore, the configuration options
`coordination.heartbeat`, `coordination.initial_reconnect_backoff` and
`coordination.max_reconnect_backoff` have been removed.