From 7e15d4e28f98e13f0ea7399787c50839139d8492 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Thu, 10 May 2012 19:28:04 -0400 Subject: [PATCH] Add version to compute rpc API. Part of blueprint versioned-rpc-apis. Change-Id: I5943d1fae2c96cfe519817b59098402481a1026b --- nova/compute/api.py | 240 +++++++------------- nova/compute/manager.py | 2 + nova/compute/rpcapi.py | 276 +++++++++++++++++++++++ nova/tests/compute/__init__.py | 19 ++ nova/tests/{ => compute}/test_compute.py | 22 +- nova/tests/compute/test_rpcapi.py | 236 +++++++++++++++++++ 6 files changed, 630 insertions(+), 165 deletions(-) create mode 100644 nova/compute/rpcapi.py create mode 100644 nova/tests/compute/__init__.py rename nova/tests/{ => compute}/test_compute.py (99%) create mode 100644 nova/tests/compute/test_rpcapi.py diff --git a/nova/compute/api.py b/nova/compute/api.py index 04a752ed01aa..f1410df29d38 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -30,6 +30,7 @@ from nova import block_device from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state +from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states from nova.compute import vm_states from nova.consoleauth import rpcapi as consoleauth_rpcapi @@ -103,52 +104,7 @@ def check_policy(context, action, target): nova.policy.enforce(context, _action, target) -class BaseAPI(base.Base): - """Base API class.""" - def __init__(self, **kwargs): - super(BaseAPI, self).__init__(**kwargs) - - def _cast_or_call_compute_message(self, rpc_method, compute_method, - context, instance=None, host=None, params=None): - """Generic handler for RPC casts and calls to compute. - - :param rpc_method: RPC method to use (rpc.call or rpc.cast) - :param compute_method: Compute manager method to call - :param context: RequestContext of caller - :param instance: The instance object to use to find host to send to - Can be None to not include instance_uuid in args - :param host: Optional host to send to instead of instance['host'] - Must be specified if 'instance' is None - :param params: Optional dictionary of arguments to be passed to the - compute worker - - :returns: None - """ - if not params: - params = {} - if not host: - if not instance: - raise exception.NovaException(_("No compute host specified")) - host = instance['host'] - if not host: - raise exception.NovaException(_("Unable to find host for " - "Instance %s") % instance['uuid']) - queue = self.db.queue_get_for(context, FLAGS.compute_topic, host) - if instance: - params['instance_uuid'] = instance['uuid'] - kwargs = {'method': compute_method, 'args': params} - return rpc_method(context, queue, kwargs) - - def _cast_compute_message(self, *args, **kwargs): - """Generic handler for RPC casts to compute.""" - self._cast_or_call_compute_message(rpc.cast, *args, **kwargs) - - def _call_compute_message(self, *args, **kwargs): - """Generic handler for RPC calls to compute.""" - return self._cast_or_call_compute_message(rpc.call, *args, **kwargs) - - -class API(BaseAPI): +class API(base.Base): """API for interacting with the compute manager.""" def __init__(self, image_service=None, network_api=None, volume_api=None, @@ -160,6 +116,7 @@ class API(BaseAPI): self.volume_api = volume_api or volume.API() self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI() self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() + self.compute_rpcapi = compute_rpcapi.ComputeAPI() super(API, self).__init__(**kwargs) def _check_injected_file_quota(self, context, injected_files): @@ -762,10 +719,8 @@ class API(BaseAPI): hosts.add(instance['host']) for host in hosts: - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "refresh_security_group_rules", - "args": {"security_group_id": security_group.id}}) + self.compute_rpcapi.refresh_security_group_rules(context, + security_group.id, host=host) def trigger_security_group_members_refresh(self, context, group_ids): """Called when a security group gains a new or loses a member. @@ -805,10 +760,8 @@ class API(BaseAPI): # ...and finally we tell these nodes to refresh their view of this # particular security group. for host in hosts: - rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), - {"method": "refresh_security_group_members", - "args": {"security_group_id": group_id}}) + self.compute_rpcapi.refresh_security_group_members(context, + group_id, host=host) def trigger_provider_fw_rules_refresh(self, context): """Called when a rule is added/removed from a provider firewall""" @@ -862,11 +815,10 @@ class API(BaseAPI): self.db.instance_add_security_group(context.elevated(), instance_uuid, security_group['id']) - params = {"security_group_id": security_group['id']} # NOTE(comstud): No instance_uuid argument to this compute manager # call - self._cast_compute_message('refresh_security_group_rules', - context, host=instance['host'], params=params) + self.compute_rpcapi.refresh_security_group_rules(context, + security_group['id'], host=instance['host']) @wrap_check_policy def remove_security_group(self, context, instance, security_group_name): @@ -891,11 +843,10 @@ class API(BaseAPI): self.db.instance_remove_security_group(context.elevated(), instance_uuid, security_group['id']) - params = {"security_group_id": security_group['id']} # NOTE(comstud): No instance_uuid argument to this compute manager # call - self._cast_compute_message('refresh_security_group_rules', - context, host=instance['host'], params=params) + self.compute_rpcapi.refresh_security_group_rules(context, + security_group['id'], host=instance['host']) @wrap_check_policy def update(self, context, instance, **kwargs): @@ -926,16 +877,14 @@ class API(BaseAPI): # NOTE(jerdfelt): The compute daemon handles reclaiming instances # that are in soft delete. If there is no host assigned, there is # no daemon to reclaim, so delete it immediately. - host = instance['host'] - if host: + if instance['host']: self.update(context, instance, vm_state=vm_states.SOFT_DELETE, task_state=task_states.POWERING_OFF, deleted_at=utils.utcnow()) - self._cast_compute_message('power_off_instance', - context, instance) + self.compute_rpcapi.power_off_instance(context, instance) else: LOG.warning(_('No host for instance, deleting immediately'), instance=instance) @@ -946,9 +895,8 @@ class API(BaseAPI): pass def _delete(self, context, instance): - host = instance['host'] try: - if not host: + if not instance['host']: # Just update database, nothing else we can do return self.db.instance_destroy(context, instance['id']) @@ -964,14 +912,13 @@ class API(BaseAPI): context, instance['uuid'], 'finished') if migration_ref: src_host = migration_ref['source_compute'] - params = {'migration_id': migration_ref['id']} # Call since this can race with the terminate_instance - self._call_compute_message('confirm_resize', context, - instance, host=src_host, - params=params) + self.compute_rpcapi.confirm_resize(context, + instance, migration_ref['id'], + host=src_host, cast=False) + + self.compute_rpcapi.terminate_instance(context, instance) - self._cast_compute_message('terminate_instance', - context, instance) except exception.InstanceNotFound: # NOTE(comstud): Race condition. Instance already gone. pass @@ -1002,13 +949,9 @@ class API(BaseAPI): task_state=None, deleted_at=None) - host = instance['host'] - if host: - self.update(context, - instance, - task_state=task_states.POWERING_ON) - self._cast_compute_message('power_on_instance', - context, instance) + if instance['host']: + self.update(context, instance, task_state=task_states.POWERING_ON) + self.compute_rpcapi.power_on_instance(context, instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SOFT_DELETE]) @@ -1032,9 +975,7 @@ class API(BaseAPI): terminated_at=utils.utcnow(), progress=0) - rpc_method = rpc.cast if do_cast else rpc.call - self._cast_or_call_compute_message(rpc_method, 'stop_instance', - context, instance) + self.compute_rpcapi.stop_instance(context, instance, cast=do_cast) @wrap_check_policy @check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF]) @@ -1062,7 +1003,7 @@ class API(BaseAPI): # TODO(yamahata): injected_files isn't supported right now. # It is used only for osapi. not for ec2 api. # availability_zone isn't used by run_instance. - self._cast_compute_message('start_instance', context, instance) + self.compute_rpcapi.start_instance(context, instance) #NOTE(bcwaldon): no policy check here since it should be rolled in to # search_opts in get_all @@ -1275,10 +1216,9 @@ class API(BaseAPI): sent_meta['properties'] = properties recv_meta = self.image_service.create(context, sent_meta) - params = {'image_id': recv_meta['id'], 'image_type': image_type, - 'backup_type': backup_type, 'rotation': rotation} - self._cast_compute_message('snapshot_instance', context, instance, - params=params) + self.compute_rpcapi.snapshot_instance(context, instance=instance, + image_id=recv_meta['id'], image_type=image_type, + backup_type=backup_type, rotation=rotation) return recv_meta def _get_minram_mindisk_params(self, context, instance): @@ -1312,8 +1252,8 @@ class API(BaseAPI): instance, vm_state=vm_states.ACTIVE, task_state=state) - self._cast_compute_message('reboot_instance', context, instance, - params={'reboot_type': reboot_type}) + self.compute_rpcapi.reboot_instance(context, instance=instance, + reboot_type=reboot_type) def _get_image(self, context, image_href): """Throws an ImageNotFound exception if image_href does not exist.""" @@ -1383,15 +1323,9 @@ class API(BaseAPI): # system metadata... and copy in the properties for the new image. _reset_image_metadata() - rebuild_params = { - "new_pass": admin_password, - "injected_files": files_to_inject, - "image_ref": image_href, - "orig_image_ref": orig_image_ref, - } - - self._cast_compute_message('rebuild_instance', context, instance, - params=rebuild_params) + self.compute_rpcapi.rebuild_instance(context, instance=instance, + new_pass=admin_password, injected_files=files_to_inject, + image_ref=image_href, orig_image_ref=orig_image_ref) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF], @@ -1410,9 +1344,9 @@ class API(BaseAPI): vm_state=vm_states.RESIZING, task_state=task_states.RESIZE_REVERTING) - params = {'migration_id': migration_ref['id']} - self._cast_compute_message('revert_resize', context, instance, - host=migration_ref['dest_compute'], params=params) + self.compute_rpcapi.revert_resize(context, + instance=instance, migration_id=migration_ref['id'], + host=migration_ref['dest_compute']) self.db.migration_update(context, migration_ref['id'], {'status': 'reverted'}) @@ -1434,9 +1368,9 @@ class API(BaseAPI): vm_state=vm_states.ACTIVE, task_state=None) - params = {'migration_id': migration_ref['id']} - self._cast_compute_message('confirm_resize', context, instance, - host=migration_ref['source_compute'], params=params) + self.compute_rpcapi.confirm_resize(context, + instance=instance, migration_id=migration_ref['id'], + host=migration_ref['source_compute']) self.db.migration_update(context, migration_ref['id'], {'status': 'confirmed'}) @@ -1510,14 +1444,14 @@ class API(BaseAPI): @wrap_check_policy def add_fixed_ip(self, context, instance, network_id): """Add fixed_ip from specified network to given instance.""" - self._cast_compute_message('add_fixed_ip_to_instance', context, - instance, params=dict(network_id=network_id)) + self.compute_rpcapi.add_fixed_ip_to_instance(context, + instance=instance, network_id=network_id) @wrap_check_policy def remove_fixed_ip(self, context, instance, address): """Remove fixed_ip from specified network to given instance.""" - self._cast_compute_message('remove_fixed_ip_from_instance', - context, instance, params=dict(address=address)) + self.compute_rpcapi.remove_fixed_ip_from_instance(context, + instance=instance, address=address) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1529,7 +1463,7 @@ class API(BaseAPI): instance, vm_state=vm_states.ACTIVE, task_state=task_states.PAUSING) - self._cast_compute_message('pause_instance', context, instance) + self.compute_rpcapi.pause_instance(context, instance=instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.PAUSED]) @@ -1539,13 +1473,12 @@ class API(BaseAPI): instance, vm_state=vm_states.PAUSED, task_state=task_states.UNPAUSING) - self._cast_compute_message('unpause_instance', context, instance) + self.compute_rpcapi.unpause_instance(context, instance=instance) @wrap_check_policy def get_diagnostics(self, context, instance): """Retrieve diagnostics for the given instance.""" - return self._call_compute_message("get_diagnostics", context, - instance) + return self.compute_rpcapi.get_diagnostics(context, instance=instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1557,7 +1490,7 @@ class API(BaseAPI): instance, vm_state=vm_states.ACTIVE, task_state=task_states.SUSPENDING) - self._cast_compute_message('suspend_instance', context, instance) + self.compute_rpcapi.suspend_instance(context, instance=instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.SUSPENDED]) @@ -1567,7 +1500,7 @@ class API(BaseAPI): instance, vm_state=vm_states.SUSPENDED, task_state=task_states.RESUMING) - self._cast_compute_message('resume_instance', context, instance) + self.compute_rpcapi.resume_instance(context, instance=instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF, @@ -1580,11 +1513,8 @@ class API(BaseAPI): vm_state=vm_states.ACTIVE, task_state=task_states.RESCUING) - rescue_params = { - "rescue_password": rescue_password - } - self._cast_compute_message('rescue_instance', context, instance, - params=rescue_params) + self.compute_rpcapi.rescue_instance(context, instance=instance, + rescue_password=rescue_password) @wrap_check_policy @check_instance_state(vm_state=[vm_states.RESCUED]) @@ -1594,7 +1524,7 @@ class API(BaseAPI): instance, vm_state=vm_states.RESCUED, task_state=task_states.UNRESCUING) - self._cast_compute_message('unrescue_instance', context, instance) + self.compute_rpcapi.unrescue_instance(context, instance=instance) @wrap_check_policy @check_instance_state(vm_state=[vm_states.ACTIVE]) @@ -1604,22 +1534,20 @@ class API(BaseAPI): instance, task_state=task_states.UPDATING_PASSWORD) - params = {"new_pass": password} - self._cast_compute_message('set_admin_password', context, instance, - params=params) + self.compute_rpcapi.set_admin_password(context, instance=instance, + new_pass=password) @wrap_check_policy def inject_file(self, context, instance, path, file_contents): """Write a file to the given instance.""" - params = {'path': path, 'file_contents': file_contents} - self._cast_compute_message('inject_file', context, instance, - params=params) + self.compute_rpcapi.inject_file(context, instance=instance, path=path, + file_contents=file_contents) @wrap_check_policy def get_vnc_console(self, context, instance, console_type): """Get a url to an instance Console.""" - connect_info = self._call_compute_message('get_vnc_console', - context, instance, params={"console_type": console_type}) + connect_info = self.compute_rpcapi.get_vnc_console(context, + instance=instance, console_type=console_type) self.consoleauth_rpcapi.authorize_console(context, connect_info['token'], console_type, connect_info['host'], @@ -1630,19 +1558,18 @@ class API(BaseAPI): @wrap_check_policy def get_console_output(self, context, instance, tail_length=None): """Get console output for an an instance.""" - params = {'tail_length': tail_length} - return self._call_compute_message('get_console_output', context, - instance, params=params) + return self.compute_rpcapi.get_console_output(context, + instance=instance, tail_length=tail_length) @wrap_check_policy def lock(self, context, instance): """Lock the given instance.""" - self._cast_compute_message('lock_instance', context, instance) + self.compute_rpcapi.lock_instance(context, instance=instance) @wrap_check_policy def unlock(self, context, instance): """Unlock the given instance.""" - self._cast_compute_message('unlock_instance', context, instance) + self.compute_rpcapi.unlock_instance(context, instance=instance) @wrap_check_policy def get_lock(self, context, instance): @@ -1652,12 +1579,12 @@ class API(BaseAPI): @wrap_check_policy def reset_network(self, context, instance): """Reset networking on the instance.""" - self._cast_compute_message('reset_network', context, instance) + self.compute_rpcapi.reset_network(context, instance=instance) @wrap_check_policy def inject_network_info(self, context, instance): """Inject network info for the instance.""" - self._cast_compute_message('inject_network_info', context, instance) + self.compute_rpcapi.inject_network_info(context, instance=instance) @wrap_check_policy def attach_volume(self, context, instance, volume_id, device): @@ -1667,10 +1594,8 @@ class API(BaseAPI): volume = self.volume_api.get(context, volume_id) self.volume_api.check_attach(context, volume) self.volume_api.reserve_volume(context, volume) - params = {"volume_id": volume_id, - "mountpoint": device} - self._cast_compute_message('attach_volume', context, instance, - params=params) + self.compute_rpcapi.attach_volume(context, instance=instance, + volume_id=volume_id, mountpoint=device) # FIXME(comstud): I wonder if API should pull in the instance from # the volume ID via volume API and pass it and the volume object here @@ -1688,9 +1613,8 @@ class API(BaseAPI): volume = self.volume_api.get(context, volume_id) self.volume_api.check_detach(context, volume) - params = {'volume_id': volume_id} - self._cast_compute_message('detach_volume', context, instance, - params=params) + self.compute_rpcapi.detach_volume(context, instance=instance, + volume_id=volume_id) return instance @wrap_check_policy @@ -1778,32 +1702,38 @@ class API(BaseAPI): instance['uuid']) -class HostAPI(BaseAPI): +class HostAPI(base.Base): + def __init__(self): + self.compute_rpcapi = compute_rpcapi.ComputeAPI() + super(HostAPI, self).__init__() + """Sub-set of the Compute Manager API for managing host operations.""" def set_host_enabled(self, context, host, enabled): """Sets the specified host's ability to accept new instances.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - return self._call_compute_message("set_host_enabled", context, - host=host, params={"enabled": enabled}) + return self.compute_rpcapi.set_host_enabled(context, enabled=enabled, + host=host) def host_power_action(self, context, host, action): """Reboots, shuts down or powers up the host.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - return self._call_compute_message("host_power_action", context, - host=host, params={"action": action}) + topic = self.db.queue_get_for(context, FLAGS.compute_topic, host) + return self.compute_rpcapi.host_power_action(context, action=action, + host=host) def set_host_maintenance(self, context, host, mode): """Start/Stop host maintenance window. On start, it triggers guest VMs evacuation.""" - return self._call_compute_message("host_maintenance_mode", context, - host=host, params={"host": host, "mode": mode}) + return self.compute_rpcapi.host_maintenance_mode(context, + host_param=host, mode=mode, host=host) class AggregateAPI(base.Base): """Sub-set of the Compute Manager API for managing host aggregates.""" def __init__(self, **kwargs): + self.compute_rpcapi = compute_rpcapi.ComputeAPI() super(AggregateAPI, self).__init__(**kwargs) def create_aggregate(self, context, aggregate_name, availability_zone): @@ -1883,10 +1813,8 @@ class AggregateAPI(base.Base): if aggregate.operational_state == aggregate_states.CREATED: values = {'operational_state': aggregate_states.CHANGING} self.db.aggregate_update(context, aggregate_id, values) - queue = self.db.queue_get_for(context, service.topic, host) - rpc.cast(context, queue, {"method": "add_aggregate_host", - "args": {"aggregate_id": aggregate_id, - "host": host}, }) + self.compute_rpcapi.add_aggregate_host(context, + aggregate_id=aggregate_id, host_param=host, host=host) return self.get_aggregate(context, aggregate_id) else: invalid = {aggregate_states.CHANGING: 'setup in progress', @@ -1906,10 +1834,8 @@ class AggregateAPI(base.Base): if aggregate.operational_state in [aggregate_states.ACTIVE, aggregate_states.ERROR]: self.db.aggregate_host_delete(context, aggregate_id, host) - queue = self.db.queue_get_for(context, service.topic, host) - rpc.cast(context, queue, {"method": "remove_aggregate_host", - "args": {"aggregate_id": aggregate_id, - "host": host}, }) + self.compute_rpcapi.remove_aggregate_host(context, + aggregate_id=aggregate_id, host_param=host, host=host) return self.get_aggregate(context, aggregate_id) else: invalid = {aggregate_states.CREATED: 'no hosts to remove', diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 8025a46a5d79..bd3cd02cc83c 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -224,6 +224,8 @@ def _get_additional_capabilities(): class ComputeManager(manager.SchedulerDependentManager): """Manages the running instances from creation to destruction.""" + RPC_API_VERSION = '1.0' + def __init__(self, compute_driver=None, *args, **kwargs): """Load configuration options and connect to the hypervisor.""" # TODO(vish): sync driver creation logic with the rest of the system diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py new file mode 100644 index 000000000000..a2a3b281cffe --- /dev/null +++ b/nova/compute/rpcapi.py @@ -0,0 +1,276 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Client side of the compute RPC API. +""" + +from nova.db import base +from nova import exception +from nova import flags +import nova.rpc.proxy + + +FLAGS = flags.FLAGS + + +class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base): + '''Client side of the compute rpc API. + + API version history: + + 1.0 - Initial version. + ''' + + RPC_API_VERSION = '1.0' + + def __init__(self): + super(ComputeAPI, self).__init__(topic=FLAGS.compute_topic, + default_version=self.RPC_API_VERSION) + + def _compute_topic(self, ctxt, host, instance): + '''Get the topic to use for a message. + + :param ctxt: request context + :param host: explicit host to send the message to. + :param instance: If an explicit host was not specified, use + instance['host'] + + :returns: A topic string + ''' + if not host: + if not instance: + raise exception.NovaException(_('No compute host specified')) + host = instance['host'] + if not host: + raise exception.NovaException(_('Unable to find host for ' + 'Instance %s') % instance['uuid']) + return self.db.queue_get_for(ctxt, self.topic, host) + + def add_aggregate_host(self, ctxt, aggregate_id, host_param, host): + '''Add aggregate host. + + :param ctxt: request context + :param aggregate_id: + :param host_param: This value is placed in the message to be the 'host' + parameter for the remote method. + :param host: This is the host to send the message to. + ''' + self.cast(ctxt, self.make_msg('add_aggregate_host', + aggregate_id=aggregate_id, host=host_param), + topic=self._compute_topic(ctxt, host, None)) + + def add_fixed_ip_to_instance(self, ctxt, instance, network_id): + self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance', + instance_uuid=instance['uuid'], network_id=network_id), + topic=self._compute_topic(ctxt, None, instance)) + + def attach_volume(self, ctxt, instance, volume_id, mountpoint): + self.cast(ctxt, self.make_msg('attach_volume', + instance_uuid=instance['uuid'], volume_id=volume_id, + mountpoint=mountpoint), + topic=self._compute_topic(ctxt, None, instance)) + + def confirm_resize(self, ctxt, instance, migration_id, host, + cast=True): + rpc_method = self.cast if cast else self.call + return rpc_method(ctxt, self.make_msg('confirm_resize', + instance_uuid=instance['uuid'], migration_id=migration_id), + topic=self._compute_topic(ctxt, host, instance)) + + def detach_volume(self, ctxt, instance, volume_id): + self.cast(ctxt, self.make_msg('detach_volume', + instance_uuid=instance['uuid'], volume_id=volume_id), + topic=self._compute_topic(ctxt, None, instance)) + + def get_console_output(self, ctxt, instance, tail_length): + return self.call(ctxt, self.make_msg('get_console_output', + instance_uuid=instance['uuid'], tail_length=tail_length), + topic=self._compute_topic(ctxt, None, instance)) + + def get_diagnostics(self, ctxt, instance): + return self.call(ctxt, self.make_msg('get_diagnostics', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def get_vnc_console(self, ctxt, instance, console_type): + return self.call(ctxt, self.make_msg('get_vnc_console', + instance_uuid=instance['uuid'], console_type=console_type), + topic=self._compute_topic(ctxt, None, instance)) + + def host_maintenance_mode(self, ctxt, host_param, mode, host): + '''Set host maintenance mode + + :param ctxt: request context + :param host_param: This value is placed in the message to be the 'host' + parameter for the remote method. + :param mode: + :param host: This is the host to send the message to. + ''' + return self.call(ctxt, self.make_msg('host_maintenance_mode', + host=host_param, mode=mode), + topic=self._compute_topic(ctxt, host, None)) + + def host_power_action(self, ctxt, action, host): + return self.call(ctxt, self.make_msg('host_power_action', + action=action), topic=self._compute_topic(ctxt, host, None)) + + def inject_file(self, ctxt, instance, path, file_contents): + self.cast(ctxt, self.make_msg('inject_file', + instance_uuid=instance['uuid'], path=path, + file_contents=file_contents), + topic=self._compute_topic(ctxt, None, instance)) + + def inject_network_info(self, ctxt, instance): + self.cast(ctxt, self.make_msg('inject_network_info', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def lock_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('lock_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def pause_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('pause_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def power_off_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('power_off_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def power_on_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('power_on_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def reboot_instance(self, ctxt, instance, reboot_type): + self.cast(ctxt, self.make_msg('reboot_instance', + instance_uuid=instance['uuid'], reboot_type=reboot_type), + topic=self._compute_topic(ctxt, None, instance)) + + def rebuild_instance(self, ctxt, instance, new_pass, injected_files, + image_ref, orig_image_ref): + self.cast(ctxt, self.make_msg('rebuild_instance', + instance_uuid=instance['uuid'], new_pass=new_pass, + injected_files=injected_files, image_ref=image_ref, + orig_image_ref=orig_image_ref), + topic=self._compute_topic(ctxt, None, instance)) + + def refresh_security_group_rules(self, ctxt, security_group_id, host): + self.cast(ctxt, self.make_msg('refresh_security_group_rules', + security_group_id=security_group_id), + topic=self._compute_topic(ctxt, host, None)) + + def refresh_security_group_members(self, ctxt, security_group_id, + host): + self.cast(ctxt, self.make_msg('refresh_security_group_members', + security_group_id=security_group_id), + topic=self._compute_topic(ctxt, host, None)) + + def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host): + '''Remove aggregate host. + + :param ctxt: request context + :param aggregate_id: + :param host_param: This value is placed in the message to be the 'host' + parameter for the remote method. + :param host: This is the host to send the message to. + ''' + self.cast(ctxt, self.make_msg('remove_aggregate_host', + aggregate_id=aggregate_id, host=host_param), + topic=self._compute_topic(ctxt, host, None)) + + def remove_fixed_ip_from_instance(self, ctxt, instance, address): + self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance', + instance_uuid=instance['uuid'], address=address), + topic=self._compute_topic(ctxt, None, instance)) + + def rescue_instance(self, ctxt, instance, rescue_password): + self.cast(ctxt, self.make_msg('rescue_instance', + instance_uuid=instance['uuid'], + rescue_password=rescue_password), + topic=self._compute_topic(ctxt, None, instance)) + + def reset_network(self, ctxt, instance): + self.cast(ctxt, self.make_msg('reset_network', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def resume_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('resume_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def revert_resize(self, ctxt, instance, migration_id, host): + self.cast(ctxt, self.make_msg('revert_resize', + instance_uuid=instance['uuid'], migration_id=migration_id), + topic=self._compute_topic(ctxt, host, instance)) + + def set_admin_password(self, ctxt, instance, new_pass): + self.cast(ctxt, self.make_msg('set_admin_password', + instance_uuid=instance['uuid'], new_pass=new_pass), + topic=self._compute_topic(ctxt, None, instance)) + + def set_host_enabled(self, ctxt, enabled, host): + return self.call(ctxt, self.make_msg('set_host_enabled', + enabled=enabled), topic=self._compute_topic(ctxt, host, None)) + + def snapshot_instance(self, ctxt, instance, image_id, image_type, + backup_type, rotation): + self.cast(ctxt, self.make_msg('snapshot_instance', + instance_uuid=instance['uuid'], image_id=image_id, + image_type=image_type, backup_type=backup_type, + rotation=rotation), + topic=self._compute_topic(ctxt, None, instance)) + + def start_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('start_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def stop_instance(self, ctxt, instance, cast=True): + rpc_method = self.cast if cast else self.call + return rpc_method(ctxt, self.make_msg('stop_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def suspend_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('suspend_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def terminate_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('terminate_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def unlock_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('unlock_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def unpause_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('unpause_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) + + def unrescue_instance(self, ctxt, instance): + self.cast(ctxt, self.make_msg('unrescue_instance', + instance_uuid=instance['uuid']), + topic=self._compute_topic(ctxt, None, instance)) diff --git a/nova/tests/compute/__init__.py b/nova/tests/compute/__init__.py new file mode 100644 index 000000000000..7e04e7c73b39 --- /dev/null +++ b/nova/tests/compute/__init__.py @@ -0,0 +1,19 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work +from nova.tests import * diff --git a/nova/tests/test_compute.py b/nova/tests/compute/test_compute.py similarity index 99% rename from nova/tests/test_compute.py rename to nova/tests/compute/test_compute.py index bba07cadbc44..7af2c7ef1391 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -33,6 +33,7 @@ from nova.compute import api as compute_api from nova.compute import instance_types from nova.compute import manager as compute_manager from nova.compute import power_state +from nova.compute import rpcapi as compute_rpcapi from nova.compute import task_states from nova.compute import vm_states from nova import context @@ -3490,13 +3491,14 @@ class ComputeAPITestCase(BaseTestCase): rpc_msg1 = {'method': 'get_vnc_console', 'args': {'instance_uuid': fake_instance['uuid'], - 'console_type': fake_console_type}} + 'console_type': fake_console_type}, + 'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION} rpc_msg2 = {'method': 'authorize_console', 'args': fake_connect_info, 'version': '1.0'} rpc.call(self.context, 'compute.%s' % fake_instance['host'], - rpc_msg1).AndReturn(fake_connect_info2) + rpc_msg1, None).AndReturn(fake_connect_info2) rpc.call(self.context, FLAGS.consoleauth_topic, rpc_msg2, None).AndReturn(None) @@ -3516,9 +3518,10 @@ class ComputeAPITestCase(BaseTestCase): rpc_msg = {'method': 'get_console_output', 'args': {'instance_uuid': fake_instance['uuid'], - 'tail_length': fake_tail_length}} + 'tail_length': fake_tail_length}, + 'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION} rpc.call(self.context, 'compute.%s' % fake_instance['host'], - rpc_msg).AndReturn(fake_console_output) + rpc_msg, None).AndReturn(fake_console_output) self.mox.ReplayAll() @@ -4018,7 +4021,7 @@ class ComputeHostAPITestCase(BaseTestCase): self.host_api = compute_api.HostAPI() def _rpc_call_stub(self, call_info): - def fake_rpc_call(context, topic, msg): + def fake_rpc_call(context, topic, msg, timeout=None): call_info['context'] = context call_info['topic'] = topic call_info['msg'] = msg @@ -4034,7 +4037,8 @@ class ComputeHostAPITestCase(BaseTestCase): self.assertEqual(call_info['topic'], 'compute.fake_host') self.assertEqual(call_info['msg'], {'method': 'set_host_enabled', - 'args': {'enabled': 'fake_enabled'}}) + 'args': {'enabled': 'fake_enabled'}, + 'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION}) def test_host_power_action(self): ctxt = context.RequestContext('fake', 'fake') @@ -4045,7 +4049,8 @@ class ComputeHostAPITestCase(BaseTestCase): self.assertEqual(call_info['topic'], 'compute.fake_host') self.assertEqual(call_info['msg'], {'method': 'host_power_action', - 'args': {'action': 'fake_action'}}) + 'args': {'action': 'fake_action'}, + 'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION}) def test_set_host_maintenance(self): ctxt = context.RequestContext('fake', 'fake') @@ -4056,7 +4061,8 @@ class ComputeHostAPITestCase(BaseTestCase): self.assertEqual(call_info['topic'], 'compute.fake_host') self.assertEqual(call_info['msg'], {'method': 'host_maintenance_mode', - 'args': {'host': 'fake_host', 'mode': 'fake_mode'}}) + 'args': {'host': 'fake_host', 'mode': 'fake_mode'}, + 'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION}) class KeypairAPITestCase(BaseTestCase): diff --git a/nova/tests/compute/test_rpcapi.py b/nova/tests/compute/test_rpcapi.py new file mode 100644 index 000000000000..593598ebbfa4 --- /dev/null +++ b/nova/tests/compute/test_rpcapi.py @@ -0,0 +1,236 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for nova.compute.rpcapi +""" + +from nova.compute import rpcapi as compute_rpcapi +from nova import context +from nova import flags +from nova import rpc +from nova import test + + +FLAGS = flags.FLAGS + + +class ComputeRpcAPITestCase(test.TestCase): + + def setUp(self): + self.fake_instance = {'uuid': 'fake_uuid', 'host': 'fake_host'} + super(ComputeRpcAPITestCase, self).setUp() + + def tearDown(self): + super(ComputeRpcAPITestCase, self).tearDown() + + def _test_compute_api(self, method, rpc_method, **kwargs): + ctxt = context.RequestContext('fake_user', 'fake_project') + rpcapi = compute_rpcapi.ComputeAPI() + expected_retval = 'foo' if method == 'call' else None + expected_msg = rpcapi.make_msg(method, **kwargs) + if 'host_param' in expected_msg['args']: + host_param = expected_msg['args']['host_param'] + del expected_msg['args']['host_param'] + expected_msg['args']['host'] = host_param + elif 'host' in expected_msg['args']: + del expected_msg['args']['host'] + if 'instance' in expected_msg['args']: + instance = expected_msg['args']['instance'] + del expected_msg['args']['instance'] + expected_msg['args']['instance_uuid'] = instance['uuid'] + expected_msg['version'] = rpcapi.RPC_API_VERSION + cast_and_call = ['confirm_resize', 'stop_instance'] + if rpc_method == 'call' and method in cast_and_call: + kwargs['cast'] = False + if 'host' in kwargs: + host = kwargs['host'] + else: + host = kwargs['instance']['host'] + expected_topic = '%s.%s' % (FLAGS.compute_topic, host) + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if expected_retval: + return expected_retval + + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + retval = getattr(rpcapi, method)(ctxt, **kwargs) + + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, expected_topic, expected_msg] + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_add_aggregate_host(self): + self._test_compute_api('add_aggregate_host', 'cast', aggregate_id='id', + host_param='host', host='host') + + def test_add_fixed_ip_to_instance(self): + self._test_compute_api('add_fixed_ip_to_instance', 'cast', + instance=self.fake_instance, network_id='id') + + def test_attach_volume(self): + self._test_compute_api('attach_volume', 'cast', + instance=self.fake_instance, volume_id='id', mountpoint='mp') + + def test_confirm_resize_cast(self): + self._test_compute_api('confirm_resize', 'cast', + instance=self.fake_instance, migration_id='id', host='host') + + def test_confirm_resize_call(self): + self._test_compute_api('confirm_resize', 'call', + instance=self.fake_instance, migration_id='id', host='host') + + def test_detach_volume(self): + self._test_compute_api('detach_volume', 'cast', + instance=self.fake_instance, volume_id='id') + + def test_get_console_output(self): + self._test_compute_api('get_console_output', 'call', + instance=self.fake_instance, tail_length='tl') + + def test_get_diagnostics(self): + self._test_compute_api('get_diagnostics', 'call', + instance=self.fake_instance) + + def test_get_vnc_console(self): + self._test_compute_api('get_vnc_console', 'call', + instance=self.fake_instance, console_type='type') + + def test_host_maintenance_mode(self): + self._test_compute_api('host_maintenance_mode', 'call', + host_param='param', mode='mode', host='host') + + def test_host_power_action(self): + self._test_compute_api('host_power_action', 'call', action='action', + host='host') + + def test_inject_file(self): + self._test_compute_api('inject_file', 'cast', + instance=self.fake_instance, path='path', file_contents='fc') + + def test_inject_network_info(self): + self._test_compute_api('inject_network_info', 'cast', + instance=self.fake_instance) + + def test_lock_instance(self): + self._test_compute_api('lock_instance', 'cast', + instance=self.fake_instance) + + def test_pause_instance(self): + self._test_compute_api('pause_instance', 'cast', + instance=self.fake_instance) + + def test_power_off_instance(self): + self._test_compute_api('power_off_instance', 'cast', + instance=self.fake_instance) + + def test_power_on_instance(self): + self._test_compute_api('power_on_instance', 'cast', + instance=self.fake_instance) + + def test_reboot_instance(self): + self._test_compute_api('reboot_instance', 'cast', + instance=self.fake_instance, reboot_type='type') + + def test_rebuild_instance(self): + self._test_compute_api('rebuild_instance', 'cast', + instance=self.fake_instance, new_pass='pass', + injected_files='files', image_ref='ref', + orig_image_ref='orig_ref') + + def test_refresh_security_group_rules(self): + self._test_compute_api('refresh_security_group_rules', 'cast', + security_group_id='id', host='host') + + def test_refresh_security_group_members(self): + self._test_compute_api('refresh_security_group_members', 'cast', + security_group_id='id', host='host') + + def test_remove_aggregate_host(self): + self._test_compute_api('remove_aggregate_host', 'cast', + aggregate_id='id', host_param='host', host='host') + + def test_remove_fixed_ip_from_instance(self): + self._test_compute_api('remove_fixed_ip_from_instance', 'cast', + instance=self.fake_instance, address='addr') + + def test_rescue_instance(self): + self._test_compute_api('rescue_instance', 'cast', + instance=self.fake_instance, rescue_password='pw') + + def test_reset_network(self): + self._test_compute_api('reset_network', 'cast', + instance=self.fake_instance) + + def test_resume_instance(self): + self._test_compute_api('resume_instance', 'cast', + instance=self.fake_instance) + + def test_revert_resize(self): + self._test_compute_api('revert_resize', 'cast', + instance=self.fake_instance, migration_id='id', host='host') + + def test_set_admin_password(self): + self._test_compute_api('set_admin_password', 'cast', + instance=self.fake_instance, new_pass='pw') + + def test_set_host_enabled(self): + self._test_compute_api('set_host_enabled', 'call', + enabled='enabled', host='host') + + def test_snapshot_instance(self): + self._test_compute_api('snapshot_instance', 'cast', + instance=self.fake_instance, image_id='id', image_type='type', + backup_type='type', rotation='rotation') + + def test_start_instance(self): + self._test_compute_api('start_instance', 'cast', + instance=self.fake_instance) + + def test_stop_instance_cast(self): + self._test_compute_api('stop_instance', 'cast', + instance=self.fake_instance) + + def test_stop_instance_call(self): + self._test_compute_api('stop_instance', 'call', + instance=self.fake_instance) + + def test_suspend_instance(self): + self._test_compute_api('suspend_instance', 'cast', + instance=self.fake_instance) + + def test_terminate_instance(self): + self._test_compute_api('terminate_instance', 'cast', + instance=self.fake_instance) + + def test_unlock_instance(self): + self._test_compute_api('unlock_instance', 'cast', + instance=self.fake_instance) + + def test_unpause_instance(self): + self._test_compute_api('unpause_instance', 'cast', + instance=self.fake_instance) + + def test_unrescue_instance(self): + self._test_compute_api('unrescue_instance', 'cast', + instance=self.fake_instance)