Simplify Coordination implementation
This patch attempts to simplify the coordination implementation. It also removes some unnecessary exception handling that could hide valid issues, e.g. the use of unsupported drivers. - Use inbuilt heartbeat. - Fixed bug with etcd3 driver requiring name to be encoded. - Only use grouping if it is used by the service. - Remove unnecessary exception handling. Closes-Bug: #1872858 Related-Bug: #1872205 Change-Id: Ic659c52bf3adca2e97c8b669fe4fa1b5ed992e90
This commit is contained in:
parent
846f919ec8
commit
7ff2f3956d
|
@ -199,7 +199,7 @@ class Service(service.RPCService):
|
|||
)
|
||||
|
||||
self.coordination = coordination.Coordination(
|
||||
self.service_name, self.tg
|
||||
self.service_name, self.tg, grouping_enabled=False
|
||||
)
|
||||
|
||||
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
# under the License.
|
||||
|
||||
import math
|
||||
import time
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log
|
||||
|
@ -25,7 +24,7 @@ import tenacity
|
|||
import tooz.coordination
|
||||
|
||||
import designate.conf
|
||||
from designate.utils import generate_uuid
|
||||
from designate import utils
|
||||
|
||||
CONF = designate.conf.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -37,10 +36,14 @@ def _retry_if_tooz_error(exception):
|
|||
|
||||
|
||||
class Coordination(object):
|
||||
def __init__(self, name, tg):
|
||||
def __init__(self, name, tg, grouping_enabled=False):
|
||||
# NOTE(eandersson): Workaround until tooz handles the conversion.
|
||||
if not isinstance(name, bytes):
|
||||
name = name.encode('ascii')
|
||||
self.name = name
|
||||
self.tg = tg
|
||||
self.coordination_id = None
|
||||
self._grouping_enabled = grouping_enabled
|
||||
self._coordinator = None
|
||||
self._started = False
|
||||
|
||||
|
@ -61,71 +64,69 @@ class Coordination(object):
|
|||
return lockutils.lock(name)
|
||||
|
||||
def start(self):
|
||||
self.coordination_id = ":".join([CONF.host, generate_uuid()])
|
||||
self.coordination_id = ":".join([CONF.host, utils.generate_uuid()])
|
||||
self._started = False
|
||||
|
||||
if CONF.coordination.backend_url is not None:
|
||||
backend_url = CONF.coordination.backend_url
|
||||
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
backend_url, self.coordination_id.encode())
|
||||
self._started = False
|
||||
|
||||
self.tg.add_timer(CONF.coordination.heartbeat_interval,
|
||||
self._coordinator_heartbeat)
|
||||
self.tg.add_timer(CONF.coordination.run_watchers_interval,
|
||||
self._coordinator_run_watchers)
|
||||
|
||||
else:
|
||||
backend_url = CONF.coordination.backend_url
|
||||
if backend_url is None:
|
||||
LOG.warning('No coordination backend configured, distributed '
|
||||
'coordination functionality will be disabled. '
|
||||
'Please configure a coordination backend.')
|
||||
return
|
||||
|
||||
if self._coordinator is not None:
|
||||
while not self._started:
|
||||
try:
|
||||
self._coordinator.start()
|
||||
self._coordinator = tooz.coordination.get_coordinator(
|
||||
backend_url, self.coordination_id.encode()
|
||||
)
|
||||
while not self._coordinator.is_started:
|
||||
self._coordinator.start(start_heart=True)
|
||||
|
||||
try:
|
||||
create_group_req = self._coordinator.create_group(
|
||||
self.name)
|
||||
create_group_req.get()
|
||||
except tooz.coordination.GroupAlreadyExist:
|
||||
pass
|
||||
self._started = True
|
||||
|
||||
join_group_req = self._coordinator.join_group(self.name)
|
||||
join_group_req.get()
|
||||
|
||||
self._started = True
|
||||
|
||||
except Exception:
|
||||
LOG.warning('Failed to start Coordinator', exc_info=True)
|
||||
time.sleep(15)
|
||||
if self._grouping_enabled:
|
||||
self._enable_grouping()
|
||||
|
||||
def stop(self):
|
||||
if self._coordinator is not None:
|
||||
self._started = False
|
||||
|
||||
leave_group_req = self._coordinator.leave_group(self.name)
|
||||
leave_group_req.get()
|
||||
self._coordinator.stop()
|
||||
|
||||
self._coordinator = None
|
||||
|
||||
def _coordinator_heartbeat(self):
|
||||
if not self._started:
|
||||
if self._coordinator is None:
|
||||
return
|
||||
|
||||
try:
|
||||
self._coordinator.heartbeat()
|
||||
except tooz.coordination.ToozError:
|
||||
LOG.warning('Error sending a heartbeat to coordination backend.')
|
||||
if self._grouping_enabled:
|
||||
self._disable_grouping()
|
||||
self._coordinator.stop()
|
||||
self._coordinator = None
|
||||
finally:
|
||||
self._started = False
|
||||
|
||||
def _coordinator_run_watchers(self):
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
self._coordinator.run_watchers()
|
||||
|
||||
def _create_group(self):
|
||||
try:
|
||||
create_group_req = self._coordinator.create_group(
|
||||
self.name
|
||||
)
|
||||
create_group_req.get()
|
||||
except tooz.coordination.GroupAlreadyExist:
|
||||
pass
|
||||
join_group_req = self._coordinator.join_group(self.name)
|
||||
join_group_req.get()
|
||||
|
||||
def _disable_grouping(self):
|
||||
try:
|
||||
leave_group_req = self._coordinator.leave_group(self.name)
|
||||
leave_group_req.get()
|
||||
except tooz.coordination.GroupNotCreated:
|
||||
pass
|
||||
|
||||
def _enable_grouping(self):
|
||||
self._create_group()
|
||||
self.tg.add_timer(
|
||||
CONF.coordination.run_watchers_interval,
|
||||
self._coordinator_run_watchers
|
||||
)
|
||||
|
||||
|
||||
class Partitioner(object):
|
||||
def __init__(self, coordinator, group_id, my_id, partitions):
|
||||
|
|
|
@ -48,7 +48,7 @@ class Service(service.RPCService):
|
|||
)
|
||||
|
||||
self.coordination = coordination.Coordination(
|
||||
self.service_name, self.tg
|
||||
self.service_name, self.tg, grouping_enabled=True
|
||||
)
|
||||
|
||||
@property
|
||||
|
|
|
@ -53,7 +53,14 @@ class TestCoordination(TestCase):
|
|||
def test_start(self):
|
||||
service = coordination.Coordination(self.name, self.tg)
|
||||
service.start()
|
||||
self.assertTrue(service.started)
|
||||
service.stop()
|
||||
|
||||
def test_start_with_grouping_enabled(self):
|
||||
service = coordination.Coordination(
|
||||
self.name, self.tg, grouping_enabled=True
|
||||
)
|
||||
service.start()
|
||||
self.assertTrue(service.started)
|
||||
self.assertIn(self.name.encode('utf-8'),
|
||||
service.coordinator.get_groups().get())
|
||||
|
@ -68,6 +75,14 @@ class TestCoordination(TestCase):
|
|||
service.stop()
|
||||
self.assertFalse(service.started)
|
||||
|
||||
def test_stop_with_grouping_enabled(self):
|
||||
service = coordination.Coordination(
|
||||
self.name, self.tg, grouping_enabled=True
|
||||
)
|
||||
service.start()
|
||||
service.stop()
|
||||
self.assertFalse(service.started)
|
||||
|
||||
def test_start_no_coordination(self):
|
||||
self.config(backend_url=None, group="coordination")
|
||||
service = coordination.Coordination(self.name, self.tg)
|
||||
|
|
Loading…
Reference in New Issue