Expose conductors: db and rpc

This patch lays some ground work around db and rpc to provide
conductors information from the API.

Changes in the db api and Conductor object is used to support
the implementation of /v1/conductors. Adds an argument
"online" to Conductor.get_by_hostname, so that we can get
the conductor object from database even it's not online,
this is required for the implementation of /v1/conductors/{hostname}.

Adds a new interface get_conductor_for() to get the hostname
of the conductor which is servicing the given node, it will
be used for the implementation of /v1/nodes* endpoints, as well
as listing nodes by given conductor.

Story: 1724474
Task: 28064

Change-Id: I39a7a47c5ae649f6c3200e772a9357023f21a7c4
This commit is contained in:
Kaifeng Wang 2018-11-23 17:12:03 +08:00
parent c5414620c5
commit e2a768f0cd
7 changed files with 108 additions and 16 deletions

View File

@ -123,11 +123,11 @@ class ConductorAPI(object):
# NOTE(deva): this is going to be buggy
self.ring_manager = hash_ring.HashRingManager(use_groups=use_groups)
def get_topic_for(self, node):
"""Get the RPC topic for the conductor service the node is mapped to.
def get_conductor_for(self, node):
"""Get the conductor which the node is mapped to.
:param node: a node object.
:returns: an RPC topic string.
:returns: the conductor hostname.
:raises: NoValidHost
"""
@ -136,13 +136,24 @@ class ConductorAPI(object):
node.conductor_group)
dest = ring.get_nodes(node.uuid.encode('utf-8'),
replicas=CONF.hash_distribution_replicas)
return '%s.%s' % (self.topic, dest.pop())
return dest.pop()
except exception.DriverNotFound:
reason = (_('No conductor service registered which supports '
'driver %(driver)s for conductor group "%(group)s".') %
{'driver': node.driver, 'group': node.conductor_group})
raise exception.NoValidHost(reason=reason)
def get_topic_for(self, node):
"""Get the RPC topic for the conductor service the node is mapped to.
:param node: a node object.
:returns: an RPC topic string.
:raises: NoValidHost
"""
hostname = self.get_conductor_for(node)
return '%s.%s' % (self.topic, hostname)
def get_topic_for_driver(self, driver_name):
"""Get RPC topic name for a conductor supporting the given driver.

View File

@ -493,12 +493,29 @@ class Connection(object):
"""
@abc.abstractmethod
def get_conductor(self, hostname):
def get_conductor_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
"""Return a list of conductors.
:param limit: Maximum number of conductors to return.
:param marker: the last item of the previous page; we return the next
result set.
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: direction in which results should be sorted.
(asc, desc)
"""
@abc.abstractmethod
def get_conductor(self, hostname, online=True):
"""Retrieve a conductor's service record from the database.
:param hostname: The hostname of the conductor service.
:param online: Specify the filter value on the `online` field when
querying conductors. The ``online`` field is ignored if
this value is set to None.
:returns: A conductor.
:raises: ConductorNotFound
:raises: ConductorNotFound if the conductor with given hostname does
not exist or doesn't meet the specified online expectation.
"""
@abc.abstractmethod

View File

@ -792,11 +792,17 @@ class Connection(api.Connection):
'online': True})
return ref
def get_conductor(self, hostname):
def get_conductor_list(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
return _paginate_query(models.Conductor, limit, marker,
sort_key, sort_dir)
def get_conductor(self, hostname, online=True):
try:
return (model_query(models.Conductor)
.filter_by(hostname=hostname, online=True)
.one())
query = model_query(models.Conductor).filter_by(hostname=hostname)
if online is not None:
query = query.filter_by(online=online)
return query.one()
except NoResultFound:
raise exception.ConductorNotFound(conductor=hostname)

View File

@ -42,20 +42,42 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
'conductor_group': object_fields.StringField(),
}
@classmethod
def list(cls, context, limit=None, marker=None, sort_key=None,
sort_dir=None):
"""Return a list of Conductor objects.
:param cls: the :class:`Conductor`
:param context: Security context.
:param limit: maximum number of resources to return in a single result.
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:returns: a list of :class:`Conductor` object.
"""
db_conductors = cls.dbapi.get_conductor_list(limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
return cls._from_db_object_list(context, db_conductors)
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
# methods can be used in the future to replace current explicit RPC calls.
# Implications of calling new remote procedures should be thought through.
# @object_base.remotable_classmethod
@classmethod
def get_by_hostname(cls, context, hostname):
def get_by_hostname(cls, context, hostname, online=True):
"""Get a Conductor record by its hostname.
:param cls: the :class:`Conductor`
:param context: Security context
:param hostname: the hostname on which a Conductor is running
:param online: Specify the expected ``online`` field value for the
conductor to be retrieved. The ``online`` field is
ignored if this value is set to None.
:returns: a :class:`Conductor` object.
"""
db_obj = cls.dbapi.get_conductor(hostname)
db_obj = cls.dbapi.get_conductor(hostname, online=online)
conductor = cls._from_db_object(context, cls(), db_obj)
return conductor

View File

@ -158,6 +158,16 @@ class RPCAPITestCase(db_base.DbTestCase):
self.assertEqual('fake-topic.fake-host',
rpcapi.get_topic_for_driver('fake-driver'))
def test_get_conductor_for(self):
CONF.set_override('host', 'fake-host')
c = self.dbapi.register_conductor({'hostname': 'fake-host',
'drivers': []})
self.dbapi.register_conductor_hardware_interfaces(
c.id, 'fake-driver', 'deploy', ['iscsi', 'direct'], 'iscsi')
rpcapi = conductor_rpcapi.ConductorAPI()
self.assertEqual(rpcapi.get_conductor_for(self.fake_node_obj),
'fake-host')
def _test_can_send_create_port(self, can_send):
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
with mock.patch.object(rpcapi.client,

View File

@ -104,6 +104,18 @@ class DbConductorTestCase(base.DbTestCase):
c2 = self.dbapi.get_conductor(c1.hostname)
self.assertEqual(c1.id, c2.id)
def test_get_inactive_conductor_ignore_online(self):
c1 = self._create_test_cdr()
self.dbapi.unregister_conductor(c1.hostname)
c2 = self.dbapi.get_conductor(c1.hostname, online=None)
self.assertEqual(c1.id, c2.id)
def test_get_inactive_conductor_with_online_true(self):
c1 = self._create_test_cdr()
self.dbapi.unregister_conductor(c1.hostname)
self.assertRaises(exception.ConductorNotFound,
self.dbapi.get_conductor, c1.hostname)
def test_get_conductor_not_found(self):
self._create_test_cdr()
self.assertRaises(

View File

@ -41,7 +41,20 @@ class TestConductorObject(db_base.DbTestCase):
autospec=True) as mock_get_cdr:
mock_get_cdr.return_value = self.fake_conductor
objects.Conductor.get_by_hostname(self.context, host)
mock_get_cdr.assert_called_once_with(host)
mock_get_cdr.assert_called_once_with(host, online=True)
def test_list(self):
conductor1 = db_utils.get_test_conductor(hostname='cond1')
conductor2 = db_utils.get_test_conductor(hostname='cond2')
with mock.patch.object(self.dbapi, 'get_conductor_list',
autospec=True) as mock_cond_list:
mock_cond_list.return_value = [conductor1, conductor2]
conductors = objects.Conductor.list(self.context)
self.assertEqual(2, len(conductors))
self.assertIsInstance(conductors[0], objects.Conductor)
self.assertIsInstance(conductors[1], objects.Conductor)
self.assertEqual(conductors[0].hostname, 'cond1')
self.assertEqual(conductors[1].hostname, 'cond2')
def test_save(self):
host = self.fake_conductor['hostname']
@ -52,7 +65,7 @@ class TestConductorObject(db_base.DbTestCase):
c.hostname = 'another-hostname'
self.assertRaises(NotImplementedError,
c.save, self.context)
mock_get_cdr.assert_called_once_with(host)
mock_get_cdr.assert_called_once_with(host, online=True)
def test_touch(self):
host = self.fake_conductor['hostname']
@ -63,7 +76,7 @@ class TestConductorObject(db_base.DbTestCase):
mock_get_cdr.return_value = self.fake_conductor
c = objects.Conductor.get_by_hostname(self.context, host)
c.touch(self.context)
mock_get_cdr.assert_called_once_with(host)
mock_get_cdr.assert_called_once_with(host, online=True)
mock_touch_cdr.assert_called_once_with(host)
def test_refresh(self):
@ -72,7 +85,8 @@ class TestConductorObject(db_base.DbTestCase):
t1 = t0 + datetime.timedelta(seconds=10)
returns = [dict(self.fake_conductor, updated_at=t0),
dict(self.fake_conductor, updated_at=t1)]
expected = [mock.call(host), mock.call(host)]
expected = [mock.call(host, online=True),
mock.call(host, online=True)]
with mock.patch.object(self.dbapi, 'get_conductor',
side_effect=returns,
autospec=True) as mock_get_cdr: