From a3155deae0763d57bef2f28bce5efaf5305cd10a Mon Sep 17 00:00:00 2001 From: Huan Xie Date: Tue, 8 Nov 2016 18:56:01 -0800 Subject: [PATCH] Add dom0 plugins Add both nova and neutron dom0 plugins, and only keep the ones with .py suffix, also change the plugin version to 2.0 Change-Id: I71f4640866efa48a0de7bfd2f86cc5f1293835f9 --- exclusion_py3.txt | 2 + os_xenapi/client/session.py | 14 +- os_xenapi/dom0/README | 8 + .../etc/xapi.d/plugins/_bittorrent_seeder.py | 129 ++++ os_xenapi/dom0/etc/xapi.d/plugins/agent.py | 273 ++++++++ .../dom0/etc/xapi.d/plugins/bandwidth.py | 64 ++ .../dom0/etc/xapi.d/plugins/bittorrent.py | 324 +++++++++ .../dom0/etc/xapi.d/plugins/config_file.py | 34 + os_xenapi/dom0/etc/xapi.d/plugins/console.py | 89 +++ .../etc/xapi.d/plugins/dom0_plugin_version.py | 47 ++ .../dom0/etc/xapi.d/plugins/dom0_pluginlib.py | 139 ++++ os_xenapi/dom0/etc/xapi.d/plugins/glance.py | 626 ++++++++++++++++++ os_xenapi/dom0/etc/xapi.d/plugins/ipxe.py | 140 ++++ os_xenapi/dom0/etc/xapi.d/plugins/kernel.py | 142 ++++ .../dom0/etc/xapi.d/plugins/migration.py | 84 +++ os_xenapi/dom0/etc/xapi.d/plugins/netwrap.py | 89 +++ .../etc/xapi.d/plugins/partition_utils.py | 86 +++ os_xenapi/dom0/etc/xapi.d/plugins/utils.py | 518 +++++++++++++++ .../dom0/etc/xapi.d/plugins/workarounds.py | 53 ++ os_xenapi/dom0/etc/xapi.d/plugins/xenhost.py | 622 +++++++++++++++++ os_xenapi/dom0/etc/xapi.d/plugins/xenstore.py | 225 +++++++ os_xenapi/tests/plugins/__init__.py | 0 os_xenapi/tests/plugins/plugin_test.py | 68 ++ os_xenapi/tests/plugins/test_bandwidth.py | 49 ++ .../tests/plugins/test_dom0_plugin_version.py | 28 + .../tests/plugins/test_dom0_pluginlib.py | 151 +++++ .../tests/plugins/test_partition_utils.py | 109 +++ 27 files changed, 4102 insertions(+), 11 deletions(-) create mode 100644 os_xenapi/dom0/README create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/_bittorrent_seeder.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/agent.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/bandwidth.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/bittorrent.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/config_file.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/console.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/dom0_plugin_version.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/dom0_pluginlib.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/glance.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/ipxe.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/kernel.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/migration.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/netwrap.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/partition_utils.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/utils.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/workarounds.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/xenhost.py create mode 100644 os_xenapi/dom0/etc/xapi.d/plugins/xenstore.py create mode 100644 os_xenapi/tests/plugins/__init__.py create mode 100644 os_xenapi/tests/plugins/plugin_test.py create mode 100644 os_xenapi/tests/plugins/test_bandwidth.py create mode 100644 os_xenapi/tests/plugins/test_dom0_plugin_version.py create mode 100644 os_xenapi/tests/plugins/test_dom0_pluginlib.py create mode 100644 os_xenapi/tests/plugins/test_partition_utils.py diff --git a/exclusion_py3.txt b/exclusion_py3.txt index e234f8c..db1f6d3 100644 --- a/exclusion_py3.txt +++ b/exclusion_py3.txt @@ -1,2 +1,4 @@ # The XenAPI plugins run in a Python 2 environment, so avoid attempting # to run their unit tests in a Python 3 environment + +os_xenapi.tests.plugins diff --git a/os_xenapi/client/session.py b/os_xenapi/client/session.py index aab13ba..94f699c 100644 --- a/os_xenapi/client/session.py +++ b/os_xenapi/client/session.py @@ -66,7 +66,7 @@ class XenAPISession(object): # changed in development environments. # MAJOR VERSION: Incompatible changes with the plugins # MINOR VERSION: Compatible changes, new plguins, etc - PLUGIN_REQUIRED_VERSION = '1.8' + PLUGIN_REQUIRED_VERSION = '2.0' def __init__(self, url, user, pw, originator="os-xenapi", timeout=10, concurrent=5): @@ -198,16 +198,8 @@ class XenAPISession(object): # the plugin gets executed on the right host when using XS pools args['host_uuid'] = self.host_uuid - # TODO(sfinucan): Once the required plugin version is bumped to v2.0, - # we can assume that all files will have a '.py' extension. Until then, - # handle hosts without this extension by rewriting all calls to plugins - # to exclude the '.py' extension. This is made possible through the - # temporary inclusion of symlinks to plugins. - # NOTE(sfinucan): 'partition_utils.py' was the only plugin with a '.py' - # extension before this change was enacted, hence this plugin is - # excluded - if not plugin == 'partition_utils.py': - plugin = plugin.rstrip('.py') + if not plugin.endswith('.py'): + plugin = '%s.py' % plugin with self._get_session() as session: return self._unwrap_plugin_exceptions( diff --git a/os_xenapi/dom0/README b/os_xenapi/dom0/README new file mode 100644 index 0000000..63d1bb7 --- /dev/null +++ b/os_xenapi/dom0/README @@ -0,0 +1,8 @@ +This directory contains files that are required for the XenAPI support. +They should be installed in the XenServer / Xen Cloud Platform dom0. + +If you install them manually, you will need to ensure that the newly +added files are executable. You can do this by running the following +command (from dom0): + +chmod a+x /etc/xapi.d/plugins/* diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/_bittorrent_seeder.py b/os_xenapi/dom0/etc/xapi.d/plugins/_bittorrent_seeder.py new file mode 100644 index 0000000..68f61eb --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/_bittorrent_seeder.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +"""Seed a bittorent image. This file should not be executed directly, rather it +should be kicked off by the `bittorent` dom0 plugin.""" + +import os +import sys +import time + +import libtorrent + +import dom0_pluginlib + + +dom0_pluginlib.configure_logging('_bittorrent_seeder') +logging = dom0_pluginlib.logging + + +def _daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + """Daemonize the current process. + + Do the UNIX double-fork magic, see Stevens' "Advanced Programming + in the UNIX Environment" for details (ISBN 0201563177). + + Source: http://www.jejik.com/articles/2007/02/ + a_simple_unix_linux_daemon_in_python/ + """ + # 1st fork + try: + pid = os.fork() + if pid > 0: + # first parent returns + return False + except OSError, e: # noqa + logging.error("fork #1 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # 2nd fork + try: + pid = os.fork() + if pid > 0: + # second parent exits + sys.exit(0) + except OSError, e: # noqa + logging.error("fork #2 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(stdin, 'r') + so = open(stdout, 'a+') + se = open(stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + return True + + +def main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + seed_time = time.time() + torrent_seed_duration + logging.debug("Seeding '%s' for %d secs" % ( + torrent_path, torrent_seed_duration)) + + child = _daemonize() + if not child: + return + + # At this point we're the daemonized child... + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + + torrent_file = open(torrent_path, 'rb') + try: + torrent_data = torrent_file.read() + finally: + torrent_file.close() + + decoded_data = libtorrent.bdecode(torrent_data) + + info = libtorrent.torrent_info(decoded_data) + torrent = session.add_torrent( + info, seed_cache_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + try: + while time.time() < seed_time: + time.sleep(5) + finally: + session.remove_torrent(torrent) + + logging.debug("Seeding of '%s' finished" % torrent_path) + + +if __name__ == "__main__": + (torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) = sys.argv[1:] + torrent_seed_duration = int(torrent_seed_duration) + torrent_listen_port_start = int(torrent_listen_port_start) + torrent_listen_port_end = int(torrent_listen_port_end) + + main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/agent.py b/os_xenapi/dom0/etc/xapi.d/plugins/agent.py new file mode 100644 index 0000000..ce2e377 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/agent.py @@ -0,0 +1,273 @@ +#!/usr/bin/env python + +# Copyright (c) 2011 Citrix Systems, Inc. +# Copyright 2011 OpenStack Foundation +# Copyright 2011 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +# TODO(sfinucan): Remove the symlinks in this folder once Ocata is released + +# +# XenAPI plugin for reading/writing information to xenstore +# + +import base64 +import commands # noqa +try: + import json +except ImportError: + import simplejson as json +import time + +import XenAPIPlugin + +import dom0_pluginlib +dom0_pluginlib.configure_logging("agent") +import xenstore + + +DEFAULT_TIMEOUT = 30 +PluginError = dom0_pluginlib.PluginError + + +class TimeoutError(StandardError): + pass + + +class RebootDetectedError(StandardError): + pass + + +def version(self, arg_dict): + """Get version of agent.""" + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + arg_dict["value"] = json.dumps({"name": "version", "value": "agent"}) + request_id = arg_dict["id"] + arg_dict["path"] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def key_init(self, arg_dict): + """Handles the Diffie-Hellman key exchange with the agent to + + establish the shared secret key used to encrypt/decrypt sensitive + info to be passed, such as passwords. Returns the shared + secret key value. + """ + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + # WARNING: Some older Windows agents will crash if the public key isn't + # a string + pub = arg_dict["pub"] + arg_dict["value"] = json.dumps({"name": "keyinit", "value": pub}) + request_id = arg_dict["id"] + arg_dict["path"] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def password(self, arg_dict): + """Writes a request to xenstore that tells the agent to set + + the root password for the given VM. The password should be + encrypted using the shared secret key that was returned by a + previous call to key_init. The encrypted password value should + be passed as the value for the 'enc_pass' key in arg_dict. + """ + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + enc_pass = arg_dict["enc_pass"] + arg_dict["value"] = json.dumps({"name": "password", "value": enc_pass}) + request_id = arg_dict["id"] + arg_dict["path"] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def resetnetwork(self, arg_dict): + """Writes a request to xenstore that tells the agent to reset networking. + + """ + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + arg_dict['value'] = json.dumps({'name': 'resetnetwork', 'value': ''}) + request_id = arg_dict['id'] + arg_dict['path'] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def inject_file(self, arg_dict): + """Expects a file path and the contents of the file to be written. + + Should be base64-encoded in order to eliminate errors as they are passed + through the stack. Writes that information to xenstore for the agent, + which will decode the file and intended path, and create it on the + instance. The original agent munged both of these into a single entry; + the new agent keeps them separate. We will need to test for the new agent, + and write the xenstore records to match the agent version. We will also + need to test to determine if the file injection method on the agent has + been disabled, and raise a NotImplemented error if that is the case. + """ + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + b64_path = arg_dict["b64_path"] + b64_file = arg_dict["b64_contents"] + request_id = arg_dict["id"] + agent_features = _get_agent_features(self, arg_dict) + if "file_inject" in agent_features: + # New version of the agent. Agent should receive a 'value' + # key whose value is a dictionary containing 'b64_path' and + # 'b64_file'. See old version below. + arg_dict["value"] = json.dumps({"name": "file_inject", + "value": {"b64_path": b64_path, + "b64_file": b64_file}}) + elif "injectfile" in agent_features: + # Old agent requires file path and file contents to be + # combined into one base64 value. + raw_path = base64.b64decode(b64_path) + raw_file = base64.b64decode(b64_file) + new_b64 = base64.b64encode("%s,%s" % (raw_path, raw_file)) + arg_dict["value"] = json.dumps({"name": "injectfile", + "value": new_b64}) + else: + # Either the methods don't exist in the agent, or they + # have been disabled. + raise NotImplementedError("NOT IMPLEMENTED: Agent does not" + " support file injection.") + arg_dict["path"] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def agent_update(self, arg_dict): + """Expects an URL and md5sum of the contents + + Then directs the agent to update itself. + """ + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + request_id = arg_dict["id"] + url = arg_dict["url"] + md5sum = arg_dict["md5sum"] + arg_dict["value"] = json.dumps({"name": "agentupdate", + "value": "%s,%s" % (url, md5sum)}) + arg_dict["path"] = "data/host/%s" % request_id + xenstore.write_record(self, arg_dict) + try: + resp = _wait_for_agent(self, request_id, arg_dict, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + return resp + + +def _get_agent_features(self, arg_dict): + """Return an array of features that an agent supports.""" + timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT)) + tmp_id = commands.getoutput("uuidgen") + dct = {} + dct.update(arg_dict) + dct["value"] = json.dumps({"name": "features", "value": ""}) + dct["path"] = "data/host/%s" % tmp_id + xenstore.write_record(self, dct) + try: + resp = _wait_for_agent(self, tmp_id, dct, timeout) + except TimeoutError, e: # noqa + raise PluginError(e) + response = json.loads(resp) + if response['returncode'] != 0: + return response["message"].split(",") + else: + return {} + + +def _wait_for_agent(self, request_id, arg_dict, timeout): + """Periodically checks xenstore for a response from the agent. + + The request is always written to 'data/host/{id}', and + the agent's response for that request will be in 'data/guest/{id}'. + If no value appears from the agent within the timeout specified, + the original request is deleted and a TimeoutError is raised. + """ + arg_dict["path"] = "data/guest/%s" % request_id + arg_dict["ignore_missing_path"] = True + start = time.time() + reboot_detected = False + while time.time() - start < timeout: + ret = xenstore.read_record(self, arg_dict) + # Note: the response for None with be a string that includes + # double quotes. + if ret != '"None"': + # The agent responded + return ret + + time.sleep(.5) + + # NOTE(johngarbutt) If we can't find this domid, then + # the VM has rebooted, so we must trigger domid refresh. + # Check after the sleep to give xenstore time to update + # after the VM reboot. + exists_args = { + "dom_id": arg_dict["dom_id"], + "path": "name", + } + dom_id_is_present = xenstore.record_exists(exists_args) + if not dom_id_is_present: + reboot_detected = True + break + + # No response within the timeout period; bail out + # First, delete the request record + arg_dict["path"] = "data/host/%s" % request_id + xenstore.delete_record(self, arg_dict) + + if reboot_detected: + raise RebootDetectedError("REBOOT: dom_id %s no longer " + "present") % arg_dict["dom_id"] + else: + raise TimeoutError("TIMEOUT: No response from agent within" + " %s seconds.") % timeout + + +if __name__ == "__main__": + XenAPIPlugin.dispatch( + {"version": version, + "key_init": key_init, + "password": password, + "resetnetwork": resetnetwork, + "inject_file": inject_file, + "agentupdate": agent_update}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/bandwidth.py b/os_xenapi/dom0/etc/xapi.d/plugins/bandwidth.py new file mode 100644 index 0000000..66eb4ba --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/bandwidth.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +"""Fetch Bandwidth data from VIF network devices.""" + +import utils + +import dom0_pluginlib + +import re + + +dom0_pluginlib.configure_logging('bandwidth') + + +def _read_proc_net(): + f = open('/proc/net/dev', 'r') + try: + return f.readlines() + finally: + f.close() + + +def _get_bandwitdth_from_proc(): + devs = [l.strip() for l in _read_proc_net()] + # ignore headers + devs = devs[2:] + vif_pattern = re.compile("^vif(\d+)\.(\d+)") + dlist = [d.split(':', 1) for d in devs if vif_pattern.match(d)] + devmap = dict() + for name, stats in dlist: + slist = stats.split() + dom, vifnum = name[3:].split('.', 1) + dev = devmap.get(dom, {}) + # Note, we deliberately swap in and out, as instance traffic + # shows up inverted due to going though the bridge. (mdragon) + dev[vifnum] = dict(bw_in=int(slist[8]), bw_out=int(slist[0])) + devmap[dom] = dev + return devmap + + +def fetch_all_bandwidth(session): + return _get_bandwitdth_from_proc() + + +if __name__ == '__main__': + utils.register_plugin_calls(fetch_all_bandwidth) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/bittorrent.py b/os_xenapi/dom0/etc/xapi.d/plugins/bittorrent.py new file mode 100644 index 0000000..bb44e64 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/bittorrent.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +"""Download images via BitTorrent.""" + +import errno +import inspect +import os +import random +import shutil +import tempfile +import time + +import libtorrent +import urllib2 + +import utils + +import dom0_pluginlib + + +dom0_pluginlib.configure_logging('bittorrent') +logging = dom0_pluginlib.logging + +# Taken from units since we don't pull down full library +Mi = 1024 ** 2 +DEFAULT_TORRENT_CACHE = '/images/torrents' +DEFAULT_SEED_CACHE = '/images/seeds' +SEEDER_PROCESS = '_bittorrent_seeder.py' +DEFAULT_MMA = int(libtorrent.bandwidth_mixed_algo_t.prefer_tcp) +DEFAULT_MORQ = 400 +DEFAULT_MQDB = 8 * Mi +DEFAULT_MQDBLW = 0 + + +def _make_torrent_cache(): + torrent_cache_path = os.environ.get('TORRENT_CACHE', DEFAULT_TORRENT_CACHE) + + if not os.path.exists(torrent_cache_path): + os.mkdir(torrent_cache_path) + + return torrent_cache_path + + +def _fetch_torrent_file(torrent_cache_path, image_id, torrent_url): + torrent_path = os.path.join(torrent_cache_path, image_id + '.torrent') + + if not os.path.exists(torrent_path): + logging.info("Downloading %s" % torrent_url) + + # Write contents to temporary path to ensure we don't have partially + # completed files in the cache. + temp_directory = tempfile.mkdtemp(dir=torrent_cache_path) + try: + temp_path = os.path.join(temp_directory, + os.path.basename(torrent_path)) + temp_file = open(temp_path, 'wb') + try: + remote_torrent_file = urllib2.urlopen(torrent_url) + shutil.copyfileobj(remote_torrent_file, temp_file) + finally: + temp_file.close() + + os.rename(temp_path, torrent_path) + finally: + shutil.rmtree(temp_directory) + + return torrent_path + + +def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed): + """Delete any torrent files that haven't been accessed recently.""" + if not torrent_max_last_accessed: + logging.debug("Reaping old torrent files disabled, skipping...") + return + + logging.debug("Preparing to reap old torrent files," + " torrent_max_last_accessed=%d" % torrent_max_last_accessed) + + for fname in os.listdir(torrent_cache_path): + torrent_path = os.path.join(torrent_cache_path, fname) + last_accessed = time.time() - os.path.getatime(torrent_path) + if last_accessed > torrent_max_last_accessed: + logging.debug("Reaping '%s', last_accessed=%d" % ( + torrent_path, last_accessed)) + utils.delete_if_exists(torrent_path) + + +def _download(torrent_path, save_as_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff): + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + + mixed_mode_algorithm = os.environ.get( + 'DEFAULT_MIXED_MODE_ALGORITHM', DEFAULT_MMA) + max_out_request_queue = os.environ.get( + 'DEFAULT_MAX_OUT_REQUEST_QUEUE', DEFAULT_MORQ) + max_queued_disk_bytes = os.environ.get( + 'DEFAULT_MAX_QUEUED_DISK_BYTES', DEFAULT_MQDB) + max_queued_disk_bytes_low_watermark = os.environ.get( + 'DEFAULT_MAX_QUEUED_DISK_BYTES_LOW_WATERMARK', DEFAULT_MQDBLW) + + session_opts = {'mixed_mode_algorithm': mixed_mode_algorithm, + 'max_queued_disk_bytes': max_queued_disk_bytes, + 'max_out_request_queue': max_out_request_queue, + 'max_queued_disk_bytes_low_watermark': + max_queued_disk_bytes_low_watermark} + session.set_settings(session_opts) + info = libtorrent.torrent_info( + libtorrent.bdecode(open(torrent_path, 'rb').read())) + + torrent = session.add_torrent( + info, save_as_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + + try: + last_progress = 0 + last_progress_updated = time.time() + + log_time = 0 + while not torrent.is_seed(): + s = torrent.status() + + progress = s.progress * 100 + + if progress != last_progress: + last_progress = progress + last_progress_updated = time.time() + + stall_duration = time.time() - last_progress_updated + if stall_duration > torrent_download_stall_cutoff: + logging.error( + "Download stalled: stall_duration=%d," + " torrent_download_stall_cutoff=%d" % ( + stall_duration, torrent_download_stall_cutoff)) + raise Exception("Bittorrent download stall detected, bailing!") + + log_time += 1 + if log_time % 10 == 0: + logging.debug( + '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)' + ' %s %s' % (progress, s.download_rate / 1000, + s.upload_rate / 1000, s.num_peers, s.state, + torrent_path)) + time.sleep(1) + finally: + session.remove_torrent(torrent) + + logging.debug("Download of '%s' finished" % torrent_path) + + +def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + if not torrent_seed_duration: + logging.debug("Seeding disabled, skipping...") + return False + + if os.path.exists(seed_path): + logging.debug("Seed is already present, skipping....") + return False + + rand = random.random() + if rand > torrent_seed_chance: + logging.debug("%.2f > %.2f, seeding randomly skipping..." % ( + rand, torrent_seed_chance)) + return False + + num_active_seeders = len(list(_active_seeder_processes())) + if (torrent_max_seeder_processes_per_host >= 0 and + num_active_seeders >= torrent_max_seeder_processes_per_host): + logging.debug("max number of seeder processes for this host reached" + " (%d), skipping..." % + torrent_max_seeder_processes_per_host) + return False + + return True + + +def _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe())) + seeder_path = os.path.join(plugin_path, SEEDER_PROCESS) + seed_cmd = map(str, [seeder_path, torrent_path, seed_cache_path, + torrent_seed_duration, torrent_listen_port_start, + torrent_listen_port_end]) + utils.run_command(seed_cmd) + + +def _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host): + seed_filename = os.path.basename(tarball_path) + seed_path = os.path.join(seed_cache_path, seed_filename) + + if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + logging.debug("Preparing to seed '%s' for %d secs" % ( + seed_path, torrent_seed_duration)) + utils._rename(tarball_path, seed_path) + + # Daemonize and seed the image + _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) + else: + utils.delete_if_exists(tarball_path) + + +def _extract_tarball(tarball_path, staging_path): + """Extract the tarball into the staging directory.""" + tarball_fileobj = open(tarball_path, 'rb') + try: + utils.extract_tarball(tarball_fileobj, staging_path) + finally: + tarball_fileobj.close() + + +def _active_seeder_processes(): + """Yields command-line of active seeder processes. + + Roughly equivalent to performing ps | grep _bittorrent_seeder + """ + pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] + for pid in pids: + try: + cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read() + except IOError, e: # noqa + if e.errno != errno.ENOENT: + raise + + if SEEDER_PROCESS in cmdline: + yield cmdline + + +def _reap_finished_seeds(seed_cache_path): + """Delete any cached seeds where the seeder process has died.""" + logging.debug("Preparing to reap finished seeds") + missing = {} + for fname in os.listdir(seed_cache_path): + seed_path = os.path.join(seed_cache_path, fname) + missing[seed_path] = None + + for cmdline in _active_seeder_processes(): + for seed_path in missing.keys(): + seed_filename = os.path.basename(seed_path) + if seed_filename in cmdline: + del missing[seed_path] + + for seed_path in missing: + logging.debug("Reaping cached seed '%s'" % seed_path) + utils.delete_if_exists(seed_path) + + +def _make_seed_cache(): + seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE) + if not os.path.exists(seed_cache_path): + os.mkdir(seed_cache_path) + return seed_cache_path + + +def download_vhd(session, image_id, torrent_url, torrent_seed_duration, + torrent_seed_chance, torrent_max_last_accessed, + torrent_listen_port_start, torrent_listen_port_end, + torrent_download_stall_cutoff, uuid_stack, sr_path, + torrent_max_seeder_processes_per_host): + # Download an image from BitTorrent, unbundle it, and then deposit the + # VHDs into the storage repository + seed_cache_path = _make_seed_cache() + torrent_cache_path = _make_torrent_cache() + + # Housekeeping + _reap_finished_seeds(seed_cache_path) + _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed) + + torrent_path = _fetch_torrent_file(torrent_cache_path, image_id, + torrent_url) + + staging_path = utils.make_staging_area(sr_path) + try: + tarball_filename = os.path.basename(torrent_path).replace( + '.torrent', '') + tarball_path = os.path.join(staging_path, tarball_filename) + + # Download tarball into staging area + _download(torrent_path, staging_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff) + + # Extract the tarball into the staging area + _extract_tarball(tarball_path, staging_path) + + # Move the VHDs from the staging area into the storage repository + vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack) + + # Seed image for others in the swarm + _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host) + finally: + utils.cleanup_staging_area(staging_path) + + return vdi_list + + +if __name__ == '__main__': + utils.register_plugin_calls(download_vhd) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/config_file.py b/os_xenapi/dom0/etc/xapi.d/plugins/config_file.py new file mode 100644 index 0000000..b4d002d --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/config_file.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +import XenAPIPlugin + + +def get_val(session, args): + config_key = args['key'] + config_file = open('/etc/xapi.conf') + try: + for line in config_file: + split = line.split('=') + if (len(split) == 2) and (split[0].strip() == config_key): + return split[1].strip() + return "" + finally: + config_file.close() + +if __name__ == '__main__': + XenAPIPlugin.dispatch({"get_val": get_val}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/console.py b/os_xenapi/dom0/etc/xapi.d/plugins/console.py new file mode 100644 index 0000000..446dd07 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/console.py @@ -0,0 +1,89 @@ +#!/usr/bin/python + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +""" +To configure this plugin, you must set the following xenstore key: +/local/logconsole/@ = "/var/log/xen/guest/console.%d" + +This can be done by running: +xenstore-write /local/logconsole/@ "/var/log/xen/guest/console.%d" + +WARNING: +You should ensure appropriate log rotation to ensure +guests are not able to consume too much Dom0 disk space, +and equally should not be able to stop other guests from logging. +Adding and removing the following xenstore key will reopen the log, +as will be required after a log rotate: +/local/logconsole/ +""" + +import base64 +import logging +import zlib + +import XenAPIPlugin + +import dom0_pluginlib +dom0_pluginlib.configure_logging("console") + +CONSOLE_LOG_DIR = '/var/log/xen/guest' +CONSOLE_LOG_FILE_PATTERN = CONSOLE_LOG_DIR + '/console.%d' + +MAX_CONSOLE_BYTES = 102400 +SEEK_SET = 0 +SEEK_END = 2 + + +def _last_bytes(file_like_object): + try: + file_like_object.seek(-MAX_CONSOLE_BYTES, SEEK_END) + except IOError, e: # noqa + if e.errno == 22: + file_like_object.seek(0, SEEK_SET) + else: + raise + return file_like_object.read() + + +def get_console_log(session, arg_dict): + try: + raw_dom_id = arg_dict['dom_id'] + except KeyError: + raise dom0_pluginlib.PluginError("Missing dom_id") + try: + dom_id = int(raw_dom_id) + except ValueError: + raise dom0_pluginlib.PluginError("Invalid dom_id") + + logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb') + try: + try: + log_content = _last_bytes(logfile) + except IOError, e: # noqa + msg = "Error reading console: %s" % e + logging.debug(msg) + raise dom0_pluginlib.PluginError(msg) + finally: + logfile.close() + + return base64.b64encode(zlib.compress(log_content)) + + +if __name__ == "__main__": + XenAPIPlugin.dispatch({"get_console_log": get_console_log}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/dom0_plugin_version.py b/os_xenapi/dom0/etc/xapi.d/plugins/dom0_plugin_version.py new file mode 100644 index 0000000..9a755d2 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/dom0_plugin_version.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +# Copyright (c) 2013 OpenStack Foundation +# Copyright (c) 2013 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +"""Returns the version of the nova plugins""" + +import utils + +# MAJOR VERSION: Incompatible changes +# MINOR VERSION: Compatible changes, new plugins, etc + +# NOTE(sfinucan): 2.0 will be equivalent to the last in the 1.x stream + +# 1.0 - Initial version. +# 1.1 - New call to check GC status +# 1.2 - Added support for pci passthrough devices +# 1.3 - Add vhd2 functions for doing glance operations by url +# 1.4 - Add support of Glance v2 api +# 1.5 - Added function for network configuration on ovs bridge +# 1.6 - Add function for network configuration on Linux bridge +# 1.7 - Add Partition utilities plugin +# 1.8 - Add support for calling plug-ins with the .py suffix +# 2.0 - Remove plugin files which don't have .py suffix +PLUGIN_VERSION = "2.0" + + +def get_version(session): + return PLUGIN_VERSION + +if __name__ == '__main__': + utils.register_plugin_calls(get_version) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/dom0_pluginlib.py b/os_xenapi/dom0/etc/xapi.d/plugins/dom0_pluginlib.py new file mode 100644 index 0000000..1e6abee --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/dom0_pluginlib.py @@ -0,0 +1,139 @@ +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# +# Helper functions for the Nova xapi plugins. In time, this will merge +# with the pluginlib.py shipped with xapi, but for now, that file is not +# very stable, so it's easiest just to have a copy of all the functions +# that we need. +# + +import logging +import logging.handlers +import time + +import XenAPI + + +# Logging setup + +def configure_logging(name): + log = logging.getLogger() + log.setLevel(logging.DEBUG) + sysh = logging.handlers.SysLogHandler('/dev/log') + sysh.setLevel(logging.DEBUG) + formatter = logging.Formatter('%s: %%(levelname)-8s %%(message)s' % name) + sysh.setFormatter(formatter) + log.addHandler(sysh) + + +# Exceptions + +class PluginError(Exception): + """Base Exception class for all plugin errors.""" + def __init__(self, *args): + Exception.__init__(self, *args) + + +class ArgumentError(PluginError): + # Raised when required arguments are missing, argument values are invalid, + # or incompatible arguments are given. + def __init__(self, *args): + PluginError.__init__(self, *args) + + +# Argument validation + +def exists(args, key): + # Validates that a freeform string argument to a RPC method call is given. + # Returns the string. + if key in args: + return args[key] + else: + raise ArgumentError('Argument %s is required.' % key) + + +def optional(args, key): + # If the given key is in args, return the corresponding value, otherwise + # return None + return key in args and args[key] or None + + +def _get_domain_0(session): + this_host_ref = session.xenapi.session.get_this_host(session.handle) + expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' + expr = expr % this_host_ref + return list(session.xenapi.VM.get_all_records_where(expr).keys())[0] + + +def with_vdi_in_dom0(session, vdi, read_only, f): + dom0 = _get_domain_0(session) + vbd_rec = {} + vbd_rec['VM'] = dom0 + vbd_rec['VDI'] = vdi + vbd_rec['userdevice'] = 'autodetect' + vbd_rec['bootable'] = False + vbd_rec['mode'] = read_only and 'RO' or 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + logging.debug('Creating VBD for VDI %s ... ', vdi) + vbd = session.xenapi.VBD.create(vbd_rec) + logging.debug('Creating VBD for VDI %s done.', vdi) + try: + logging.debug('Plugging VBD %s ... ', vbd) + session.xenapi.VBD.plug(vbd) + logging.debug('Plugging VBD %s done.', vbd) + return f(session.xenapi.VBD.get_device(vbd)) + finally: + logging.debug('Destroying VBD for VDI %s ... ', vdi) + _vbd_unplug_with_retry(session, vbd) + try: + session.xenapi.VBD.destroy(vbd) + except XenAPI.Failure, e: # noqa + logging.error('Ignoring XenAPI.Failure %s', e) + logging.debug('Destroying VBD for VDI %s done.', vdi) + + +def _vbd_unplug_with_retry(session, vbd): + """Call VBD.unplug on the given VBD + + with a retry if we get DEVICE_DETACH_REJECTED. For reasons which I don't + understand, we're seeing the device still in use, even when all processes + using the device should be dead. + """ + while True: + try: + session.xenapi.VBD.unplug(vbd) + logging.debug('VBD.unplug successful first time.') + return + except XenAPI.Failure, e: # noqa + if (len(e.details) > 0 and + e.details[0] == 'DEVICE_DETACH_REJECTED'): + logging.debug('VBD.unplug rejected: retrying...') + time.sleep(1) + elif (len(e.details) > 0 and + e.details[0] == 'DEVICE_ALREADY_DETACHED'): + logging.debug('VBD.unplug successful eventually.') + return + else: + logging.error('Ignoring XenAPI.Failure in VBD.unplug: %s', e) + return diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/glance.py b/os_xenapi/dom0/etc/xapi.d/plugins/glance.py new file mode 100644 index 0000000..bf472cf --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/glance.py @@ -0,0 +1,626 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +"""Handle the uploading and downloading of images via Glance.""" + +try: + import httplib +except ImportError: + from six.moves import http_client as httplib + +try: + import json +except ImportError: + import simplejson as json + +import md5 # noqa +import socket +import urllib2 +from urlparse import urlparse + +import dom0_pluginlib +import utils + + +dom0_pluginlib.configure_logging('glance') +logging = dom0_pluginlib.logging +PluginError = dom0_pluginlib.PluginError + +SOCKET_TIMEOUT_SECONDS = 90 + + +class RetryableError(Exception): + pass + + +def _create_connection(scheme, netloc): + if scheme == 'https': + conn = httplib.HTTPSConnection(netloc) + else: + conn = httplib.HTTPConnection(netloc) + conn.connect() + return conn + + +def _download_tarball_and_verify(request, staging_path): + # NOTE(johngarbutt) By default, there is no timeout. + # To ensure the script does not hang if we lose connection + # to glance, we add this socket timeout. + # This is here so there is no chance the timeout out has + # been adjusted by other library calls. + socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) + + try: + response = urllib2.urlopen(request) + except urllib2.HTTPError, error: # noqa + raise RetryableError(error) + except urllib2.URLError, error: # noqa + raise RetryableError(error) + except httplib.HTTPException, error: # noqa + # httplib.HTTPException and derivatives (BadStatusLine in particular) + # don't have a useful __repr__ or __str__ + raise RetryableError('%s: %s' % (error.__class__.__name__, error)) + + url = request.get_full_url() + logging.info("Reading image data from %s" % url) + + callback_data = {'bytes_read': 0} + checksum = md5.new() + + def update_md5(chunk): + callback_data['bytes_read'] += len(chunk) + checksum.update(chunk) + + try: + try: + utils.extract_tarball(response, staging_path, callback=update_md5) + except Exception, error: # noqa + raise RetryableError(error) + finally: + bytes_read = callback_data['bytes_read'] + logging.info("Read %d bytes from %s", bytes_read, url) + + # Use ETag if available, otherwise content-md5(v2) or + # X-Image-Meta-Checksum(v1) + etag = response.info().getheader('etag', None) + if etag is None: + etag = response.info().getheader('content-md5', None) + if etag is None: + etag = response.info().getheader('x-image-meta-checksum', None) + + # Verify checksum using ETag + checksum = checksum.hexdigest() + + if etag is None: + msg = "No ETag found for comparison to checksum %(checksum)s" + logging.info(msg % {'checksum': checksum}) + elif checksum != etag: + msg = 'ETag %(etag)s does not match computed md5sum %(checksum)s' + raise RetryableError(msg % {'checksum': checksum, 'etag': etag}) + else: + msg = "Verified image checksum %(checksum)s" + logging.info(msg % {'checksum': checksum}) + + +def _download_tarball_v1(sr_path, staging_path, image_id, glance_host, + glance_port, glance_use_ssl, extra_headers): + # Download the tarball image from Glance v1 and extract it into the + # staging area. Retry if there is any failure. + if glance_use_ssl: + scheme = 'https' + else: + scheme = 'http' + + endpoint = "%(scheme)s://%(glance_host)s:%(glance_port)d" % { + 'scheme': scheme, 'glance_host': glance_host, + 'glance_port': glance_port} + _download_tarball_by_url_v1(sr_path, staging_path, image_id, + endpoint, extra_headers) + + +def _download_tarball_by_url_v1( + sr_path, staging_path, image_id, glance_endpoint, extra_headers): + # Download the tarball image from Glance v1 and extract it into the + # staging area. Retry if there is any failure. + url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { + 'glance_endpoint': glance_endpoint, + 'image_id': image_id} + logging.info("Downloading %s with glance v1 api" % url) + + request = urllib2.Request(url, headers=extra_headers) + try: + _download_tarball_and_verify(request, staging_path) + except Exception: + logging.exception('Failed to retrieve %(url)s' % {'url': url}) + raise + + +def _download_tarball_by_url_v2( + sr_path, staging_path, image_id, glance_endpoint, extra_headers): + # Download the tarball image from Glance v2 and extract it into the + # staging area. Retry if there is any failure. + url = "%(glance_endpoint)s/v2/images/%(image_id)s/file" % { + 'glance_endpoint': glance_endpoint, + 'image_id': image_id} + logging.debug("Downloading %s with glance v2 api" % url) + + request = urllib2.Request(url, headers=extra_headers) + try: + _download_tarball_and_verify(request, staging_path) + except Exception: + logging.exception('Failed to retrieve %(url)s' % {'url': url}) + raise + + +def _upload_tarball_v1(staging_path, image_id, glance_host, glance_port, + glance_use_ssl, extra_headers, properties): + if glance_use_ssl: + scheme = 'https' + else: + scheme = 'http' + + url = '%s://%s:%s' % (scheme, glance_host, glance_port) + _upload_tarball_by_url_v1(staging_path, image_id, url, + extra_headers, properties) + + +def _upload_tarball_by_url_v1(staging_path, image_id, glance_endpoint, + extra_headers, properties): + """Create a tarball of the image and then stream that into Glance v1 + + Using chunked-transfer-encoded HTTP. + """ + # NOTE(johngarbutt) By default, there is no timeout. + # To ensure the script does not hang if we lose connection + # to glance, we add this socket timeout. + # This is here so there is no chance the timeout out has + # been adjusted by other library calls. + socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) + logging.debug("Uploading image %s with glance v1 api" + % image_id) + + url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { + 'glance_endpoint': glance_endpoint, + 'image_id': image_id} + logging.info("Writing image data to %s" % url) + + # NOTE(sdague): this is python 2.4, which means urlparse returns a + # tuple, not a named tuple. + # 0 - scheme + # 1 - host:port (aka netloc) + # 2 - path + parts = urlparse(url) + + try: + conn = _create_connection(parts[0], parts[1]) + except Exception, error: # noqa + logging.exception('Failed to connect %(url)s' % {'url': url}) + raise RetryableError(error) + + try: + validate_image_status_before_upload_v1(conn, url, extra_headers) + + try: + # NOTE(sirp): httplib under python2.4 won't accept + # a file-like object to request + conn.putrequest('PUT', parts[2]) + + # NOTE(sirp): There is some confusion around OVF. Here's a summary + # of where we currently stand: + # 1. OVF as a container format is misnamed. We really should be + # using OVA since that is the name for the container format; + # OVF is the standard applied to the manifest file contained + # within. + # 2. We're currently uploading a vanilla tarball. In order to be + # OVF/OVA compliant, we'll need to embed a minimal OVF + # manifest as the first file. + + # NOTE(dprince): In order to preserve existing Glance properties + # we set X-Glance-Registry-Purge-Props on this request. + headers = { + 'content-type': 'application/octet-stream', + 'transfer-encoding': 'chunked', + 'x-image-meta-is-public': 'False', + 'x-image-meta-status': 'queued', + 'x-image-meta-disk-format': 'vhd', + 'x-image-meta-container-format': 'ovf', + 'x-glance-registry-purge-props': 'False'} + + headers.update(**extra_headers) + + for key, value in properties.items(): + header_key = "x-image-meta-property-%s" % key.replace('_', '-') + headers[header_key] = str(value) + + for header, value in headers.items(): + conn.putheader(header, value) + conn.endheaders() + except Exception, error: # noqa + logging.exception('Failed to upload %(url)s' % {'url': url}) + raise RetryableError(error) + + callback_data = {'bytes_written': 0} + + def send_chunked_transfer_encoded(chunk): + chunk_len = len(chunk) + callback_data['bytes_written'] += chunk_len + try: + conn.send("%x\r\n%s\r\n" % (chunk_len, chunk)) + except Exception, error: # noqa + logging.exception('Failed to upload when sending chunks') + raise RetryableError(error) + + compression_level = properties.get('xenapi_image_compression_level') + + utils.create_tarball( + None, staging_path, callback=send_chunked_transfer_encoded, + compression_level=compression_level) + + send_chunked_transfer_encoded('') # Chunked-Transfer terminator + + bytes_written = callback_data['bytes_written'] + logging.info("Wrote %d bytes to %s" % (bytes_written, url)) + + resp = conn.getresponse() + if resp.status == httplib.OK: + return + + logging.error("Unexpected response while writing image data to %s: " + "Response Status: %i, Response body: %s" + % (url, resp.status, resp.read())) + + check_resp_status_and_retry(resp, image_id, url) + + finally: + conn.close() + + +def _update_image_meta_v2(conn, image_id, extra_headers, properties): + # NOTE(sirp): There is some confusion around OVF. Here's a summary + # of where we currently stand: + # 1. OVF as a container format is misnamed. We really should be + # using OVA since that is the name for the container format; + # OVF is the standard applied to the manifest file contained + # within. + # 2. We're currently uploading a vanilla tarball. In order to be + # OVF/OVA compliant, we'll need to embed a minimal OVF + # manifest as the first file. + body = [ + {"path": "/container_format", "value": "ovf", "op": "add"}, + {"path": "/disk_format", "value": "vhd", "op": "add"}, + {"path": "/visibility", "value": "private", "op": "add"}] + + headers = {'Content-Type': 'application/openstack-images-v2.1-json-patch'} + headers.update(**extra_headers) + + for key, value in properties.items(): + prop = {"path": "/%s" % key.replace('_', '-'), + "value": key, + "op": "add"} + body.append(prop) + body = json.dumps(body) + conn.request('PATCH', '/v2/images/%s' % image_id, + body=body, headers=headers) + resp = conn.getresponse() + resp.read() + + if resp.status == httplib.OK: + return + logging.error("Image meta was not updated. Status: %s, Reason: %s" % ( + resp.status, resp.reason)) + + +def _upload_tarball_by_url_v2(staging_path, image_id, glance_endpoint, + extra_headers, properties): + """Create a tarball of the image and then stream that into Glance v2 + + Using chunked-transfer-encoded HTTP. + """ + # NOTE(johngarbutt) By default, there is no timeout. + # To ensure the script does not hang if we lose connection + # to glance, we add this socket timeout. + # This is here so there is no chance the timeout out has + # been adjusted by other library calls. + socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS) + logging.debug("Uploading imaged %s with glance v2 api" + % image_id) + + url = "%(glance_endpoint)s/v2/images/%(image_id)s/file" % { + 'glance_endpoint': glance_endpoint, + 'image_id': image_id} + + # NOTE(sdague): this is python 2.4, which means urlparse returns a + # tuple, not a named tuple. + # 0 - scheme + # 1 - host:port (aka netloc) + # 2 - path + parts = urlparse(url) + + try: + conn = _create_connection(parts[0], parts[1]) + except Exception, error: # noqa + raise RetryableError(error) + + try: + _update_image_meta_v2(conn, image_id, extra_headers, properties) + + validate_image_status_before_upload_v2(conn, url, extra_headers) + + try: + conn.connect() + # NOTE(sirp): httplib under python2.4 won't accept + # a file-like object to request + conn.putrequest('PUT', parts[2]) + + headers = { + 'content-type': 'application/octet-stream', + 'transfer-encoding': 'chunked'} + + headers.update(**extra_headers) + + for header, value in headers.items(): + conn.putheader(header, value) + conn.endheaders() + except Exception, error: # noqa + logging.exception('Failed to upload %(url)s' % {'url': url}) + raise RetryableError(error) + + callback_data = {'bytes_written': 0} + + def send_chunked_transfer_encoded(chunk): + chunk_len = len(chunk) + callback_data['bytes_written'] += chunk_len + try: + conn.send("%x\r\n%s\r\n" % (chunk_len, chunk)) + except Exception, error: # noqa + logging.exception('Failed to upload when sending chunks') + raise RetryableError(error) + + compression_level = properties.get('xenapi_image_compression_level') + + utils.create_tarball( + None, staging_path, callback=send_chunked_transfer_encoded, + compression_level=compression_level) + + send_chunked_transfer_encoded('') # Chunked-Transfer terminator + + bytes_written = callback_data['bytes_written'] + logging.info("Wrote %d bytes to %s" % (bytes_written, url)) + + resp = conn.getresponse() + if resp.status == httplib.NO_CONTENT: + return + + logging.error("Unexpected response while writing image data to %s: " + "Response Status: %i, Response body: %s" + % (url, resp.status, resp.read())) + + check_resp_status_and_retry(resp, image_id, url) + + finally: + conn.close() + + +def check_resp_status_and_retry(resp, image_id, url): + # Note(Jesse): This branch sorts errors into those that are permanent, + # those that are ephemeral, and those that are unexpected. + if resp.status in (httplib.BAD_REQUEST, # 400 + httplib.UNAUTHORIZED, # 401 + httplib.PAYMENT_REQUIRED, # 402 + httplib.FORBIDDEN, # 403 + httplib.NOT_FOUND, # 404 + httplib.METHOD_NOT_ALLOWED, # 405 + httplib.NOT_ACCEPTABLE, # 406 + httplib.PROXY_AUTHENTICATION_REQUIRED, # 407 + httplib.CONFLICT, # 409 + httplib.GONE, # 410 + httplib.LENGTH_REQUIRED, # 411 + httplib.PRECONDITION_FAILED, # 412 + httplib.REQUEST_ENTITY_TOO_LARGE, # 413 + httplib.REQUEST_URI_TOO_LONG, # 414 + httplib.UNSUPPORTED_MEDIA_TYPE, # 415 + httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416 + httplib.EXPECTATION_FAILED, # 417 + httplib.UNPROCESSABLE_ENTITY, # 422 + httplib.LOCKED, # 423 + httplib.FAILED_DEPENDENCY, # 424 + httplib.UPGRADE_REQUIRED, # 426 + httplib.NOT_IMPLEMENTED, # 501 + httplib.HTTP_VERSION_NOT_SUPPORTED, # 505 + httplib.NOT_EXTENDED, # 510 + ): + raise PluginError("Got Permanent Error response [%i] while " + "uploading image [%s] to glance [%s]" + % (resp.status, image_id, url)) + # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We + # optimistically retry on 500 errors below. + elif resp.status in (httplib.REQUEST_TIMEOUT, # 408 + httplib.INTERNAL_SERVER_ERROR, # 500 + httplib.BAD_GATEWAY, # 502 + httplib.SERVICE_UNAVAILABLE, # 503 + httplib.GATEWAY_TIMEOUT, # 504 + httplib.INSUFFICIENT_STORAGE, # 507 + ): + raise RetryableError("Got Ephemeral Error response [%i] while " + "uploading image [%s] to glance [%s]" + % (resp.status, image_id, url)) + else: + # Note(Jesse): Assume unexpected errors are retryable. If you are + # seeing this error message, the error should probably be added + # to either the ephemeral or permanent error list. + raise RetryableError("Got Unexpected Error response [%i] while " + "uploading image [%s] to glance [%s]" + % (resp.status, image_id, url)) + + +def validate_image_status_before_upload_v1(conn, url, extra_headers): + try: + parts = urlparse(url) + path = parts[2] + image_id = path.split('/')[-1] + # NOTE(nikhil): Attempt to determine if the Image has a status + # of 'queued'. Because data will continued to be sent to Glance + # until it has a chance to check the Image state, discover that + # it is not 'active' and send back a 409. Hence, the data will be + # unnecessarily buffered by Glance. This wastes time and bandwidth. + # LP bug #1202785 + + conn.request('HEAD', path, headers=extra_headers) + head_resp = conn.getresponse() + # NOTE(nikhil): read the response to re-use the conn object. + body_data = head_resp.read(8192) + if len(body_data) > 8: + err_msg = ('Cannot upload data for image %(image_id)s as the ' + 'HEAD call had more than 8192 bytes of data in ' + 'the response body.' % {'image_id': image_id}) + raise PluginError("Got Permanent Error while uploading image " + "[%s] to glance [%s]. " + "Message: %s" % (image_id, url, + err_msg)) + else: + head_resp.read() + + except Exception, error: # noqa + logging.exception('Failed to HEAD the image %(image_id)s while ' + 'checking image status before attempting to ' + 'upload %(url)s' % {'image_id': image_id, + 'url': url}) + raise RetryableError(error) + + if head_resp.status != httplib.OK: + logging.error("Unexpected response while doing a HEAD call " + "to image %s , url = %s , Response Status: " + "%i" % (image_id, url, head_resp.status)) + + check_resp_status_and_retry(head_resp, image_id, url) + + else: + image_status = head_resp.getheader('x-image-meta-status') + if image_status not in ('queued', ): + err_msg = ('Cannot upload data for image %(image_id)s as the ' + 'image status is %(image_status)s' % + {'image_id': image_id, 'image_status': image_status}) + logging.exception(err_msg) + raise PluginError("Got Permanent Error while uploading image " + "[%s] to glance [%s]. " + "Message: %s" % (image_id, url, + err_msg)) + else: + logging.info('Found image %(image_id)s in status ' + '%(image_status)s. Attempting to ' + 'upload.' % {'image_id': image_id, + 'image_status': image_status}) + + +def validate_image_status_before_upload_v2(conn, url, extra_headers): + try: + parts = urlparse(url) + path = parts[2] + image_id = path.split('/')[-2] + # NOTE(nikhil): Attempt to determine if the Image has a status + # of 'queued'. Because data will continued to be sent to Glance + # until it has a chance to check the Image state, discover that + # it is not 'active' and send back a 409. Hence, the data will be + # unnecessarily buffered by Glance. This wastes time and bandwidth. + # LP bug #1202785 + + conn.request('GET', '/v2/images/%s' % image_id, headers=extra_headers) + get_resp = conn.getresponse() + except Exception, error: # noqa + logging.exception('Failed to GET the image %(image_id)s while ' + 'checking image status before attempting to ' + 'upload %(url)s' % {'image_id': image_id, + 'url': url}) + raise RetryableError(error) + + if get_resp.status != httplib.OK: + logging.error("Unexpected response while doing a GET call " + "to image %s , url = %s , Response Status: " + "%i" % (image_id, url, get_resp.status)) + + check_resp_status_and_retry(get_resp, image_id, url) + + else: + body = json.loads(get_resp.read()) + image_status = body['status'] + if image_status not in ('queued', ): + err_msg = ('Cannot upload data for image %(image_id)s as the ' + 'image status is %(image_status)s' % + {'image_id': image_id, 'image_status': image_status}) + logging.exception(err_msg) + raise PluginError("Got Permanent Error while uploading image " + "[%s] to glance [%s]. " + "Message: %s" % (image_id, url, + err_msg)) + else: + logging.info('Found image %(image_id)s in status ' + '%(image_status)s. Attempting to ' + 'upload.' % {'image_id': image_id, + 'image_status': image_status}) + get_resp.read() + + +def download_vhd2(session, image_id, endpoint, + uuid_stack, sr_path, extra_headers, api_version=1): + # Download an image from Glance v2, unbundle it, and then deposit the + # VHDs into the storage repository. + staging_path = utils.make_staging_area(sr_path) + try: + # Download tarball into staging area and extract it + # TODO(mfedosin): remove this check when v1 is deprecated. + if api_version == 1: + _download_tarball_by_url_v1( + sr_path, staging_path, image_id, + endpoint, extra_headers) + else: + _download_tarball_by_url_v2( + sr_path, staging_path, image_id, + endpoint, extra_headers) + + # Move the VHDs from the staging area into the storage repository + return utils.import_vhds(sr_path, staging_path, uuid_stack) + finally: + utils.cleanup_staging_area(staging_path) + + +def upload_vhd2(session, vdi_uuids, image_id, endpoint, sr_path, + extra_headers, properties, api_version=1): + """Bundle the VHDs comprising an image and then stream them into Glance""" + staging_path = utils.make_staging_area(sr_path) + try: + utils.prepare_staging_area(sr_path, staging_path, vdi_uuids) + # TODO(mfedosin): remove this check when v1 is deprecated. + if api_version == 1: + _upload_tarball_by_url_v1(staging_path, image_id, + endpoint, extra_headers, properties) + else: + _upload_tarball_by_url_v2(staging_path, image_id, + endpoint, extra_headers, properties) + finally: + utils.cleanup_staging_area(staging_path) + + +if __name__ == '__main__': + utils.register_plugin_calls(download_vhd2, upload_vhd2) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/ipxe.py b/os_xenapi/dom0/etc/xapi.d/plugins/ipxe.py new file mode 100644 index 0000000..ee913b1 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/ipxe.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python + +# Copyright (c) 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +"""Inject network configuration into iPXE ISO for boot.""" + +import logging +import os +import shutil + +import utils + +# FIXME(sirp): should this use pluginlib from 5.6? +import dom0_pluginlib +dom0_pluginlib.configure_logging('ipxe') + + +ISOLINUX_CFG = """SAY iPXE ISO boot image +TIMEOUT 30 +DEFAULT ipxe.krn +LABEL ipxe.krn + KERNEL ipxe.krn + INITRD netcfg.ipxe +""" + +NETCFG_IPXE = """#!ipxe +:start +imgfree +ifclose net0 +set net0/ip %(ip_address)s +set net0/netmask %(netmask)s +set net0/gateway %(gateway)s +set dns %(dns)s +ifopen net0 +goto menu + +:menu +chain %(boot_menu_url)s +goto boot + +:boot +sanboot --no-describe --drive 0x80 +""" + + +def _write_file(filename, data): + # If the ISO was tampered with such that the destination is a symlink, + # that could allow a malicious user to write to protected areas of the + # dom0 filesystem. /HT to comstud for pointing this out. + # + # Short-term, checking that the destination is not a symlink should be + # sufficient. + # + # Long-term, we probably want to perform all file manipulations within a + # chroot jail to be extra safe. + if os.path.islink(filename): + raise RuntimeError('SECURITY: Cannot write to symlinked destination') + + logging.debug("Writing to file '%s'" % filename) + f = open(filename, 'w') + try: + f.write(data) + finally: + f.close() + + +def _unbundle_iso(sr_path, filename, path): + logging.debug("Unbundling ISO '%s'" % filename) + read_only_path = utils.make_staging_area(sr_path) + try: + utils.run_command(['mount', '-o', 'loop', filename, read_only_path]) + try: + shutil.copytree(read_only_path, path) + finally: + utils.run_command(['umount', read_only_path]) + finally: + utils.cleanup_staging_area(read_only_path) + + +def _create_iso(mkisofs_cmd, filename, path): + logging.debug("Creating ISO '%s'..." % filename) + orig_dir = os.getcwd() + os.chdir(path) + try: + utils.run_command([mkisofs_cmd, '-quiet', '-l', '-o', filename, + '-c', 'boot.cat', '-b', 'isolinux.bin', + '-no-emul-boot', '-boot-load-size', '4', + '-boot-info-table', '.']) + finally: + os.chdir(orig_dir) + + +def inject(session, sr_path, vdi_uuid, boot_menu_url, ip_address, netmask, + gateway, dns, mkisofs_cmd): + + iso_filename = '%s.img' % os.path.join(sr_path, 'iso', vdi_uuid) + + # Create staging area so we have a unique path but remove it since + # shutil.copytree will recreate it + staging_path = utils.make_staging_area(sr_path) + utils.cleanup_staging_area(staging_path) + + try: + _unbundle_iso(sr_path, iso_filename, staging_path) + + # Write Configs + _write_file(os.path.join(staging_path, 'netcfg.ipxe'), + NETCFG_IPXE % {"ip_address": ip_address, + "netmask": netmask, + "gateway": gateway, + "dns": dns, + "boot_menu_url": boot_menu_url}) + + _write_file(os.path.join(staging_path, 'isolinux.cfg'), + ISOLINUX_CFG) + + _create_iso(mkisofs_cmd, iso_filename, staging_path) + finally: + utils.cleanup_staging_area(staging_path) + + +if __name__ == "__main__": + utils.register_plugin_calls(inject) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/kernel.py b/os_xenapi/dom0/etc/xapi.d/plugins/kernel.py new file mode 100644 index 0000000..925abf2 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/kernel.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +"""Handle the manipulation of kernel images.""" + +import errno +import os +import shutil + +import XenAPIPlugin + +import dom0_pluginlib + + +dom0_pluginlib.configure_logging('kernel') +logging = dom0_pluginlib.logging +exists = dom0_pluginlib.exists +optional = dom0_pluginlib.optional +with_vdi_in_dom0 = dom0_pluginlib.with_vdi_in_dom0 + + +KERNEL_DIR = '/boot/guest' + + +def _copy_vdi(dest, copy_args): + vdi_uuid = copy_args['vdi_uuid'] + vdi_size = copy_args['vdi_size'] + cached_image = copy_args['cached-image'] + + logging.debug("copying kernel/ramdisk file from %s to /boot/guest/%s", + dest, vdi_uuid) + filename = KERNEL_DIR + '/' + vdi_uuid + + # Make sure KERNEL_DIR exists, otherwise create it + if not os.path.isdir(KERNEL_DIR): + logging.debug("Creating directory %s", KERNEL_DIR) + os.makedirs(KERNEL_DIR) + + # Read data from /dev/ and write into a file on /boot/guest + of = open(filename, 'wb') + f = open(dest, 'rb') + + # Copy only vdi_size bytes + data = f.read(vdi_size) + of.write(data) + + if cached_image: + # Create a cache file. If caching is enabled, kernel images do not have + # to be fetched from glance. + cached_image = KERNEL_DIR + '/' + cached_image + logging.debug("copying kernel/ramdisk file from %s to /boot/guest/%s", + dest, cached_image) + cache_file = open(cached_image, 'wb') + cache_file.write(data) + cache_file.close() + logging.debug("Done. Filename: %s", cached_image) + + f.close() + of.close() + logging.debug("Done. Filename: %s", filename) + return filename + + +def copy_vdi(session, args): + vdi = exists(args, 'vdi-ref') + size = exists(args, 'image-size') + cached_image = optional(args, 'cached-image') + + # Use the uuid as a filename + vdi_uuid = session.xenapi.VDI.get_uuid(vdi) + copy_args = {'vdi_uuid': vdi_uuid, + 'vdi_size': int(size), + 'cached-image': cached_image} + + filename = with_vdi_in_dom0(session, vdi, False, + lambda dev: + _copy_vdi('/dev/%s' % dev, copy_args)) + return filename + + +def create_kernel_ramdisk(session, args): + # Creates a copy of the kernel/ramdisk image if it is present in the + # cache. If the image is not present in the cache, it does nothing. + cached_image = exists(args, 'cached-image') + image_uuid = exists(args, 'new-image-uuid') + cached_image_filename = KERNEL_DIR + '/' + cached_image + filename = KERNEL_DIR + '/' + image_uuid + + if os.path.isfile(cached_image_filename): + shutil.copyfile(cached_image_filename, filename) + logging.debug("Done. Filename: %s", filename) + else: + filename = "" + logging.debug("Cached kernel/ramdisk image not found") + return filename + + +def _remove_file(filepath): + try: + os.remove(filepath) + except OSError, exc: # noqa + if exc.errno != errno.ENOENT: + raise + + +def remove_kernel_ramdisk(session, args): + """Removes kernel and/or ramdisk from dom0's file system.""" + kernel_file = optional(args, 'kernel-file') + ramdisk_file = optional(args, 'ramdisk-file') + if kernel_file: + _remove_file(kernel_file) + if ramdisk_file: + _remove_file(ramdisk_file) + return "ok" + + +if __name__ == '__main__': + XenAPIPlugin.dispatch({'copy_vdi': copy_vdi, + 'create_kernel_ramdisk': create_kernel_ramdisk, + 'remove_kernel_ramdisk': remove_kernel_ramdisk}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/migration.py b/os_xenapi/dom0/etc/xapi.d/plugins/migration.py new file mode 100644 index 0000000..5ede92b --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/migration.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python + +# Copyright 2010 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +""" +XenAPI Plugin for transferring data between host nodes +""" +import utils + +import dom0_pluginlib + + +dom0_pluginlib.configure_logging('migration') +logging = dom0_pluginlib.logging + + +def move_vhds_into_sr(session, instance_uuid, sr_path, uuid_stack): + """Moves the VHDs from their copied location to the SR.""" + staging_path = "/images/instance%s" % instance_uuid + imported_vhds = utils.import_vhds(sr_path, staging_path, uuid_stack) + utils.cleanup_staging_area(staging_path) + return imported_vhds + + +def _rsync_vhds(instance_uuid, host, staging_path, user="root"): + if not staging_path.endswith('/'): + staging_path += '/' + + dest_path = '/images/instance%s/' % (instance_uuid) + + ip_cmd = ["/sbin/ip", "addr", "show"] + output = utils.run_command(ip_cmd) + if ' %s/' % host in output: + # If copying to localhost, don't use SSH + rsync_cmd = ["/usr/bin/rsync", "-av", "--progress", + staging_path, dest_path] + else: + ssh_cmd = 'ssh -o StrictHostKeyChecking=no' + rsync_cmd = ["/usr/bin/rsync", "-av", "--progress", "-e", ssh_cmd, + staging_path, '%s@%s:%s' % (user, host, dest_path)] + + # NOTE(hillad): rsync's progress is carriage returned, requiring + # universal_newlines for real-time output. + + rsync_proc = utils.make_subprocess(rsync_cmd, stdout=True, stderr=True, + universal_newlines=True) + while True: + rsync_progress = rsync_proc.stdout.readline() + if not rsync_progress: + break + logging.debug("[%s] %s" % (instance_uuid, rsync_progress)) + + utils.finish_subprocess(rsync_proc, rsync_cmd) + + +def transfer_vhd(session, instance_uuid, host, vdi_uuid, sr_path, seq_num): + """Rsyncs a VHD to an adjacent host.""" + staging_path = utils.make_staging_area(sr_path) + try: + utils.prepare_staging_area(sr_path, staging_path, [vdi_uuid], + seq_num=seq_num) + _rsync_vhds(instance_uuid, host, staging_path) + finally: + utils.cleanup_staging_area(staging_path) + + +if __name__ == '__main__': + utils.register_plugin_calls(move_vhds_into_sr, transfer_vhd) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/netwrap.py b/os_xenapi/dom0/etc/xapi.d/plugins/netwrap.py new file mode 100644 index 0000000..616637e --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/netwrap.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +# Copyright 2012 OpenStack Foundation +# Copyright 2012 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# +# XenAPI plugin for executing network commands (ovs, iptables, etc) on dom0 +# + +import gettext +gettext.install('neutron', unicode=1) +try: + import json +except ImportError: + import simplejson as json +import subprocess + +import XenAPIPlugin + + +ALLOWED_CMDS = [ + 'ip', + 'ipset', + 'iptables-save', + 'iptables-restore', + 'ip6tables-save', + 'ip6tables-restore', + 'sysctl', + # NOTE(yamamoto): of_interface=native doesn't use ovs-ofctl + 'ovs-ofctl', + 'ovs-vsctl', + 'ovsdb-client', + 'conntrack', + ] + + +class PluginError(Exception): + """Base Exception class for all plugin errors.""" + def __init__(self, *args): + Exception.__init__(self, *args) + + +def _run_command(cmd, cmd_input): + """Abstracts out the basics of issuing system commands. + + If the command returns anything in stderr, a PluginError is raised with + that information. Otherwise, the output from stdout is returned + """ + pipe = subprocess.PIPE + proc = subprocess.Popen(cmd, shell=False, stdin=pipe, stdout=pipe, + stderr=pipe, close_fds=True) + (out, err) = proc.communicate(cmd_input) + return proc.returncode, out, err + + +def run_command(session, args): + cmd = json.loads(args.get('cmd')) + if cmd and cmd[0] not in ALLOWED_CMDS: + msg = _("Dom0 execution of '%s' is not permitted") % cmd[0] + raise PluginError(msg) + returncode, out, err = _run_command( + cmd, json.loads(args.get('cmd_input', 'null'))) + if not err: + err = "" + if not out: + out = "" + # This runs in Dom0, will return to neutron-ovs-agent in compute node + result = {'returncode': returncode, + 'out': out, + 'err': err} + return json.dumps(result) + + +if __name__ == "__main__": + XenAPIPlugin.dispatch({"run_command": run_command}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/partition_utils.py b/os_xenapi/dom0/etc/xapi.d/plugins/partition_utils.py new file mode 100644 index 0000000..9a1434b --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/partition_utils.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# Copyright (c) 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +import logging +import os +import time + +import dom0_pluginlib as pluginlib +import utils + +pluginlib.configure_logging("disk_utils") + + +def wait_for_dev(session, dev_path, max_seconds): + for i in range(0, max_seconds): + if os.path.exists(dev_path): + return dev_path + time.sleep(1) + + return "" + + +def make_partition(session, dev, partition_start, partition_end): + dev_path = utils.make_dev_path(dev) + + if partition_end != "-": + raise pluginlib.PluginError("Can only create unbounded partitions") + + utils.run_command(['sfdisk', '-uS', dev_path], + '%s,;\n' % (partition_start)) + + +def _mkfs(fs, path, label): + """Format a file or block device + + :param fs: Filesystem type (only 'swap', 'ext3' supported) + :param path: Path to file or block device to format + :param label: Volume label to use + """ + if fs == 'swap': + args = ['mkswap'] + elif fs == 'ext3': + args = ['mkfs', '-t', fs] + # add -F to force no interactive execute on non-block device. + args.extend(['-F']) + if label: + args.extend(['-L', label]) + else: + raise pluginlib.PluginError("Partition type %s not supported" % fs) + args.append(path) + utils.run_command(args) + + +def mkfs(session, dev, partnum, fs_type, fs_label): + dev_path = utils.make_dev_path(dev) + + out = utils.run_command(['kpartx', '-avspp', dev_path]) + try: + logging.info('kpartx output: %s' % out) + mapperdir = os.path.join('/dev', 'mapper') + dev_base = os.path.basename(dev) + partition_path = os.path.join(mapperdir, "%sp%s" % (dev_base, partnum)) + _mkfs(fs_type, partition_path, fs_label) + finally: + # Always remove partitions otherwise we can't unplug the VBD + utils.run_command(['kpartx', '-dvspp', dev_path]) + +if __name__ == "__main__": + utils.register_plugin_calls(wait_for_dev, + make_partition, + mkfs) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/utils.py b/os_xenapi/dom0/etc/xapi.d/plugins/utils.py new file mode 100644 index 0000000..fe52ede --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/utils.py @@ -0,0 +1,518 @@ +# Copyright (c) 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +"""Various utilities used by XenServer plugins.""" + +try: + import cPickle as pickle +except ImportError: + import pickle + +import errno +import logging +import os +import shutil +import signal +import subprocess +import tempfile + +import XenAPIPlugin + +LOG = logging.getLogger(__name__) +CHUNK_SIZE = 8192 + + +class CommandNotFound(Exception): + pass + + +def delete_if_exists(path): + try: + os.unlink(path) + except OSError, e: # noqa + if e.errno == errno.ENOENT: + LOG.warning("'%s' was already deleted, skipping delete", path) + else: + raise + + +def _link(src, dst): + LOG.info("Hard-linking file '%s' -> '%s'", src, dst) + os.link(src, dst) + + +def _rename(src, dst): + LOG.info("Renaming file '%s' -> '%s'", src, dst) + try: + os.rename(src, dst) + except OSError, e: # noqa + if e.errno == errno.EXDEV: + LOG.error("Invalid cross-device link. Perhaps %s and %s should " + "be symlinked on the same filesystem?", src, dst) + raise + + +def make_subprocess(cmdline, stdout=False, stderr=False, stdin=False, + universal_newlines=False, close_fds=True, env=None): + """Make a subprocess according to the given command-line string""" + LOG.info("Running cmd '%s'", " ".join(cmdline)) + kwargs = {} + kwargs['stdout'] = stdout and subprocess.PIPE or None + kwargs['stderr'] = stderr and subprocess.PIPE or None + kwargs['stdin'] = stdin and subprocess.PIPE or None + kwargs['universal_newlines'] = universal_newlines + kwargs['close_fds'] = close_fds + kwargs['env'] = env + try: + proc = subprocess.Popen(cmdline, **kwargs) + except OSError, e: # noqa + if e.errno == errno.ENOENT: + raise CommandNotFound + else: + raise + return proc + + +class SubprocessException(Exception): + def __init__(self, cmdline, ret, out, err): + Exception.__init__(self, "'%s' returned non-zero exit code: " + "retcode=%i, out='%s', stderr='%s'" + % (cmdline, ret, out, err)) + self.cmdline = cmdline + self.ret = ret + self.out = out + self.err = err + + +def finish_subprocess(proc, cmdline, cmd_input=None, ok_exit_codes=None): + """Ensure that the process returned a zero exit code indicating success""" + if ok_exit_codes is None: + ok_exit_codes = [0] + out, err = proc.communicate(cmd_input) + + ret = proc.returncode + if ret not in ok_exit_codes: + LOG.error("Command '%(cmdline)s' with process id '%(pid)s' expected " + "return code in '%(ok)s' but got '%(rc)s': %(err)s", + {'cmdline': cmdline, 'pid': proc.pid, 'ok': ok_exit_codes, + 'rc': ret, 'err': err}) + raise SubprocessException(' '.join(cmdline), ret, out, err) + return out + + +def run_command(cmd, cmd_input=None, ok_exit_codes=None): + """Abstracts out the basics of issuing system commands. + + If the command returns anything in stderr, an exception is raised with + that information. Otherwise, the output from stdout is returned. + + cmd_input is passed to the process on standard input. + """ + proc = make_subprocess(cmd, stdout=True, stderr=True, stdin=True, + close_fds=True) + return finish_subprocess(proc, cmd, cmd_input=cmd_input, + ok_exit_codes=ok_exit_codes) + + +def try_kill_process(proc): + """Sends the given process the SIGKILL signal.""" + pid = proc.pid + LOG.info("Killing process %s", pid) + try: + os.kill(pid, signal.SIGKILL) + except Exception: + LOG.exception("Failed to kill %s", pid) + + +def make_staging_area(sr_path): + """The staging area is a place we temporarily store and manipulate VHDs. + + The use of the staging area is different for upload and download: + + Download + ======== + + When we download the tarball, the VHDs contained within will have names + like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before + moving them into the SR. However, since 'image.vhd' may be a base_copy, we + need to link it to 'snap.vhd' (using vhd-util modify) before moving both + into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted). + The staging area gives us a place to perform these operations before they + are moved to the SR, scanned, and then registered with XenServer. + + Upload + ====== + + On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd' + in the case of the snapshot VHD, and 'image.vhd' in the case of the + base_copy. The staging area provides a directory in which we can create + hard-links to rename the VHDs without affecting what's in the SR. + + NOTE + ==== + + The staging area is created as a subdirectory within the SR in order to + guarantee that it resides within the same filesystem and therefore permit + hard-linking and cheap file moves. + """ + staging_path = tempfile.mkdtemp(dir=sr_path) + return staging_path + + +def cleanup_staging_area(staging_path): + """Remove staging area directory + + On upload, the staging area contains hard-links to the VHDs in the SR; + it's safe to remove the staging-area because the SR will keep the link + count > 0 (so the VHDs in the SR will not be deleted). + """ + if os.path.exists(staging_path): + shutil.rmtree(staging_path) + + +def _handle_old_style_images(staging_path): + """Rename files to conform to new image format, if needed. + + Old-Style: + + snap.vhd -> image.vhd -> base.vhd + + New-Style: + + 0.vhd -> 1.vhd -> ... (n-1).vhd + + The New-Style format has the benefit of being able to support a VDI chain + of arbitrary length. + """ + file_num = 0 + for filename in ('snap.vhd', 'image.vhd', 'base.vhd'): + path = os.path.join(staging_path, filename) + if os.path.exists(path): + _rename(path, os.path.join(staging_path, "%d.vhd" % file_num)) + file_num += 1 + + # Rename any format of name to 0.vhd when there is only single one + contents = os.listdir(staging_path) + if len(contents) == 1: + filename = contents[0] + if filename != '0.vhd' and filename.endswith('.vhd'): + _rename( + os.path.join(staging_path, filename), + os.path.join(staging_path, '0.vhd')) + + +def _assert_vhd_not_hidden(path): + """Sanity check to ensure that only appropriate VHDs are marked as hidden. + + If this flag is incorrectly set, then when we move the VHD into the SR, it + will be deleted out from under us. + """ + query_cmd = ["vhd-util", "query", "-n", path, "-f"] + out = run_command(query_cmd) + + for line in out.splitlines(): + if line.lower().startswith('hidden'): + value = line.split(':')[1].strip() + if value == "1": + raise Exception( + "VHD %s is marked as hidden without child" % path) + + +def _vhd_util_check(vdi_path): + check_cmd = ["vhd-util", "check", "-n", vdi_path, "-p"] + out = run_command(check_cmd, ok_exit_codes=[0, 22]) + first_line = out.splitlines()[0].strip() + return out, first_line + + +def _validate_vhd(vdi_path): + """This checks for several errors in the VHD structure. + + Most notably, it checks that the timestamp in the footer is correct, but + may pick up other errors also. + + This check ensures that the timestamps listed in the VHD footer aren't in + the future. This can occur during a migration if the clocks on the two + Dom0's are out-of-sync. This would corrupt the SR if it were imported, so + generate an exception to bail. + """ + out, first_line = _vhd_util_check(vdi_path) + + if 'invalid' in first_line: + LOG.warning("VHD invalid, attempting repair.") + repair_cmd = ["vhd-util", "repair", "-n", vdi_path] + run_command(repair_cmd) + out, first_line = _vhd_util_check(vdi_path) + + if 'invalid' in first_line: + if 'footer' in first_line: + part = 'footer' + elif 'header' in first_line: + part = 'header' + else: + part = 'setting' + + details = first_line.split(':', 1) + if len(details) == 2: + details = details[1] + else: + details = first_line + + extra = '' + if 'timestamp' in first_line: + extra = (" ensure source and destination host machines have " + "time set correctly") + + LOG.info("VDI Error details: %s", out) + + raise Exception( + "VDI '%(vdi_path)s' has an invalid %(part)s: '%(details)s'" + "%(extra)s" % {'vdi_path': vdi_path, 'part': part, + 'details': details, 'extra': extra}) + + LOG.info("VDI is valid: %s", vdi_path) + + +def _validate_vdi_chain(vdi_path): + """Check VDI chain + + This check ensures that the parent pointers on the VHDs are valid + before we move the VDI chain to the SR. This is *very* important + because a bad parent pointer will corrupt the SR causing a cascade of + failures. + """ + def get_parent_path(path): + query_cmd = ["vhd-util", "query", "-n", path, "-p"] + out = run_command(query_cmd, ok_exit_codes=[0, 22]) + first_line = out.splitlines()[0].strip() + + if first_line.endswith(".vhd"): + return first_line + elif 'has no parent' in first_line: + return None + elif 'query failed' in first_line: + raise Exception("VDI '%s' not present which breaks" + " the VDI chain, bailing out" % path) + else: + raise Exception("Unexpected output '%s' from vhd-util" % out) + + cur_path = vdi_path + while cur_path: + _validate_vhd(cur_path) + cur_path = get_parent_path(cur_path) + + +def _validate_sequenced_vhds(staging_path): + # This check ensures that the VHDs in the staging area are sequenced + # properly from 0 to n-1 with no gaps. + seq_num = 0 + filenames = os.listdir(staging_path) + for filename in filenames: + if not filename.endswith('.vhd'): + continue + + # Ignore legacy swap embedded in the image, generated on-the-fly now + if filename == "swap.vhd": + continue + + vhd_path = os.path.join(staging_path, "%d.vhd" % seq_num) + if not os.path.exists(vhd_path): + raise Exception("Corrupt image. Expected seq number: %d. Files: %s" + % (seq_num, filenames)) + + seq_num += 1 + + +def import_vhds(sr_path, staging_path, uuid_stack): + """Move VHDs from staging area into the SR. + + The staging area is necessary because we need to perform some fixups + (assigning UUIDs, relinking the VHD chain) before moving into the SR, + otherwise the SR manager process could potentially delete the VHDs out from + under us. + + Returns: A dict of imported VHDs: + + {'root': {'uuid': 'ffff-aaaa'}} + """ + _handle_old_style_images(staging_path) + _validate_sequenced_vhds(staging_path) + + files_to_move = [] + + # Collect sequenced VHDs and assign UUIDs to them + seq_num = 0 + while True: + orig_vhd_path = os.path.join(staging_path, "%d.vhd" % seq_num) + if not os.path.exists(orig_vhd_path): + break + + # Rename (0, 1 .. N).vhd -> aaaa-bbbb-cccc-dddd.vhd + vhd_uuid = uuid_stack.pop() + vhd_path = os.path.join(staging_path, "%s.vhd" % vhd_uuid) + _rename(orig_vhd_path, vhd_path) + + if seq_num == 0: + leaf_vhd_path = vhd_path + leaf_vhd_uuid = vhd_uuid + + files_to_move.append(vhd_path) + seq_num += 1 + + # Re-link VHDs, in reverse order, from base-copy -> leaf + parent_path = None + for vhd_path in reversed(files_to_move): + if parent_path: + # Link to parent + modify_cmd = ["vhd-util", "modify", "-n", vhd_path, + "-p", parent_path] + run_command(modify_cmd) + + parent_path = vhd_path + + # Sanity check the leaf VHD + _assert_vhd_not_hidden(leaf_vhd_path) + _validate_vdi_chain(leaf_vhd_path) + + # Move files into SR + for orig_path in files_to_move: + new_path = os.path.join(sr_path, os.path.basename(orig_path)) + _rename(orig_path, new_path) + + imported_vhds = dict(root=dict(uuid=leaf_vhd_uuid)) + return imported_vhds + + +def prepare_staging_area(sr_path, staging_path, vdi_uuids, seq_num=0): + """Hard-link VHDs into staging area.""" + for vdi_uuid in vdi_uuids: + source = os.path.join(sr_path, "%s.vhd" % vdi_uuid) + link_name = os.path.join(staging_path, "%d.vhd" % seq_num) + _link(source, link_name) + seq_num += 1 + + +def create_tarball(fileobj, path, callback=None, compression_level=None): + """Create a tarball from a given path. + + :param fileobj: a file-like object holding the tarball byte-stream. + If None, then only the callback will be used. + :param path: path to create tarball from + :param callback: optional callback to call on each chunk written + :param compression_level: compression level, e.g., 9 for gzip -9. + """ + tar_cmd = ["tar", "-zc", "--directory=%s" % path, "."] + env = os.environ.copy() + if compression_level and 1 <= compression_level <= 9: + env["GZIP"] = "-%d" % compression_level + tar_proc = make_subprocess(tar_cmd, stdout=True, stderr=True, env=env) + + try: + while True: + chunk = tar_proc.stdout.read(CHUNK_SIZE) + if chunk == '': + break + + if callback: + callback(chunk) + + if fileobj: + fileobj.write(chunk) + except Exception: + try_kill_process(tar_proc) + raise + + finish_subprocess(tar_proc, tar_cmd) + + +def extract_tarball(fileobj, path, callback=None): + """Extract a tarball to a given path. + + :param fileobj: a file-like object holding the tarball byte-stream + :param path: path to extract tarball into + :param callback: optional callback to call on each chunk read + """ + tar_cmd = ["tar", "-zx", "--directory=%s" % path] + tar_proc = make_subprocess(tar_cmd, stderr=True, stdin=True) + + try: + while True: + chunk = fileobj.read(CHUNK_SIZE) + if chunk == '': + break + + if callback: + callback(chunk) + + tar_proc.stdin.write(chunk) + + # NOTE(tpownall): If we do not poll for the tar process exit + # code when tar has exited pre maturely there is the chance + # that tar will become a defunct zombie child under glance plugin + # and re parented under init forever waiting on the stdin pipe to + # close. Polling for the exit code allows us to break the pipe. + returncode = tar_proc.poll() + tar_pid = tar_proc.pid + if returncode is not None: + LOG.error("tar extract with process id '%(pid)s' " + "exited early with '%(rc)s'", + {'pid': tar_pid, 'rc': returncode}) + raise SubprocessException( + ' '.join(tar_cmd), returncode, "", "") + + except SubprocessException: + # no need to kill already dead process + raise + except Exception: + LOG.exception("Failed while sending data to tar pid: %s", tar_pid) + try_kill_process(tar_proc) + raise + + finish_subprocess(tar_proc, tar_cmd) + + +def make_dev_path(dev, partition=None, base='/dev'): + """Return a path to a particular device. + + >>> make_dev_path('xvdc') + /dev/xvdc + + >>> make_dev_path('xvdc', 1) + /dev/xvdc1 + """ + path = os.path.join(base, dev) + if partition: + path += str(partition) + return path + + +def _handle_serialization(func): + def wrapped(session, params): + params = pickle.loads(params['params']) + rv = func(session, *params['args'], **params['kwargs']) + return pickle.dumps(rv) + return wrapped + + +def register_plugin_calls(*funcs): + """Wrapper around XenAPIPlugin.dispatch which handles pickle serialization. + + """ + wrapped_dict = {} + for func in funcs: + wrapped_dict[func.__name__] = _handle_serialization(func) + XenAPIPlugin.dispatch(wrapped_dict) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/workarounds.py b/os_xenapi/dom0/etc/xapi.d/plugins/workarounds.py new file mode 100644 index 0000000..07c550e --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/workarounds.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +"""Handle the uploading and downloading of images via Glance.""" + +import os +import shutil + +import utils + +import dom0_pluginlib + + +dom0_pluginlib.configure_logging('workarounds') + + +def _copy_vdis(sr_path, staging_path, vdi_uuids): + seq_num = 0 + for vdi_uuid in vdi_uuids: + src = os.path.join(sr_path, "%s.vhd" % vdi_uuid) + dst = os.path.join(staging_path, "%d.vhd" % seq_num) + shutil.copyfile(src, dst) + seq_num += 1 + + +def safe_copy_vdis(session, sr_path, vdi_uuids, uuid_stack): + staging_path = utils.make_staging_area(sr_path) + try: + _copy_vdis(sr_path, staging_path, vdi_uuids) + return utils.import_vhds(sr_path, staging_path, uuid_stack) + finally: + utils.cleanup_staging_area(staging_path) + + +if __name__ == '__main__': + utils.register_plugin_calls(safe_copy_vdis) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/xenhost.py b/os_xenapi/dom0/etc/xapi.d/plugins/xenhost.py new file mode 100644 index 0000000..2af160a --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/xenhost.py @@ -0,0 +1,622 @@ +#!/usr/bin/env python + +# Copyright 2011 OpenStack Foundation +# Copyright 2011 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true + +# +# XenAPI plugin for host operations +# + +try: + import json +except ImportError: + import simplejson as json +import logging +import re +import sys +import time + +import utils + +import dom0_pluginlib as pluginlib +import XenAPI +import XenAPIPlugin + +try: + import xmlrpclib +except ImportError: + import six.moves.xmlrpc_client as xmlrpclib + + +pluginlib.configure_logging("xenhost") + + +host_data_pattern = re.compile(r"\s*(\S+) \([^\)]+\) *: ?(.*)") +config_file_path = "/usr/etc/xenhost.conf" +DEFAULT_TRIES = 23 +DEFAULT_SLEEP = 10 + + +def jsonify(fnc): + def wrapper(*args, **kwargs): + return json.dumps(fnc(*args, **kwargs)) + return wrapper + + +class TimeoutError(StandardError): + pass + + +def _run_command(cmd, cmd_input=None): + """Wrap utils.run_command to raise PluginError on failure""" + try: + return utils.run_command(cmd, cmd_input=cmd_input) + except utils.SubprocessException, e: # noqa + raise pluginlib.PluginError(e.err) + + +def _resume_compute(session, compute_ref, compute_uuid): + """Resume compute node on slave host after pool join. + + This has to happen regardless of the success or failure of the join + operation. + """ + try: + # session is valid if the join operation has failed + session.xenapi.VM.start(compute_ref, False, True) + except XenAPI.Failure: + # if session is invalid, e.g. xapi has restarted, then the pool + # join has been successful, wait for xapi to become alive again + for c in range(0, DEFAULT_TRIES): + try: + _run_command(["xe", "vm-start", "uuid=%s" % compute_uuid]) + return + except pluginlib.PluginError: + logging.exception('Waited %d seconds for the slave to ' + 'become available.' % (c * DEFAULT_SLEEP)) + time.sleep(DEFAULT_SLEEP) + raise pluginlib.PluginError('Unrecoverable error: the host has ' + 'not come back for more than %d seconds' + % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1))) + + +@jsonify +def set_host_enabled(self, arg_dict): + """Sets this host's ability to accept new instances. + + It will otherwise continue to operate normally. + """ + enabled = arg_dict.get("enabled") + if enabled is None: + raise pluginlib.PluginError( + "Missing 'enabled' argument to set_host_enabled") + + host_uuid = arg_dict['host_uuid'] + if enabled == "true": + result = _run_command(["xe", "host-enable", "uuid=%s" % host_uuid]) + elif enabled == "false": + result = _run_command(["xe", "host-disable", "uuid=%s" % host_uuid]) + else: + raise pluginlib.PluginError("Illegal enabled status: %s") % enabled + # Should be empty string + if result: + raise pluginlib.PluginError(result) + # Return the current enabled status + cmd = ["xe", "host-param-get", "uuid=%s" % host_uuid, "param-name=enabled"] + host_enabled = _run_command(cmd) + if host_enabled == "true": + status = "enabled" + else: + status = "disabled" + return {"status": status} + + +def _write_config_dict(dct): + conf_file = file(config_file_path, "w") + json.dump(dct, conf_file) + conf_file.close() + + +def _get_config_dict(): + """Returns a dict containing the key/values in the config file. + + If the file doesn't exist, it is created, and an empty dict + is returned. + """ + try: + conf_file = file(config_file_path) + config_dct = json.load(conf_file) + conf_file.close() + except IOError: + # File doesn't exist + config_dct = {} + # Create the file + _write_config_dict(config_dct) + return config_dct + + +@jsonify +def get_config(self, arg_dict): + """Return the value stored for the specified key, or None if no match.""" + conf = _get_config_dict() + params = arg_dict["params"] + try: + dct = json.loads(params) + except Exception: + dct = params + key = dct["key"] + ret = conf.get(key) + if ret is None: + # Can't jsonify None + return "None" + return ret + + +@jsonify +def set_config(self, arg_dict): + """Write the specified key/value pair, overwriting any existing value.""" + conf = _get_config_dict() + params = arg_dict["params"] + try: + dct = json.loads(params) + except Exception: + dct = params + key = dct["key"] + val = dct["value"] + if val is None: + # Delete the key, if present + conf.pop(key, None) + else: + conf.update({key: val}) + _write_config_dict(conf) + + +def iptables_config(session, args): + # command should be either save or restore + logging.debug("iptables_config:enter") + logging.debug("iptables_config: args=%s", args) + cmd_args = pluginlib.exists(args, 'cmd_args') + logging.debug("iptables_config: cmd_args=%s", cmd_args) + process_input = pluginlib.optional(args, 'process_input') + logging.debug("iptables_config: process_input=%s", process_input) + cmd = json.loads(cmd_args) + cmd = map(str, cmd) + + # either execute iptable-save or iptables-restore + # command must be only one of these two + # process_input must be used only with iptables-restore + if len(cmd) > 0 and cmd[0] in ('iptables-save', + 'iptables-restore', + 'ip6tables-save', + 'ip6tables-restore'): + result = _run_command(cmd, process_input) + ret_str = json.dumps(dict(out=result, err='')) + logging.debug("iptables_config:exit") + return ret_str + # else don't do anything and return an error + else: + raise pluginlib.PluginError("Invalid iptables command") + + +def _ovs_add_patch_port(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + port_name = pluginlib.exists(args, 'port_name') + peer_port_name = pluginlib.exists(args, 'peer_port_name') + cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port', + port_name, '--', 'add-port', bridge_name, port_name, + '--', 'set', 'interface', port_name, + 'type=patch', 'options:peer=%s' % peer_port_name] + return _run_command(cmd_args) + + +def _ovs_del_port(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + port_name = pluginlib.exists(args, 'port_name') + cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port', + bridge_name, port_name] + return _run_command(cmd_args) + + +def _ovs_del_br(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + cmd_args = ['ovs-vsctl', '--', '--if-exists', + 'del-br', bridge_name] + return _run_command(cmd_args) + + +def _ovs_set_if_external_id(args): + interface = pluginlib.exists(args, 'interface') + extneral_id = pluginlib.exists(args, 'extneral_id') + value = pluginlib.exists(args, 'value') + cmd_args = ['ovs-vsctl', 'set', 'Interface', interface, + 'external-ids:%s=%s' % (extneral_id, value)] + return _run_command(cmd_args) + + +def _ovs_add_port(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + port_name = pluginlib.exists(args, 'port_name') + cmd_args = ['ovs-vsctl', '--', '--if-exists', 'del-port', port_name, + '--', 'add-port', bridge_name, port_name] + return _run_command(cmd_args) + + +def _ip_link_get_dev(args): + device_name = pluginlib.exists(args, 'device_name') + cmd_args = ['ip', 'link', 'show', device_name] + return _run_command(cmd_args) + + +def _ip_link_del_dev(args): + device_name = pluginlib.exists(args, 'device_name') + cmd_args = ['ip', 'link', 'delete', device_name] + return _run_command(cmd_args) + + +def _ip_link_add_veth_pair(args): + dev1_name = pluginlib.exists(args, 'dev1_name') + dev2_name = pluginlib.exists(args, 'dev2_name') + cmd_args = ['ip', 'link', 'add', dev1_name, 'type', 'veth', 'peer', + 'name', dev2_name] + return _run_command(cmd_args) + + +def _ip_link_set_dev(args): + device_name = pluginlib.exists(args, 'device_name') + option = pluginlib.exists(args, 'option') + cmd_args = ['ip', 'link', 'set', device_name, option] + return _run_command(cmd_args) + + +def _ip_link_set_promisc(args): + device_name = pluginlib.exists(args, 'device_name') + option = pluginlib.exists(args, 'option') + cmd_args = ['ip', 'link', 'set', device_name, 'promisc', option] + return _run_command(cmd_args) + + +def _brctl_add_br(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + cmd_args = ['brctl', 'addbr', bridge_name] + return _run_command(cmd_args) + + +def _brctl_del_br(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + cmd_args = ['brctl', 'delbr', bridge_name] + return _run_command(cmd_args) + + +def _brctl_set_fd(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + fd = pluginlib.exists(args, 'fd') + cmd_args = ['brctl', 'setfd', bridge_name, fd] + return _run_command(cmd_args) + + +def _brctl_set_stp(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + option = pluginlib.exists(args, 'option') + cmd_args = ['brctl', 'stp', bridge_name, option] + return _run_command(cmd_args) + + +def _brctl_add_if(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + if_name = pluginlib.exists(args, 'interface_name') + cmd_args = ['brctl', 'addif', bridge_name, if_name] + return _run_command(cmd_args) + + +def _brctl_del_if(args): + bridge_name = pluginlib.exists(args, 'bridge_name') + if_name = pluginlib.exists(args, 'interface_name') + cmd_args = ['brctl', 'delif', bridge_name, if_name] + return _run_command(cmd_args) + + +ALLOWED_NETWORK_CMDS = { + # allowed cmds to config OVS bridge + 'ovs_add_patch_port': _ovs_add_patch_port, + 'ovs_add_port': _ovs_add_port, + 'ovs_del_port': _ovs_del_port, + 'ovs_del_br': _ovs_del_br, + 'ovs_set_if_external_id': _ovs_set_if_external_id, + 'ip_link_add_veth_pair': _ip_link_add_veth_pair, + 'ip_link_del_dev': _ip_link_del_dev, + 'ip_link_get_dev': _ip_link_get_dev, + 'ip_link_set_dev': _ip_link_set_dev, + 'ip_link_set_promisc': _ip_link_set_promisc, + 'brctl_add_br': _brctl_add_br, + 'brctl_add_if': _brctl_add_if, + 'brctl_del_br': _brctl_del_br, + 'brctl_del_if': _brctl_del_if, + 'brctl_set_fd': _brctl_set_fd, + 'brctl_set_stp': _brctl_set_stp + } + + +def network_config(session, args): + """network config functions""" + cmd = pluginlib.exists(args, 'cmd') + if not isinstance(cmd, basestring): + msg = "invalid command '%s'" % str(cmd) + raise pluginlib.PluginError(msg) + return + if cmd not in ALLOWED_NETWORK_CMDS: + msg = "Dom0 execution of '%s' is not permitted" % cmd + raise pluginlib.PluginError(msg) + return + cmd_args = pluginlib.exists(args, 'args') + return ALLOWED_NETWORK_CMDS[cmd](cmd_args) + + +def _power_action(action, arg_dict): + # Host must be disabled first + host_uuid = arg_dict['host_uuid'] + result = _run_command(["xe", "host-disable", "uuid=%s" % host_uuid]) + if result: + raise pluginlib.PluginError(result) + # All running VMs must be shutdown + result = _run_command(["xe", "vm-shutdown", "--multiple", + "resident-on=%s" % host_uuid]) + if result: + raise pluginlib.PluginError(result) + cmds = {"reboot": "host-reboot", + "startup": "host-power-on", + "shutdown": "host-shutdown"} + result = _run_command(["xe", cmds[action], "uuid=%s" % host_uuid]) + # Should be empty string + if result: + raise pluginlib.PluginError(result) + return {"power_action": action} + + +@jsonify +def host_reboot(self, arg_dict): + """Reboots the host.""" + return _power_action("reboot", arg_dict) + + +@jsonify +def host_shutdown(self, arg_dict): + """Reboots the host.""" + return _power_action("shutdown", arg_dict) + + +@jsonify +def host_start(self, arg_dict): + """Starts the host. + + Currently not feasible, since the host runs on the same machine as + Xen. + """ + return _power_action("startup", arg_dict) + + +@jsonify +def host_join(self, arg_dict): + """Join a remote host into a pool. + + The pool's master is the host where the plugin is called from. The + following constraints apply: + + - The host must have no VMs running, except nova-compute, which + will be shut down (and restarted upon pool-join) automatically, + - The host must have no shared storage currently set up, + - The host must have the same license of the master, + - The host must have the same supplemental packs as the master. + """ + session = XenAPI.Session(arg_dict.get("url")) + session.login_with_password(arg_dict.get("user"), + arg_dict.get("password")) + compute_ref = session.xenapi.VM.get_by_uuid(arg_dict.get('compute_uuid')) + session.xenapi.VM.clean_shutdown(compute_ref) + try: + if arg_dict.get("force"): + session.xenapi.pool.join(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + else: + session.xenapi.pool.join_force(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + finally: + _resume_compute(session, compute_ref, arg_dict.get("compute_uuid")) + + +@jsonify +def host_data(self, arg_dict): + # Runs the commands on the xenstore host to return the current status + # information. + host_uuid = arg_dict['host_uuid'] + resp = _run_command(["xe", "host-param-list", "uuid=%s" % host_uuid]) + parsed_data = parse_response(resp) + # We have the raw dict of values. Extract those that we need, + # and convert the data types as needed. + ret_dict = cleanup(parsed_data) + # Add any config settings + config = _get_config_dict() + ret_dict.update(config) + return ret_dict + + +def parse_response(resp): + data = {} + for ln in resp.splitlines(): + if not ln: + continue + mtch = host_data_pattern.match(ln.strip()) + try: + k, v = mtch.groups() + data[k] = v + except AttributeError: + # Not a valid line; skip it + continue + return data + + +@jsonify +def host_uptime(self, arg_dict): + """Returns the result of the uptime command on the xenhost.""" + return {"uptime": _run_command(['uptime'])} + + +def cleanup(dct): + # Take the raw KV pairs returned and translate them into the + # appropriate types, discarding any we don't need. + def safe_int(val): + # Integer values will either be string versions of numbers, + # or empty strings. Convert the latter to nulls. + try: + return int(val) + except ValueError: + return None + + def strip_kv(ln): + return [val.strip() for val in ln.split(":", 1)] + + out = {} + +# sbs = dct.get("supported-bootloaders", "") +# out["host_supported-bootloaders"] = sbs.split("; ") +# out["host_suspend-image-sr-uuid"] = dct.get("suspend-image-sr-uuid", "") +# out["host_crash-dump-sr-uuid"] = dct.get("crash-dump-sr-uuid", "") +# out["host_local-cache-sr"] = dct.get("local-cache-sr", "") + out["enabled"] = dct.get("enabled", "true") == "true" + out["host_memory"] = omm = {} + omm["total"] = safe_int(dct.get("memory-total", "")) + omm["overhead"] = safe_int(dct.get("memory-overhead", "")) + omm["free"] = safe_int(dct.get("memory-free", "")) + omm["free-computed"] = safe_int(dct.get("memory-free-computed", "")) + +# out["host_API-version"] = avv = {} +# avv["vendor"] = dct.get("API-version-vendor", "") +# avv["major"] = safe_int(dct.get("API-version-major", "")) +# avv["minor"] = safe_int(dct.get("API-version-minor", "")) + + out["enabled"] = dct.get("enabled", True) + out["host_uuid"] = dct.get("uuid", None) + out["host_name-label"] = dct.get("name-label", "") + out["host_name-description"] = dct.get("name-description", "") +# out["host_host-metrics-live"] = dct.get( +# "host-metrics-live", "false") == "true" + out["host_hostname"] = dct.get("hostname", "") + out["host_ip_address"] = dct.get("address", "") + oc = dct.get("other-config", "") + out["host_other-config"] = ocd = {} + if oc: + for oc_fld in oc.split("; "): + ock, ocv = strip_kv(oc_fld) + ocd[ock] = ocv + + capabilities = dct.get("capabilities", "") + out["host_capabilities"] = capabilities.replace(";", "").split() +# out["host_allowed-operations"] = dct.get( +# "allowed-operations", "").split("; ") +# lsrv = dct.get("license-server", "") +# out["host_license-server"] = ols = {} +# if lsrv: +# for lspart in lsrv.split("; "): +# lsk, lsv = lspart.split(": ") +# if lsk == "port": +# ols[lsk] = safe_int(lsv) +# else: +# ols[lsk] = lsv +# sv = dct.get("software-version", "") +# out["host_software-version"] = osv = {} +# if sv: +# for svln in sv.split("; "): +# svk, svv = strip_kv(svln) +# osv[svk] = svv + cpuinf = dct.get("cpu_info", "") + out["host_cpu_info"] = ocp = {} + if cpuinf: + for cpln in cpuinf.split("; "): + cpk, cpv = strip_kv(cpln) + if cpk in ("cpu_count", "family", "model", "stepping"): + ocp[cpk] = safe_int(cpv) + else: + ocp[cpk] = cpv +# out["host_edition"] = dct.get("edition", "") +# out["host_external-auth-service-name"] = dct.get( +# "external-auth-service-name", "") + return out + + +def query_gc(session, sr_uuid, vdi_uuid): + result = _run_command(["/opt/xensource/sm/cleanup.py", + "-q", "-u", sr_uuid]) + # Example output: "Currently running: True" + return result[19:].strip() == "True" + + +def get_pci_device_details(session): + """Returns a string that is a list of pci devices with details. + + This string is obtained by running the command lspci. With -vmm option, + it dumps PCI device data in machine readable form. This verbose format + display a sequence of records separated by a blank line. We will also + use option "-n" to get vendor_id and device_id as numeric values and + the "-k" option to get the kernel driver used if any. + """ + return _run_command(["lspci", "-vmmnk"]) + + +def get_pci_type(session, pci_device): + """Returns the type of the PCI device (type-PCI, type-VF or type-PF). + + pci-device -- The address of the pci device + """ + # We need to add the domain if it is missing + if pci_device.count(':') == 1: + pci_device = "0000:" + pci_device + output = _run_command(["ls", "/sys/bus/pci/devices/" + pci_device + "/"]) + + if "physfn" in output: + return "type-VF" + if "virtfn" in output: + return "type-PF" + return "type-PCI" + + +if __name__ == "__main__": + # Support both serialized and non-serialized plugin approaches + _, methodname = xmlrpclib.loads(sys.argv[1]) + if methodname in ['query_gc', 'get_pci_device_details', 'get_pci_type', + 'network_config']: + utils.register_plugin_calls(query_gc, + get_pci_device_details, + get_pci_type, + network_config) + + XenAPIPlugin.dispatch( + {"host_data": host_data, + "set_host_enabled": set_host_enabled, + "host_shutdown": host_shutdown, + "host_reboot": host_reboot, + "host_start": host_start, + "host_join": host_join, + "get_config": get_config, + "set_config": set_config, + "iptables_config": iptables_config, + "host_uptime": host_uptime}) diff --git a/os_xenapi/dom0/etc/xapi.d/plugins/xenstore.py b/os_xenapi/dom0/etc/xapi.d/plugins/xenstore.py new file mode 100644 index 0000000..40cf3e9 --- /dev/null +++ b/os_xenapi/dom0/etc/xapi.d/plugins/xenstore.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python + +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack Foundation +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace +# which means the Nova xenapi plugins must use only Python 2.4 features + +# +# XenAPI plugin for reading/writing information to xenstore +# + +try: + import json +except ImportError: + import simplejson as json + +import utils # noqa + +import XenAPIPlugin # noqa + +import dom0_pluginlib as pluginlib # noqa +pluginlib.configure_logging("xenstore") + + +class XenstoreError(pluginlib.PluginError): + """Errors that occur when calling xenstore-* through subprocesses.""" + + def __init__(self, cmd, return_code, stderr, stdout): + msg = "cmd: %s; returncode: %d; stderr: %s; stdout: %s" + msg = msg % (cmd, return_code, stderr, stdout) + self.cmd = cmd + self.return_code = return_code + self.stderr = stderr + self.stdout = stdout + pluginlib.PluginError.__init__(self, msg) + + +def jsonify(fnc): + def wrapper(*args, **kwargs): + ret = fnc(*args, **kwargs) + try: + json.loads(ret) + except ValueError: + # Value should already be JSON-encoded, but some operations + # may write raw sting values; this will catch those and + # properly encode them. + ret = json.dumps(ret) + return ret + return wrapper + + +def record_exists(arg_dict): + """Returns whether or not the given record exists. + + The record path is determined from the given path and dom_id in the + arg_dict. + """ + cmd = ["xenstore-exists", "/local/domain/%(dom_id)s/%(path)s" % arg_dict] + try: + _run_command(cmd) + return True + except XenstoreError, e: # noqa + if e.stderr == '': + # if stderr was empty, this just means the path did not exist + return False + # otherwise there was a real problem + raise + + +@jsonify +def read_record(self, arg_dict): + """Returns the value stored at the given path for the given dom_id. + + These must be encoded as key/value pairs in arg_dict. You can + optionally include a key 'ignore_missing_path'; if this is present + and boolean True, attempting to read a non-existent path will return + the string 'None' instead of raising an exception. + """ + cmd = ["xenstore-read", "/local/domain/%(dom_id)s/%(path)s" % arg_dict] + try: + result = _run_command(cmd) + return result.strip() + except XenstoreError, e: # noqa + if not arg_dict.get("ignore_missing_path", False): + raise + if not record_exists(arg_dict): + return "None" + # Just try again in case the agent write won the race against + # the record_exists check. If this fails again, it will likely raise + # an equally meaningful XenstoreError as the one we just caught + result = _run_command(cmd) + return result.strip() + + +@jsonify +def write_record(self, arg_dict): + """Writes to xenstore at the specified path. + + If there is information already stored in that location, it is overwritten. + As in read_record, the dom_id and path must be specified in the arg_dict; + additionally, you must specify a 'value' key, whose value must be a string. + Typically, you can json-ify more complex values and store the json output. + """ + cmd = ["xenstore-write", + "/local/domain/%(dom_id)s/%(path)s" % arg_dict, + arg_dict["value"]] + _run_command(cmd) + return arg_dict["value"] + + +@jsonify +def list_records(self, arg_dict): + """Returns all stored data at or below the given path for the given dom_id. + + The data is returned as a json-ified dict, with the + path as the key and the stored value as the value. If the path + doesn't exist, an empty dict is returned. + """ + dirpath = "/local/domain/%(dom_id)s/%(path)s" % arg_dict + cmd = ["xenstore-ls", dirpath.rstrip("/")] + try: + recs = _run_command(cmd) + except XenstoreError, e: # noqa + if not record_exists(arg_dict): + return {} + # Just try again in case the path was created in between + # the "ls" and the existence check. If this fails again, it will + # likely raise an equally meaningful XenstoreError + recs = _run_command(cmd) + base_path = arg_dict["path"] + paths = _paths_from_ls(recs) + ret = {} + for path in paths: + if base_path: + arg_dict["path"] = "%s/%s" % (base_path, path) + else: + arg_dict["path"] = path + rec = read_record(self, arg_dict) + try: + val = json.loads(rec) + except ValueError: + val = rec + ret[path] = val + return ret + + +@jsonify +def delete_record(self, arg_dict): + """Just like it sounds: + + it removes the record for the specified VM and the specified path from + xenstore. + """ + cmd = ["xenstore-rm", "/local/domain/%(dom_id)s/%(path)s" % arg_dict] + try: + return _run_command(cmd) + except XenstoreError, e: # noqa + if 'could not remove path' in e.stderr: + # Entry already gone. We're good to go. + return '' + raise + + +def _paths_from_ls(recs): + """The xenstore-ls command returns a listing that isn't terribly useful. + + This method cleans that up into a dict with each path + as the key, and the associated string as the value. + """ + last_nm = "" + level = 0 + path = [] + ret = [] + for ln in recs.splitlines(): + nm, val = ln.rstrip().split(" = ") + barename = nm.lstrip() + this_level = len(nm) - len(barename) + if this_level == 0: + ret.append(barename) + level = 0 + path = [] + elif this_level == level: + # child of same parent + ret.append("%s/%s" % ("/".join(path), barename)) + elif this_level > level: + path.append(last_nm) + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + elif this_level < level: + path = path[:this_level] + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + last_nm = barename + return ret + + +def _run_command(cmd): + """Wrap utils.run_command to raise XenstoreError on failure""" + try: + return utils.run_command(cmd) + except utils.SubprocessException, e: # noqa + raise XenstoreError(e.cmdline, e.ret, e.err, e.out) + +if __name__ == "__main__": + XenAPIPlugin.dispatch( + {"read_record": read_record, + "write_record": write_record, + "list_records": list_records, + "delete_record": delete_record}) diff --git a/os_xenapi/tests/plugins/__init__.py b/os_xenapi/tests/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/os_xenapi/tests/plugins/plugin_test.py b/os_xenapi/tests/plugins/plugin_test.py new file mode 100644 index 0000000..67e283d --- /dev/null +++ b/os_xenapi/tests/plugins/plugin_test.py @@ -0,0 +1,68 @@ +# Copyright (c) 2016 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import imp +import os +import sys + +import mock + +from os_xenapi.client import session +from os_xenapi.tests import base + +# both XenAPI and XenAPIPlugin may not exist +# in unit test environment. +sys.modules['XenAPI'] = mock.Mock() +sys.modules['XenAPIPlugin'] = mock.Mock() + + +class PluginTestBase(base.TestCase): + def setUp(self): + super(PluginTestBase, self).setUp() + self.session = mock.Mock() + session.apply_session_helpers(self.session) + + def mock_patch_object(self, target, attribute, return_val=None): + # utility function to mock object's attribute + patcher = mock.patch.object(target, attribute, return_value=return_val) + mock_one = patcher.start() + self.addCleanup(patcher.stop) + return mock_one + + def _get_plugin_path(self): + current_path = os.path.realpath(__file__) + rel_path = os.path.join(current_path, + "../../../dom0/etc/xapi.d/plugins") + plugin_path = os.path.abspath(rel_path) + return plugin_path + + def load_plugin(self, file_name): + # XAPI plugins run in a py24 environment and may be not compatible with + # py34's syntax. In order to prevent unit test scanning the source file + # under py34 environment, the plugins will be imported with this + # function at run time. + + plugin_path = self._get_plugin_path() + + # add plugin path into search path. + if plugin_path not in sys.path: + sys.path.append(plugin_path) + + # be sure not to create c files next to the plugins + sys.dont_write_bytecode = True + + name = file_name.split('.')[0] + path = os.path.join(plugin_path, file_name) + return imp.load_source(name, path) diff --git a/os_xenapi/tests/plugins/test_bandwidth.py b/os_xenapi/tests/plugins/test_bandwidth.py new file mode 100644 index 0000000..9891ed3 --- /dev/null +++ b/os_xenapi/tests/plugins/test_bandwidth.py @@ -0,0 +1,49 @@ +# Copyright (c) 2016 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from os_xenapi.tests.plugins import plugin_test + + +class BandwidthTestCase(plugin_test.PluginTestBase): + def setUp(self): + super(BandwidthTestCase, self).setUp() + self.pluginlib = self.load_plugin("dom0_pluginlib.py") + + # Prevent any logging to syslog + self.mock_patch_object(self.pluginlib, + 'configure_logging') + + self.bandwidth = self.load_plugin("bandwidth.py") + + def test_get_bandwitdth_from_proc(self): + fake_data = [ + 'Inter-| Receive | Transmit', + 'if|bw_in i1 i2 i3 i4 i5 i6 i7|bw_out o1 o2 o3 o4 o5 o6 o7', + 'xenbr1: 1 0 0 0 0 0 0 0 11 0 0 0 0 0 0 0', + 'vif2.0: 2 0 0 0 0 0 0 0 12 0 0 0 0 0 0 0', + 'vif2.1: 3 0 0 0 0 0 0 0 13 0 0 0 0 0 0 0', + 'vifabcd1234-c: 4 0 0 0 0 0 0 0 14 0 0 0 0 0 0 0\n'] + expect_devmap = {'2': {'1': {'bw_in': 13, 'bw_out': 3}, + '0': {'bw_in': 12, 'bw_out': 2}}} + + mock_read_proc_net = self.mock_patch_object( + self.bandwidth, + '_read_proc_net', + return_val=fake_data) + + devmap = self.bandwidth._get_bandwitdth_from_proc() + + self.assertTrue(mock_read_proc_net.called) + self.assertEqual(devmap, expect_devmap) diff --git a/os_xenapi/tests/plugins/test_dom0_plugin_version.py b/os_xenapi/tests/plugins/test_dom0_plugin_version.py new file mode 100644 index 0000000..21efd12 --- /dev/null +++ b/os_xenapi/tests/plugins/test_dom0_plugin_version.py @@ -0,0 +1,28 @@ +# Copyright (c) 2016 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from os_xenapi.tests.plugins import plugin_test + + +class Dom0PluginVersion(plugin_test.PluginTestBase): + def setUp(self): + super(Dom0PluginVersion, self).setUp() + self.dom0_plugin_version = self.load_plugin('dom0_plugin_version.py') + + def test_dom0_plugin_version(self): + session = 'fake_session' + expected_value = self.dom0_plugin_version.PLUGIN_VERSION + return_value = self.dom0_plugin_version.get_version(session) + self.assertEqual(expected_value, return_value) diff --git a/os_xenapi/tests/plugins/test_dom0_pluginlib.py b/os_xenapi/tests/plugins/test_dom0_pluginlib.py new file mode 100644 index 0000000..8d69794 --- /dev/null +++ b/os_xenapi/tests/plugins/test_dom0_pluginlib.py @@ -0,0 +1,151 @@ +# Copyright (c) 2016 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from os_xenapi.tests.plugins import plugin_test + + +class FakeUnplugException(Exception): + def __init__(self, details): + self.details = details + + +class PluginlibDom0(plugin_test.PluginTestBase): + def setUp(self): + super(PluginlibDom0, self).setUp() + self.dom0_pluginlib = self.load_plugin("dom0_pluginlib.py") + + def test_configure_logging(self): + name = 'fake_name' + mock_Logger_setLevel = self.mock_patch_object( + self.dom0_pluginlib.logging.Logger, 'setLevel') + mock_sysh_setLevel = self.mock_patch_object( + self.dom0_pluginlib.logging.handlers.SysLogHandler, 'setLevel') + mock_Formatter = self.mock_patch_object( + self.dom0_pluginlib.logging, 'Formatter') + mock_sysh_setFormatter = self.mock_patch_object( + self.dom0_pluginlib.logging.handlers.SysLogHandler, 'setFormatter') + mock_Logger_addHandler = self.mock_patch_object( + self.dom0_pluginlib.logging.Logger, 'addHandler') + + self.dom0_pluginlib.configure_logging(name) + + self.assertTrue(mock_Logger_setLevel.called) + self.assertTrue(mock_sysh_setLevel.called) + self.assertTrue(mock_Formatter.called) + self.assertTrue(mock_sysh_setFormatter.called) + self.assertTrue(mock_Logger_addHandler.called) + + def test_exists_ok(self): + fake_args = {'k1': 'v1'} + self.assertEqual('v1', self.dom0_pluginlib.exists(fake_args, 'k1')) + + def test_exists_exception(self): + fake_args = {'k1': 'v1'} + self.assertRaises(self.dom0_pluginlib.ArgumentError, + self.dom0_pluginlib.exists, + fake_args, + 'no_key') + + def test_optional_exist(self): + fake_args = {'k1': 'v1'} + self.assertEqual('v1', + self.dom0_pluginlib.optional(fake_args, 'k1')) + + def test_optional_none(self): + fake_args = {'k1': 'v1'} + self.assertIsNone(self.dom0_pluginlib.optional(fake_args, + 'no_key')) + + def test_get_domain_0(self): + mock_get_this_host = self.mock_patch_object( + self.session.xenapi.session, + 'get_this_host', + return_val='fake_host_ref') + mock_get_vm_records = self.mock_patch_object( + self.session.xenapi.VM, + 'get_all_records_where', + return_val={"fake_vm_ref": "fake_value"}) + + ret_value = self.dom0_pluginlib._get_domain_0(self.session) + + self.assertTrue(mock_get_this_host.called) + self.assertTrue(mock_get_vm_records.called) + self.assertEqual('fake_vm_ref', ret_value) + + def test_with_vdi_in_dom0(self): + self.mock_patch_object( + self.dom0_pluginlib, + '_get_domain_0', + return_val='fake_dom0_ref') + mock_vbd_create = self.mock_patch_object( + self.session.xenapi.VBD, + 'create', + return_val='fake_vbd_ref') + mock_vbd_plug = self.mock_patch_object( + self.session.xenapi.VBD, + 'plug') + self.mock_patch_object( + self.session.xenapi.VBD, + 'get_device', + return_val='fake_device_xvda') + mock_vbd_unplug_with_retry = self.mock_patch_object( + self.dom0_pluginlib, + '_vbd_unplug_with_retry') + mock_vbd_destroy = self.mock_patch_object( + self.session.xenapi.VBD, + 'destroy') + + def handle_function(vbd): + # the fake vbd handle function + self.assertEqual(vbd, 'fake_device_xvda') + self.assertTrue(mock_vbd_plug.called) + self.assertFalse(mock_vbd_unplug_with_retry.called) + return 'function_called' + + fake_vdi = 'fake_vdi' + return_value = self.dom0_pluginlib.with_vdi_in_dom0( + self.session, fake_vdi, False, handle_function) + + self.assertEqual('function_called', return_value) + self.assertTrue(mock_vbd_plug.called) + self.assertTrue(mock_vbd_unplug_with_retry.called) + self.assertTrue(mock_vbd_destroy.called) + args, kwargs = mock_vbd_create.call_args + self.assertEqual('fake_dom0_ref', args[0]['VM']) + self.assertEqual('RW', args[0]['mode']) + + def test_vbd_unplug_with_retry_success_at_first_time(self): + self.dom0_pluginlib._vbd_unplug_with_retry(self.session, + 'fake_vbd_ref') + self.assertEqual(1, self.session.xenapi.VBD.unplug.call_count) + + def test_vbd_unplug_with_retry_detached_already(self): + error = FakeUnplugException(['DEVICE_ALREADY_DETACHED']) + + self.session.xenapi.VBD.unplug.side_effect = error + self.dom0_pluginlib.XenAPI.Failure = FakeUnplugException + self.dom0_pluginlib._vbd_unplug_with_retry(self.session, + 'fake_vbd_ref') + self.assertEqual(1, self.session.xenapi.VBD.unplug.call_count) + + def test_vbd_unplug_with_retry_success_at_second_time(self): + side_effects = [FakeUnplugException(['DEVICE_DETACH_REJECTED']), + None] + + self.session.xenapi.VBD.unplug.side_effect = side_effects + self.dom0_pluginlib.XenAPI.Failure = FakeUnplugException + self.dom0_pluginlib._vbd_unplug_with_retry(self.session, + 'fake_vbd_ref') + self.assertEqual(2, self.session.xenapi.VBD.unplug.call_count) diff --git a/os_xenapi/tests/plugins/test_partition_utils.py b/os_xenapi/tests/plugins/test_partition_utils.py new file mode 100644 index 0000000..647e06e --- /dev/null +++ b/os_xenapi/tests/plugins/test_partition_utils.py @@ -0,0 +1,109 @@ +# Copyright (c) 2016 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from os_xenapi.client import exception +from os_xenapi.tests.plugins import plugin_test + + +class PartitionUtils(plugin_test.PluginTestBase): + def setUp(self): + super(PartitionUtils, self).setUp() + self.pluginlib = self.load_plugin("dom0_pluginlib.py") + + # Prevent any logging to syslog + self.mock_patch_object(self.pluginlib, + 'configure_logging') + + self.partition_utils = self.load_plugin("partition_utils.py") + + def test_wait_for_dev_ok(self): + mock_sleep = self.mock_patch_object(self.partition_utils.time, + 'sleep') + mock_exists = self.mock_patch_object(self.partition_utils.os.path, + 'exists') + mock_exists.side_effect = [False, True] + ret = self.partition_utils.wait_for_dev('session', '/fake', 2) + + self.assertEqual(1, mock_sleep.call_count) + self.assertEqual(ret, "/fake") + + def test_wait_for_dev_timeout(self): + mock_sleep = self.mock_patch_object(self.partition_utils.time, + 'sleep') + mock_exists = self.mock_patch_object(self.partition_utils.os.path, + 'exists') + mock_exists.side_effect = [False, False, True] + ret = self.partition_utils.wait_for_dev('session', '/fake', 2) + + self.assertEqual(2, mock_sleep.call_count) + self.assertEqual(ret, "") + + def test_mkfs_removes_partitions_ok(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + mock__mkfs = self.mock_patch_object(self.partition_utils, '_mkfs') + + self.partition_utils.mkfs('session', 'fakedev', '1', 'ext3', 'label') + mock__mkfs.assert_called_with('ext3', '/dev/mapper/fakedevp1', + 'label') + expected_calls = [mock.call(['kpartx', '-avspp', '/dev/fakedev'])] + expected_calls.append(mock.call(['kpartx', '-dvspp', '/dev/fakedev'])) + mock_run.assert_has_calls(expected_calls) + + def test_mkfs_removes_partitions_exc(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + mock__mkfs = self.mock_patch_object(self.partition_utils, '_mkfs') + mock__mkfs.side_effect = exception.OsXenApiException( + message="partition failed") + + self.assertRaises(exception.OsXenApiException, + self.partition_utils.mkfs, + 'session', 'fakedev', '1', 'ext3', 'label') + expected_calls = [mock.call(['kpartx', '-avspp', '/dev/fakedev'])] + expected_calls.append(mock.call(['kpartx', '-dvspp', '/dev/fakedev'])) + mock_run.assert_has_calls(expected_calls) + + def test_mkfs_ext3_no_label(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + + self.partition_utils._mkfs('ext3', '/dev/sda1', None) + mock_run.assert_called_with(['mkfs', '-t', 'ext3', '-F', '/dev/sda1']) + + def test_mkfs_ext3(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + + self.partition_utils._mkfs('ext3', '/dev/sda1', 'label') + mock_run.assert_called_with(['mkfs', '-t', 'ext3', '-F', '-L', + 'label', '/dev/sda1']) + + def test_mkfs_swap(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + + self.partition_utils._mkfs('swap', '/dev/sda1', 'ignored') + mock_run.assert_called_with(['mkswap', '/dev/sda1']) + + def test_make_partition(self): + mock_run = self.mock_patch_object(self.partition_utils.utils, + 'run_command') + + self.partition_utils.make_partition('session', 'dev', 'start', '-') + + mock_run.assert_called_with(['sfdisk', '-uS', '/dev/dev'], 'start,;\n')