Add Ceph backend for Swift

This commit is contained in:
Christian Schwede 2014-05-09 18:59:40 +00:00
parent 3a88166efa
commit 411d6f9574
7 changed files with 1215 additions and 0 deletions

20
README.md Normal file
View File

@ -0,0 +1,20 @@
Ceph backend for Openstack Swift
================================
Installation
------------
1. Install the rados object server:
sudo python setup.py install
2. Modify your object-server.conf to use the new object server:
[app:object-server]
use = egg:swift_ceph_backend#rados_object
3. Set the user and pool for Ceph in the [DEFAULT] section:
[DEFAULT]
rados_user = swift
rados_pool = swift

18
setup.py Normal file
View File

@ -0,0 +1,18 @@
from setuptools import setup
setup(
name = 'swift-ceph-backend',
version = '0.1',
description = 'Ceph backend for OpenStack Swift',
license = 'Apache License (2.0)',
packages = ['swift_ceph_backend'],
classifiers = [
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 2.6',
'Environment :: No Input/Output (Daemon)'],
install_requires = ['swift', ],
entry_points = {
'paste.app_factory': ['rados_object = swift_ceph_backend.rados_server:app_factory'],
},
)

View File

@ -0,0 +1,471 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Babu Shanmugam <anbu@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import cPickle as pickle
from contextlib import contextmanager
from hashlib import md5
from eventlet import Timeout
import time
from swift.common.utils import normalize_timestamp
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileDeleted, DiskFileNotOpen, DiskFileNoSpace, \
DiskFileError
from swift.common.swob import multi_range_iterator
from swift.obj.diskfile import METADATA_KEY
class RadosFileSystem(object):
def __init__(self, ceph_conf, rados_user, rados_pool, **kwargs):
self._conf = ceph_conf
self._user = rados_user
self._pool = rados_pool
self._rados = None
self._ioctx = None
try:
import rados
except ImportError:
rados = None
self.RADOS = kwargs.get('rados', rados)
atexit.register(self._shutdown)
def _get_rados(self, fs):
if self._rados is None:
self._rados = fs.RADOS.Rados(conffile=fs._conf, rados_id=fs._user)
self._rados.connect()
return self._rados
def _shutdown(self):
if self._rados:
self._rados.shutdown()
class _radosfs(object):
def __init__(self, fs, _get_rados):
self._rados = _get_rados(fs)
self._fs = fs
self._pool = fs._pool
self._ioctx = self._rados.open_ioctx(self._pool)
def close(self):
self._ioctx.close()
def del_object(self, obj):
try:
self._ioctx.remove_object(obj)
finally:
pass
def get_metadata(self, obj):
ret = None
try:
ret = pickle.loads(self._ioctx.get_xattr(obj, METADATA_KEY))
finally:
return ret
def put_metadata(self, obj, metadata):
# Pickle the metadata object and set it on a xattr
self._ioctx.set_xattr(obj, METADATA_KEY,
pickle.dumps(metadata))
def put_object(self, obj, metadata):
self._ioctx.aio_flush()
self.put_metadata(obj, metadata)
def size(self, obj):
size = 0
try:
(size, ts) = self._ioctx.stat(obj)
finally:
return size
def create(self, obj, size):
try:
self._ioctx.trunc(obj, size)
except self._fs.RADOS.NoSpace:
raise DiskFileNoSpace()
def write(self, obj, offset, data):
try:
return self._ioctx.write(obj, data, offset)
except self._fs.RADOS.NoSpace:
raise DiskFileNoSpace()
except Exception:
raise DiskFileError()
def read(self, obj, off):
return self._ioctx.read(obj, offset=off)
def quarantine(self, obj):
# There is no way of swift recon monitor to get information
# of the quarantined file, so better clean it up
self.del_object(obj)
def open(self):
return self._radosfs(self, self._get_rados)
def get_diskfile(self, device, partition, account,
container, obj, **kwargs):
return DiskFile(self, device, partition, account,
container, obj)
class DiskFileWriter(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Encapsulation of the write context for servicing PUT REST API
requests. Serves as the context manager object for DiskFile's create()
method.
:param fs: internal file system object to use
:param name: standard object name
"""
def __init__(self, fs, name):
self._fs = fs
self._name = name
self._write_offset = 0
def write(self, chunk):
"""
Write a chunk of data.
:param chunk: the chunk of data to write as a string object
"""
written = 0
while written != len(chunk):
written += self._fs.write(self._name,
self._write_offset + written,
chunk[written:])
self._write_offset += len(chunk)
return self._write_offset
def put(self, metadata):
"""
Flush all the writes so far and set the metadata
:param metadata: dictionary of metadata to be written
"""
metadata['name'] = self._name
self._fs.put_object(self._name, metadata)
class DiskFileReader(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Encapsulation of the read context for servicing GET REST API
requests. Serves as the context manager object for DiskFile's reader()
method.
:param fs: internal filesystem object
:param name: object name
:param obj_size: on-disk size of object in bytes
:param etag: MD5 hash of object from metadata
:param iter_hook: called when __iter__ returns a chunk
"""
def __init__(self, fs, name, obj_size, etag, iter_hook=None):
self._fs = fs.open()
self._name = name
self._obj_size = obj_size
self._etag = etag
self._iter_hook = iter_hook
#
self._iter_etag = None
self._bytes_read = 0
self._read_offset = 0
self._started_at_0 = False
self._read_to_eof = False
self._suppress_file_closing = False
def __iter__(self):
try:
if self._read_offset == 0:
self._started_at_0 = True
self._read_to_eof = False
self._iter_etag = md5()
while True:
chunk = self._fs.read(self._name, self._read_offset)
if chunk:
if self._iter_etag:
self._iter_etag.update(chunk)
self._read_offset += len(chunk)
yield chunk
if self._iter_hook:
self._iter_hook()
else:
self._read_to_eof = True
break
finally:
if not self._suppress_file_closing:
self.close()
def app_iter_range(self, start, stop):
self._read_offset = start
if stop is not None:
length = stop - start
else:
length = None
try:
self._suppress_file_closing = True
for chunk in self:
if length is not None:
length -= len(chunk)
if length < 0:
# Chop off the extra:
yield chunk[:length]
break
yield chunk
finally:
self._suppress_file_closing = False
try:
self.close()
except DiskFileQuarantined:
pass
def app_iter_ranges(self, ranges, content_type, boundary, size):
if not ranges:
yield ''
else:
try:
self._suppress_file_closing = True
for chunk in multi_range_iterator(
ranges, content_type, boundary, size,
self.app_iter_range):
yield chunk
finally:
self._suppress_file_closing = False
try:
self.close()
except DiskFileQuarantined:
pass
def _quarantine(self, msg):
self._fs.quarantine(self._name)
def _handle_close_quarantine(self):
if self._read_offset != self._obj_size:
self._quarantine(
"Bytes read: %d, does not match metadata: %d" % (
self._read_offset, self._obj_size))
elif self._iter_etag and \
self._etag != self._iter_etag.hexdigest():
self._quarantine(
"ETag %s and file's md5 %s do not match" % (
self._etag, self._iter_etag.hexdigest()))
def close(self):
"""
Close the file. Will handle quarantining file if necessary.
"""
self._fs.close()
try:
if self._started_at_0 and self._read_to_eof:
self._handle_close_quarantine()
except (Exception, Timeout):
pass
class DiskFile(object):
"""
.. note::
RADOS based alternative pluggable on-disk backend implementation.
Manage object files in RADOS filesystem
:param account: account name for the object
:param container: container name for the object
:param obj: object name for the object
:param iter_hook: called when __iter__ returns a chunk
:param keep_cache: caller's preference for keeping data read in the cache
"""
def __init__(self, fs, device, partition, account, container, obj):
self._name = '/' + '/'.join((device, partition, account,
container, obj))
self._metadata = None
self._fs = fs
def open(self):
"""
Open the file and read the metadata.
This method must populate the _metadata attribute.
:raises DiskFileCollision: on name mis-match with metadata
:raises DiskFileDeleted: if it does not exist, or a tombstone is
present
:raises DiskFileQuarantined: if while reading metadata of the file
some data did pass cross checks
"""
self._fs_inst = self._fs.open()
self._metadata = self._fs_inst.get_metadata(self._name)
if self._metadata is None:
raise DiskFileDeleted()
self._verify_data_file()
self._metadata = self._metadata or {}
return self
def __enter__(self):
if self._metadata is None:
raise DiskFileNotOpen()
return self
def __exit__(self, t, v, tb):
self._fs_inst.close()
def _quarantine(self, msg):
self._fs_inst.quarantine(self._name)
raise DiskFileQuarantined(msg)
def _verify_data_file(self):
"""
Verify the metadata's name value matches what we think the object is
named.
:raises DiskFileCollision: if the metadata stored name does not match
the referenced name of the file
:raises DiskFileNotExist: if the object has expired
:raises DiskFileQuarantined: if data inconsistencies were detected
between the metadata and the file-system
metadata
"""
try:
mname = self._metadata['name']
except KeyError:
self._quarantine("missing name metadata")
else:
if mname != self._name:
raise DiskFileCollision('Client path does not match path '
'stored in object metadata')
try:
x_delete_at = int(self._metadata['X-Delete-At'])
except KeyError:
pass
except ValueError:
# Quarantine, the x-delete-at key is present but not an
# integer.
self._quarantine(
"bad metadata x-delete-at value %s" % (
self._metadata['X-Delete-At']))
else:
if x_delete_at <= time.time():
raise DiskFileNotExist('Expired')
try:
metadata_size = int(self._metadata['Content-Length'])
except KeyError:
self._quarantine(
"missing content-length in metadata")
except ValueError:
# Quarantine, the content-length key is present but not an
# integer.
self._quarantine(
"bad metadata content-length value %s" % (
self._metadata['Content-Length']))
obj_size = self._fs_inst.size(self._name)
if obj_size != metadata_size:
self._quarantine(
"metadata content-length %s does"
" not match actual object size %s" % (
metadata_size, obj_size))
def get_metadata(self):
"""
Provide the metadata for an object as a dictionary.
:returns: object's metadata dictionary
"""
if self._metadata is None:
raise DiskFileNotOpen()
return self._metadata
def read_metadata(self):
"""
Return the metadata for an object.
:returns: metadata dictionary for an object
"""
with self.open():
return self.get_metadata()
def reader(self, iter_hook=None, keep_cache=False):
"""
Return a swift.common.swob.Response class compatible "app_iter"
object. The responsibility of closing the open file is passed to the
DiskFileReader object.
:param iter_hook:
:param keep_cache:
"""
if self._metadata is None:
raise DiskFileNotOpen()
dr = DiskFileReader(self._fs, self._name,
int(self._metadata['Content-Length']),
self._metadata['ETag'],
iter_hook=iter_hook)
return dr
@contextmanager
def create(self, size=None):
"""
Context manager to create a file. We create a temporary file first, and
then return a DiskFileWriter object to encapsulate the state.
:param size: optional initial size of file to explicitly allocate on
disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
"""
fs_inst = None
try:
fs_inst = self._fs.open()
if size is not None:
fs_inst.create(self._name, size)
yield DiskFileWriter(fs_inst, self._name)
finally:
if fs_inst is not None:
fs_inst.close()
def write_metadata(self, metadata):
"""
Write a block of metadata to an object.
"""
with self.create() as writer:
writer.put(metadata)
def delete(self, timestamp):
"""
Perform a delete for the given object in the given container under the
given account.
:param timestamp: timestamp to compare with each file
"""
fs_inst = None
try:
timestamp = normalize_timestamp(timestamp)
fs_inst = self._fs.open()
md = fs_inst.get_metadata(self._name)
if md and md['X-Timestamp'] < timestamp:
fs_inst.del_object(self._name)
finally:
if fs_inst is not None:
fs_inst.close()

View File

@ -0,0 +1,108 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Babu Shanmugam <anbu@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Rados Object Server for Swift """
from __future__ import with_statement
import os
from swift import gettext_ as _
from eventlet import Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import is_success
from swift.obj.rados_diskfile import RadosFileSystem
from swift.obj import server
class ObjectController(server.ObjectController):
"""
Implements the WSGI application for the Swift Rados Object Server.
"""
def setup(self, conf):
ceph_conf = conf.get("rados_ceph_conf", None)
rados_user = conf.get("rados_user", None)
rados_pool = conf.get("rados_pool", None)
self._filesystem = RadosFileSystem(ceph_conf, rados_user, rados_pool)
def get_diskfile(self, device, partition, account, container, obj,
**kwargs):
"""
Utility method for instantiating a DiskFile object supporting a given
REST API.
"""
return self._filesystem.get_diskfile(device, partition, account,
container, obj, **kwargs)
def async_update(self, op, account, container, obj, host, partition,
contdevice, headers_out, objdevice):
"""
Sends or saves an async update.
:param op: operation performed (ex: 'PUT', or 'DELETE')
:param account: account name for the object
:param container: container name for the object
:param obj: object name
:param host: host that the container is on
:param partition: partition that the container is on
:param contdevice: device name that the container is on
:param headers_out: dictionary of headers to send in the container
request
:param objdevice: device name that the object is in
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:
with ConnectionTimeout(self.conn_timeout):
ip, port = host.rsplit(':', 1)
conn = http_connect(ip, port, contdevice, partition, op,
full_path, headers_out)
with Timeout(self.node_timeout):
response = conn.getresponse()
response.read()
if is_success(response.status):
return
else:
self.logger.error(_(
'ERROR Container update failed: %(status)d '
'response from %(ip)s:%(port)s/%(dev)s'),
{'status': response.status, 'ip': ip, 'port': port,
'dev': contdevice})
except (Exception, Timeout):
self.logger.exception(_(
'ERROR container update failed with '
'%(ip)s:%(port)s/%(dev)s'),
{'ip': ip, 'port': port, 'dev': contdevice})
# FIXME: For now don't handle async updates
def REPLICATE(self, request):
"""
Handle REPLICATE requests for the Swift Object Server. This is used
by the object replicator to get hashes for directories.
"""
pass
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI object server apps"""
conf = global_conf.copy()
conf.update(local_conf)
return ObjectController(conf)

View File

@ -0,0 +1,465 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
#
# Author: Babu Shanmugam <anbu@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import call
import cPickle as pickle
import unittest
from hashlib import md5
from swift.common.exceptions import DiskFileQuarantined, \
DiskFileCollision, DiskFileNotOpen
from swift.obj.rados_diskfile import RadosFileSystem
from swift.obj.rados_diskfile import METADATA_KEY
class TestRadosDiskFile(unittest.TestCase):
def setUp(self):
super(TestRadosDiskFile, self).setUp()
self.mock_rados = mock.MagicMock(name='rados')
self.Rados = self.mock_rados.Rados.return_value
self.ioctx = self.Rados.open_ioctx.return_value
self.rados_ceph_conf = 'xxx-ceph.conf'
self.rados_name = 'xxx-rados-name'
self.rados_pool = 'xxx-rados-pool'
self.device = 'device'
self.partition = '0'
self.account = 'account'
self.container = 'container'
self.obj_name = 'myobject'
self.rdf = RadosFileSystem(self.rados_ceph_conf,
self.rados_name,
self.rados_pool,
rados=self.mock_rados)
self.df = self.rdf.get_diskfile(self.device,
self.partition,
self.account,
self.container,
self.obj_name)
def tearDown(self):
super(TestRadosDiskFile, self).tearDown()
self.mock_rados.reset_mock()
self.Rados.reset_mock()
self.ioctx.reset_mock()
del self.rdf
del self.df
def _obj_name(self):
return '/' + '/'.join((self.device, self.partition,
self.account, self.container, self.obj_name))
def _assert_if_rados_not_opened(self):
self.mock_rados.Rados.assert_called_once_with(
conffile=self.rados_ceph_conf, rados_id=self.rados_name)
self.Rados.connect.assert_called_once_with()
self.Rados.open_ioctx.assert_called_once_with(self.rados_pool)
def _assert_if_rados_not_closed(self):
self.ioctx.close.assert_called_once_with()
def _assert_if_rados_opened(self):
assert((self.mock_rados.Rados.call_count == 0) and
(self.Rados.connect.call_count == 0) and
(self.Rados.open_ioctx.call_count == 0))
def _assert_if_rados_closed(self):
assert((self.ioctx.close.call_count == 0) and
(self.Rados.shutdown.call_count == 0))
def _assert_if_rados_opened_closed(self):
assert((self.mock_rados.Rados.call_count > 0) and
(self.Rados.connect.call_count > 0) and
(self.Rados.open_ioctx.call_count > 0))
assert((self.Rados.connect.call_count > 0) and
(self.Rados.open_ioctx.call_count ==
self.ioctx.close.call_count))
def test_df_open_1(self):
meta = {'name': self._obj_name(), 'Content-Length': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
with self.df.open():
pass
self._assert_if_rados_not_opened()
self.ioctx.get_xattr.assert_called_once_with(self._obj_name(),
METADATA_KEY)
self._assert_if_rados_not_closed()
def test_df_open_invalid_name(self):
meta = {'name': 'invalid', 'Content-Length': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
try:
self.df.open()
except DiskFileCollision:
success = True
except Exception:
pass
finally:
assert(success)
def test_df_open_invalid_content_length(self):
meta = {'name': self._obj_name(), 'Content-Length': 100}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
try:
self.df.open()
except DiskFileQuarantined:
success = True
except Exception:
pass
finally:
assert(success)
def test_df_notopen_check(self):
success = False
try:
with self.df:
pass
except DiskFileNotOpen:
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened()
self._assert_if_rados_closed()
def test_df_notopen_get_metadata(self):
success = False
try:
self.df.get_metadata()
except DiskFileNotOpen:
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened()
self._assert_if_rados_closed()
def test_df_get_metadata(self):
meta = {'name': self._obj_name(), 'Content-Length': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
ret_meta = None
try:
with self.df.open():
ret_meta = self.df.get_metadata()
success = True
except Exception:
pass
finally:
assert(success)
assert(ret_meta == meta)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_read_metadata(self):
meta = {'name': self._obj_name(), 'Content-Length': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
ret_meta = None
try:
ret_meta = self.df.read_metadata()
success = True
except Exception:
pass
finally:
assert(success)
assert(ret_meta == meta)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_notopen_reader(self):
success = False
try:
self.df.reader()
except DiskFileNotOpen:
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened()
self._assert_if_rados_closed()
def test_df_open_reader_1(self):
meta = {'name': self._obj_name(), 'Content-Length': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
try:
with self.df.open():
self.df.reader()
except KeyError:
success = True
pass
finally:
assert(success)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_open_reader_2(self):
meta = {'name': self._obj_name(), 'Content-Length': 0, 'ETag': ''}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
try:
with self.df.open():
rdr = self.df.reader()
rdr.close()
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened_closed()
def test_df_reader_iter_invalid_cont_len(self):
etag = md5()
fcont = '123456789'
etag.update(fcont)
meta = {'name': self._obj_name(), 'Content-Length': len(fcont),
'ETag': etag.hexdigest()}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (len(fcont), 0)
success = False
try:
with self.df.open():
rdr = self.df.reader()
num_chunks = 0
self.ioctx.read.return_value = fcont
for chunk in rdr:
num_chunks += 1
assert(chunk == fcont)
if num_chunks == 3:
self.ioctx.read.return_value = None
assert(num_chunks == 3)
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened_closed()
# check read calls
call_list = [call.read(self._obj_name(), offset=0),
call.read(self._obj_name(), offset=len(fcont)),
call.read(self._obj_name(), offset=(2 * len(fcont))),
call.read(self._obj_name(), offset=(3 * len(fcont)))]
self.ioctx.assert_has_calls(call_list)
self.ioctx.remove_object.assert_called_once_with(self._obj_name())
def test_df_reader_iter_invalid_etag(self):
etag = md5()
fcont = '123456789'
etag.update(fcont)
meta = {'name': self._obj_name(), 'Content-Length': (3 * len(fcont)),
'ETag': etag.hexdigest()}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = ((len(fcont) * 3), 0)
success = False
try:
with self.df.open():
rdr = self.df.reader()
num_chunks = 0
self.ioctx.read.return_value = fcont
for chunk in rdr:
num_chunks += 1
assert(chunk == fcont)
if num_chunks == 3:
self.ioctx.read.return_value = None
assert(num_chunks == 3)
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened_closed()
# check read calls
call_list = [call.read(self._obj_name(), offset=0),
call.read(self._obj_name(), offset=len(fcont)),
call.read(self._obj_name(), offset=(2 * len(fcont))),
call.read(self._obj_name(), offset=(3 * len(fcont)))]
self.ioctx.assert_has_calls(call_list)
self.ioctx.remove_object.assert_called_once_with(self._obj_name())
def test_df_reader_iter_all_ok(self):
etag = md5()
fcont = '123456789'
etag.update(fcont)
etag.update(fcont)
etag.update(fcont)
meta = {'name': self._obj_name(), 'Content-Length': (3 * len(fcont)),
'ETag': etag.hexdigest()}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = ((3 * len(fcont)), 0)
success = False
try:
with self.df.open():
rdr = self.df.reader()
num_chunks = 0
self.ioctx.read.return_value = fcont
for chunk in rdr:
num_chunks += 1
assert(chunk == fcont)
if num_chunks == 3:
self.ioctx.read.return_value = None
assert(num_chunks == 3)
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened_closed()
# check read calls
call_list = [call.read(self._obj_name(), offset=0),
call.read(self._obj_name(), offset=len(fcont)),
call.read(self._obj_name(), offset=(2 * len(fcont))),
call.read(self._obj_name(), offset=(3 * len(fcont)))]
self.ioctx.assert_has_calls(call_list)
# if everything is perfect, the object will not be deleted
assert(self.ioctx.remove_object.call_count == 0)
def test_df_reader_iter_range(self):
etag = md5()
fcont = '0123456789'
etag.update(fcont)
meta = {'name': self._obj_name(), 'Content-Length': len(fcont),
'ETag': etag.hexdigest()}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (len(fcont), 0)
success = False
try:
with self.df.open():
rdr = self.df.reader()
num_chunks = 0
def ioctx_read(obj_name, length=8192, offset=0):
assert(obj_name == self._obj_name())
return fcont[offset:]
self.ioctx.read = ioctx_read
for chunk in rdr.app_iter_range(1, 8):
num_chunks += 1
assert(chunk == '1234567')
assert(num_chunks == 1)
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_opened_closed()
assert(self.ioctx.remove_object.call_count == 0)
def test_df_writer_1(self):
with self.df.create():
pass
assert(self.ioctx.trunc.call_count == 0)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
with self.df.create(500):
pass
self.ioctx.trunc.assert_called_once_with(self._obj_name(), 500)
def test_df_writer_write(self):
fcont = '0123456789'
writes = []
def ioctx_write(obj, data, offset):
writes.append((data, offset))
return 2
self.ioctx.write = ioctx_write
with self.df.create() as writer:
assert(writer.write(fcont) == len(fcont))
check_list = [
(fcont, 0),
(fcont[2:], 2),
(fcont[4:], 4),
(fcont[6:], 6),
(fcont[8:], 8)]
assert(writes == check_list)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_writer_put(self):
meta = {'Content-Length': 0,
'ETag': ''}
with self.df.create() as writer:
writer.put(meta)
old_metadata = pickle.dumps(meta)
ca = self.ioctx.set_xattr.call_args
check_1 = call(self._obj_name(), METADATA_KEY, old_metadata)
assert(ca == check_1)
assert(meta['name'] == self._obj_name())
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_write_metadata(self):
meta = {'Content-Length': 0,
'ETag': ''}
self.df.write_metadata(meta)
old_metadata = pickle.dumps(meta)
ca = self.ioctx.set_xattr.call_args
check_1 = call(self._obj_name(), METADATA_KEY, old_metadata)
assert(ca == check_1)
assert(meta['name'] == self._obj_name())
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
def test_df_delete(self):
meta = {'name': self._obj_name(), 'Content-Length': 0,
'X-Timestamp': 0}
self.ioctx.get_xattr.return_value = pickle.dumps(meta)
self.ioctx.stat.return_value = (0, 0)
success = False
try:
self.df.delete(1)
success = True
except Exception:
pass
finally:
assert(success)
self._assert_if_rados_not_opened()
self._assert_if_rados_not_closed()
self.ioctx.remove_object.assert_called_once_with(self._obj_name())

Binary file not shown.

133
tests/test_rados_server.py Executable file
View File

@ -0,0 +1,133 @@
# Copyright (c) 2010-2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import mock
MOCK_RADOS = mock.Mock(name='rados')
MOCK_RADOS.__name__ = 'rados'
sys.modules['rados'] = MOCK_RADOS
import cStringIO
import unittest
from test.unit.proxy import test_server
from test.unit.proxy.test_server import teardown
from swift.obj import rados_server
class ObjectNotFound(Exception):
pass
MOCK_RADOS.ObjectNotFound = ObjectNotFound
class MockIoctx(object):
def __init__(self):
self._objs = {}
def get_xattr(self, key, attr_name):
if self._objs.get(key) is None:
raise MOCK_RADOS.ObjectNotFound
o = self._obj(key, None, None)
return o['attr']
def set_xattr(self, key, attr_name, attr):
self._obj(key, None, attr)
def _obj(self, name, size, attr=None):
o = self._objs.get(name)
if o is None:
fd = cStringIO.StringIO()
if attr is None:
attr = ''
o = self._objs[name] = {'size': size, 'fd': fd, 'attr': attr}
else:
if size is not None:
o['size'] = size
if attr is not None:
o['attr'] = attr
return o
def stat(self, key):
obj = self._obj(key, None)
return (obj['fd'].tell(), 0)
def trunc(self, key, size):
self._obj(key, size)
def write(self, key, data, offset=0):
o = self._obj(key, None)
fd = o['fd']
if offset < fd.tell():
fd.seek(offset, os.SEEK_SET)
fd.write(data)
return len(data)
def read(self, key, length=8192, offset=0):
o = self._obj(key, None)
fd = o['fd']
fd.seek(offset, os.SEEK_SET)
return fd.read(length)
def aio_flush(self):
pass
def close(self):
pass
def remove_object(self, key):
del self._objs[key]
def setup():
mock_rados_Rados = mock.MagicMock()
MOCK_RADOS.Rados.return_value = mock_rados_Rados
mock_rados_Rados.open_ioctx.return_value = MockIoctx()
test_server.do_setup(rados_server)
class TestController(test_server.TestController):
pass
class TestProxyServer(test_server.TestProxyServer):
pass
class TestObjectController(test_server.TestObjectController):
pass
class TestContainerController(test_server.TestContainerController):
pass
class TestAccountController(test_server.TestAccountController):
pass
class TestAccountControllerFakeGetResponse(
test_server.TestAccountControllerFakeGetResponse):
pass
if __name__ == '__main__':
setup()
try:
unittest.main()
finally:
teardown()