Merge "Expose conductors: db and rpc"
This commit is contained in:
commit
67c3e29c3b
|
@ -123,11 +123,11 @@ class ConductorAPI(object):
|
|||
# NOTE(deva): this is going to be buggy
|
||||
self.ring_manager = hash_ring.HashRingManager(use_groups=use_groups)
|
||||
|
||||
def get_topic_for(self, node):
|
||||
"""Get the RPC topic for the conductor service the node is mapped to.
|
||||
def get_conductor_for(self, node):
|
||||
"""Get the conductor which the node is mapped to.
|
||||
|
||||
:param node: a node object.
|
||||
:returns: an RPC topic string.
|
||||
:returns: the conductor hostname.
|
||||
:raises: NoValidHost
|
||||
|
||||
"""
|
||||
|
@ -136,13 +136,24 @@ class ConductorAPI(object):
|
|||
node.conductor_group)
|
||||
dest = ring.get_nodes(node.uuid.encode('utf-8'),
|
||||
replicas=CONF.hash_distribution_replicas)
|
||||
return '%s.%s' % (self.topic, dest.pop())
|
||||
return dest.pop()
|
||||
except exception.DriverNotFound:
|
||||
reason = (_('No conductor service registered which supports '
|
||||
'driver %(driver)s for conductor group "%(group)s".') %
|
||||
{'driver': node.driver, 'group': node.conductor_group})
|
||||
raise exception.NoValidHost(reason=reason)
|
||||
|
||||
def get_topic_for(self, node):
|
||||
"""Get the RPC topic for the conductor service the node is mapped to.
|
||||
|
||||
:param node: a node object.
|
||||
:returns: an RPC topic string.
|
||||
:raises: NoValidHost
|
||||
|
||||
"""
|
||||
hostname = self.get_conductor_for(node)
|
||||
return '%s.%s' % (self.topic, hostname)
|
||||
|
||||
def get_topic_for_driver(self, driver_name):
|
||||
"""Get RPC topic name for a conductor supporting the given driver.
|
||||
|
||||
|
|
|
@ -493,12 +493,29 @@ class Connection(object):
|
|||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_conductor(self, hostname):
|
||||
def get_conductor_list(self, limit=None, marker=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
"""Return a list of conductors.
|
||||
|
||||
:param limit: Maximum number of conductors to return.
|
||||
:param marker: the last item of the previous page; we return the next
|
||||
result set.
|
||||
:param sort_key: Attribute by which results should be sorted.
|
||||
:param sort_dir: direction in which results should be sorted.
|
||||
(asc, desc)
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_conductor(self, hostname, online=True):
|
||||
"""Retrieve a conductor's service record from the database.
|
||||
|
||||
:param hostname: The hostname of the conductor service.
|
||||
:param online: Specify the filter value on the `online` field when
|
||||
querying conductors. The ``online`` field is ignored if
|
||||
this value is set to None.
|
||||
:returns: A conductor.
|
||||
:raises: ConductorNotFound
|
||||
:raises: ConductorNotFound if the conductor with given hostname does
|
||||
not exist or doesn't meet the specified online expectation.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
|
@ -792,11 +792,17 @@ class Connection(api.Connection):
|
|||
'online': True})
|
||||
return ref
|
||||
|
||||
def get_conductor(self, hostname):
|
||||
def get_conductor_list(self, limit=None, marker=None,
|
||||
sort_key=None, sort_dir=None):
|
||||
return _paginate_query(models.Conductor, limit, marker,
|
||||
sort_key, sort_dir)
|
||||
|
||||
def get_conductor(self, hostname, online=True):
|
||||
try:
|
||||
return (model_query(models.Conductor)
|
||||
.filter_by(hostname=hostname, online=True)
|
||||
.one())
|
||||
query = model_query(models.Conductor).filter_by(hostname=hostname)
|
||||
if online is not None:
|
||||
query = query.filter_by(online=online)
|
||||
return query.one()
|
||||
except NoResultFound:
|
||||
raise exception.ConductorNotFound(conductor=hostname)
|
||||
|
||||
|
|
|
@ -42,20 +42,42 @@ class Conductor(base.IronicObject, object_base.VersionedObjectDictCompat):
|
|||
'conductor_group': object_fields.StringField(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def list(cls, context, limit=None, marker=None, sort_key=None,
|
||||
sort_dir=None):
|
||||
"""Return a list of Conductor objects.
|
||||
|
||||
:param cls: the :class:`Conductor`
|
||||
:param context: Security context.
|
||||
:param limit: maximum number of resources to return in a single result.
|
||||
:param marker: pagination marker for large data sets.
|
||||
:param sort_key: column to sort results by.
|
||||
:param sort_dir: direction to sort. "asc" or "desc".
|
||||
:returns: a list of :class:`Conductor` object.
|
||||
"""
|
||||
db_conductors = cls.dbapi.get_conductor_list(limit=limit,
|
||||
marker=marker,
|
||||
sort_key=sort_key,
|
||||
sort_dir=sort_dir)
|
||||
return cls._from_db_object_list(context, db_conductors)
|
||||
|
||||
# NOTE(xek): We don't want to enable RPC on this call just yet. Remotable
|
||||
# methods can be used in the future to replace current explicit RPC calls.
|
||||
# Implications of calling new remote procedures should be thought through.
|
||||
# @object_base.remotable_classmethod
|
||||
@classmethod
|
||||
def get_by_hostname(cls, context, hostname):
|
||||
def get_by_hostname(cls, context, hostname, online=True):
|
||||
"""Get a Conductor record by its hostname.
|
||||
|
||||
:param cls: the :class:`Conductor`
|
||||
:param context: Security context
|
||||
:param hostname: the hostname on which a Conductor is running
|
||||
:param online: Specify the expected ``online`` field value for the
|
||||
conductor to be retrieved. The ``online`` field is
|
||||
ignored if this value is set to None.
|
||||
:returns: a :class:`Conductor` object.
|
||||
"""
|
||||
db_obj = cls.dbapi.get_conductor(hostname)
|
||||
db_obj = cls.dbapi.get_conductor(hostname, online=online)
|
||||
conductor = cls._from_db_object(context, cls(), db_obj)
|
||||
return conductor
|
||||
|
||||
|
|
|
@ -158,6 +158,16 @@ class RPCAPITestCase(db_base.DbTestCase):
|
|||
self.assertEqual('fake-topic.fake-host',
|
||||
rpcapi.get_topic_for_driver('fake-driver'))
|
||||
|
||||
def test_get_conductor_for(self):
|
||||
CONF.set_override('host', 'fake-host')
|
||||
c = self.dbapi.register_conductor({'hostname': 'fake-host',
|
||||
'drivers': []})
|
||||
self.dbapi.register_conductor_hardware_interfaces(
|
||||
c.id, 'fake-driver', 'deploy', ['iscsi', 'direct'], 'iscsi')
|
||||
rpcapi = conductor_rpcapi.ConductorAPI()
|
||||
self.assertEqual(rpcapi.get_conductor_for(self.fake_node_obj),
|
||||
'fake-host')
|
||||
|
||||
def _test_can_send_create_port(self, can_send):
|
||||
rpcapi = conductor_rpcapi.ConductorAPI(topic='fake-topic')
|
||||
with mock.patch.object(rpcapi.client,
|
||||
|
|
|
@ -104,6 +104,18 @@ class DbConductorTestCase(base.DbTestCase):
|
|||
c2 = self.dbapi.get_conductor(c1.hostname)
|
||||
self.assertEqual(c1.id, c2.id)
|
||||
|
||||
def test_get_inactive_conductor_ignore_online(self):
|
||||
c1 = self._create_test_cdr()
|
||||
self.dbapi.unregister_conductor(c1.hostname)
|
||||
c2 = self.dbapi.get_conductor(c1.hostname, online=None)
|
||||
self.assertEqual(c1.id, c2.id)
|
||||
|
||||
def test_get_inactive_conductor_with_online_true(self):
|
||||
c1 = self._create_test_cdr()
|
||||
self.dbapi.unregister_conductor(c1.hostname)
|
||||
self.assertRaises(exception.ConductorNotFound,
|
||||
self.dbapi.get_conductor, c1.hostname)
|
||||
|
||||
def test_get_conductor_not_found(self):
|
||||
self._create_test_cdr()
|
||||
self.assertRaises(
|
||||
|
|
|
@ -41,7 +41,20 @@ class TestConductorObject(db_base.DbTestCase):
|
|||
autospec=True) as mock_get_cdr:
|
||||
mock_get_cdr.return_value = self.fake_conductor
|
||||
objects.Conductor.get_by_hostname(self.context, host)
|
||||
mock_get_cdr.assert_called_once_with(host)
|
||||
mock_get_cdr.assert_called_once_with(host, online=True)
|
||||
|
||||
def test_list(self):
|
||||
conductor1 = db_utils.get_test_conductor(hostname='cond1')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='cond2')
|
||||
with mock.patch.object(self.dbapi, 'get_conductor_list',
|
||||
autospec=True) as mock_cond_list:
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
conductors = objects.Conductor.list(self.context)
|
||||
self.assertEqual(2, len(conductors))
|
||||
self.assertIsInstance(conductors[0], objects.Conductor)
|
||||
self.assertIsInstance(conductors[1], objects.Conductor)
|
||||
self.assertEqual(conductors[0].hostname, 'cond1')
|
||||
self.assertEqual(conductors[1].hostname, 'cond2')
|
||||
|
||||
def test_save(self):
|
||||
host = self.fake_conductor['hostname']
|
||||
|
@ -52,7 +65,7 @@ class TestConductorObject(db_base.DbTestCase):
|
|||
c.hostname = 'another-hostname'
|
||||
self.assertRaises(NotImplementedError,
|
||||
c.save, self.context)
|
||||
mock_get_cdr.assert_called_once_with(host)
|
||||
mock_get_cdr.assert_called_once_with(host, online=True)
|
||||
|
||||
def test_touch(self):
|
||||
host = self.fake_conductor['hostname']
|
||||
|
@ -63,7 +76,7 @@ class TestConductorObject(db_base.DbTestCase):
|
|||
mock_get_cdr.return_value = self.fake_conductor
|
||||
c = objects.Conductor.get_by_hostname(self.context, host)
|
||||
c.touch(self.context)
|
||||
mock_get_cdr.assert_called_once_with(host)
|
||||
mock_get_cdr.assert_called_once_with(host, online=True)
|
||||
mock_touch_cdr.assert_called_once_with(host)
|
||||
|
||||
def test_refresh(self):
|
||||
|
@ -72,7 +85,8 @@ class TestConductorObject(db_base.DbTestCase):
|
|||
t1 = t0 + datetime.timedelta(seconds=10)
|
||||
returns = [dict(self.fake_conductor, updated_at=t0),
|
||||
dict(self.fake_conductor, updated_at=t1)]
|
||||
expected = [mock.call(host), mock.call(host)]
|
||||
expected = [mock.call(host, online=True),
|
||||
mock.call(host, online=True)]
|
||||
with mock.patch.object(self.dbapi, 'get_conductor',
|
||||
side_effect=returns,
|
||||
autospec=True) as mock_get_cdr:
|
||||
|
|
Loading…
Reference in New Issue