Add version to compute rpc API.

Part of blueprint versioned-rpc-apis.

Change-Id: I5943d1fae2c96cfe519817b59098402481a1026b
This commit is contained in:
Russell Bryant 2012-05-10 19:28:04 -04:00
parent 21e9d2e2b7
commit 7e15d4e28f
6 changed files with 630 additions and 165 deletions

View File

@ -30,6 +30,7 @@ from nova import block_device
from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import vm_states
from nova.consoleauth import rpcapi as consoleauth_rpcapi
@ -103,52 +104,7 @@ def check_policy(context, action, target):
nova.policy.enforce(context, _action, target)
class BaseAPI(base.Base):
"""Base API class."""
def __init__(self, **kwargs):
super(BaseAPI, self).__init__(**kwargs)
def _cast_or_call_compute_message(self, rpc_method, compute_method,
context, instance=None, host=None, params=None):
"""Generic handler for RPC casts and calls to compute.
:param rpc_method: RPC method to use (rpc.call or rpc.cast)
:param compute_method: Compute manager method to call
:param context: RequestContext of caller
:param instance: The instance object to use to find host to send to
Can be None to not include instance_uuid in args
:param host: Optional host to send to instead of instance['host']
Must be specified if 'instance' is None
:param params: Optional dictionary of arguments to be passed to the
compute worker
:returns: None
"""
if not params:
params = {}
if not host:
if not instance:
raise exception.NovaException(_("No compute host specified"))
host = instance['host']
if not host:
raise exception.NovaException(_("Unable to find host for "
"Instance %s") % instance['uuid'])
queue = self.db.queue_get_for(context, FLAGS.compute_topic, host)
if instance:
params['instance_uuid'] = instance['uuid']
kwargs = {'method': compute_method, 'args': params}
return rpc_method(context, queue, kwargs)
def _cast_compute_message(self, *args, **kwargs):
"""Generic handler for RPC casts to compute."""
self._cast_or_call_compute_message(rpc.cast, *args, **kwargs)
def _call_compute_message(self, *args, **kwargs):
"""Generic handler for RPC calls to compute."""
return self._cast_or_call_compute_message(rpc.call, *args, **kwargs)
class API(BaseAPI):
class API(base.Base):
"""API for interacting with the compute manager."""
def __init__(self, image_service=None, network_api=None, volume_api=None,
@ -160,6 +116,7 @@ class API(BaseAPI):
self.volume_api = volume_api or volume.API()
self.consoleauth_rpcapi = consoleauth_rpcapi.ConsoleAuthAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(API, self).__init__(**kwargs)
def _check_injected_file_quota(self, context, injected_files):
@ -762,10 +719,8 @@ class API(BaseAPI):
hosts.add(instance['host'])
for host in hosts:
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "refresh_security_group_rules",
"args": {"security_group_id": security_group.id}})
self.compute_rpcapi.refresh_security_group_rules(context,
security_group.id, host=host)
def trigger_security_group_members_refresh(self, context, group_ids):
"""Called when a security group gains a new or loses a member.
@ -805,10 +760,8 @@ class API(BaseAPI):
# ...and finally we tell these nodes to refresh their view of this
# particular security group.
for host in hosts:
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "refresh_security_group_members",
"args": {"security_group_id": group_id}})
self.compute_rpcapi.refresh_security_group_members(context,
group_id, host=host)
def trigger_provider_fw_rules_refresh(self, context):
"""Called when a rule is added/removed from a provider firewall"""
@ -862,11 +815,10 @@ class API(BaseAPI):
self.db.instance_add_security_group(context.elevated(),
instance_uuid,
security_group['id'])
params = {"security_group_id": security_group['id']}
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
self._cast_compute_message('refresh_security_group_rules',
context, host=instance['host'], params=params)
self.compute_rpcapi.refresh_security_group_rules(context,
security_group['id'], host=instance['host'])
@wrap_check_policy
def remove_security_group(self, context, instance, security_group_name):
@ -891,11 +843,10 @@ class API(BaseAPI):
self.db.instance_remove_security_group(context.elevated(),
instance_uuid,
security_group['id'])
params = {"security_group_id": security_group['id']}
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
self._cast_compute_message('refresh_security_group_rules',
context, host=instance['host'], params=params)
self.compute_rpcapi.refresh_security_group_rules(context,
security_group['id'], host=instance['host'])
@wrap_check_policy
def update(self, context, instance, **kwargs):
@ -926,16 +877,14 @@ class API(BaseAPI):
# NOTE(jerdfelt): The compute daemon handles reclaiming instances
# that are in soft delete. If there is no host assigned, there is
# no daemon to reclaim, so delete it immediately.
host = instance['host']
if host:
if instance['host']:
self.update(context,
instance,
vm_state=vm_states.SOFT_DELETE,
task_state=task_states.POWERING_OFF,
deleted_at=utils.utcnow())
self._cast_compute_message('power_off_instance',
context, instance)
self.compute_rpcapi.power_off_instance(context, instance)
else:
LOG.warning(_('No host for instance, deleting immediately'),
instance=instance)
@ -946,9 +895,8 @@ class API(BaseAPI):
pass
def _delete(self, context, instance):
host = instance['host']
try:
if not host:
if not instance['host']:
# Just update database, nothing else we can do
return self.db.instance_destroy(context, instance['id'])
@ -964,14 +912,13 @@ class API(BaseAPI):
context, instance['uuid'], 'finished')
if migration_ref:
src_host = migration_ref['source_compute']
params = {'migration_id': migration_ref['id']}
# Call since this can race with the terminate_instance
self._call_compute_message('confirm_resize', context,
instance, host=src_host,
params=params)
self.compute_rpcapi.confirm_resize(context,
instance, migration_ref['id'],
host=src_host, cast=False)
self.compute_rpcapi.terminate_instance(context, instance)
self._cast_compute_message('terminate_instance',
context, instance)
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass
@ -1002,13 +949,9 @@ class API(BaseAPI):
task_state=None,
deleted_at=None)
host = instance['host']
if host:
self.update(context,
instance,
task_state=task_states.POWERING_ON)
self._cast_compute_message('power_on_instance',
context, instance)
if instance['host']:
self.update(context, instance, task_state=task_states.POWERING_ON)
self.compute_rpcapi.power_on_instance(context, instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SOFT_DELETE])
@ -1032,9 +975,7 @@ class API(BaseAPI):
terminated_at=utils.utcnow(),
progress=0)
rpc_method = rpc.cast if do_cast else rpc.call
self._cast_or_call_compute_message(rpc_method, 'stop_instance',
context, instance)
self.compute_rpcapi.stop_instance(context, instance, cast=do_cast)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.STOPPED, vm_states.SHUTOFF])
@ -1062,7 +1003,7 @@ class API(BaseAPI):
# TODO(yamahata): injected_files isn't supported right now.
# It is used only for osapi. not for ec2 api.
# availability_zone isn't used by run_instance.
self._cast_compute_message('start_instance', context, instance)
self.compute_rpcapi.start_instance(context, instance)
#NOTE(bcwaldon): no policy check here since it should be rolled in to
# search_opts in get_all
@ -1275,10 +1216,9 @@ class API(BaseAPI):
sent_meta['properties'] = properties
recv_meta = self.image_service.create(context, sent_meta)
params = {'image_id': recv_meta['id'], 'image_type': image_type,
'backup_type': backup_type, 'rotation': rotation}
self._cast_compute_message('snapshot_instance', context, instance,
params=params)
self.compute_rpcapi.snapshot_instance(context, instance=instance,
image_id=recv_meta['id'], image_type=image_type,
backup_type=backup_type, rotation=rotation)
return recv_meta
def _get_minram_mindisk_params(self, context, instance):
@ -1312,8 +1252,8 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=state)
self._cast_compute_message('reboot_instance', context, instance,
params={'reboot_type': reboot_type})
self.compute_rpcapi.reboot_instance(context, instance=instance,
reboot_type=reboot_type)
def _get_image(self, context, image_href):
"""Throws an ImageNotFound exception if image_href does not exist."""
@ -1383,15 +1323,9 @@ class API(BaseAPI):
# system metadata... and copy in the properties for the new image.
_reset_image_metadata()
rebuild_params = {
"new_pass": admin_password,
"injected_files": files_to_inject,
"image_ref": image_href,
"orig_image_ref": orig_image_ref,
}
self._cast_compute_message('rebuild_instance', context, instance,
params=rebuild_params)
self.compute_rpcapi.rebuild_instance(context, instance=instance,
new_pass=admin_password, injected_files=files_to_inject,
image_ref=image_href, orig_image_ref=orig_image_ref)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF],
@ -1410,9 +1344,9 @@ class API(BaseAPI):
vm_state=vm_states.RESIZING,
task_state=task_states.RESIZE_REVERTING)
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('revert_resize', context, instance,
host=migration_ref['dest_compute'], params=params)
self.compute_rpcapi.revert_resize(context,
instance=instance, migration_id=migration_ref['id'],
host=migration_ref['dest_compute'])
self.db.migration_update(context, migration_ref['id'],
{'status': 'reverted'})
@ -1434,9 +1368,9 @@ class API(BaseAPI):
vm_state=vm_states.ACTIVE,
task_state=None)
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('confirm_resize', context, instance,
host=migration_ref['source_compute'], params=params)
self.compute_rpcapi.confirm_resize(context,
instance=instance, migration_id=migration_ref['id'],
host=migration_ref['source_compute'])
self.db.migration_update(context, migration_ref['id'],
{'status': 'confirmed'})
@ -1510,14 +1444,14 @@ class API(BaseAPI):
@wrap_check_policy
def add_fixed_ip(self, context, instance, network_id):
"""Add fixed_ip from specified network to given instance."""
self._cast_compute_message('add_fixed_ip_to_instance', context,
instance, params=dict(network_id=network_id))
self.compute_rpcapi.add_fixed_ip_to_instance(context,
instance=instance, network_id=network_id)
@wrap_check_policy
def remove_fixed_ip(self, context, instance, address):
"""Remove fixed_ip from specified network to given instance."""
self._cast_compute_message('remove_fixed_ip_from_instance',
context, instance, params=dict(address=address))
self.compute_rpcapi.remove_fixed_ip_from_instance(context,
instance=instance, address=address)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@ -1529,7 +1463,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.PAUSING)
self._cast_compute_message('pause_instance', context, instance)
self.compute_rpcapi.pause_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.PAUSED])
@ -1539,13 +1473,12 @@ class API(BaseAPI):
instance,
vm_state=vm_states.PAUSED,
task_state=task_states.UNPAUSING)
self._cast_compute_message('unpause_instance', context, instance)
self.compute_rpcapi.unpause_instance(context, instance=instance)
@wrap_check_policy
def get_diagnostics(self, context, instance):
"""Retrieve diagnostics for the given instance."""
return self._call_compute_message("get_diagnostics", context,
instance)
return self.compute_rpcapi.get_diagnostics(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@ -1557,7 +1490,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.ACTIVE,
task_state=task_states.SUSPENDING)
self._cast_compute_message('suspend_instance', context, instance)
self.compute_rpcapi.suspend_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.SUSPENDED])
@ -1567,7 +1500,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.SUSPENDED,
task_state=task_states.RESUMING)
self._cast_compute_message('resume_instance', context, instance)
self.compute_rpcapi.resume_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE, vm_states.SHUTOFF,
@ -1580,11 +1513,8 @@ class API(BaseAPI):
vm_state=vm_states.ACTIVE,
task_state=task_states.RESCUING)
rescue_params = {
"rescue_password": rescue_password
}
self._cast_compute_message('rescue_instance', context, instance,
params=rescue_params)
self.compute_rpcapi.rescue_instance(context, instance=instance,
rescue_password=rescue_password)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.RESCUED])
@ -1594,7 +1524,7 @@ class API(BaseAPI):
instance,
vm_state=vm_states.RESCUED,
task_state=task_states.UNRESCUING)
self._cast_compute_message('unrescue_instance', context, instance)
self.compute_rpcapi.unrescue_instance(context, instance=instance)
@wrap_check_policy
@check_instance_state(vm_state=[vm_states.ACTIVE])
@ -1604,22 +1534,20 @@ class API(BaseAPI):
instance,
task_state=task_states.UPDATING_PASSWORD)
params = {"new_pass": password}
self._cast_compute_message('set_admin_password', context, instance,
params=params)
self.compute_rpcapi.set_admin_password(context, instance=instance,
new_pass=password)
@wrap_check_policy
def inject_file(self, context, instance, path, file_contents):
"""Write a file to the given instance."""
params = {'path': path, 'file_contents': file_contents}
self._cast_compute_message('inject_file', context, instance,
params=params)
self.compute_rpcapi.inject_file(context, instance=instance, path=path,
file_contents=file_contents)
@wrap_check_policy
def get_vnc_console(self, context, instance, console_type):
"""Get a url to an instance Console."""
connect_info = self._call_compute_message('get_vnc_console',
context, instance, params={"console_type": console_type})
connect_info = self.compute_rpcapi.get_vnc_console(context,
instance=instance, console_type=console_type)
self.consoleauth_rpcapi.authorize_console(context,
connect_info['token'], console_type, connect_info['host'],
@ -1630,19 +1558,18 @@ class API(BaseAPI):
@wrap_check_policy
def get_console_output(self, context, instance, tail_length=None):
"""Get console output for an an instance."""
params = {'tail_length': tail_length}
return self._call_compute_message('get_console_output', context,
instance, params=params)
return self.compute_rpcapi.get_console_output(context,
instance=instance, tail_length=tail_length)
@wrap_check_policy
def lock(self, context, instance):
"""Lock the given instance."""
self._cast_compute_message('lock_instance', context, instance)
self.compute_rpcapi.lock_instance(context, instance=instance)
@wrap_check_policy
def unlock(self, context, instance):
"""Unlock the given instance."""
self._cast_compute_message('unlock_instance', context, instance)
self.compute_rpcapi.unlock_instance(context, instance=instance)
@wrap_check_policy
def get_lock(self, context, instance):
@ -1652,12 +1579,12 @@ class API(BaseAPI):
@wrap_check_policy
def reset_network(self, context, instance):
"""Reset networking on the instance."""
self._cast_compute_message('reset_network', context, instance)
self.compute_rpcapi.reset_network(context, instance=instance)
@wrap_check_policy
def inject_network_info(self, context, instance):
"""Inject network info for the instance."""
self._cast_compute_message('inject_network_info', context, instance)
self.compute_rpcapi.inject_network_info(context, instance=instance)
@wrap_check_policy
def attach_volume(self, context, instance, volume_id, device):
@ -1667,10 +1594,8 @@ class API(BaseAPI):
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_attach(context, volume)
self.volume_api.reserve_volume(context, volume)
params = {"volume_id": volume_id,
"mountpoint": device}
self._cast_compute_message('attach_volume', context, instance,
params=params)
self.compute_rpcapi.attach_volume(context, instance=instance,
volume_id=volume_id, mountpoint=device)
# FIXME(comstud): I wonder if API should pull in the instance from
# the volume ID via volume API and pass it and the volume object here
@ -1688,9 +1613,8 @@ class API(BaseAPI):
volume = self.volume_api.get(context, volume_id)
self.volume_api.check_detach(context, volume)
params = {'volume_id': volume_id}
self._cast_compute_message('detach_volume', context, instance,
params=params)
self.compute_rpcapi.detach_volume(context, instance=instance,
volume_id=volume_id)
return instance
@wrap_check_policy
@ -1778,32 +1702,38 @@ class API(BaseAPI):
instance['uuid'])
class HostAPI(BaseAPI):
class HostAPI(base.Base):
def __init__(self):
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(HostAPI, self).__init__()
"""Sub-set of the Compute Manager API for managing host operations."""
def set_host_enabled(self, context, host, enabled):
"""Sets the specified host's ability to accept new instances."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
return self._call_compute_message("set_host_enabled", context,
host=host, params={"enabled": enabled})
return self.compute_rpcapi.set_host_enabled(context, enabled=enabled,
host=host)
def host_power_action(self, context, host, action):
"""Reboots, shuts down or powers up the host."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
return self._call_compute_message("host_power_action", context,
host=host, params={"action": action})
topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
return self.compute_rpcapi.host_power_action(context, action=action,
host=host)
def set_host_maintenance(self, context, host, mode):
"""Start/Stop host maintenance window. On start, it triggers
guest VMs evacuation."""
return self._call_compute_message("host_maintenance_mode", context,
host=host, params={"host": host, "mode": mode})
return self.compute_rpcapi.host_maintenance_mode(context,
host_param=host, mode=mode, host=host)
class AggregateAPI(base.Base):
"""Sub-set of the Compute Manager API for managing host aggregates."""
def __init__(self, **kwargs):
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(AggregateAPI, self).__init__(**kwargs)
def create_aggregate(self, context, aggregate_name, availability_zone):
@ -1883,10 +1813,8 @@ class AggregateAPI(base.Base):
if aggregate.operational_state == aggregate_states.CREATED:
values = {'operational_state': aggregate_states.CHANGING}
self.db.aggregate_update(context, aggregate_id, values)
queue = self.db.queue_get_for(context, service.topic, host)
rpc.cast(context, queue, {"method": "add_aggregate_host",
"args": {"aggregate_id": aggregate_id,
"host": host}, })
self.compute_rpcapi.add_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CHANGING: 'setup in progress',
@ -1906,10 +1834,8 @@ class AggregateAPI(base.Base):
if aggregate.operational_state in [aggregate_states.ACTIVE,
aggregate_states.ERROR]:
self.db.aggregate_host_delete(context, aggregate_id, host)
queue = self.db.queue_get_for(context, service.topic, host)
rpc.cast(context, queue, {"method": "remove_aggregate_host",
"args": {"aggregate_id": aggregate_id,
"host": host}, })
self.compute_rpcapi.remove_aggregate_host(context,
aggregate_id=aggregate_id, host_param=host, host=host)
return self.get_aggregate(context, aggregate_id)
else:
invalid = {aggregate_states.CREATED: 'no hosts to remove',

View File

@ -224,6 +224,8 @@ def _get_additional_capabilities():
class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""
RPC_API_VERSION = '1.0'
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
# TODO(vish): sync driver creation logic with the rest of the system

276
nova/compute/rpcapi.py Normal file
View File

@ -0,0 +1,276 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Client side of the compute RPC API.
"""
from nova.db import base
from nova import exception
from nova import flags
import nova.rpc.proxy
FLAGS = flags.FLAGS
class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
'''Client side of the compute rpc API.
API version history:
1.0 - Initial version.
'''
RPC_API_VERSION = '1.0'
def __init__(self):
super(ComputeAPI, self).__init__(topic=FLAGS.compute_topic,
default_version=self.RPC_API_VERSION)
def _compute_topic(self, ctxt, host, instance):
'''Get the topic to use for a message.
:param ctxt: request context
:param host: explicit host to send the message to.
:param instance: If an explicit host was not specified, use
instance['host']
:returns: A topic string
'''
if not host:
if not instance:
raise exception.NovaException(_('No compute host specified'))
host = instance['host']
if not host:
raise exception.NovaException(_('Unable to find host for '
'Instance %s') % instance['uuid'])
return self.db.queue_get_for(ctxt, self.topic, host)
def add_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Add aggregate host.
:param ctxt: request context
:param aggregate_id:
:param host_param: This value is placed in the message to be the 'host'
parameter for the remote method.
:param host: This is the host to send the message to.
'''
self.cast(ctxt, self.make_msg('add_aggregate_host',
aggregate_id=aggregate_id, host=host_param),
topic=self._compute_topic(ctxt, host, None))
def add_fixed_ip_to_instance(self, ctxt, instance, network_id):
self.cast(ctxt, self.make_msg('add_fixed_ip_to_instance',
instance_uuid=instance['uuid'], network_id=network_id),
topic=self._compute_topic(ctxt, None, instance))
def attach_volume(self, ctxt, instance, volume_id, mountpoint):
self.cast(ctxt, self.make_msg('attach_volume',
instance_uuid=instance['uuid'], volume_id=volume_id,
mountpoint=mountpoint),
topic=self._compute_topic(ctxt, None, instance))
def confirm_resize(self, ctxt, instance, migration_id, host,
cast=True):
rpc_method = self.cast if cast else self.call
return rpc_method(ctxt, self.make_msg('confirm_resize',
instance_uuid=instance['uuid'], migration_id=migration_id),
topic=self._compute_topic(ctxt, host, instance))
def detach_volume(self, ctxt, instance, volume_id):
self.cast(ctxt, self.make_msg('detach_volume',
instance_uuid=instance['uuid'], volume_id=volume_id),
topic=self._compute_topic(ctxt, None, instance))
def get_console_output(self, ctxt, instance, tail_length):
return self.call(ctxt, self.make_msg('get_console_output',
instance_uuid=instance['uuid'], tail_length=tail_length),
topic=self._compute_topic(ctxt, None, instance))
def get_diagnostics(self, ctxt, instance):
return self.call(ctxt, self.make_msg('get_diagnostics',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def get_vnc_console(self, ctxt, instance, console_type):
return self.call(ctxt, self.make_msg('get_vnc_console',
instance_uuid=instance['uuid'], console_type=console_type),
topic=self._compute_topic(ctxt, None, instance))
def host_maintenance_mode(self, ctxt, host_param, mode, host):
'''Set host maintenance mode
:param ctxt: request context
:param host_param: This value is placed in the message to be the 'host'
parameter for the remote method.
:param mode:
:param host: This is the host to send the message to.
'''
return self.call(ctxt, self.make_msg('host_maintenance_mode',
host=host_param, mode=mode),
topic=self._compute_topic(ctxt, host, None))
def host_power_action(self, ctxt, action, host):
return self.call(ctxt, self.make_msg('host_power_action',
action=action), topic=self._compute_topic(ctxt, host, None))
def inject_file(self, ctxt, instance, path, file_contents):
self.cast(ctxt, self.make_msg('inject_file',
instance_uuid=instance['uuid'], path=path,
file_contents=file_contents),
topic=self._compute_topic(ctxt, None, instance))
def inject_network_info(self, ctxt, instance):
self.cast(ctxt, self.make_msg('inject_network_info',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def lock_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('lock_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def pause_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('pause_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def power_off_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('power_off_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def power_on_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('power_on_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def reboot_instance(self, ctxt, instance, reboot_type):
self.cast(ctxt, self.make_msg('reboot_instance',
instance_uuid=instance['uuid'], reboot_type=reboot_type),
topic=self._compute_topic(ctxt, None, instance))
def rebuild_instance(self, ctxt, instance, new_pass, injected_files,
image_ref, orig_image_ref):
self.cast(ctxt, self.make_msg('rebuild_instance',
instance_uuid=instance['uuid'], new_pass=new_pass,
injected_files=injected_files, image_ref=image_ref,
orig_image_ref=orig_image_ref),
topic=self._compute_topic(ctxt, None, instance))
def refresh_security_group_rules(self, ctxt, security_group_id, host):
self.cast(ctxt, self.make_msg('refresh_security_group_rules',
security_group_id=security_group_id),
topic=self._compute_topic(ctxt, host, None))
def refresh_security_group_members(self, ctxt, security_group_id,
host):
self.cast(ctxt, self.make_msg('refresh_security_group_members',
security_group_id=security_group_id),
topic=self._compute_topic(ctxt, host, None))
def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Remove aggregate host.
:param ctxt: request context
:param aggregate_id:
:param host_param: This value is placed in the message to be the 'host'
parameter for the remote method.
:param host: This is the host to send the message to.
'''
self.cast(ctxt, self.make_msg('remove_aggregate_host',
aggregate_id=aggregate_id, host=host_param),
topic=self._compute_topic(ctxt, host, None))
def remove_fixed_ip_from_instance(self, ctxt, instance, address):
self.cast(ctxt, self.make_msg('remove_fixed_ip_from_instance',
instance_uuid=instance['uuid'], address=address),
topic=self._compute_topic(ctxt, None, instance))
def rescue_instance(self, ctxt, instance, rescue_password):
self.cast(ctxt, self.make_msg('rescue_instance',
instance_uuid=instance['uuid'],
rescue_password=rescue_password),
topic=self._compute_topic(ctxt, None, instance))
def reset_network(self, ctxt, instance):
self.cast(ctxt, self.make_msg('reset_network',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def resume_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('resume_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def revert_resize(self, ctxt, instance, migration_id, host):
self.cast(ctxt, self.make_msg('revert_resize',
instance_uuid=instance['uuid'], migration_id=migration_id),
topic=self._compute_topic(ctxt, host, instance))
def set_admin_password(self, ctxt, instance, new_pass):
self.cast(ctxt, self.make_msg('set_admin_password',
instance_uuid=instance['uuid'], new_pass=new_pass),
topic=self._compute_topic(ctxt, None, instance))
def set_host_enabled(self, ctxt, enabled, host):
return self.call(ctxt, self.make_msg('set_host_enabled',
enabled=enabled), topic=self._compute_topic(ctxt, host, None))
def snapshot_instance(self, ctxt, instance, image_id, image_type,
backup_type, rotation):
self.cast(ctxt, self.make_msg('snapshot_instance',
instance_uuid=instance['uuid'], image_id=image_id,
image_type=image_type, backup_type=backup_type,
rotation=rotation),
topic=self._compute_topic(ctxt, None, instance))
def start_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('start_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def stop_instance(self, ctxt, instance, cast=True):
rpc_method = self.cast if cast else self.call
return rpc_method(ctxt, self.make_msg('stop_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def suspend_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('suspend_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def terminate_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('terminate_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def unlock_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unlock_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def unpause_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unpause_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))
def unrescue_instance(self, ctxt, instance):
self.cast(ctxt, self.make_msg('unrescue_instance',
instance_uuid=instance['uuid']),
topic=self._compute_topic(ctxt, None, instance))

View File

@ -0,0 +1,19 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
from nova.tests import *

View File

@ -33,6 +33,7 @@ from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import task_states
from nova.compute import vm_states
from nova import context
@ -3490,13 +3491,14 @@ class ComputeAPITestCase(BaseTestCase):
rpc_msg1 = {'method': 'get_vnc_console',
'args': {'instance_uuid': fake_instance['uuid'],
'console_type': fake_console_type}}
'console_type': fake_console_type},
'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION}
rpc_msg2 = {'method': 'authorize_console',
'args': fake_connect_info,
'version': '1.0'}
rpc.call(self.context, 'compute.%s' % fake_instance['host'],
rpc_msg1).AndReturn(fake_connect_info2)
rpc_msg1, None).AndReturn(fake_connect_info2)
rpc.call(self.context, FLAGS.consoleauth_topic,
rpc_msg2, None).AndReturn(None)
@ -3516,9 +3518,10 @@ class ComputeAPITestCase(BaseTestCase):
rpc_msg = {'method': 'get_console_output',
'args': {'instance_uuid': fake_instance['uuid'],
'tail_length': fake_tail_length}}
'tail_length': fake_tail_length},
'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION}
rpc.call(self.context, 'compute.%s' % fake_instance['host'],
rpc_msg).AndReturn(fake_console_output)
rpc_msg, None).AndReturn(fake_console_output)
self.mox.ReplayAll()
@ -4018,7 +4021,7 @@ class ComputeHostAPITestCase(BaseTestCase):
self.host_api = compute_api.HostAPI()
def _rpc_call_stub(self, call_info):
def fake_rpc_call(context, topic, msg):
def fake_rpc_call(context, topic, msg, timeout=None):
call_info['context'] = context
call_info['topic'] = topic
call_info['msg'] = msg
@ -4034,7 +4037,8 @@ class ComputeHostAPITestCase(BaseTestCase):
self.assertEqual(call_info['topic'], 'compute.fake_host')
self.assertEqual(call_info['msg'],
{'method': 'set_host_enabled',
'args': {'enabled': 'fake_enabled'}})
'args': {'enabled': 'fake_enabled'},
'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION})
def test_host_power_action(self):
ctxt = context.RequestContext('fake', 'fake')
@ -4045,7 +4049,8 @@ class ComputeHostAPITestCase(BaseTestCase):
self.assertEqual(call_info['topic'], 'compute.fake_host')
self.assertEqual(call_info['msg'],
{'method': 'host_power_action',
'args': {'action': 'fake_action'}})
'args': {'action': 'fake_action'},
'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION})
def test_set_host_maintenance(self):
ctxt = context.RequestContext('fake', 'fake')
@ -4056,7 +4061,8 @@ class ComputeHostAPITestCase(BaseTestCase):
self.assertEqual(call_info['topic'], 'compute.fake_host')
self.assertEqual(call_info['msg'],
{'method': 'host_maintenance_mode',
'args': {'host': 'fake_host', 'mode': 'fake_mode'}})
'args': {'host': 'fake_host', 'mode': 'fake_mode'},
'version': compute_rpcapi.ComputeAPI.RPC_API_VERSION})
class KeypairAPITestCase(BaseTestCase):

View File

@ -0,0 +1,236 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for nova.compute.rpcapi
"""
from nova.compute import rpcapi as compute_rpcapi
from nova import context
from nova import flags
from nova import rpc
from nova import test
FLAGS = flags.FLAGS
class ComputeRpcAPITestCase(test.TestCase):
def setUp(self):
self.fake_instance = {'uuid': 'fake_uuid', 'host': 'fake_host'}
super(ComputeRpcAPITestCase, self).setUp()
def tearDown(self):
super(ComputeRpcAPITestCase, self).tearDown()
def _test_compute_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = compute_rpcapi.ComputeAPI()
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
if 'host_param' in expected_msg['args']:
host_param = expected_msg['args']['host_param']
del expected_msg['args']['host_param']
expected_msg['args']['host'] = host_param
elif 'host' in expected_msg['args']:
del expected_msg['args']['host']
if 'instance' in expected_msg['args']:
instance = expected_msg['args']['instance']
del expected_msg['args']['instance']
expected_msg['args']['instance_uuid'] = instance['uuid']
expected_msg['version'] = rpcapi.RPC_API_VERSION
cast_and_call = ['confirm_resize', 'stop_instance']
if rpc_method == 'call' and method in cast_and_call:
kwargs['cast'] = False
if 'host' in kwargs:
host = kwargs['host']
else:
host = kwargs['instance']['host']
expected_topic = '%s.%s' % (FLAGS.compute_topic, host)
self.fake_args = None
self.fake_kwargs = None
def _fake_rpc_method(*args, **kwargs):
self.fake_args = args
self.fake_kwargs = kwargs
if expected_retval:
return expected_retval
self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
expected_args = [ctxt, expected_topic, expected_msg]
for arg, expected_arg in zip(self.fake_args, expected_args):
self.assertEqual(arg, expected_arg)
def test_add_aggregate_host(self):
self._test_compute_api('add_aggregate_host', 'cast', aggregate_id='id',
host_param='host', host='host')
def test_add_fixed_ip_to_instance(self):
self._test_compute_api('add_fixed_ip_to_instance', 'cast',
instance=self.fake_instance, network_id='id')
def test_attach_volume(self):
self._test_compute_api('attach_volume', 'cast',
instance=self.fake_instance, volume_id='id', mountpoint='mp')
def test_confirm_resize_cast(self):
self._test_compute_api('confirm_resize', 'cast',
instance=self.fake_instance, migration_id='id', host='host')
def test_confirm_resize_call(self):
self._test_compute_api('confirm_resize', 'call',
instance=self.fake_instance, migration_id='id', host='host')
def test_detach_volume(self):
self._test_compute_api('detach_volume', 'cast',
instance=self.fake_instance, volume_id='id')
def test_get_console_output(self):
self._test_compute_api('get_console_output', 'call',
instance=self.fake_instance, tail_length='tl')
def test_get_diagnostics(self):
self._test_compute_api('get_diagnostics', 'call',
instance=self.fake_instance)
def test_get_vnc_console(self):
self._test_compute_api('get_vnc_console', 'call',
instance=self.fake_instance, console_type='type')
def test_host_maintenance_mode(self):
self._test_compute_api('host_maintenance_mode', 'call',
host_param='param', mode='mode', host='host')
def test_host_power_action(self):
self._test_compute_api('host_power_action', 'call', action='action',
host='host')
def test_inject_file(self):
self._test_compute_api('inject_file', 'cast',
instance=self.fake_instance, path='path', file_contents='fc')
def test_inject_network_info(self):
self._test_compute_api('inject_network_info', 'cast',
instance=self.fake_instance)
def test_lock_instance(self):
self._test_compute_api('lock_instance', 'cast',
instance=self.fake_instance)
def test_pause_instance(self):
self._test_compute_api('pause_instance', 'cast',
instance=self.fake_instance)
def test_power_off_instance(self):
self._test_compute_api('power_off_instance', 'cast',
instance=self.fake_instance)
def test_power_on_instance(self):
self._test_compute_api('power_on_instance', 'cast',
instance=self.fake_instance)
def test_reboot_instance(self):
self._test_compute_api('reboot_instance', 'cast',
instance=self.fake_instance, reboot_type='type')
def test_rebuild_instance(self):
self._test_compute_api('rebuild_instance', 'cast',
instance=self.fake_instance, new_pass='pass',
injected_files='files', image_ref='ref',
orig_image_ref='orig_ref')
def test_refresh_security_group_rules(self):
self._test_compute_api('refresh_security_group_rules', 'cast',
security_group_id='id', host='host')
def test_refresh_security_group_members(self):
self._test_compute_api('refresh_security_group_members', 'cast',
security_group_id='id', host='host')
def test_remove_aggregate_host(self):
self._test_compute_api('remove_aggregate_host', 'cast',
aggregate_id='id', host_param='host', host='host')
def test_remove_fixed_ip_from_instance(self):
self._test_compute_api('remove_fixed_ip_from_instance', 'cast',
instance=self.fake_instance, address='addr')
def test_rescue_instance(self):
self._test_compute_api('rescue_instance', 'cast',
instance=self.fake_instance, rescue_password='pw')
def test_reset_network(self):
self._test_compute_api('reset_network', 'cast',
instance=self.fake_instance)
def test_resume_instance(self):
self._test_compute_api('resume_instance', 'cast',
instance=self.fake_instance)
def test_revert_resize(self):
self._test_compute_api('revert_resize', 'cast',
instance=self.fake_instance, migration_id='id', host='host')
def test_set_admin_password(self):
self._test_compute_api('set_admin_password', 'cast',
instance=self.fake_instance, new_pass='pw')
def test_set_host_enabled(self):
self._test_compute_api('set_host_enabled', 'call',
enabled='enabled', host='host')
def test_snapshot_instance(self):
self._test_compute_api('snapshot_instance', 'cast',
instance=self.fake_instance, image_id='id', image_type='type',
backup_type='type', rotation='rotation')
def test_start_instance(self):
self._test_compute_api('start_instance', 'cast',
instance=self.fake_instance)
def test_stop_instance_cast(self):
self._test_compute_api('stop_instance', 'cast',
instance=self.fake_instance)
def test_stop_instance_call(self):
self._test_compute_api('stop_instance', 'call',
instance=self.fake_instance)
def test_suspend_instance(self):
self._test_compute_api('suspend_instance', 'cast',
instance=self.fake_instance)
def test_terminate_instance(self):
self._test_compute_api('terminate_instance', 'cast',
instance=self.fake_instance)
def test_unlock_instance(self):
self._test_compute_api('unlock_instance', 'cast',
instance=self.fake_instance)
def test_unpause_instance(self):
self._test_compute_api('unpause_instance', 'cast',
instance=self.fake_instance)
def test_unrescue_instance(self):
self._test_compute_api('unrescue_instance', 'cast',
instance=self.fake_instance)