Add session support for os-xenapi

Change-Id: I18687e701468d5fdb85cdbc02549d051defec507
This commit is contained in:
Huan Xie 2016-10-19 02:12:37 -07:00
parent 8a15b4ba4f
commit 95fa8e6929
12 changed files with 1005 additions and 1 deletions

5
exclusion_py3.txt Normal file
View File

@ -0,0 +1,5 @@
# The XenAPI plugins run in a Python 2 environment, so avoid attempting
# to run their unit tests in a Python 3 environment
os_xenapi.tests.client.test_session.CallPluginTestCase
os_xenapi.tests.client.test_session.SessionTestCase

View File

View File

@ -0,0 +1,55 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_xenapi.client.i18n import _
class OsXenApiException(Exception):
"""Base OsXenapi Exception
To correctly use this class, inherit from it and define
a 'msg_fmt' property. That msg_fmt will get printf'd
with the keyword arguments provided to the constructor.
"""
msg_fmt = _("An unknown exception occurred.")
code = 500
def __init__(self, message=None, **kwargs):
self.kwargs = kwargs
if 'code' not in self.kwargs:
try:
self.kwargs['code'] = self.code
except AttributeError:
pass
if not message:
message = self.msg_fmt % kwargs
self.message = message
super(OsXenApiException, self).__init__(message)
def format_message(self):
# NOTE(mrodden): use the first argument to the python Exception object
# which should be our full NovaException message, (see __init__)
return self.args[0]
class PluginRetriesExceeded(OsXenApiException):
msg_fmt = _("Number of retries to plugin (%(num_retries)d) exceeded.")
class SessionLoginTimeout(OsXenApiException):
msg_fmt = _("Unable to log in to XenAPI (is the Dom0 disk full?)")

46
os_xenapi/client/i18n.py Normal file
View File

@ -0,0 +1,46 @@
# Copyright 2016 Citrix.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo.i18n integration module.
See http://docs.openstack.org/developer/oslo.i18n/usage.html .
"""
import oslo_i18n
DOMAIN = 'os-xenapi'
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
def translate(value, user_locale):
return oslo_i18n.translate(value, user_locale)
def get_available_languages():
return oslo_i18n.get_available_languages(DOMAIN)

150
os_xenapi/client/objects.py Normal file
View File

@ -0,0 +1,150 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency import lockutils
synchronized = lockutils.synchronized_with_prefix('os-xenapi-')
class XenAPISessionObject(object):
"""Wrapper to make calling and mocking the session easier
The XenAPI protocol is an XML RPC API that is based around the
XenAPI database, and operations you can do on each of the objects
stored in the database, such as VM, SR, VDI, etc.
For more details see the XenAPI docs:
http://docs.vmd.citrix.com/XenServer/6.2.0/1.0/en_gb/api/
Most, objects like VM, SR, VDI, etc, share a common set of methods:
* vm_ref = session.VM.create(vm_rec)
* vm_ref = session.VM.get_by_uuid(uuid)
* session.VM.destroy(vm_ref)
* vm_refs = session.VM.get_all()
Each object also has specific messages, or functions, such as:
* session.VM.clean_reboot(vm_ref)
Each object has fields, like "VBDs" that can be fetched like this:
* vbd_refs = session.VM.get_VBDs(vm_ref)
You can get all the fields by fetching the full record.
However please note this is much more expensive than just
fetching the field you require:
* vm_rec = session.VM.get_record(vm_ref)
When searching for particular objects, you may be tempted
to use get_all(), but this often leads to races as objects
get deleted under your feet. It is preferable to use the undocumented:
* vms = session.VM.get_all_records_where(
'field "is_control_domain"="true"')
"""
def __init__(self, session, name):
self.session = session
self.name = name
def _call_method(self, method_name, *args):
call = "%s.%s" % (self.name, method_name)
return self.session.call_xenapi(call, *args)
def __getattr__(self, method_name):
return lambda *params: self._call_method(method_name, *params)
class VM(XenAPISessionObject):
"""Virtual Machine."""
def __init__(self, session):
super(VM, self).__init__(session, "VM")
class VBD(XenAPISessionObject):
"""Virtual block device."""
def __init__(self, session):
super(VBD, self).__init__(session, "VBD")
def plug(self, vbd_ref, vm_ref):
@synchronized('vbd-' + vm_ref)
def synchronized_plug():
self._call_method("plug", vbd_ref)
# NOTE(johngarbutt) we need to ensure there is only ever one
# VBD.unplug or VBD.plug happening at once per VM
# due to a bug in XenServer 6.1 and 6.2
synchronized_plug()
def unplug(self, vbd_ref, vm_ref):
@synchronized('vbd-' + vm_ref)
def synchronized_unplug():
self._call_method("unplug", vbd_ref)
# NOTE(johngarbutt) we need to ensure there is only ever one
# VBD.unplug or VBD.plug happening at once per VM
# due to a bug in XenServer 6.1 and 6.2
synchronized_unplug()
class VDI(XenAPISessionObject):
"""Virtual disk image."""
def __init__(self, session):
super(VDI, self).__init__(session, "VDI")
class VIF(XenAPISessionObject):
"""Virtual Network Interface."""
def __init__(self, session):
super(VIF, self).__init__(session, "VIF")
class SR(XenAPISessionObject):
"""Storage Repository."""
def __init__(self, session):
super(SR, self).__init__(session, "SR")
class PBD(XenAPISessionObject):
"""Physical block device."""
def __init__(self, session):
super(PBD, self).__init__(session, "PBD")
class PIF(XenAPISessionObject):
"""Physical Network Interface."""
def __init__(self, session):
super(PIF, self).__init__(session, "PIF")
class VLAN(XenAPISessionObject):
"""VLAN."""
def __init__(self, session):
super(VLAN, self).__init__(session, "VLAN")
class Host(XenAPISessionObject):
"""XenServer hosts."""
def __init__(self, session):
super(Host, self).__init__(session, "host")
class Network(XenAPISessionObject):
"""Networks that VIFs are attached to."""
def __init__(self, session):
super(Network, self).__init__(session, "network")
class Pool(XenAPISessionObject):
"""Pool of hosts."""
def __init__(self, session):
super(Pool, self).__init__(session, "pool")

373
os_xenapi/client/session.py Normal file
View File

@ -0,0 +1,373 @@
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import contextlib
try:
import cPickle as pickle
except ImportError:
import pickle
import errno
import socket
import time
from eventlet import queue
from eventlet import timeout
from oslo_log import log as logging
from oslo_utils import versionutils
from six.moves import http_client
from six.moves import range
from six.moves import urllib
try:
import xmlrpclib
except ImportError:
import six.moves.xmlrpc_client as xmlrpclib
from os_xenapi.client import exception
from os_xenapi.client.i18n import _
from os_xenapi.client.i18n import _LW
from os_xenapi.client import objects as cli_objects
LOG = logging.getLogger(__name__)
def apply_session_helpers(session):
session.VM = cli_objects.VM(session)
session.SR = cli_objects.SR(session)
session.VDI = cli_objects.VDI(session)
session.VIF = cli_objects.VIF(session)
session.VBD = cli_objects.VBD(session)
session.PBD = cli_objects.PBD(session)
session.PIF = cli_objects.PIF(session)
session.VLAN = cli_objects.VLAN(session)
session.host = cli_objects.Host(session)
session.network = cli_objects.Network(session)
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls."""
# This is not a config option as it should only ever be
# changed in development environments.
# MAJOR VERSION: Incompatible changes with the plugins
# MINOR VERSION: Compatible changes, new plguins, etc
PLUGIN_REQUIRED_VERSION = '1.8'
def __init__(self, url, user, pw, originator="os-xenapi", timeout=10,
concurrent=5):
"""Initialize session for connection with XenServer/Xen Cloud Platform
:param url: URL for connection to XenServer/Xen Cloud Platform
:param user: Username for connection to XenServer/Xen Cloud Platform
:param pw: Password for connection to XenServer/Xen Cloud Platform
:param originator: Specify the caller for this API
:param timeout: Timeout in seconds for XenAPI login
:param concurrent: Maximum concurrent XenAPI connections
"""
import XenAPI
self.XenAPI = XenAPI
self.originator = originator
self.timeout = timeout
self.concurrent = concurrent
self._sessions = queue.Queue()
self.host_checked = False
self.url = self._create_first_session(url, user, pw)
self._populate_session_pool(url, user, pw)
self.host_uuid = self._get_host_uuid()
self.host_ref = self._get_host_ref()
self.product_version, self.product_brand = \
self._get_product_version_and_brand()
# TODO(huanxie) Uncomment _verify_plugin_version() in the future
# self._verify_plugin_version()
self.platform_version = self._get_platform_version()
self._cached_xsm_sr_relaxed = None
apply_session_helpers(self)
def _login_with_password(self, user, pw, session):
login_exception = exception.SessionLoginTimeout
with timeout.Timeout(self.timeout, login_exception):
session.login_with_password(user, pw, self.originator)
def _verify_plugin_version(self):
requested_version = self.PLUGIN_REQUIRED_VERSION
current_version = self.call_plugin_serialized(
'dom0_plugin_version.py', 'get_version')
# v2.0 is the same as v1.8, with no version bumps. Remove this once
# Ocata is released
if requested_version == '2.0' and current_version == '1.8':
return
if not versionutils.is_compatible(requested_version, current_version):
raise exception.OsXenApiException(
_("Plugin version mismatch (Expected %(exp)s, got %(got)s)") %
{'exp': requested_version, 'got': current_version})
def _create_first_session(self, url, user, pw):
try:
session = self._create_session_and_login(url, user, pw)
except exception.SessionLoginTimeout:
raise
self._sessions.put(session)
return url
def _populate_session_pool(self, url, user, pw):
for i in range(self.concurrent - 1):
session = self._create_session_and_login(url, user, pw)
self._sessions.put(session)
def _get_host_uuid(self):
with self._get_session() as session:
host_ref = session.xenapi.session.get_this_host(session.handle)
return session.xenapi.host.get_uuid(host_ref)
def _get_product_version_and_brand(self):
"""Return tuple of (major, minor, rev)
This tuple is for host version and product brand.
"""
software_version = self._get_software_version()
product_version_str = software_version.get('product_version')
# Product version is only set in some cases (e.g. XCP, XenServer) and
# not in others (e.g. xenserver-core, XAPI-XCP).
# In these cases, the platform version is the best number to use.
if product_version_str is None:
product_version_str = software_version.get('platform_version',
'0.0.0')
product_brand = software_version.get('product_brand')
product_version =\
versionutils.convert_version_to_tuple(product_version_str)
return product_version, product_brand
def _get_platform_version(self):
"""Return a tuple of (major, minor, rev) for the host version"""
software_version = self._get_software_version()
platform_version_str = software_version.get('platform_version',
'0.0.0')
platform_version = versionutils.convert_version_to_tuple(
platform_version_str)
return platform_version
def _get_software_version(self):
return self.call_xenapi('host.get_software_version', self.host_ref)
def get_session_id(self):
"""Return a string session_id. Used for vnc consoles."""
with self._get_session() as session:
return str(session._session)
@contextlib.contextmanager
def _get_session(self):
"""Return exclusive session for scope of with statement."""
session = self._sessions.get()
try:
yield session
finally:
self._sessions.put(session)
def _get_host_ref(self):
"""Return the xenapi host on which nova-compute runs on."""
with self._get_session() as session:
return session.xenapi.host.get_by_uuid(self.host_uuid)
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread."""
with self._get_session() as session:
return session.xenapi_request(method, args)
def call_plugin(self, plugin, fn, args):
"""Call host.call_plugin on a background thread."""
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
# TODO(sfinucan): Once the required plugin version is bumped to v2.0,
# we can assume that all files will have a '.py' extension. Until then,
# handle hosts without this extension by rewriting all calls to plugins
# to exclude the '.py' extension. This is made possible through the
# temporary inclusion of symlinks to plugins.
# NOTE(sfinucan): 'partition_utils.py' was the only plugin with a '.py'
# extension before this change was enacted, hence this plugin is
# excluded
if not plugin == 'partition_utils.py':
plugin = plugin.rstrip('.py')
with self._get_session() as session:
return self._unwrap_plugin_exceptions(
session.xenapi.host.call_plugin,
self.host_ref, plugin, fn, args)
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
params = {'params': pickle.dumps(dict(args=args, kwargs=kwargs))}
rv = self.call_plugin(plugin, fn, params)
return pickle.loads(rv)
def call_plugin_serialized_with_retry(self, plugin, fn, num_retries,
callback, retry_cb=None, *args,
**kwargs):
"""Allows a plugin to raise RetryableError so we can try again."""
attempts = num_retries + 1
sleep_time = 0.5
for attempt in range(1, attempts + 1):
try:
if attempt > 1:
time.sleep(sleep_time)
sleep_time = min(2 * sleep_time, 15)
callback_result = None
if callback:
callback_result = callback(kwargs)
msg = ('%(plugin)s.%(fn)s attempt %(attempt)d/%(attempts)d, '
'callback_result: %(callback_result)s')
LOG.debug(msg,
{'plugin': plugin, 'fn': fn, 'attempt': attempt,
'attempts': attempts,
'callback_result': callback_result})
return self.call_plugin_serialized(plugin, fn, *args, **kwargs)
except self.XenAPI.Failure as exc:
if self._is_retryable_exception(exc, fn):
LOG.warning(_LW('%(plugin)s.%(fn)s failed. '
'Retrying call.'),
{'plugin': plugin, 'fn': fn})
if retry_cb:
retry_cb(exc=exc)
else:
raise
except socket.error as exc:
if exc.errno == errno.ECONNRESET:
LOG.warning(_LW('Lost connection to XenAPI during call to '
'%(plugin)s.%(fn)s. Retrying call.'),
{'plugin': plugin, 'fn': fn})
if retry_cb:
retry_cb(exc=exc)
else:
raise
raise exception.PluginRetriesExceeded(num_retries=num_retries)
def _is_retryable_exception(self, exc, fn):
_type, method, error = exc.details[:3]
if error == 'RetryableError':
LOG.debug("RetryableError, so retrying %(fn)s", {'fn': fn},
exc_info=True)
return True
if "signal" in method:
LOG.debug("Error due to a signal, retrying %(fn)s", {'fn': fn},
exc_info=True)
return True
else:
return False
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
self.is_local_connection = url == "unix://local"
if self.is_local_connection:
return self.XenAPI.xapi_local()
return self.XenAPI.Session(url)
def _create_session_and_login(self, url, user, pw):
session = self._create_session(url)
self._login_with_password(user, pw, session)
return session
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
"""Parse exception details."""
try:
return func(*args, **kwargs)
except self.XenAPI.Failure as exc:
LOG.debug("Got exception: %s", exc)
if (len(exc.details) == 4 and
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exc.details[2] == 'Failure'):
params = None
try:
params = ast.literal_eval(exc.details[3])
except Exception:
raise exc
raise self.XenAPI.Failure(params)
else:
raise
except xmlrpclib.ProtocolError as exc:
LOG.debug("Got exception: %s", exc)
raise
def get_rec(self, record_type, ref):
try:
return self.call_xenapi('%s.get_record' % record_type, ref)
except self.XenAPI.Failure as e:
if e.details[0] != 'HANDLE_INVALID':
raise
return None
def get_all_refs_and_recs(self, record_type):
"""Retrieve all refs and recs for a Xen record type.
Handles race-conditions where the record may be deleted between
the `get_all` call and the `get_record` call.
"""
return self.call_xenapi('%s.get_all_records' % record_type).items()
@contextlib.contextmanager
def custom_task(self, label, desc=''):
"""Return exclusive session for scope of with statement."""
name = '%s-%s' % (self.originator, label)
task_ref = self.call_xenapi("task.create", name, desc)
try:
LOG.debug('Created task %s with ref %s' % (name, task_ref))
yield task_ref
finally:
self.call_xenapi("task.destroy", task_ref)
LOG.debug('Destroyed task ref %s' % task_ref)
@contextlib.contextmanager
def http_connection(self, session):
conn = None
xs_url = urllib.parse.urlparse(session.url)
LOG.debug("Creating http(s) connection to %s" % session.url)
if xs_url.scheme == 'http':
conn = http_client.HTTPConnection(xs_url.netloc)
elif xs_url.scheme == 'https':
conn = http_client.HTTPSConnection(xs_url.netloc)
conn.connect()
try:
yield conn
finally:
conn.close()
def is_xsm_sr_check_relaxed(self):
if self._cached_xsm_sr_relaxed is None:
config_value = self.call_plugin('config_file', 'get_val',
key='relax-xsm-sr-check')
if not config_value:
version_str = '.'.join(str(v) for v in self.platform_version)
if versionutils.is_compatible('2.1.0', version_str,
same_major=False):
self._cached_xsm_sr_relaxed = True
else:
self._cached_xsm_sr_relaxed = False
else:
self._cached_xsm_sr_relaxed = config_value.lower() == 'true'
return self._cached_xsm_sr_relaxed

View File

View File

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_os_xenapi
----------------------------------
Tests for `os_xenapi objects` module.
"""
import mock
from os_xenapi.client import objects
from os_xenapi.tests import base
class XenAPISessionObjectTestCase(base.TestCase):
def setUp(self):
super(XenAPISessionObjectTestCase, self).setUp()
self.session = mock.Mock()
self.obj = objects.XenAPISessionObject(self.session, "FAKE")
def test_call_method_via_attr(self):
self.session.call_xenapi.return_value = "asdf"
result = self.obj.get_X("ref")
self.assertEqual(result, "asdf")
self.session.call_xenapi.assert_called_once_with("FAKE.get_X", "ref")
class ObjectsTestCase(base.TestCase):
def setUp(self):
super(ObjectsTestCase, self).setUp()
self.session = mock.Mock()
def test_VM(self):
vm = objects.VM(self.session)
vm.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VM.get_X", "ref")
def test_SR(self):
sr = objects.SR(self.session)
sr.get_X("ref")
self.session.call_xenapi.assert_called_once_with("SR.get_X", "ref")
def test_VDI(self):
vdi = objects.VDI(self.session)
vdi.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VDI.get_X", "ref")
def test_VIF(self):
vdi = objects.VIF(self.session)
vdi.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VIF.get_X", "ref")
def test_VBD(self):
vbd = objects.VBD(self.session)
vbd.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VBD.get_X", "ref")
def test_PBD(self):
pbd = objects.PBD(self.session)
pbd.get_X("ref")
self.session.call_xenapi.assert_called_once_with("PBD.get_X", "ref")
def test_PIF(self):
pif = objects.PIF(self.session)
pif.get_X("ref")
self.session.call_xenapi.assert_called_once_with("PIF.get_X", "ref")
def test_VLAN(self):
vlan = objects.VLAN(self.session)
vlan.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VLAN.get_X", "ref")
def test_host(self):
host = objects.Host(self.session)
host.get_X("ref")
self.session.call_xenapi.assert_called_once_with("host.get_X", "ref")
def test_network(self):
network = objects.Network(self.session)
network.get_X("ref")
self.session.call_xenapi.assert_called_once_with("network.get_X",
"ref")
def test_pool(self):
pool = objects.Pool(self.session)
pool.get_X("ref")
self.session.call_xenapi.assert_called_once_with("pool.get_X", "ref")
class VBDTestCase(base.TestCase):
def setUp(self):
super(VBDTestCase, self).setUp()
self.session = mock.Mock()
self.session.VBD = objects.VBD(self.session)
self.utils = mock.Mock()
def test_plug(self):
self.session.VBD.plug("vbd_ref", "vm_ref")
self.session.call_xenapi.assert_called_once_with("VBD.plug", "vbd_ref")
def test_unplug(self):
self.session.VBD.unplug("vbd_ref", "vm_ref")
self.session.call_xenapi.assert_called_once_with("VBD.unplug",
"vbd_ref")
@mock.patch.object(objects, 'synchronized')
def test_vbd_plug_check_synchronized(self, mock_synchronized):
self.session.VBD.plug("vbd_ref", "vm_ref")
mock_synchronized.assert_called_once_with("vbd-vm_ref")
@mock.patch.object(objects, 'synchronized')
def test_vbd_unplug_check_synchronized(self, mock_synchronized):
self.session.VBD.unplug("vbd_ref", "vm_ref")
mock_synchronized.assert_called_once_with("vbd-vm_ref")

View File

@ -0,0 +1,232 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import socket
import mock
from os_xenapi.client import exception
from os_xenapi.client import session
from os_xenapi.tests import base
class SessionTestCase(base.TestCase):
@mock.patch.object(session.XenAPISession, '_get_platform_version')
@mock.patch.object(session.XenAPISession, '_create_session')
@mock.patch.object(session.XenAPISession, '_get_product_version_and_brand')
def test_session_nova_originator(self, mock_version_and_brand,
mock_create_session,
mock_platform_version):
concurrent = 2
originator = 'os-xenapi-nova'
timeout = 10
sess = mock.Mock()
mock_create_session.return_value = sess
mock_version_and_brand.return_value = ('6.5', 'XenServer')
mock_platform_version.return_value = (2, 1, 0)
session.XenAPISession('http://someserver', 'username', 'password',
originator=originator, concurrent=concurrent,
timeout=timeout)
sess.login_with_password.assert_called_with('username', 'password',
originator)
@mock.patch.object(session.XenAPISession, '_get_platform_version')
@mock.patch('eventlet.timeout.Timeout')
@mock.patch.object(session.XenAPISession, '_create_session')
@mock.patch.object(session.XenAPISession, '_get_product_version_and_brand')
def test_session_login_with_timeout(self, mock_version,
create_session, mock_timeout,
mock_platform_version):
concurrent = 2
originator = 'os-xenapi-nova'
sess = mock.Mock()
create_session.return_value = sess
mock_version.return_value = ('version', 'brand')
mock_platform_version.return_value = (2, 1, 0)
session.XenAPISession('http://someserver', 'username', 'password',
originator=originator, concurrent=concurrent)
self.assertEqual(concurrent, sess.login_with_password.call_count)
self.assertEqual(concurrent, mock_timeout.call_count)
@mock.patch.object(session.XenAPISession, 'call_plugin')
@mock.patch.object(session.XenAPISession, '_get_software_version')
@mock.patch.object(session.XenAPISession, '_create_session')
def test_relax_xsm_sr_check_true(self, mock_create_session,
mock_get_software_version,
mock_call_plugin):
sess = mock.Mock()
mock_create_session.return_value = sess
mock_get_software_version.return_value = {'product_version': '6.5.0',
'product_brand': 'XenServer',
'platform_version': '1.9.0'}
# mark relax-xsm-sr-check=True in /etc/xapi.conf
mock_call_plugin.return_value = "True"
xenapi_sess = session.XenAPISession(
'http://someserver', 'username', 'password')
self.assertTrue(xenapi_sess.is_xsm_sr_check_relaxed())
@mock.patch.object(session.XenAPISession, 'call_plugin')
@mock.patch.object(session.XenAPISession, '_get_software_version')
@mock.patch.object(session.XenAPISession, '_create_session')
def test_relax_xsm_sr_check_XS65_missing(self, mock_create_session,
mock_get_software_version,
mock_call_plugin):
sess = mock.Mock()
mock_create_session.return_value = sess
mock_get_software_version.return_value = {'product_version': '6.5.0',
'product_brand': 'XenServer',
'platform_version': '1.9.0'}
# mark no relax-xsm-sr-check setting in /etc/xapi.conf
mock_call_plugin.return_value = ""
xenapi_sess = session.XenAPISession(
'http://someserver', 'username', 'password')
self.assertFalse(xenapi_sess.is_xsm_sr_check_relaxed())
@mock.patch.object(session.XenAPISession, 'call_plugin')
@mock.patch.object(session.XenAPISession, '_get_software_version')
@mock.patch.object(session.XenAPISession, '_create_session')
def test_relax_xsm_sr_check_XS7_missing(self, mock_create_session,
mock_get_software_version,
mock_call_plugin):
sess = mock.Mock()
mock_create_session.return_value = sess
mock_get_software_version.return_value = {'product_version': '7.0.0',
'product_brand': 'XenServer',
'platform_version': '2.1.0'}
# mark no relax-xsm-sr-check in /etc/xapi.conf
mock_call_plugin.return_value = ""
xenapi_sess = session.XenAPISession(
'http://someserver', 'username', 'password')
self.assertTrue(xenapi_sess.is_xsm_sr_check_relaxed())
class ApplySessionHelpersTestCase(base.TestCase):
def setUp(self):
super(ApplySessionHelpersTestCase, self).setUp()
self.session = mock.Mock()
session.apply_session_helpers(self.session)
def test_apply_session_helpers_add_VM(self):
self.session.VM.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VM.get_X", "ref")
def test_apply_session_helpers_add_SR(self):
self.session.SR.get_X("ref")
self.session.call_xenapi.assert_called_once_with("SR.get_X", "ref")
def test_apply_session_helpers_add_VDI(self):
self.session.VDI.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VDI.get_X", "ref")
def test_apply_session_helpers_add_VIF(self):
self.session.VIF.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VIF.get_X", "ref")
def test_apply_session_helpers_add_VBD(self):
self.session.VBD.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VBD.get_X", "ref")
def test_apply_session_helpers_add_PBD(self):
self.session.PBD.get_X("ref")
self.session.call_xenapi.assert_called_once_with("PBD.get_X", "ref")
def test_apply_session_helpers_add_PIF(self):
self.session.PIF.get_X("ref")
self.session.call_xenapi.assert_called_once_with("PIF.get_X", "ref")
def test_apply_session_helpers_add_VLAN(self):
self.session.VLAN.get_X("ref")
self.session.call_xenapi.assert_called_once_with("VLAN.get_X", "ref")
def test_apply_session_helpers_add_host(self):
self.session.host.get_X("ref")
self.session.call_xenapi.assert_called_once_with("host.get_X", "ref")
def test_apply_session_helpers_add_network(self):
self.session.network.get_X("ref")
self.session.call_xenapi.assert_called_once_with("network.get_X",
"ref")
class CallPluginTestCase(base.TestCase):
def _get_fake_xapisession(self):
class FakeXapiSession(session.XenAPISession):
def __init__(self, **kwargs):
"Skip the superclass's dirty init"
self.XenAPI = mock.MagicMock()
return FakeXapiSession()
def setUp(self):
super(CallPluginTestCase, self).setUp()
self.session = self._get_fake_xapisession()
def test_serialized_with_retry_socket_error_conn_reset(self):
exc = socket.error()
exc.errno = errno.ECONNRESET
plugin = 'glance'
fn = 'download_vhd'
num_retries = 1
callback = None
retry_cb = mock.Mock()
with mock.patch.object(self.session, 'call_plugin_serialized',
spec=True) as call_plugin_serialized:
call_plugin_serialized.side_effect = exc
self.assertRaises(
exception.PluginRetriesExceeded,
self.session.call_plugin_serialized_with_retry, plugin, fn,
num_retries, callback, retry_cb)
call_plugin_serialized.assert_called_with(plugin, fn)
self.assertEqual(2, call_plugin_serialized.call_count)
self.assertEqual(2, retry_cb.call_count)
def test_serialized_with_retry_socket_error_reraised(self):
exc = socket.error()
exc.errno = errno.ECONNREFUSED
plugin = 'glance'
fn = 'download_vhd'
num_retries = 1
callback = None
retry_cb = mock.Mock()
with mock.patch.object(
self.session, 'call_plugin_serialized', spec=True)\
as call_plugin_serialized:
call_plugin_serialized.side_effect = exc
self.assertRaises(
socket.error, self.session.call_plugin_serialized_with_retry,
plugin, fn, num_retries, callback, retry_cb)
call_plugin_serialized.assert_called_once_with(plugin, fn)
self.assertEqual(0, retry_cb.call_count)
def test_serialized_with_retry_socket_reset_reraised(self):
exc = socket.error()
exc.errno = errno.ECONNRESET
plugin = 'glance'
fn = 'download_vhd'
num_retries = 1
callback = None
retry_cb = mock.Mock()
with mock.patch.object(self.session, 'call_plugin_serialized',
spec=True) as call_plugin_serialized:
call_plugin_serialized.side_effect = exc
self.assertRaises(
exception.PluginRetriesExceeded,
self.session.call_plugin_serialized_with_retry, plugin, fn,
num_retries, callback, retry_cb)
call_plugin_serialized.assert_called_with(plugin, fn)
self.assertEqual(2, call_plugin_serialized.call_count)

View File

@ -3,3 +3,12 @@
# process, which may cause wedges in the gate later.
pbr>=1.6 # Apache-2.0
Babel>=2.3.4 # BSD
eventlet!=0.18.3,>=0.18.2 # MIT
oslo.concurrency>=3.8.0 # Apache-2.0
oslo.log>=3.11.0 # Apache-2.0
oslo.utils>=3.17.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
six>=1.9.0 # MIT
XenAPI>=1.2 # LGPL

View File

@ -9,6 +9,7 @@ python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx!=1.3b1,<1.4,>=1.2.1 # BSD
oslosphinx>=4.7.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
os-testr>=0.8.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT

View File

@ -10,7 +10,13 @@ setenv =
VIRTUAL_ENV={envdir}
PYTHONWARNINGS=default::DeprecationWarning
deps = -r{toxinidir}/test-requirements.txt
commands = python setup.py test --slowest --testr-args='{posargs}'
whitelist_externals = find
rm
commands =
find . -type f -name "*.pyc" -delete
py27: python setup.py test --slowest --testr-args='{posargs}'
py35: ostestr --blacklist_file exclusion_py3.txt
[testenv:pep8]
commands = flake8 {posargs}