Introduce new driver call and RPC for heartbeat

Change-Id: Iec31feb07b85f9ed668d354967f8d265233a2bc1
Partial-Bug: #1570841
This commit is contained in:
Dmitry Tantsur 2016-06-22 17:58:35 +02:00
parent 088f09903b
commit 97f96642a9
6 changed files with 78 additions and 3 deletions

View File

@ -81,7 +81,7 @@ class ConductorManager(base_manager.BaseConductorManager):
"""Ironic Conductor manager main class."""
# NOTE(rloo): This must be in sync with rpcapi.ConductorAPI's.
RPC_API_VERSION = '1.33'
RPC_API_VERSION = '1.34'
target = messaging.Target(version=RPC_API_VERSION)
@ -2093,6 +2093,25 @@ class ConductorManager(base_manager.BaseConductorManager):
return driver.raid.get_logical_disk_properties()
@messaging.expected_exceptions(exception.NoFreeConductorWorker)
def heartbeat(self, context, node_id, callback_url):
"""Process a heartbeat from the ramdisk.
:param context: request context.
:param node_id: node id or uuid.
:param callback_url: URL to reach back to the ramdisk.
:raises: NoFreeConductorWorker if there are no conductors to process
this heartbeat request.
"""
LOG.debug('RPC heartbeat called for node %s', node_id)
# NOTE(dtantsur): we acquire a shared lock to begin with, drivers are
# free to promote it to an exclusive one.
with task_manager.acquire(context, node_id, shared=True,
purpose='heartbeat') as task:
task.spawn_after(self._spawn_worker, task.driver.deploy.heartbeat,
task, callback_url)
def _object_dispatch(self, target, method, context, args, kwargs):
"""Dispatch a call to an object method.

View File

@ -80,11 +80,12 @@ class ConductorAPI(object):
| object_backport_versions
| 1.32 - Add do_node_clean
| 1.33 - Added update and destroy portgroup.
| 1.34 - Added heartbeat
"""
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
RPC_API_VERSION = '1.33'
RPC_API_VERSION = '1.34'
def __init__(self, topic=None):
super(ConductorAPI, self).__init__()
@ -646,6 +647,18 @@ class ConductorAPI(object):
return cctxt.call(context, 'do_node_clean',
node_id=node_id, clean_steps=clean_steps)
def heartbeat(self, context, node_id, callback_url, topic=None):
"""Process a node heartbeat.
:param context: request context.
:param node_id: node ID or UUID.
:param callback_url: URL to reach back to the ramdisk.
:param topic: RPC topic. Defaults to self.topic.
"""
cctxt = self.client.prepare(topic=topic or self.topic, version='1.34')
return cctxt.call(context, 'heartbeat', node_id=node_id,
callback_url=callback_url)
def object_class_action_versions(self, context, objname, objmethod,
object_versions, args, kwargs):
"""Perform an action on a VersionedObject class.

View File

@ -30,7 +30,7 @@ from oslo_utils import excutils
import six
from ironic.common import exception
from ironic.common.i18n import _, _LE
from ironic.common.i18n import _, _LE, _LW
from ironic.common import raid
LOG = logging.getLogger(__name__)
@ -375,6 +375,17 @@ class DeployInterface(BaseInterface):
"""
pass
def heartbeat(self, task, callback_url):
"""Record a heartbeat for the node.
:param task: a TaskManager instance containing the node to act on.
:param callback_url: a URL to use to call to the ramdisk.
:return: None
"""
LOG.warning(_LW('Got heartbeat message from node %(node)s, but '
'the driver %(driver)s does not support heartbeating'),
{'node': task.node.uuid, 'driver': task.node.driver})
@six.add_metaclass(abc.ABCMeta)
class BootInterface(object):

View File

@ -5016,3 +5016,16 @@ class DoNodeAdoptionTestCase(
self.assertEqual(states.MANAGEABLE, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state)
self.assertIsNone(node.last_error)
@mock.patch('ironic.conductor.manager.ConductorManager._spawn_worker')
def test_heartbeat(self, mock_spawn):
"""Test heartbeating."""
node = obj_utils.create_test_node(
self.context, driver='fake',
provision_state=states.DEPLOYING,
target_provision_state=states.ACTIVE)
self._start_service()
self.service.heartbeat(self.context, node.uuid, 'http://callback')
mock_spawn.assert_called_with(self.driver.deploy.heartbeat,
mock.ANY, 'http://callback')

View File

@ -393,3 +393,10 @@ class RPCAPITestCase(base.DbTestCase):
'call',
version='1.33',
portgroup=self.fake_portgroup)
def test_heartbeat(self):
self._test_rpcapi('heartbeat',
'call',
node_id='fake-node',
callback_url='http://ramdisk.url:port',
version='1.34')

View File

@ -20,6 +20,7 @@ import mock
from ironic.common import exception
from ironic.common import raid
from ironic.drivers import base as driver_base
from ironic.drivers.modules import fake
from ironic.tests import base
@ -389,3 +390,14 @@ class RAIDInterfaceTestCase(base.TestCase):
raid_interface = MyRAIDInterface()
raid_interface.get_logical_disk_properties()
get_properties_mock.assert_called_once_with(raid_schema)
class TestDeployInterface(base.TestCase):
@mock.patch.object(driver_base.LOG, 'warning', autospec=True)
def test_warning_on_heartbeat(self, mock_log):
# NOTE(dtantsur): FakeDeploy does not override heartbeat
deploy = fake.FakeDeploy()
deploy.heartbeat(mock.Mock(node=mock.Mock(uuid='uuid',
driver='driver')),
'url')
self.assertTrue(mock_log.called)