Merge "Added Nova Instace Snapshot Operation and security group rules support"

This commit is contained in:
Jenkins 2017-06-02 14:04:10 +00:00 committed by Gerrit Code Review
commit 0cf63a5004
8 changed files with 726 additions and 299 deletions

View File

@ -1,2 +1 @@
google-api-python-client
oslo.service>=1.19.0

View File

@ -12,12 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from oslo_log import log as logging
from cinder.i18n import _LI
from cinder.i18n import _LI, _
from oslo_service import loopingcall
from oslo_utils import reflection
LOG = logging.getLogger(__name__)
@ -26,6 +29,46 @@ class GceOperationError(Exception):
pass
class _FixedIntervalWithTimeoutLoopingCall(loopingcall.LoopingCallBase):
"""A fixed interval looping call with timeout checking mechanism."""
_RUN_ONLY_ONE_MESSAGE = _("A fixed interval looping call with timeout"
" checking and can only run one function at"
" at a time")
_KIND = _('Fixed interval looping call with timeout checking.')
def start(self, interval, initial_delay=None, stop_on_exception=True,
timeout=0):
start_time = time.time()
def _idle_for(result, elapsed):
delay = round(elapsed - interval, 2)
if delay > 0:
func_name = reflection.get_callable_name(self.f)
LOG.warning('Function %(func_name)r run outlasted '
'interval by %(delay).2f sec',
{'func_name': func_name,
'delay': delay})
elapsed_time = time.time() - start_time
if timeout > 0 and elapsed_time > timeout:
raise loopingcall.LoopingCallTimeOut(
_('Looping call timed out after %.02f seconds') %
elapsed_time)
return -delay if delay < 0 else 0
return self._start(_idle_for, initial_delay=initial_delay,
stop_on_exception=stop_on_exception)
# Currently, default oslo.service version(newton) is 1.16.0.
# Once we upgrade oslo.service >= 1.19.0, we can remove temporary
# definition _FixedIntervalWithTimeoutLoopingCall
if not hasattr(loopingcall, 'FixedIntervalWithTimeoutLoopingCall'):
loopingcall.FixedIntervalWithTimeoutLoopingCall = \
_FixedIntervalWithTimeoutLoopingCall
def wait_for_operation(compute, project, operation, interval=1, timeout=60):
"""Wait for GCE operation to complete, raise error if operation failure
:param compute: GCE compute resource object using googleapiclient.discovery

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
from oslo_log import log as logging
from googleapiclient.discovery import build
@ -22,239 +20,6 @@ from oauth2client.client import GoogleCredentials
LOG = logging.getLogger(__name__)
def list_instances(compute, project, zone):
"""Returns list of GCE instance resources for specified project
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
"""
result = compute.instances().list(project=project, zone=zone).execute()
if 'items' not in result:
return []
return result['items']
def get_instance(compute, project, zone, instance):
"""Get GCE instance information
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param instance: string, Name of the GCE instance resource
"""
result = compute.instances().get(project=project, zone=zone,
instance=instance).execute()
return result
def get_instance_metadata(compute, project, zone, instance):
"""Returns specified instance's metadata
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param instance: string or instance resource, Name of the GCE instance
resource or GCE instance resource
"""
if isinstance(instance, six.string_types):
instance = get_instance(compute, project, zone, instance)
return instance['metadata']
def get_instances_metadata_key(compute, project, zone, instance, key):
"""Returns particular key information for specified instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param instance: string or instance resource, Name of the GCE instance
resource or GCE instance resource
:param key: string, Key to retrieved from the instance metadata
"""
metadata = get_instance_metadata(compute, project, zone, instance)
if 'items' in metadata:
for item in metadata['items']:
if item['key'] == key:
return item['value']
return None
def get_external_ip(compute, project, zone, instance):
""" Return external IP of GCE instance return empty string otherwise
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param instance: string or instance resource, Name of the GCE instance
resource or GCE instance resource
"""
if isinstance(instance, six.string_types):
instance = get_instance(compute, project, zone, instance)
for interface in instance.get('networkInterfaces', []):
for config in interface.get('accessConfigs', []):
if config['type'] == 'ONE_TO_ONE_NAT' and 'natIP' in config:
return config['natIP']
return ''
def set_instance_metadata(compute, project, zone, instance, items,
operation='add'):
"""Perform specified operation on GCE instance metadata
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param instance: string or instance resource, Name of the GCE instance
resource or GCE instance resource
:param items: list, List of items where each item is dictionary having
'key' and 'value' as its members
Refer following sample list,
[ {'key': 'openstack_id', 'value': '1224555'}, ]
:param operation: string, Operation to perform on instance metadata
"""
if not isinstance(items, list):
raise TypeError(
"set_instance_metadata: items should be instance of list")
metadata = get_instance_metadata(compute, project, zone, instance)
if operation == 'add':
if 'items' in metadata:
metadata['items'].extend(items)
else:
metadata['items'] = items
LOG.info("Adding metadata %s" % (metadata, ))
# TODO: Add del operation if required
return compute.instances().setMetadata(project=project, zone=zone,
instance=instance,
body=metadata).execute()
def create_instance(compute, project, zone, name, image_link, machine_link):
"""Create GCE instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, Name of instance to be launched
:param image_link: url, GCE Image link for instance launch
:param machine_link: url, GCE Machine link for instance launch
"""
# source_disk_image = "projects/%s/global/images/%s" % (
# "debian-cloud", "debian-8-jessie-v20170327")
# machine_link = "zones/%s/machineTypes/n1-standard-1" % zone
LOG.info("Launching instance %s with image %s and machine %s" %
(name, image_link, machine_link))
config = {
'kind':
'compute#instance',
'name':
name,
'machineType':
machine_link,
# Specify the boot disk and the image to use as a source.
'disks': [{
'boot': True,
'autoDelete': True,
'initializeParams': {
'sourceImage': image_link,
}
}],
# Specify a network interface with NAT to access the public
# internet.
'networkInterfaces': [{
'network':
'global/networks/default',
'accessConfigs': [{
'type': 'ONE_TO_ONE_NAT',
'name': 'External NAT'
}]
}],
# Allow the instance to access cloud storage and logging.
'serviceAccounts': [{
'email':
'default',
'scopes': [
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/logging.write',
'https://www.googleapis.com/auth/compute'
]
}],
}
return compute.instances().insert(project=project, zone=zone,
body=config).execute()
def delete_instance(compute, project, zone, name):
"""Delete GCE instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, Name of the GCE instance
"""
return compute.instances().delete(project=project, zone=zone,
instance=name).execute()
def stop_instance(compute, project, zone, name):
"""Stop GCE instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, Name of the GCE instance
"""
return compute.instances().stop(project=project, zone=zone,
instance=name).execute()
def start_instance(compute, project, zone, name):
"""Start GCE instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, Name of the GCE instance
"""
return compute.instances().start(project=project, zone=zone,
instance=name).execute()
def reset_instance(compute, project, zone, name):
"""Hard reset GCE instance
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, Name of the GCE instance
"""
return compute.instances().reset(project=project, zone=zone,
instance=name).execute()
def wait_for_operation(compute, project, zone, operation, interval=1,
timeout=60):
"""Wait for GCE operation to complete, raise error if operation failure
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param operation: object, Operation resource obtained by calling GCE API
:param interval: int, Time period(seconds) between two GCE operation checks
:param timeout: int, Absoulte time period(seconds) to monitor GCE operation
"""
operation_name = operation['name']
if interval < 1:
raise ValueError("wait_for_operation: Interval should be positive")
iterations = timeout / interval
for i in range(iterations):
result = compute.zoneOperations().get(
project=project, zone=zone, operation=operation_name).execute()
if result['status'] == 'DONE':
LOG.info("Operation %s status is %s" % (operation_name,
result['status']))
if 'error' in result:
raise Exception(result['error'])
return result
time.sleep(interval)
raise Exception(
"wait_for_operation: Operation %s failed to perform in timeout %s" %
(operation_name, timeout))
def get_gce_service(service_key):
"""Returns GCE compute resource object for interacting with GCE API
:param service_key: string, Path of service key obtained from
@ -265,24 +30,6 @@ def get_gce_service(service_key):
return service
def get_machines_info(compute, project, zone):
"""Return machine type info from GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
"""
response = compute.machineTypes().list(project=project,
zone=zone).execute()
GCE_MAP = {
machine_type['name']: {
'memory_mb': machine_type['memoryMb'],
'vcpus': machine_type['guestCpus']
}
for machine_type in response['items']
}
return GCE_MAP
def get_images(compute, project):
"""Return public images info from GCE
:param compute: GCE compute resource object using googleapiclient.discovery

View File

@ -13,10 +13,13 @@
# under the License.
import uuid
import time
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
from oslo_log import log as logging
from oslo_utils import reflection
from neutron_lib import exceptions as e
from neutron._i18n import _LI, _
@ -26,6 +29,46 @@ from six.moves import urllib
LOG = logging.getLogger(__name__)
class _FixedIntervalWithTimeoutLoopingCall(loopingcall.LoopingCallBase):
"""A fixed interval looping call with timeout checking mechanism."""
_RUN_ONLY_ONE_MESSAGE = _("A fixed interval looping call with timeout"
" checking and can only run one function at"
" at a time")
_KIND = _('Fixed interval looping call with timeout checking.')
def start(self, interval, initial_delay=None, stop_on_exception=True,
timeout=0):
start_time = time.time()
def _idle_for(result, elapsed):
delay = round(elapsed - interval, 2)
if delay > 0:
func_name = reflection.get_callable_name(self.f)
LOG.warning('Function %(func_name)r run outlasted '
'interval by %(delay).2f sec',
{'func_name': func_name,
'delay': delay})
elapsed_time = time.time() - start_time
if timeout > 0 and elapsed_time > timeout:
raise loopingcall.LoopingCallTimeOut(
_('Looping call timed out after %.02f seconds') %
elapsed_time)
return -delay if delay < 0 else 0
return self._start(_idle_for, initial_delay=initial_delay,
stop_on_exception=stop_on_exception)
# Currently, default oslo.service version(newton) is 1.16.0.
# Once we upgrade oslo.service >= 1.19.0, we can remove temporary
# definition _FixedIntervalWithTimeoutLoopingCall
if not hasattr(loopingcall, 'FixedIntervalWithTimeoutLoopingCall'):
loopingcall.FixedIntervalWithTimeoutLoopingCall = \
_FixedIntervalWithTimeoutLoopingCall
class GceOperationError(Exception):
pass
@ -333,3 +376,51 @@ def release_floatingip(compute, project, zone, floatingip):
accessConfig=accessconfig['name'],
networkInterface=interface['name']).execute()
wait_for_operation(compute, project, operation)
def create_firewall_rule(compute, project, body):
"""Create firewall rule in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param body: dict, Information required for creating firewall
Refer format at https://developers.google.com/resources/api-libraries/documentation/compute/beta/python/latest/compute_beta.firewalls.html#insert
:return: Operation information
:rtype: dict
"""
return compute.firewalls().insert(project=project, body=body).execute()
def update_firewall_rule(compute, project, name, body):
"""Update existing firewall rule in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE firewall name
:param body: dict, Information required for updating firewall
Refer format at https://developers.google.com/resources/api-libraries/documentation/compute/beta/python/latest/compute_beta.firewalls.html#update
:return: Operation information
:rtype: dict
"""
return compute.firewalls().update(project=project, firewall=name,
body=body).execute()
def delete_firewall_rule(compute, project, name):
"""Delete firewall rule in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE firewall name
:return: Operation information
:rtype: dict
"""
return compute.firewalls().delete(project=project, firewall=name).execute()
def get_firewall_rule(compute, project, name):
"""Get firewall rule info in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE firewall name
:return: Firewall info
:rtype: dict
"""
return compute.firewalls().get(project=project, firewall=name).execute()

View File

@ -17,14 +17,25 @@ import random
from oslo_log import log
import ipaddr
from neutron._i18n import _LI, _
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import gceconf
from neutron.common import gceutils
from neutron._i18n import _LI
from neutron.manager import NeutronManager
from neutron.plugins.ml2 import driver_api as api
from neutron_lib import exceptions as e
from neutron.extensions import securitygroup as sg
LOG = log.getLogger(__name__)
class SecurityGroupInvalidDirection(e.InvalidInput):
message = _("Security group rule for direction '%(direction)s' not "
"supported. Allowed values are %(values)s.")
class GceMechanismDriver(api.MechanismDriver):
"""Ml2 Mechanism driver for GCE"""
@ -40,6 +51,18 @@ class GceMechanismDriver(api.MechanismDriver):
LOG.info(
_LI("GCE Mechanism driver init with %s project, %s region") %
(self.gce_project, self.gce_region))
self._subscribe_events()
def _subscribe_events(self):
registry.subscribe(self.secgroup_callback, resources.SECURITY_GROUP,
events.BEFORE_DELETE)
registry.subscribe(self.secgroup_callback, resources.SECURITY_GROUP,
events.BEFORE_UPDATE)
registry.subscribe(self.secgroup_callback,
resources.SECURITY_GROUP_RULE, events.BEFORE_DELETE)
registry.subscribe(self.secgroup_callback,
resources.SECURITY_GROUP_RULE, events.BEFORE_UPDATE)
def _gce_network_name(self, context):
return 'net-' + context.current[api.ID]
@ -116,12 +139,175 @@ class GceMechanismDriver(api.MechanismDriver):
LOG.info(
_LI("Deleted subnet %s in region %s on GCE") % (name, region))
def _gce_secgrp_id(self, openstack_id):
return "secgrp-" + openstack_id
def _convert_secgrp_rule_to_gce(self, rule, network_link):
if rule['ethertype'] != 'IPv4':
raise sg.SecurityGroupRuleInvalidEtherType(
ethertype=rule['ethertype'], values=('IPv4', ))
gce_rule = {
'sourceRanges': [],
'sourceTags': [],
'targetTags': [],
'allowed': [{}],
'destinationRanges': [],
}
gce_rule['name'] = self._gce_secgrp_id(rule['id'])
gce_rule['network'] = network_link
directions = {
'ingress': 'INGRESS',
}
gce_protocols = ('tcp', 'udp', 'icmp', 'esp', 'ah', 'sctp')
if rule['direction'] in directions:
gce_rule['direction'] = directions[rule['direction']]
else:
raise SecurityGroupInvalidDirection(direction=rule['direction'],
values=directions.keys())
protocol = rule['protocol']
if protocol is None:
gce_rule['allowed'][0]['IPProtocol'] = 'all'
elif protocol in gce_protocols:
gce_rule['allowed'][0]['IPProtocol'] = protocol
# GCE allows port specification for tcp and udp only
if protocol in ('tcp', 'udp'):
ports = []
port_range_max = rule['port_range_max']
port_range_min = rule['port_range_min']
if port_range_max is None or port_range_min is None:
ports.append('0-65535')
elif port_range_max == port_range_min:
ports.append(str(port_range_max))
else:
ports.append("%s-%s" % (port_range_min, port_range_max))
gce_rule['allowed'][0]['ports'] = ports
else:
raise sg.SecurityGroupRuleInvalidProtocol(protocol=protocol,
values=gce_protocols)
if rule['remote_ip_prefix'] is None:
gce_rule['sourceRanges'].append('0.0.0.0/0')
else:
gce_rule['sourceRanges'].append(rule['remote_ip_prefix'])
return gce_rule
def _create_secgrp_rule(self, context, rule, network_link):
compute, project = self.gce_svc, self.gce_project
try:
gce_rule = self._convert_secgrp_rule_to_gce(rule, network_link)
except Exception as e:
LOG.exception(
"An error occured while creating security group: %s" % e)
return
LOG.info(_LI("Create GCE firewall rule %s") % gce_rule)
operation = gceutils.create_firewall_rule(compute, project, gce_rule)
gceutils.wait_for_operation(compute, project, operation)
def _update_secgrp_rule(self, context, rule_id):
compute, project = self.gce_svc, self.gce_project
name = self._gce_secgrp_id(rule_id)
try:
gce_firewall_info = gceutils.get_firewall_rule(
compute, project, name)
except gceutils.HttpError:
return
core_plugin = NeutronManager.get_plugin()
rule = core_plugin.get_security_group_rule(context, rule_id)
network_link = gce_firewall_info['network']
try:
gce_rule = self._convert_secgrp_rule_to_gce(rule, network_link)
LOG.info(_LI("Update GCE firewall rule %s") % name)
operation = gceutils.update_firewall_rule(compute, project, name,
gce_rule)
gceutils.wait_for_operation(compute, project, operation)
except Exception as e:
LOG.exception(
_LI("An error occured while updating security group: %s") % e)
LOG.error(_LI("Deleting existing GCE firewall rule %s") % name)
operation = gceutils.delete_firewall_rule(compute, project, name)
gceutils.wait_for_operation(compute, project, operation)
def _delete_secgrp_rule(self, context, rule_id):
name = self._gce_secgrp_id(rule_id)
compute, project = self.gce_svc, self.gce_project
try:
LOG.warn(
_LI("Delete existing GCE firewall rule %s,"
"as firewall rule update not GCE compatible.") % name)
operation = gceutils.delete_firewall_rule(compute, project, name)
gceutils.wait_for_operation(compute, project, operation)
except gceutils.HttpError:
pass
def _create_secgrp_rules_if_needed(self, context, secgrp_ids):
core_plugin = NeutronManager.get_plugin()
secgrp_rules = []
for secgrp_id in secgrp_ids:
secgrp = core_plugin.get_security_group(context._plugin_context,
secgrp_id)
secgrp_rules.extend(secgrp['security_group_rules'])
if secgrp_rules:
network_name = self._gce_subnet_network_name(context)
compute, project = self.gce_svc, self.gce_project
network = gceutils.get_network(compute, project, network_name)
network_link = network['selfLink']
for secgrp_rule in secgrp_rules:
try:
gce_rule_name = self._gce_secgrp_id(secgrp_rule['id'])
gceutils.get_firewall_rule(compute, project, gce_rule_name)
except gceutils.HttpError:
self._create_secgrp_rule(context, secgrp_rule,
network_link)
def _update_secgrp(self, context, secgrp_id):
core_plugin = NeutronManager.get_plugin()
secgrp = core_plugin.get_security_group(context, secgrp_id)
secgrp_rules = secgrp['security_group_rules']
for secgrp_rule in secgrp_rules:
self._update_secgrp_rule(context, secgrp_rule['id'])
def _delete_secgrp(self, context, secgrp_id):
core_plugin = NeutronManager.get_plugin()
secgrp = core_plugin.get_security_group(context, secgrp_id)
secgrp_rules = secgrp['security_group_rules']
for secgrp_rule in secgrp_rules:
self._delete_secgrp_rule(context, secgrp_rule['id'])
def bind_port(self, context):
fixed_ip_dict = dict()
if 'fixed_ips' in context.current:
if len(context.current['fixed_ips']) > 0:
if len(context.current['fixed_ips']):
fixed_ip_dict = context.current['fixed_ips'][0]
secgrp_ids = context.current['security_groups']
if secgrp_ids:
self._create_secgrp_rules_if_needed(context, secgrp_ids)
segment_id = random.choice(context.segments_to_bind)[api.ID]
context.set_binding(segment_id, "vip_type_a", fixed_ip_dict,
status='ACTIVE')
return True
def secgroup_callback(self, resource, event, trigger, **kwargs):
if resource == resources.SECURITY_GROUP_RULE:
context = kwargs['context']
if event == events.BEFORE_DELETE:
rule_id = kwargs['security_group_rule_id']
self._delete_secgrp_rule(context, rule_id)
elif event == events.BEFORE_UPDATE:
rule_id = kwargs['security_group_rule_id']
self._update_secgrp_rule(context, rule_id)
elif resource == resources.SECURITY_GROUP:
if event == events.BEFORE_DELETE:
context = kwargs['context']
security_group_id = kwargs.get('security_group_id')
if security_group_id:
self._delete_secgrp(context, security_group_id)
else:
LOG.warn(
_LI("Security group ID not found in delete request"))

View File

@ -1,3 +1,2 @@
google-api-python-client
ipaddr
oslo.service>=1.19.0

View File

@ -15,20 +15,23 @@
import hashlib
import uuid
import time
import nova.conf
from nova import exception
from nova.image import glance
from nova.i18n import _LI
from nova.virt import driver, hardware
from oslo_config import cfg
from oslo_log import log as logging
from nova.compute import task_states
import gceutils
from googleapiclient.errors import HttpError
from nova.virt.gce.constants import GCE_STATE_MAP
LOG = logging.getLogger(__name__)
gce_group = cfg.OptGroup(name='GCE',
title='Options to connect to Google cloud')
@ -227,11 +230,12 @@ class GCEDriver(driver.ComputeDriver):
attached to the instance.
"""
compute, project, zone = self.gce_svc, self.gce_project, self.gce_zone
# TODO: Use instance id as instance name
instance_name = instance.display_name
# GCE expects instance name in format "[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?"
# So we need to construct it for GCE from uuid
gce_instance_name = 'inst-' + instance.uuid
LOG.info(
_LI("Creating instance %s as %s on GCE.") % (instance.display_name,
instance_name))
gce_instance_name))
# Image Info
image_link = instance.system_metadata['image_gce_link']
# Flavor Info
@ -241,11 +245,11 @@ class GCEDriver(driver.ComputeDriver):
network_interfaces = self._process_network_info(network_info)
# Create Instance
operation = gceutils.create_instance(compute, project, zone,
instance_name, image_link,
gce_instance_name, image_link,
flavor_link, network_interfaces)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
gce_instance = gceutils.get_instance(compute, project, zone,
instance_name)
gce_instance_name)
# Update GCE info in openstack instance metadata
instance.metadata.update({'gce_id': gce_instance['name']})
gce_metadata = [
@ -260,17 +264,161 @@ class GCEDriver(driver.ComputeDriver):
operation = gceutils.set_instance_metadata(
compute, project, zone, gce_instance['name'], gce_metadata,
operation='add')
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
self._uuid_to_gce_instance[instance.uuid] = gceutils.get_instance(
compute, project, zone, instance_name)
compute, project, zone, gce_instance_name)
def snapshot(self, context, instance, image_id, update_task_state):
"""Snapshot an image of the specified instance
:param context: security context
:param instance: nova.objects.instance.Instance
:param image_id: Reference to a pre-created image holding the snapshot.
Steps:
1. Find boot disk
2. Stop instance
3. Create temporary boot disk snapshot
4. Start instance
5. Create temporary disk from snapshot
6. Create image from disk
7. Add Image info to glance
8. Delete temporary disk
9. Delete temporary snapshot
"""
raise NotImplementedError()
instance_stopped = False
temp_disk_snapshot = False
temp_disk_from_snapshot = False
image_created = False
compute, project, zone = self.gce_svc, self.gce_project, self.gce_zone
try:
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI("Taking snapshot of instance %s") % instance.uuid)
try:
boot_disk = gceutils.get_instance_boot_disk(
compute, project, zone, gce_id)
except AssertionError:
reason = "Unable to find boot disk from instance metadata %s" % instance.uuid
raise exception.InvalidMetadata(reason=reason)
disk_name = boot_disk['name']
LOG.debug(
_LI("1. Found boot disk %s for instance %s") % (disk_name,
instance.uuid))
operation = gceutils.stop_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, operation)
instance_stopped = True
LOG.debug(
_LI("2. Temporarily stopped instance %s") % instance.uuid)
snapshot_name = 'novasnap-' + disk_name + time.strftime("%s")
operation = gceutils.snapshot_disk(
compute, project, zone, boot_disk['name'], snapshot_name)
gceutils.wait_for_operation(compute, project, operation)
temp_disk_snapshot = True
LOG.debug(_LI("3. Created boot disk snapshot %s") % snapshot_name)
operation = gceutils.start_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, operation)
instance_stopped = False
LOG.debug(
_LI("4. Restart instance after disk snapshot %s") %
instance.uuid)
snapshot_disk_name = 'vol-' + snapshot_name
operation = gceutils.create_disk_from_snapshot(
compute, project, zone, snapshot_disk_name, snapshot_name)
gceutils.wait_for_operation(compute, project, operation)
snapshot_disk_info = gceutils.get_disk(compute, project, zone,
snapshot_disk_name)
temp_disk_from_snapshot = True
LOG.debug(
_LI("5. Created disk %s from snapshot %s") %
(snapshot_disk_name, snapshot_name))
update_task_state(task_state=task_states.IMAGE_PENDING_UPLOAD)
image_api = glance.get_default_image_service()
image_data = image_api.show(context, image_id)
name = image_data['name']
operation = gceutils.create_image_from_disk(
compute, project, name, snapshot_disk_info['selfLink'])
gceutils.wait_for_operation(compute, project, operation,
timeout=120)
image_created = True
LOG.debug(
_LI("6. Created image %s from disk %s") % (name,
snapshot_disk_name))
LOG.info(
_LI("Created GCE image %s from instance %s") % (name,
instance.uuid))
update_task_state(task_state=task_states.IMAGE_UPLOADING,
expected_state=task_states.IMAGE_PENDING_UPLOAD)
gce_img_data = gceutils.get_image(compute, project, name)
image_metadata = {
'name': name,
'container_format': 'bare',
'disk_format': 'raw',
'is_public': False,
'status': 'active',
'properties': {
'image_state': 'available',
'owner_id': instance.project_id,
'ramdisk_id': instance.ramdisk_id,
'location': 'gce://%s/%s/%s' % (project, name, image_id),
'gce_image_id': gce_img_data['id'],
'gce_link': gce_img_data['selfLink'],
'gce_size': gce_img_data['diskSizeGb']
},
}
image_api.update(context, image_id, image_metadata)
LOG.debug(_LI("7. Added image to glance %s") % name)
disk_operation = gceutils.delete_disk(compute, project, zone,
snapshot_disk_name)
snap_operation = gceutils.delete_snapshot(compute, project,
snapshot_name)
gceutils.wait_for_operation(compute, project, disk_operation)
temp_disk_from_snapshot = False
LOG.debug(_LI("8. Delete temporary disk %s") % snapshot_disk_name)
gceutils.wait_for_operation(compute, project, snap_operation)
temp_disk_snapshot = False
LOG.debug(
_LI("9. Delete temporary disk snapshot %s") % snapshot_name)
LOG.info(_LI("Completed snapshot for instance %s") % instance.uuid)
except Exception as e:
LOG.exception("An error occured during image creation: %s" % e)
if instance_stopped:
operation = gceutils.start_instance(compute, project, zone,
gce_id)
gceutils.wait_for_operation(compute, project, operation)
LOG.debug(
_LI("Restart instance after disk snapshot %s") %
instance.uuid)
if image_created:
LOG.info(
_LI("Rollback snapshot for instance %s, deleting image %s from GCE"
) % (instance.uuid, name))
operation = gceutils.delete_image(compute, project, name)
gceutils.wait_for_operation(compute, project, operation)
if temp_disk_from_snapshot:
disk_operation = gceutils.delete_disk(compute, project, zone,
snapshot_disk_name)
gceutils.wait_for_operation(compute, project, disk_operation)
LOG.debug(
_LI("Rollback snapshot for instace %s, delete temporary disk %s"
) % (instance.uuid, snapshot_disk_name))
if temp_disk_snapshot:
snap_operation = gceutils.delete_snapshot(
compute, project, snapshot_name)
gceutils.wait_for_operation(compute, project, snap_operation)
LOG.debug(
_LI("Rollback snapshot for instance %s, delete temporary disk snapshot %s"
) % (instance.uuid, snapshot_name))
raise e
def reboot(self, context, instance, network_info, reboot_type,
block_device_info=None, bad_volumes_callback=None):
@ -303,10 +451,10 @@ class GCEDriver(driver.ComputeDriver):
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI('Stopping instance %s') % instance.uuid)
operation = gceutils.stop_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI('Starting instance %s') % instance.uuid)
operation = gceutils.start_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI('Soft Reboot Complete for instance %s') % instance.uuid)
def _hard_reboot(self, context, instance, network_info,
@ -315,7 +463,7 @@ class GCEDriver(driver.ComputeDriver):
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI('Resetting instance %s') % instance.uuid)
operation = gceutils.reset_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI('Hard Reboot Complete %s') % instance.uuid)
@staticmethod
@ -370,7 +518,7 @@ class GCEDriver(driver.ComputeDriver):
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI('Stopping instance %s') % instance.uuid)
operation = gceutils.stop_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI('Power off complete %s') % instance.uuid)
def power_on(self, context, instance, network_info, block_device_info):
@ -379,7 +527,7 @@ class GCEDriver(driver.ComputeDriver):
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI('Starting instance %s') % instance.uuid)
operation = gceutils.start_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI("Power on Complete %s") % instance.uuid)
def soft_delete(self, instance):
@ -449,10 +597,25 @@ class GCEDriver(driver.ComputeDriver):
:param migrate_data: implementation specific params
"""
compute, project, zone = self.gce_svc, self.gce_project, self.gce_zone
gce_id = self._get_gce_id_from_instance(instance)
LOG.info(_LI('Deleting instance %s') % instance.uuid)
operation = gceutils.delete_instance(compute, project, zone, gce_id)
gceutils.wait_for_operation(compute, project, zone, operation)
try:
gce_id = self._get_gce_id_from_instance(instance)
except exception.InstanceNotFound:
LOG.error(
_LI("Unable to find GCE mapping for instance %s") %
instance.uuid)
return
try:
operation = gceutils.delete_instance(compute, project, zone,
gce_id)
except HttpError:
# Sometimes instance may not exist in GCE, in that case we just
# allow deleting VM from openstack
LOG.error(
_LI("Instance %s not found in GCE, removing from openstack.") %
instance.uuid)
return
gceutils.wait_for_operation(compute, project, operation)
LOG.info(_LI("Destroy Complete %s") % instance.uuid)
def attach_volume(self, context, connection_info, instance, mountpoint,
@ -466,7 +629,7 @@ class GCEDriver(driver.ComputeDriver):
disk_link = gce_volume['selfLink']
operation = gceutils.attach_disk(compute, project, zone, gce_id,
disk_name, disk_link)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(
_LI("Volume %s attached to instace %s") % (disk_name,
instance.uuid))
@ -481,7 +644,7 @@ class GCEDriver(driver.ComputeDriver):
disk_name = gce_volume['name']
operation = gceutils.detach_disk(compute, project, zone, gce_id,
disk_name)
gceutils.wait_for_operation(compute, project, zone, operation)
gceutils.wait_for_operation(compute, project, operation)
LOG.info(
_LI("Volume %s detached from instace %s") % (disk_name,
instance.uuid))

View File

@ -12,17 +12,64 @@
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
import six
from oslo_log import log as logging
from nova.i18n import _LI
from nova.i18n import _LI, _
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
from oslo_service import loopingcall
from oslo_utils import reflection
from six.moves import urllib
LOG = logging.getLogger(__name__)
class _FixedIntervalWithTimeoutLoopingCall(loopingcall.LoopingCallBase):
"""A fixed interval looping call with timeout checking mechanism."""
_RUN_ONLY_ONE_MESSAGE = _("A fixed interval looping call with timeout"
" checking and can only run one function at"
" at a time")
_KIND = _('Fixed interval looping call with timeout checking.')
def start(self, interval, initial_delay=None, stop_on_exception=True,
timeout=0):
start_time = time.time()
def _idle_for(result, elapsed):
delay = round(elapsed - interval, 2)
if delay > 0:
func_name = reflection.get_callable_name(self.f)
LOG.warning('Function %(func_name)r run outlasted '
'interval by %(delay).2f sec',
{'func_name': func_name,
'delay': delay})
elapsed_time = time.time() - start_time
if timeout > 0 and elapsed_time > timeout:
raise loopingcall.LoopingCallTimeOut(
_('Looping call timed out after %.02f seconds') %
elapsed_time)
return -delay if delay < 0 else 0
return self._start(_idle_for, initial_delay=initial_delay,
stop_on_exception=stop_on_exception)
# Currently, default oslo.service version(newton) is 1.16.0.
# Once we upgrade oslo.service >= 1.19.0, we can remove temporary
# definition _FixedIntervalWithTimeoutLoopingCall
if not hasattr(loopingcall, 'FixedIntervalWithTimeoutLoopingCall'):
loopingcall.FixedIntervalWithTimeoutLoopingCall = \
_FixedIntervalWithTimeoutLoopingCall
class GceOperationError(Exception):
pass
def list_instances(compute, project, zone):
"""Returns list of GCE instance resources for specified project
:param compute: GCE compute resource object using googleapiclient.discovery
@ -210,33 +257,42 @@ def reset_instance(compute, project, zone, name):
instance=name).execute()
def wait_for_operation(compute, project, zone, operation, interval=1,
timeout=60):
def wait_for_operation(compute, project, operation, interval=1, timeout=60):
"""Wait for GCE operation to complete, raise error if operation failure
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param operation: object, Operation resource obtained by calling GCE API
:param operation: object, Operation resource obtained by calling GCE asynchronous API
All GCE asynchronous API's return operation resource to followup there completion.
:param interval: int, Time period(seconds) between two GCE operation checks
:param timeout: int, Absoulte time period(seconds) to monitor GCE operation
"""
operation_name = operation['name']
if interval < 1:
raise ValueError("wait_for_operation: Interval should be positive")
iterations = timeout / interval
for i in range(iterations):
result = compute.zoneOperations().get(
project=project, zone=zone, operation=operation_name).execute()
def watch_operation(name, request):
result = request.execute()
if result['status'] == 'DONE':
LOG.info("Operation %s status is %s" % (operation_name,
result['status']))
LOG.info(
_LI("Operation %s status is %s") % (name, result['status']))
if 'error' in result:
raise Exception(result['error'])
return result
time.sleep(interval)
raise Exception(
"wait_for_operation: Operation %s failed to perform in timeout %s" %
(operation_name, timeout))
raise GceOperationError(result['error'])
raise loopingcall.LoopingCallDone()
operation_name = operation['name']
if 'zone' in operation:
zone = operation['zone'].split('/')[-1]
monitor_request = compute.zoneOperations().get(
project=project, zone=zone, operation=operation_name)
elif 'region' in operation:
region = operation['region'].split('/')[-1]
monitor_request = compute.regionOperations().get(
project=project, region=region, operation=operation_name)
else:
monitor_request = compute.globalOperations().get(
project=project, operation=operation_name)
timer = loopingcall.FixedIntervalWithTimeoutLoopingCall(
watch_operation, operation_name, monitor_request)
timer.start(interval=interval, timeout=timeout).wait()
def get_gce_service(service_key):
@ -289,6 +345,18 @@ def get_image(compute, project, name):
return result
def delete_image(compute, project, name):
"""Delete image from GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE image name
:return: Operation information
:rtype: dict
"""
result = compute.images().delete(project=project, image=name).execute()
return result
def get_network(compute, project, name):
"""Return network info
:param compute: GCE compute resource object using googleapiclient.discovery
@ -337,3 +405,134 @@ def detach_disk(compute, project, zone, instance_name, disk_name):
return compute.instances().detachDisk(project=project, zone=zone,
instance=instance_name,
deviceName=disk_name).execute()
def get_instance_boot_disk(compute, project, zone, instance):
"""Return boot disk info for instance
"""
gce_instance = get_instance(compute, project, zone, instance)
for disk in gce_instance['disks']:
if disk['boot']:
disk_url = disk['source']
# Extracting disk details from disk URL,
# Eg. projects/<project>/zones/<zone>/disks/<disk_name>
items = urllib.parse.urlparse(disk_url).path.strip('/').split('/')
if len(items) < 4 or items[-2] != 'disks':
LOG.error(_LI('Invalid disk URL %s') % (disk_url))
disk_name, zone = items[-1], items[-3]
disk_info = get_disk(compute, project, zone, disk_name)
return disk_info
# We should never reach here
raise AssertionError("Boot disk not found for instance %s" % instance)
def create_disk(compute, project, zone, name, size):
"""Create disk in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, GCE disk name
:param size: int, size of disk inn Gb
:return: Operation information
:rtype: dict
"""
body = {
"name": name,
"zone": "projects/%s/zones/%s" % (project, zone),
"type": "projects/%s/zones/%s/diskTypes/pd-standard" % (project, zone),
"sizeGb": size
}
return compute.disks().insert(project=project, zone=zone, body=body,
sourceImage=None).execute()
def delete_disk(compute, project, zone, name):
"""Delete disk in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, GCE disk name
:return: Operation information
:rtype: dict
"""
return compute.disks().delete(project=project, zone=zone,
disk=name).execute()
def get_disk(compute, project, zone, name):
"""Get info of disk in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, GCE disk name
:return: GCE disk information
:rtype: dict
"""
return compute.disks().get(project=project, zone=zone, disk=name).execute()
def snapshot_disk(compute, project, zone, name, snapshot_name):
"""Create snapshot of disk in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, GCE disk name
:param snapshot_name: string, GCE snapshot name
:return: Operation information
:rtype: dict
"""
body = {"name": snapshot_name}
return compute.disks().createSnapshot(project=project, zone=zone,
disk=name, body=body).execute()
def get_snapshot(compute, project, name):
"""Get info of snapshot in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE snapshot name
:return: GCE snapshot information
:rtype: dict
"""
return compute.snapshots().get(project=project, snapshot=name).execute()
def delete_snapshot(compute, project, name):
"""Delete snapshot in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param name: string, GCE snapshot name
:return: Operation information
:rtype: dict
"""
return compute.snapshots().delete(project=project, snapshot=name).execute()
def create_disk_from_snapshot(compute, project, zone, name, snapshot_name,
disk_type="pd-standard"):
"""Create disk from snapshot in GCE
:param compute: GCE compute resource object using googleapiclient.discovery
:param project: string, GCE Project Id
:param zone: string, GCE Name of zone
:param name: string, GCE disk name
:param snapshot_name: string, GCE snapshot name
:param disk_type: string, Disk type from (pd-standard, pd-sdd, local-ssd)
:return: Operation information
:rtype: dict
"""
gce_snapshot = get_snapshot(compute, project, snapshot_name)
body = {
"name": name,
"zone": "projects/%s/zones/%s" % (project, zone),
"type": "projects/%s/zones/%s/diskTypes/%s" % (project, zone,
disk_type),
"sourceSnapshot": gce_snapshot["selfLink"],
"sizeGb": gce_snapshot["diskSizeGb"]
}
return compute.disks().insert(project=project, zone=zone, body=body,
sourceImage=None).execute()
def create_image_from_disk(compute, project, name, disk_link):
body = {"sourceDisk": disk_link, "name": name, "rawDisk": {}}
return compute.images().insert(project=project, body=body).execute()