Copy S3 server implmenetation from Nova

Copy S3 server implmenetation and unit tests from Nova.
Add fileutils from oslo-incubator and refresh incubation modules.

Upgrade script doesn't copy buckets content because Nova S3 server
is not intended to be permanent full functionality object storage, but
has to be used temporary to CreateImage operation only.

Change-Id: I7d38b0a8e014dfff8238e7134d837d1074e4dd95
This commit is contained in:
Feodor Tersin 2015-03-27 19:46:10 +03:00
parent 35a876a922
commit dc08136037
18 changed files with 823 additions and 23 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ ec2_api.egg-info
.tox
.testrepository
/functional_tests.conf
/buckets

View File

@ -25,6 +25,7 @@ The services afterwards can be started as binaries:
/usr/bin/ec2-api
/usr/bin/ec2-api-metadata
/usr/bin/ec2-api-s3
or set up as Linux services.
@ -53,6 +54,12 @@ for Neutron
then restart neutron-metadata service.
S3 server is intended only to support EC2 operations which require S3 server
(e.g. CreateImage) in OpenStack deployments without regular object storage.
It must not be used as a substitution for all-purposes object storage server.
Do not start it if the deployment has its own object storage or uses a public
one (e.g. AWS S3).
Usage
=====

View File

@ -25,10 +25,13 @@ EC2API_DIR=$DEST/ec2-api
EC2API_CONF_DIR=${EC2API_CONF_DIR:-/etc/ec2api}
EC2API_CONF_FILE=${EC2API_CONF_DIR}/ec2api.conf
EC2API_DEBUG=${EC2API_DEBUG:-True}
EC2API_STATE_PATH=${EC2API_STATE_PATH:=$DATA_DIR/ec2api}
EC2API_SERVICE_HOST=${EC2API_SERVICE_HOST:-$SERVICE_HOST}
EC2API_SERVICE_PORT=${EC2API_SERVICE_PORT:-8788}
EC2API_SERVICE_PROTOCOL=${EC2API_SERVICE_PROTOCOL:-$SERVICE_PROTOCOL}
EC2API_S3_SERVICE_HOST=${EC2API_S3_SERVICE_HOST:-$EC2API_SERVICE_HOST}
EC2API_S3_SERVICE_PORT=${EC2API_S3_SERVICE_PORT:-3334}
EC2API_RABBIT_VHOST=${EC2API_RABBIT_VHOST:-''}
@ -160,6 +163,7 @@ function configure_ec2api {
iniset $EC2API_CONF_FILE DEFAULT debug $EC2API_DEBUG
iniset $EC2API_CONF_FILE DEFAULT use_syslog $SYSLOG
iniset $EC2API_CONF_FILE DEFAULT state_path EC2API_STATE_PATH
# ec2api Api Configuration
@ -183,8 +187,13 @@ function configure_ec2api {
iniset $EC2API_CONF_FILE DEFAULT keystone_url "http://${KEYSTONE_AUTH_HOST}:35357/v2.0"
iniset $EC2API_CONF_FILE DEFAULT region_list "$REGION_NAME"
iniset $EC2API_CONF_FILE DEFAULT s3_port "$S3_SERVICE_PORT"
iniset $EC2API_CONF_FILE DEFAULT s3_host "$SERVICE_HOST"
if is_service_enabled swift3; then
iniset $EC2API_CONF_FILE DEFAULT s3_port "$S3_SERVICE_PORT"
iniset $EC2API_CONF_FILE DEFAULT s3_host "$SERVICE_HOST"
else
iniset $EC2API_CONF_FILE DEFAULT s3_port "$EC2API_S3_SERVICE_PORT"
iniset $EC2API_CONF_FILE DEFAULT s3_host "$EC2API_S3_SERVICE_HOST"
fi
configure_ec2api_rpc_backend
@ -227,6 +236,7 @@ function install_ec2api() {
function start_ec2api() {
screen_it ec2-api "cd $EC2API_DIR && $EC2API_BIN_DIR/ec2-api --config-file $EC2API_CONF_DIR/ec2api.conf"
screen_it ec2-api-metadata "cd $EC2API_DIR && $EC2API_BIN_DIR/ec2-api-metadata --config-file $EC2API_CONF_DIR/ec2api.conf"
screen_it ec2-api-s3 "cd $EC2API_DIR && $EC2API_BIN_DIR/ec2-api-s3 --config-file $EC2API_CONF_DIR/ec2api.conf"
}
@ -235,6 +245,7 @@ function stop_ec2api() {
# Kill the ec2api screen windows
screen -S $SCREEN_NAME -p ec2-api -X kill
screen -S $SCREEN_NAME -p ec2-api-metadata -X kill
screen -S $SCREEN_NAME -p ec2-api-s3 -X kill
}
function cleanup_ec2api() {

View File

@ -3,6 +3,7 @@
# we have to add ec2-api to enabled services for screen_it to work
enable_service ec2-api
enable_service ec2-api-metadata
enable_service ec2-api-s3
# we have to use Nova client supported Nova microversions,
# but related changes are not done in the client release.

View File

@ -52,7 +52,7 @@ s3_opts = [
help='Hostname or IP for OpenStack to use when accessing '
'the S3 api'),
cfg.IntOpt('s3_port',
default=3333,
default=3334,
help='Port used when accessing the S3 api'),
cfg.BoolOpt('s3_use_ssl',
default=False,

38
ec2api/cmd/api_s3.py Normal file
View File

@ -0,0 +1,38 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Daemon for ec2api objectstore. Supports S3 API."""
import sys
from oslo_log import log as logging
from ec2api import config
from ec2api.s3 import s3server
from ec2api import service
def main():
config.parse_args(sys.argv)
logging.setup(config.CONF, "ec2api")
server = s3server.get_wsgi_server()
service.serve(server)
service.wait()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,149 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import logging
import os
import stat
import tempfile
from oslo_utils import excutils
LOG = logging.getLogger(__name__)
_FILE_CACHE = {}
DEFAULT_MODE = stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO
def ensure_tree(path, mode=DEFAULT_MODE):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
:param mode: Directory creation permissions
"""
try:
os.makedirs(path, mode)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
def read_cached_file(filename, force_reload=False):
"""Read from a file if it has been modified.
:param force_reload: Whether to reload the file.
:returns: A tuple with a boolean specifying if the data is fresh
or not.
"""
global _FILE_CACHE
if force_reload:
delete_cached_file(filename)
reloaded = False
mtime = os.path.getmtime(filename)
cache_info = _FILE_CACHE.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
LOG.debug("Reloading cached file %s" % filename)
with open(filename) as fap:
cache_info['data'] = fap.read()
cache_info['mtime'] = mtime
reloaded = True
return (reloaded, cache_info['data'])
def delete_cached_file(filename):
"""Delete cached file if present.
:param filename: filename to delete
"""
global _FILE_CACHE
if filename in _FILE_CACHE:
del _FILE_CACHE[filename]
def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
:param remove: Optional function to remove passed path
"""
try:
remove(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
:param remove: Optional function to remove passed path
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
remove(path)
def file_open(*args, **kwargs):
"""Open file
see built-in open() documentation for more details
Note: The reason this is kept in a separate module is to easily
be able to provide a stub module that doesn't alter system
state at all (for unit tests)
"""
return open(*args, **kwargs)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
"""Create temporary file or use existing file.
This util is needed for creating temporary file with
specified content, suffix and prefix. If path is not None,
it will be used for writing content. If the path doesn't
exist it'll be created.
:param content: content for temporary file.
:param path: same as parameter 'dir' for mkstemp
:param suffix: same as parameter 'suffix' for mkstemp
:param prefix: same as parameter 'prefix' for mkstemp
For example: it can be used in database tests for creating
configuration files.
"""
if path:
ensure_tree(path)
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
try:
os.write(fd, content)
finally:
os.close(fd)
return path

View File

@ -199,22 +199,26 @@ class ServiceWrapper(object):
class ProcessLauncher(object):
def __init__(self, wait_interval=0.01):
"""Constructor.
_signal_handlers_set = set()
@classmethod
def _handle_class_signals(cls, *args, **kwargs):
for handler in cls._signal_handlers_set:
handler(*args, **kwargs)
def __init__(self):
"""Constructor."""
:param wait_interval: The interval to sleep for between checks
of child process exit.
"""
self.children = {}
self.sigcaught = None
self.running = True
self.wait_interval = wait_interval
rfd, self.writepipe = os.pipe()
self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
self.handle_signal()
def handle_signal(self):
_set_signals_handler(self._handle_signal)
self._signal_handlers_set.add(self._handle_signal)
_set_signals_handler(self._handle_class_signals)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
@ -333,8 +337,8 @@ class ProcessLauncher(object):
def _wait_child(self):
try:
# Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
# Block while any of child processes have exited
pid, status = os.waitpid(0, 0)
if not pid:
return None
except OSError as exc:
@ -363,10 +367,6 @@ class ProcessLauncher(object):
while self.running:
wrap = self._wait_child()
if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(self.wait_interval)
continue
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)
@ -391,8 +391,14 @@ class ProcessLauncher(object):
if not _is_sighup_and_daemon(self.sigcaught):
break
cfg.CONF.reload_config_files()
for service in set(
[wrap.service for wrap in self.children.values()]):
service.reset()
for pid in self.children:
os.kill(pid, signal.SIGHUP)
self.running = True
self.sigcaught = None
except eventlet.greenlet.GreenletExit:

24
ec2api/s3/__init__.py Normal file
View File

@ -0,0 +1,24 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`ec2api.s3` -- S3-type object store
=====================================================
.. automodule:: ec2api.s3
:platform: Unix
:synopsis: Currently a trivial file-based system, getting extended w/ swift.
"""

372
ec2api/s3/s3server.py Normal file
View File

@ -0,0 +1,372 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010 OpenStack Foundation
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of an S3-like storage server based on local files.
Useful to test features that will eventually run on S3, or if you want to
run something locally that was once running on S3.
We don't support all the features of S3, but it does work with the
standard S3 client for the most basic semantics. To use the standard
S3 client with this module::
c = S3.AWSAuthConnection("", "", server="localhost", port=8888,
is_secure=False)
c.create_bucket("mybucket")
c.put("mybucket", "mykey", "a value")
print c.get("mybucket", "mykey").body
"""
import bisect
import datetime
import os.path
import urllib
from oslo_config import cfg
import routes
import six
import webob
from ec2api.openstack.common import fileutils
from ec2api import paths
from ec2api import utils
from ec2api import wsgi
s3_opts = [
cfg.StrOpt('buckets_path',
default=paths.state_path_def('buckets'),
help='Path to S3 buckets'),
cfg.StrOpt('s3_listen',
default="0.0.0.0",
help='IP address for S3 API to listen'),
cfg.IntOpt('s3_listen_port',
default=3334,
help='Port for S3 API to listen'),
]
CONF = cfg.CONF
CONF.register_opts(s3_opts)
def get_wsgi_server():
return wsgi.Server("S3 Objectstore",
S3Application(CONF.buckets_path),
port=CONF.s3_listen_port,
host=CONF.s3_listen)
class S3Application(wsgi.Router):
"""Implementation of an S3-like storage server based on local files.
If bucket depth is given, we break files up into multiple directories
to prevent hitting file system limits for number of files in each
directories. 1 means one level of directories, 2 means 2, etc.
"""
def __init__(self, root_directory, bucket_depth=0, mapper=None):
if mapper is None:
mapper = routes.Mapper()
mapper.connect(
'/',
controller=lambda *a, **kw: RootHandler(self)(*a, **kw))
mapper.connect(
'/{bucket}/{object_name}',
controller=lambda *a, **kw: ObjectHandler(self)(*a, **kw))
mapper.connect(
'/{bucket_name}/',
controller=lambda *a, **kw: BucketHandler(self)(*a, **kw))
self.directory = os.path.abspath(root_directory)
fileutils.ensure_tree(self.directory)
self.bucket_depth = bucket_depth
super(S3Application, self).__init__(mapper)
class BaseRequestHandler(object):
"""Base class emulating Tornado's web framework pattern in WSGI.
This is a direct port of Tornado's implementation, so some key decisions
about how the code interacts have already been chosen.
The two most common ways of designing web frameworks can be
classified as async object-oriented and sync functional.
Tornado's is on the OO side because a response is built up in and using
the shared state of an object and one of the object's methods will
eventually trigger the "finishing" of the response asynchronously.
Most WSGI stuff is in the functional side, we pass a request object to
every call down a chain and the eventual return value will be a response.
Part of the function of the routing code in S3Application as well as the
code in BaseRequestHandler's __call__ method is to merge those two styles
together enough that the Tornado code can work without extensive
modifications.
To do that it needs to give the Tornado-style code clean objects that it
can modify the state of for each request that is processed, so we use a
very simple factory lambda to create new state for each request, that's
the stuff in the router, and when we let the Tornado code modify that
object to handle the request, then we return the response it generated.
This wouldn't work the same if Tornado was being more async'y and doing
other callbacks throughout the process, but since Tornado is being
relatively simple here we can be satisfied that the response will be
complete by the end of the get/post method.
"""
def __init__(self, application):
self.application = application
@webob.dec.wsgify
def __call__(self, request):
method = request.method.lower()
f = getattr(self, method, self.invalid)
self.request = request
self.response = webob.Response()
params = request.environ['wsgiorg.routing_args'][1]
del params['controller']
f(**params)
return self.response
def get_argument(self, arg, default):
return self.request.params.get(arg, default)
def set_header(self, header, value):
self.response.headers[header] = value
def set_status(self, status_code):
self.response.status = status_code
def set_404(self):
self.render_xml({"Error": {
"Code": "NoSuchKey",
"Message": "The resource you requested does not exist"
}})
self.set_status(404)
def finish(self, body=''):
self.response.body = utils.utf8(body)
def invalid(self, **kwargs):
pass
def render_xml(self, value):
assert isinstance(value, dict) and len(value) == 1
self.set_header("Content-Type", "application/xml; charset=UTF-8")
name = value.keys()[0]
parts = []
parts.append('<' + utils.utf8(name) +
' xmlns="http://doc.s3.amazonaws.com/2006-03-01">')
self._render_parts(value.values()[0], parts)
parts.append('</' + utils.utf8(name) + '>')
self.finish('<?xml version="1.0" encoding="UTF-8"?>\n' +
''.join(parts))
def _render_parts(self, value, parts=None):
if not parts:
parts = []
if isinstance(value, six.string_types):
parts.append(utils.xhtml_escape(value))
elif isinstance(value, int) or isinstance(value, long):
parts.append(str(value))
elif isinstance(value, datetime.datetime):
parts.append(value.strftime("%Y-%m-%dT%H:%M:%S.000Z"))
elif isinstance(value, dict):
for name, subvalue in value.iteritems():
if not isinstance(subvalue, list):
subvalue = [subvalue]
for subsubvalue in subvalue:
parts.append('<' + utils.utf8(name) + '>')
self._render_parts(subsubvalue, parts)
parts.append('</' + utils.utf8(name) + '>')
else:
raise Exception("Unknown S3 value type %r", value)
def _object_path(self, bucket, object_name):
if self.application.bucket_depth < 1:
return os.path.abspath(os.path.join(
self.application.directory, bucket, object_name))
name_hash = utils.get_hash_str(object_name)
path = os.path.abspath(os.path.join(
self.application.directory, bucket))
for i in range(self.application.bucket_depth):
path = os.path.join(path, name_hash[:2 * (i + 1)])
return os.path.join(path, object_name)
class RootHandler(BaseRequestHandler):
def get(self):
names = os.listdir(self.application.directory)
buckets = []
for name in names:
path = os.path.join(self.application.directory, name)
info = os.stat(path)
buckets.append({
"Name": name,
"CreationDate": datetime.datetime.utcfromtimestamp(
info.st_ctime),
})
self.render_xml({"ListAllMyBucketsResult": {
"Buckets": {"Bucket": buckets},
}})
class BucketHandler(BaseRequestHandler):
def get(self, bucket_name):
prefix = self.get_argument("prefix", u"")
marker = self.get_argument("marker", u"")
max_keys = int(self.get_argument("max-keys", 50000))
path = os.path.abspath(os.path.join(self.application.directory,
bucket_name))
terse = int(self.get_argument("terse", 0))
if (not path.startswith(self.application.directory) or
not os.path.isdir(path)):
self.set_404()
return
object_names = []
for root, _dirs, files in os.walk(path):
for file_name in files:
object_names.append(os.path.join(root, file_name))
skip = len(path) + 1
for i in range(self.application.bucket_depth):
skip += 2 * (i + 1) + 1
object_names = [n[skip:] for n in object_names]
object_names.sort()
contents = []
start_pos = 0
if marker:
start_pos = bisect.bisect_right(object_names, marker, start_pos)
if prefix:
start_pos = bisect.bisect_left(object_names, prefix, start_pos)
truncated = False
for object_name in object_names[start_pos:]:
if not object_name.startswith(prefix):
break
if len(contents) >= max_keys:
truncated = True
break
object_path = self._object_path(bucket_name, object_name)
c = {"Key": object_name}
if not terse:
info = os.stat(object_path)
c.update({
"LastModified": datetime.datetime.utcfromtimestamp(
info.st_mtime),
"Size": info.st_size,
})
contents.append(c)
marker = object_name
self.render_xml({"ListBucketResult": {
"Name": bucket_name,
"Prefix": prefix,
"Marker": marker,
"MaxKeys": max_keys,
"IsTruncated": truncated,
"Contents": contents,
}})
def put(self, bucket_name):
path = os.path.abspath(os.path.join(
self.application.directory, bucket_name))
if (not path.startswith(self.application.directory) or
os.path.exists(path)):
self.set_status(403)
return
fileutils.ensure_tree(path)
self.finish()
def delete(self, bucket_name):
path = os.path.abspath(os.path.join(
self.application.directory, bucket_name))
if (not path.startswith(self.application.directory) or
not os.path.isdir(path)):
self.set_404()
return
if len(os.listdir(path)) > 0:
self.set_status(403)
return
os.rmdir(path)
self.set_status(204)
self.finish()
def head(self, bucket_name):
path = os.path.abspath(os.path.join(self.application.directory,
bucket_name))
if (not path.startswith(self.application.directory) or
not os.path.isdir(path)):
self.set_404()
return
self.set_status(200)
self.finish()
class ObjectHandler(BaseRequestHandler):
def get(self, bucket, object_name):
object_name = urllib.unquote(object_name)
path = self._object_path(bucket, object_name)
if (not path.startswith(self.application.directory) or
not os.path.isfile(path)):
self.set_404()
return
info = os.stat(path)
self.set_header("Content-Type", "application/unknown")
self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(
info.st_mtime))
object_file = open(path, "r")
try:
self.finish(object_file.read())
finally:
object_file.close()
def put(self, bucket, object_name):
object_name = urllib.unquote(object_name)
bucket_dir = os.path.abspath(os.path.join(
self.application.directory, bucket))
if (not bucket_dir.startswith(self.application.directory) or
not os.path.isdir(bucket_dir)):
self.set_404()
return
path = self._object_path(bucket, object_name)
if not path.startswith(bucket_dir) or os.path.isdir(path):
self.set_status(403)
return
directory = os.path.dirname(path)
fileutils.ensure_tree(directory)
object_file = open(path, "w")
object_file.write(self.request.body)
object_file.close()
self.set_header('ETag',
'"%s"' % utils.get_hash_str(self.request.body))
self.finish()
def delete(self, bucket, object_name):
object_name = urllib.unquote(object_name)
path = self._object_path(bucket, object_name)
if (not path.startswith(self.application.directory) or
not os.path.isfile(path)):
self.set_404()
return
os.unlink(path)
self.set_status(204)
self.finish()

View File

@ -24,3 +24,8 @@
# The code below enables nosetests to work with i18n _() blocks
import __builtin__
setattr(__builtin__, '_', lambda x: x)
# NOTE(ft): this is required by test_s3.S3APITestCase to switch execution
# between test and server threads
import eventlet
eventlet.monkey_patch(socket=True)

View File

@ -30,7 +30,7 @@ class ProxyTestCase(test_base.BaseTestCase):
def setUp(self):
super(ProxyTestCase, self).setUp()
self.handler = metadata.MetadataRequestHandler()
conf = config_fixture.Config()
conf = self.useFixture(config_fixture.Config())
conf.config(group='metadata',
nova_metadata_ip='9.9.9.9',
nova_metadata_port=8775,

View File

@ -0,0 +1,135 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unittets for S3 objectstore clone.
"""
import boto
from boto import exception as boto_exception
from boto.s3 import connection as s3
import fixtures
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslotest import base as test_base
from ec2api.s3 import s3server
CONF = cfg.CONF
class S3APITestCase(test_base.BaseTestCase):
"""Test objectstore through S3 API."""
def setUp(self):
"""Setup users, projects, and start a test server."""
super(S3APITestCase, self).setUp()
tempdir = self.useFixture(fixtures.TempDir())
conf = self.useFixture(config_fixture.Config())
conf.config(buckets_path=tempdir.path,
s3_listen='127.0.0.1',
s3_listen_port=0)
self.server = s3server.get_wsgi_server()
# NOTE(ft): this requires eventlet.monkey_patch, which is called in
# tests/unit/__init__.py. Remove it out from there if you get these
# tests rid of server run
self.server.start()
self.addCleanup(self.server.stop)
if not boto.config.has_section('Boto'):
boto.config.add_section('Boto')
boto.config.set('Boto', 'num_retries', '0')
conn = s3.S3Connection(aws_access_key_id='fake',
aws_secret_access_key='fake',
host=CONF.s3_listen,
port=self.server.port,
is_secure=False,
calling_format=s3.OrdinaryCallingFormat())
self.conn = conn
def get_http_connection(*args):
"""Get a new S3 connection, don't attempt to reuse connections."""
return self.conn.new_http_connection(*args)
self.conn.get_http_connection = get_http_connection
def _ensure_no_buckets(self, buckets):
self.assertEqual(len(buckets), 0, "Bucket list was not empty")
return True
def _ensure_one_bucket(self, buckets, name):
self.assertEqual(len(buckets), 1,
"Bucket list didn't have exactly one element in it")
self.assertEqual(buckets[0].name, name, "Wrong name")
return True
def test_list_buckets(self):
# Make sure we are starting with no buckets.
self._ensure_no_buckets(self.conn.get_all_buckets())
def test_create_and_delete_bucket(self):
# Test bucket creation and deletion.
bucket_name = 'testbucket'
self.conn.create_bucket(bucket_name)
self._ensure_one_bucket(self.conn.get_all_buckets(), bucket_name)
self.conn.delete_bucket(bucket_name)
self._ensure_no_buckets(self.conn.get_all_buckets())
def test_create_bucket_and_key_and_delete_key_again(self):
# Test key operations on buckets.
bucket_name = 'testbucket'
key_name = 'somekey'
key_contents = 'somekey'
b = self.conn.create_bucket(bucket_name)
k = b.new_key(key_name)
k.set_contents_from_string(key_contents)
bucket = self.conn.get_bucket(bucket_name)
# make sure the contents are correct
key = bucket.get_key(key_name)
self.assertEqual(key.get_contents_as_string(), key_contents,
"Bad contents")
# delete the key
key.delete()
self._ensure_no_buckets(bucket.get_all_keys())
def test_unknown_bucket(self):
# NOTE(unicell): Since Boto v2.25.0, the underlying implementation
# of get_bucket method changed from GET to HEAD.
#
# Prior to v2.25.0, default validate=True fetched a list of keys in the
# bucket and raises S3ResponseError. As a side effect of switching to
# HEAD request, get_bucket call now generates less error message.
#
# To keep original semantics, additional get_all_keys call is
# suggestted per Boto document. This case tests both validate=False and
# validate=True case for completeness.
#
# http://docs.pythonboto.org/en/latest/releasenotes/v2.25.0.html
# http://docs.pythonboto.org/en/latest/s3_tut.html#accessing-a-bucket
bucket_name = 'falalala'
self.assertRaises(boto_exception.S3ResponseError,
self.conn.get_bucket,
bucket_name)
bucket = self.conn.get_bucket(bucket_name, validate=False)
self.assertRaises(boto_exception.S3ResponseError,
bucket.get_all_keys,
maxkeys=0)

View File

@ -16,10 +16,12 @@
"""Utilities and helper functions."""
import contextlib
import hashlib
import hmac
import shutil
import socket
import tempfile
from xml.sax import saxutils
from oslo_config import cfg
from oslo_log import log as logging
@ -71,6 +73,10 @@ def tempdir(**kwargs):
LOG.error(_('Could not remove tmpdir: %s'), str(e))
def get_hash_str(base_str):
"""returns string that represents hash of base_str (in hex format)."""
return hashlib.md5(base_str).hexdigest()
if hasattr(hmac, 'compare_digest'):
constant_time_compare = hmac.compare_digest
else:
@ -87,3 +93,23 @@ else:
for x, y in zip(first, second):
result |= ord(x) ^ ord(y)
return result == 0
def xhtml_escape(value):
"""Escapes a string so it is valid within XML or XHTML.
"""
return saxutils.escape(value, {'"': '&quot;', "'": '&apos;'})
def utf8(value):
"""Try to turn a string into utf-8 if possible.
Code is directly from the utf8 function in
http://github.com/facebook/tornado/blob/master/tornado/escape.py
"""
if isinstance(value, unicode):
return value.encode('utf-8')
assert isinstance(value, str)
return value

View File

@ -215,7 +215,7 @@
#s3_host=$my_ip
# Port used when accessing the S3 api (integer value)
#s3_port=3333
#s3_port=3334
# Whether to use SSL when talking to S3 (boolean value)
#s3_use_ssl=false
@ -243,6 +243,22 @@
#external_network=<None>
#
# Options defined in ec2api.s3.s3server
#
# Path to S3 buckets (string value)
#buckets_path=$state_path/buckets
# IP address for S3 API to listen (string value)
#s3_listen=0.0.0.0
# Port for S3 API to listen (integer value)
#s3_listen_port=3334
[None]
#
# Options defined in ec2api.openstack.common.eventlet_backdoor
#

View File

@ -282,14 +282,22 @@ iniset $CONF_FILE DEFAULT admin_password $SERVICE_PASSWORD
iniset $CONF_FILE DEFAULT admin_tenant_name $SERVICE_TENANT
if [[ -f "$NOVA_CONF" ]]; then
copynovaopt s3_host
copynovaopt s3_port
copynovaopt s3_affix_tenant
copynovaopt s3_use_ssl
# NOTE(ft): use swift instead internal s3 server if enabled
if [[ -n $(keystone catalog --service object-store) ]] &&
[[ -n $(keystone catalog --service s3) ]]; then
copynovaopt s3_host
copynovaopt s3_port
copynovaopt s3_affix_tenant
copynovaopt s3_use_ssl
fi
copynovaopt cert_topic
copynovaopt rabbit_hosts
copynovaopt rabbit_password
# TODO(ft): it's necessary to support other available messaging implementations
nova_state_path=$(iniget $NOVA_CONF DEFAULT state_path)
root_state_path=$(dirname $nova_state_path)
iniset $CONF_FILE DEFAULT state_path ${root_state_path}/ec2api
fi
#init cache dir

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=eventlet_backdoor,local,service
modules=eventlet_backdoor,fileutils,local,service
# The base module to hold the copy of openstack.common
base=ec2api

View File

@ -31,6 +31,7 @@ console_scripts =
ec2-api=ec2api.cmd.api:main
ec2-api-manage=ec2api.cmd.manage:main
ec2-api-metadata=ec2api.cmd.api_metadata:main
ec2-api-s3=ec2api.cmd.api_s3:main
[build_sphinx]
all_files = 1