Implemet flexvolume driver of Cinder

Change-Id: Ib74c53096c7ca33554fb33286083245917d13b54
This commit is contained in:
zengchen 2017-06-22 10:41:32 +08:00
parent fd75694333
commit 18e01ab9d8
12 changed files with 532 additions and 19 deletions

View File

@ -17,3 +17,7 @@ iscsiadm: CommandFilter, iscsiadm, root
sg_scan: CommandFilter, sg_scan, root
systool: CommandFilter, systool, root
cat: CommandFilter, cat, root
# server/cinder/bare_metal_host.py
rm: CommandFilter, rm, root
ln: CommandFilter, ln, root

View File

@ -26,6 +26,9 @@ flexvolume_driver_group = cfg.OptGroup(
help=_('Configuration options for FlexVolume driver'))
flexvolume_driver_opts = [
cfg.HostnameOpt('hostname',
help=_('Hostname of machine '
'on which FlexVolume driver runs.')),
cfg.HostAddressOpt('node_ip',
help=_('IP address of machine '
'on which FlexVolume driver runs.')),

View File

@ -47,14 +47,15 @@ VOLUME_DRIVER_CMD_OPT_ARG = (
ARG_RW,
ARG_SECRET,
ARG_FSGROUP,
ARG_MOUNTS_DIR
ARG_MOUNTS_DIR,
ARG_PV_OR_VOLUME_NAME,
) = (
"kubernetes.io/fsType",
"kubernetes.io/readwrite",
"kubernetes.io/secret",
"kubernetes.io/fsGroup",
"kubernetes.io/mountsDir",
"kubernetes.io/pvOrVolumeName",
)

View File

@ -18,10 +18,43 @@ class FuxiKubernetesException(Exception):
class InvalidVolumeDriverCmdParameter(FuxiKubernetesException):
def __init__(self, reason):
super(InvalidVolumeDriverCmdParameter, self).__init__(
"Invalid FlexVolume driver cmd parameter, reason:%s" % reason)
"Invalid FlexVolume driver cmd parameter, reason: %s" % reason)
class LoadVolumeDriverException(FuxiKubernetesException):
def __init__(self, reason):
super(LoadVolumeDriverException, self).__init__(
"Load volume driver failed, reason: %s" % reason)
class GetCinderVolumeException(FuxiKubernetesException):
def __init__(self, volume_id, reason):
super(GetCinderVolumeException, self).__init__(
"Get Cinder volume: (%s) failed, reason: %s" % (volume_id, reason))
class AttachCinderVolumeException(FuxiKubernetesException):
def __init__(self, volume_name, volume_id, reason):
super(AttachCinderVolumeException, self).__init__(
'Attach volume named: (%s) by Cinder volume: (%s) failed, '
'reason: %s' % (volume_name, volume_id, reason))
class DetachCinderVolumeException(FuxiKubernetesException):
def __init__(self, volume_name, volume_id, reason):
super(DetachCinderVolumeException, self).__init__(
'Detach volume nameed: (%s) which was attached from Cinder '
'volume: (%s) failed, reason: %s' % (
volume_name, volume_id, reason))
class NotSupportedCommand(FuxiKubernetesException):
def __init__(self):
super(NotSupportedCommand, self).__init__('Not supported')
class NotMatchedHost(FuxiKubernetesException):
def __init__(self, expect_host_name, actual_host_name):
super(NotMatchedHost, self).__init__(
'Expect running on: %s, but receive the host name: %s' % (
expect_host_name, actual_host_name))

View File

@ -58,7 +58,7 @@ class BaseVolumeDriver(object):
def mount_device(self, device_mount_path, device_path, **kwargs):
return self.default_result
def detach(self, device_path, host_name):
def detach(self, pv_or_volume_name, host_name):
return self.default_result
def wait_for_detach(self, device_path):
@ -73,6 +73,11 @@ class BaseVolumeDriver(object):
def unmount(self, mount_dir):
return self.default_result
def _generate_result(self, ret, info):
info['status'] = constants.STATUS_SUCCESS if ret else (
constants.STATUS_FAILURE)
return Result(**info)
def _request_server(self, api, data):
def _send_and_receive():
try:

View File

@ -15,15 +15,80 @@ from fuxi_kubernetes.flex_volume_drivers.drivers import base
class DriverCinder(base.BaseVolumeDriver):
# TODO(zengchen): implement it.
def __init__(self):
super(DriverCinder, self).__init__()
self._driver_name = constants.VOLUME_DRIVER_CINDER
def get_volume_name(self, **kwargs):
name = self._volumeid(kwargs)
if name:
return self._generate_result(True, {'volumeName': name})
return self._generate_result(
False, {'message': 'Can not get volume name'})
def is_attached(self, host_name, **kwargs):
return self._request_server(
constants.SERVER_API_IS_ATTACHED,
{'host_name': host_name,
'volume_id': kwargs.get(constants.CINDER_VOLUME_ATTR_VOLUME_ID)}
'volume_id': self._volumeid(kwargs)}
)
def attach(self, host_name, **kwargs):
return self._request_server(
constants.SERVER_API_ATTACH,
{'host_name': host_name,
'volume_id': self._volumeid(kwargs),
'pv_or_volume_name': kwargs.get(constants.ARG_PV_OR_VOLUME_NAME)}
)
def wait_for_attach(self, device_path, **kwargs):
params = {'device_path': device_path}
return self._request_server(
constants.SERVER_API_WAIT_FOR_ATTACH,
params
)
def mount_device(self, device_mount_path, device_path, **kwargs):
params = {
'device_mount_path': device_mount_path,
'device_path': device_path
}
return self._request_server(
constants.SERVER_API_MOUNT_DEVICE,
params
)
def detach(self, pv_or_volume_name, host_name):
return self._request_server(
constants.SERVER_API_DETACH,
{'pv_or_volume_name': pv_or_volume_name, 'host_name': host_name}
)
def wait_for_detach(self, device_path):
return self._request_server(
constants.SERVER_API_WAIT_FOR_DETACH,
{'device_path': device_path}
)
def unmount_device(self, device_mount_path):
return self._request_server(
constants.SERVER_API_UNMOUNT_DEVICE,
{'device_mount_path': device_mount_path}
)
def mount(self, mount_dir, **kwargs):
params = {'mount_dir': mount_dir}
return self._request_server(
constants.SERVER_API_MOUNT,
params
)
def unmount(self, mount_dir):
return self._request_server(
constants.SERVER_API_UNMOUNT,
{'mount_dir': mount_dir}
)
def _volumeid(self, params):
return params.get(constants.CINDER_VOLUME_ATTR_VOLUME_ID)

View File

@ -30,7 +30,7 @@ usage() {
err "\t$0 attach <json params> <nodename>"
err "\t$0 waitforattach <mount device> <json params>"
err "\t$0 mountdevice <mount dir> <mount device> <json params>"
err "\t$0 detach <mount device> <nodename>"
err "\t$0 detach <pv or volume name> <nodename>"
err "\t$0 waitfordetach <mount device>"
err "\t$0 unmountdevice <mount device>"
err "\t$0 mount <mount dir> <json params>"

View File

@ -0,0 +1,282 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import os
from oslo_log import log as logging
import re
from fuxi_kubernetes import exceptions
from fuxi_kubernetes.flex_volume_drivers.server import utils
LOG = logging.getLogger(__name__)
def check_host_name(f):
@functools.wraps(f)
def wrapper(self, host_name, *args, **kwargs):
if not host_name:
host_name = self.host_name
else:
host_name = host_name.lower()
if host_name != self.host_name:
raise exceptions.NotMatchedHost(self.host_name, host_name)
return f(self, host_name, *args, **kwargs)
return wrapper
def log_error(prefix, info, *args):
LOG.error('%s, %s' % (prefix, info), *args)
class BareMetalHost(object):
def __init__(self, cinder_client):
self._cinder_client = cinder_client
self._host_name = utils.get_local_hostname()
self._attached_volumes = {}
self._volume_link_dir = '/dev/disk/by-id/'
@property
def host_name(self):
return self._host_name
@check_host_name
def is_attached(self, host_name, volume):
for item in volume.attachments:
LOG.debug("Check whether the volume is attached on host:(%s), "
"check attachment:%s", host_name, str(item))
if host_name != item['host_name']:
continue
if os.path.exists(item['device']):
return True
return False
@check_host_name
def attach(self, host_name, volume, pv_or_volume_name):
_log_error = functools.partial(
log_error, 'Attach volume:(%s)' % pv_or_volume_name)
def _raise_except(reason):
raise exceptions.AttachCinderVolumeException(
pv_or_volume_name, volume_id, reason)
phase_init_connect = 0
phase_connect_volume = 1
phase_create_link = 2
phase_set_attachment = 3
def _rollback(phase):
try:
if phase >= phase_set_attachment:
utils.execute_cmd("rm", "-f", link_to_device)
if phase >= phase_create_link:
connector.disconnect_volume(conn_info['data'], None)
if phase >= phase_init_connect:
self._cinder_client.volumes.unreserve(volume)
except Exception as ex:
_log_error('try to roolback the operation of attach on phase '
'of (%d) failed, reason:%s', phase, str(ex))
volume_id = volume.id
if self._search_volume_ids(pv_or_volume_name):
_log_error('reduplicative pv/volume name:%s', pv_or_volume_name)
_raise_except(
'reduplicative pv/volume name:%s' % pv_or_volume_name)
# reserve volume
try:
self._cinder_client.volumes.reserve(volume)
except Exception as ex:
_log_error("reserve Cinder volume: (%s) failed, reason: %s",
volume_id, str(ex))
_raise_except('reserve volume failed')
# initialize connect
conn_info = None
try:
conn_info = self._cinder_client.volumes.initialize_connection(
volume_id, utils.brick_get_connector_properties())
except Exception as ex:
_rollback(phase_init_connect)
_log_error("initialize connection to Cinder volume:(%s) failed, "
"reason:%s", volume_id, str(ex))
_raise_except('initialize connection failed ')
# connect volume
connector = None
path = ''
try:
connector = utils.brick_get_connector(
conn_info['driver_volume_type'])
device_info = connector.connect_volume(conn_info['data'])
path = os.path.realpath(device_info['path'])
except Exception as ex:
_rollback(phase_connect_volume)
_log_error("connect to Cinder volume:(%s) failed, reason:%s",
volume_id, str(ex))
_raise_except('connect to Cinder volume failed')
# create soft link to device
link_to_device = self._link_to_device(volume.id, pv_or_volume_name)
try:
utils.execute_cmd('ln', '-s', path, link_to_device)
except Exception as ex:
_rollback(phase_create_link)
_log_error("create soft link:(%s) to device:(%s) failed, "
"reason:%s.", link_to_device, path, str(ex))
_raise_except("create soft link to device failed")
# set attachment
try:
self._cinder_client.volumes.attach(
volume=volume, instance_uuid=None, mountpoint=path,
host_name=host_name)
except Exception as ex:
_rollback(phase_set_attachment)
_log_error("set attachment info to Cinder volume:(%s) failed,",
volume_id)
_raise_except("set attachment info to Cinder failed,")
self._attached_volumes[pv_or_volume_name] = volume_id
return path
def wait_for_attach(self, device_path):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
def mount_device(self, device_mount_path, device_path):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
@check_host_name
def detach(self, host_name, pv_or_volume_name):
_log_error = functools.partial(
log_error, 'Detach volume:(%s)' % pv_or_volume_name)
def _raise_except(reason):
raise exceptions.DetachCinderVolumeException(
pv_or_volume_name, volume_id, reason)
volume_id = ''
volume_ids = self._search_volume_ids(pv_or_volume_name)
if not volume_ids or len(volume_ids) > 1:
info = 'can not find corresponding volume id'
_log_error(info)
_raise_except(info)
volume_id = volume_ids[0]
volume = None
try:
volume = self._cinder_client.volumes.get(volume_id)
except Exception as ex:
_log_error('get volume:(%s) from Cinder failed', volume_id)
_raise_except('get volume from Cinder failed')
# delete link to device
try:
link_to_device = self._link_to_device(volume_id, pv_or_volume_name)
if os.path.islink(link_to_device):
utils.execute_cmd('rm', '-f', link_to_device)
except Exception as ex:
_log_error("delete link:(%s) to device:(%s) failed, reason:%s",
link_to_device, os.path.realpath(link_to_device),
str(ex))
_raise_except('delete link to device failed')
# disconnect volume
conn_info = None
try:
conn_info = self._cinder_client.volumes.initialize_connection(
volume_id, utils.brick_get_connector_properties())
except Exception as ex:
_log_error("initialize connection to Cinder volume:(%s) failed, "
"reason:%s", volume_id, str(ex))
_raise_except('initialize connection failed')
try:
connector = utils.brick_get_connector(
conn_info['driver_volume_type'])
connector.disconnect_volume(conn_info['data'], None)
except Exception as ex:
_log_error("disconnect volume:(%s) failed, reason:%s",
volume_id, str(ex))
_raise_except('disconnect volume failed')
# delete attachment
attachment_id = None
for am in volume.attachments:
if am['host_name'] == host_name:
attachment_id = am['attachment_id']
break
else:
_log_error("does not find the attachment of wolume:(%s) which "
"is attached on host:(%s)", volume_id, host_name)
try:
if attachment_id:
self._cinder_client.volumes.detach(
volume_id, attachment_uuid=attachment_id)
except Exception as ex:
_log_error("delete attachment:(%s) from Cinder failed, reason:%s",
attachment_id, str(ex))
_raise_except('delete attachment info from Cinder failed,')
self._attached_volumes.pop(pv_or_volume_name, None)
def wait_for_detach(self, device_path):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
def unmount_device(self, device_mount_path):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
def mount(self, mount_dir):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
def unmount(self, mount_dir):
LOG.warn('Use Kubernetes\' default method instead')
raise exceptions.NotSupportedCommand()
def _search_volume_ids(self, pv_or_volume_name):
volume_id = self._attached_volumes.get(pv_or_volume_name)
if volume_id:
return [volume_id]
def _volume_id(f):
if f.find(flag) > 0 and os.path.islink(os.path.join(link_dir, f)):
strs = re.split(flag, f)
return strs[0] if len(strs) == 2 and strs[1] == '' else None
flag = "_%s" % pv_or_volume_name
link_dir = self._volume_link_dir
volume_ids = []
for f in os.listdir(link_dir):
volume_id = _volume_id(f)
if volume_id:
volume_ids.append(volume_id)
return volume_ids
def _link_to_device(self, volume_id, pv_or_volume_name):
return os.path.join(self._volume_link_dir,
"%s_%s" % (volume_id, pv_or_volume_name))

View File

@ -10,18 +10,63 @@
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import driver as import_driver
from stevedore import extension
from fuxi_kubernetes import exceptions
from fuxi_kubernetes.flex_volume_drivers.server import utils
class ServerCinder(object):
# TODO(zengchen): implement all the interface of driver, such as
# is_attached, attach, detach, mount, unmount etc.
def __init__(self, host_platform):
self._cinder_client = None
self._host = None
host_cls = import_driver.DriverManager(
'flex_volume_drivers.server.cinder.hosts',
host_platform).driver
self._cinder_client = utils.get_cinder_client()
self._host = host_cls(self._cinder_client)
def is_attached(self, volume_id, host_name, **kwargs):
return False
def is_attached(self, volume_id, host_name):
volume = self._get_volume(volume_id)
if not volume.attachments:
return False
return self._host.is_attached(host_name, volume)
def attach(self, volume_id, host_name, pv_or_volume_name):
return self._host.attach(host_name, self._get_volume(volume_id),
pv_or_volume_name)
def wait_for_attach(self, device_path):
return self._host.wait_for_attach(device_path)
def mount_device(self, device_mount_path, device_path):
self._host.mount_device(device_mount_path, device_path)
def detach(self, pv_or_volume_name, host_name):
self._host.detach(host_name, pv_or_volume_name)
def wait_for_detach(self, device_path):
self._host.wait_for_detach(device_path)
def unmount_device(self, device_mount_path):
self._host.unmount_device(device_mount_path)
def mount(self, mount_dir):
self._host.mount(mount_dir)
def unmount(self, mount_dir):
self._host.unmount(mount_dir)
def _get_volume(self, volume_id):
try:
return self._cinder_client.volumes.get(volume_id)
except Exception as ex:
raise exceptions.GetCinderVolumeException(volume_id, str(ex))
@classmethod
def is_support_host_platform(cls, host_platform):
return True
mgr = extension.ExtensionManager(
namespace='flex_volume_drivers.server.cinder.hosts',
)
return host_platform in [e.name for e in mgr]

View File

@ -50,7 +50,7 @@ def init_volume_drivers():
def api_wrapper(f):
def _response(ret, info):
if ret:
info['status'] = constants.STATUS_SUCCESS
info.setdefault('status', constants.STATUS_SUCCESS)
else:
info = {'status': constants.STATUS_FAILURE, 'message': info}
return flask.jsonify(base.Result(**info)())
@ -58,13 +58,16 @@ def api_wrapper(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
data = flask.request.get_json(force=True)
driver = APP.volume_drivers.get(data.get('driver'))
driver_name = data.pop('driver', '')
driver = APP.volume_drivers.get(driver_name)
if not driver:
return _response(
False, 'Unknow FlexVolume driver:%s' % data.get('driver'))
False, 'Unknow FlexVolume driver:(%s)' % driver_name)
try:
return _response(True, f(driver, data))
except exceptions.NotSupportedCommand:
return _response(True, {'status': constants.STATUS_NOT_SUPPORT})
except Exception as ex:
return _response(False, str(ex))
@ -73,5 +76,59 @@ def api_wrapper(f):
@APP.route(constants.SERVER_API_IS_ATTACHED, methods=['POST'])
@api_wrapper
def is_attached(driver=None, param=None):
return {'attached': driver.is_attached(**param)}
def is_attached(driver=None, params=None):
return {'attached': driver.is_attached(**params)}
@APP.route(constants.SERVER_API_ATTACH, methods=['POST'])
@api_wrapper
def attach(driver=None, params=None):
return {'device': driver.attach(**params)}
@APP.route(constants.SERVER_API_WAIT_FOR_ATTACH, methods=['POST'])
@api_wrapper
def wait_for_attach(driver=None, params=None):
return {'device': driver.wait_for_attach(**params)}
@APP.route(constants.SERVER_API_MOUNT_DEVICE, methods=['POST'])
@api_wrapper
def mount_device(driver=None, params=None):
driver.mount_device(**params)
return {}
@APP.route(constants.SERVER_API_DETACH, methods=['POST'])
@api_wrapper
def detach(driver=None, params=None):
driver.detach(**params)
return {}
@APP.route(constants.SERVER_API_WAIT_FOR_DETACH, methods=['POST'])
@api_wrapper
def wait_for_detach(driver=None, params=None):
driver.wait_for_detach(**params)
return {}
@APP.route(constants.SERVER_API_UNMOUNT_DEVICE, methods=['POST'])
@api_wrapper
def unmount_device(driver=None, params=None):
driver.unmount_device(**params)
return {}
@APP.route(constants.SERVER_API_MOUNT, methods=['POST'])
@api_wrapper
def mount(driver=None, params=None):
driver.mount(**params)
return {}
@APP.route(constants.SERVER_API_UNMOUNT, methods=['POST'])
@api_wrapper
def unmount(driver=None, params=None):
driver.unmount(**params)
return {}

View File

@ -13,6 +13,8 @@
from cinderclient import client as cinder_client
from kuryr.lib import utils as kuryr_utils
from os_brick.initiator import connector
from oslo_concurrency import processutils
import socket
from fuxi_kubernetes.common import config as local_config
@ -56,6 +58,19 @@ def brick_get_connector(protocol, driver=None, use_multipath=False,
*args, **kwargs)
def get_local_hostname():
host_name = local_config.CONF[
local_config.flexvolume_driver_group.name].host_name
if not host_name:
host_name = socket.gethostname()
return host_name.lower()
def execute_cmd(*cmd):
return processutils.execute(*cmd, run_as_root=True,
root_helper=get_root_helper())
def _get_keystone_session(conf_group, **kwargs):
auth_plugin = kuryr_utils.get_auth_plugin(conf_group)
session = kuryr_utils.get_keystone_session(conf_group, auth_plugin)

View File

@ -34,6 +34,9 @@ console_scripts =
flex_volume_drivers.server =
Cinder = fuxi_kubernetes.flex_volume_drivers.server.cinder.cinder:ServerCinder
flex_volume_drivers.server.cinder.hosts =
baremetal = fuxi_kubernetes.flex_volume_drivers.server.cinder.bare_metal_host:BareMetalHost
[build_sphinx]
source-dir = doc/source
build-dir = doc/build