Import upstream version 1.0.0

This commit is contained in:
Michael Barton 2010-07-08 01:37:44 +00:00 committed by Monty Taylor
commit fe067413b1
41 changed files with 9069 additions and 0 deletions

15
PKG-INFO Normal file
View File

@ -0,0 +1,15 @@
Metadata-Version: 1.0
Name: swift
Version: 1.0.0-1
Summary: Swift
Home-page: https://launchpad.net/swift
Author: OpenStack, LLC.
Author-email: UNKNOWN
License: Apache License (2.0)
Description: UNKNOWN
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: POSIX :: Linux
Classifier: Programming Language :: Python :: 2.6
Classifier: Environment :: No Input/Output (Daemon)

17
README Normal file
View File

@ -0,0 +1,17 @@
Swift
-----
A distributed object store that was originally developed as the basis for
Rackspace's Cloud Files.
To build documentation run `make html` in the /doc folder, and then browse to
/doc/build/html/index.html.
The best place to get started is the "SAIO - Swift All In One", which will walk
you through setting up a development cluster of Swift in a VM.
For more information, vist us at http://launchpad.net/swift, or come hang out
on our IRC channel, #openstack on freenode.
--
Swift Development Team

1276
bin/st.py Executable file

File diff suppressed because it is too large Load Diff

351
bin/swift-account-audit.py Executable file
View File

@ -0,0 +1,351 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
from urllib import quote
from hashlib import md5
import getopt
from itertools import chain
import simplejson
from eventlet.greenpool import GreenPool
from eventlet.event import Event
from swift.common.ring import Ring
from swift.common.utils import split_path
from swift.common.bufferedhttp import http_connect
usage = """
Usage!
%(cmd)s [options] [url 1] [url 2] ...
-c [concurrency] Set the concurrency, default 50
-r [ring dir] Ring locations, default /etc/swift
-e [filename] File for writing a list of inconsistent urls
-d Also download files and verify md5
You can also feed a list of urls to the script through stdin.
Examples!
%(cmd)s SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076
%(cmd)s SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076/container/object
%(cmd)s -e errors.txt SOSO_88ad0b83-b2c5-4fa1-b2d6-60c597202076/container
%(cmd)s < errors.txt
%(cmd)s -c 25 -d < errors.txt
""" % {'cmd': sys.argv[0]}
class Auditor(object):
def __init__(self, swift_dir='/etc/swift', concurrency=50, deep=False,
error_file=None):
self.pool = GreenPool(concurrency)
self.object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
self.container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
self.account_ring = Ring(os.path.join(swift_dir, 'account.ring.gz'))
self.deep = deep
self.error_file = error_file
# zero out stats
self.accounts_checked = self.account_exceptions = \
self.account_not_found = self.account_container_mismatch = \
self.account_object_mismatch = self.objects_checked = \
self.object_exceptions = self.object_not_found = \
self.object_checksum_mismatch = self.containers_checked = \
self.container_exceptions = self.container_count_mismatch = \
self.container_not_found = self.container_obj_mismatch = 0
self.list_cache = {}
self.in_progress = {}
def audit_object(self, account, container, name):
path = '/%s/%s/%s' % (quote(account), quote(container), quote(name))
part, nodes = self.object_ring.get_nodes(account, container, name)
container_listing = self.audit_container(account, container)
consistent = True
if name not in container_listing:
print " Object %s missing in container listing!" % path
consistent = False
hash = None
else:
hash = container_listing[name]['hash']
etags = []
for node in nodes:
try:
if self.deep:
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'GET', path, {})
resp = conn.getresponse()
calc_hash = md5()
chunk = True
while chunk:
chunk = resp.read(8192)
calc_hash.update(chunk)
calc_hash = calc_hash.hexdigest()
if resp.status // 100 != 2:
self.object_not_found += 1
consistent = False
print ' Bad status GETting object "%s" on %s/%s' \
% (path, node['ip'], node['device'])
continue
if resp.getheader('ETag').strip('"') != calc_hash:
self.object_checksum_mismatch += 1
consistent = False
print ' MD5 doesnt match etag for "%s" on %s/%s' \
% (path, node['ip'], node['device'])
etags.append(resp.getheader('ETag'))
else:
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'HEAD', path, {})
resp = conn.getresponse()
if resp.status // 100 != 2:
self.object_not_found += 1
consistent = False
print ' Bad status HEADing object "%s" on %s/%s' \
% (path, node['ip'], node['device'])
continue
etags.append(resp.getheader('ETag'))
except Exception:
self.object_exceptions += 1
consistent = False
print ' Exception fetching object "%s" on %s/%s' \
% (path, node['ip'], node['device'])
continue
if not etags:
consistent = False
print " Failed fo fetch object %s at all!" % path
elif hash:
for etag in etags:
if resp.getheader('ETag').strip('"') != hash:
consistent = False
self.object_checksum_mismatch += 1
print ' ETag mismatch for "%s" on %s/%s' \
% (path, node['ip'], node['device'])
if not consistent and self.error_file:
print >>open(self.error_file, 'a'), path
self.objects_checked += 1
def audit_container(self, account, name, recurse=False):
if (account, name) in self.in_progress:
self.in_progress[(account, name)].wait()
if (account, name) in self.list_cache:
return self.list_cache[(account, name)]
self.in_progress[(account, name)] = Event()
print 'Auditing container "%s"...' % name
path = '/%s/%s' % (quote(account), quote(name))
account_listing = self.audit_account(account)
consistent = True
if name not in account_listing:
consistent = False
print " Container %s not in account listing!" % path
part, nodes = self.container_ring.get_nodes(account, name)
rec_d = {}
responses = {}
for node in nodes:
marker = ''
results = True
while results:
node_id = node['id']
try:
conn = http_connect(node['ip'], node['port'], node['device'],
part, 'GET', path, {},
'format=json&marker=%s' % quote(marker))
resp = conn.getresponse()
if resp.status // 100 != 2:
self.container_not_found += 1
consistent = False
print ' Bad status GETting container "%s" on %s/%s' % \
(path, node['ip'], node['device'])
break
if node['id'] not in responses:
responses[node['id']] = dict(resp.getheaders())
results = simplejson.loads(resp.read())
except Exception:
self.container_exceptions += 1
consistent = False
print ' Exception GETting container "%s" on %s/%s' % \
(path, node['ip'], node['device'])
break
if results:
marker = results[-1]['name']
for obj in results:
obj_name = obj['name']
if obj_name not in rec_d:
rec_d[obj_name] = obj
if obj['last_modified'] != rec_d[obj_name]['last_modified']:
self.container_obj_mismatch += 1
consistent = False
print " Different versions of %s/%s in container dbs." % \
(quote(name), quote(obj['name']))
if obj['last_modified'] > rec_d[obj_name]['last_modified']:
rec_d[obj_name] = obj
obj_counts = [int(header['x-container-object-count'])
for header in responses.values()]
if not obj_counts:
consistent = False
print " Failed to fetch container %s at all!" % path
else:
if len(set(obj_counts)) != 1:
self.container_count_mismatch += 1
consistent = False
print " Container databases don't agree on number of objects."
print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
self.containers_checked += 1
self.list_cache[(account, name)] = rec_d
self.in_progress[(account, name)].send(True)
del self.in_progress[(account, name)]
if recurse:
for obj in rec_d.keys():
self.pool.spawn_n(self.audit_object, account, name, obj)
if not consistent and self.error_file:
print >>open(self.error_file, 'a'), path
return rec_d
def audit_account(self, account, recurse=False):
if account in self.in_progress:
self.in_progress[account].wait()
if account in self.list_cache:
return self.list_cache[account]
self.in_progress[account] = Event()
print "Auditing account %s..." % account
consistent = True
path = '/%s' % account
part, nodes = self.account_ring.get_nodes(account)
responses = {}
for node in nodes:
marker = ''
results = True
while results:
node_id = node['id']
try:
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'GET', path, {},
'format=json&marker=%s' % quote(marker))
resp = conn.getresponse()
if resp.status // 100 != 2:
self.account_not_found += 1
consistent = False
print " Bad status GETting account %(ip)s:%(device)s" \
% node
break
results = simplejson.loads(resp.read())
except Exception:
self.account_exceptions += 1
consistent = False
print " Exception GETting account %(ip)s:%(device)s" % node
break
if node_id not in responses:
responses[node_id] = [dict(resp.getheaders()), []]
responses[node_id][1].extend(results)
if results:
marker = results[-1]['name']
headers = [resp[0] for resp in responses.values()]
cont_counts = [int(header['x-account-container-count'])
for header in headers]
if len(set(cont_counts)) != 1:
self.account_container_mismatch += 1
consistent = False
print " Account databases don't agree on number of containers."
print " Max: %s, Min: %s" % (max(cont_counts), min(cont_counts))
obj_counts = [int(header['x-account-object-count'])
for header in headers]
if len(set(obj_counts)) != 1:
self.account_object_mismatch += 1
consistent = False
print " Account databases don't agree on number of objects."
print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
containers = set()
for resp in responses.values():
containers.update(container['name'] for container in resp[1])
self.list_cache[account] = containers
self.in_progress[account].send(True)
del self.in_progress[account]
self.accounts_checked += 1
if recurse:
for container in containers:
self.pool.spawn_n(self.audit_container, account, container, True)
if not consistent and self.error_file:
print >>open(self.error_file, 'a'), path
return containers
def audit(self, account, container=None, obj=None):
if obj and container:
self.pool.spawn_n(self.audit_object, account, container, obj)
elif container:
self.pool.spawn_n(self.audit_container, account, container, True)
else:
self.pool.spawn_n(self.audit_account, account, True)
def wait(self):
self.pool.waitall()
def print_stats(self):
print
print " Accounts checked: %d" % self.accounts_checked
if self.account_not_found:
print " Missing Replicas: %d" % self.account_not_found
if self.account_exceptions:
print " Exceptions: %d" % self.account_exceptions
if self.account_container_mismatch:
print " Cntainer mismatch: %d" % self.account_container_mismatch
if self.account_object_mismatch:
print " Object mismatch: %d" % self.account_object_mismatch
print
print "Containers checked: %d" % self.containers_checked
if self.container_not_found:
print " Missing Replicas: %d" % self.container_not_found
if self.container_exceptions:
print " Exceptions: %d" % self.container_exceptions
if self.container_count_mismatch:
print " Count mismatch: %d" % self.container_count_mismatch
if self.container_obj_mismatch:
print " Obj mismatch: %d" % self.container_obj_mismatch
print
print " Objects checked: %d" % self.objects_checked
if self.object_not_found:
print " Missing Replicas: %d" % self.object_not_found
if self.object_exceptions:
print " Exceptions: %d" % self.object_exceptions
if self.object_checksum_mismatch:
print " MD5 Mismatch: %d" % self.object_checksum_mismatch
if __name__ == '__main__':
try:
optlist, args = getopt.getopt(sys.argv[1:], 'c:r:e:d')
except getopt.GetoptError, err:
print str(err)
print usage
sys.exit(2)
if not args and os.isatty(sys.stdin.fileno()):
print usage
sys.exit()
opts = dict(optlist)
options = {
'concurrency': int(opts.get('-c', 50)),
'error_file': opts.get('-e', None),
'swift_dir': opts.get('-r', '/etc/swift'),
'deep': '-d' in opts,
}
auditor = Auditor(**options)
if not os.isatty(sys.stdin.fileno()):
args = chain(args, sys.stdin)
for path in args:
path = '/' + path.rstrip('\r\n').lstrip('/')
auditor.audit(*split_path(path, 1, 3, True))
auditor.wait()
auditor.print_stats()

69
bin/swift-account-auditor.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.account.auditor import AccountAuditor
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: account-auditor CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('account-server'))
if c.has_section('account-auditor'):
auditor_conf = dict(c.items('account-auditor'))
else:
print "Unable to find account-auditor config section in %s." % \
sys.argv[1]
sys.exit(1)
logger = utils.get_logger(auditor_conf, 'account-auditor')
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
auditor = AccountAuditor(server_conf, auditor_conf)
if once:
auditor.audit_once()
else:
auditor.audit_forever()

69
bin/swift-account-reaper.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.account.reaper import AccountReaper
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: account-reaper CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('account-server'))
if c.has_section('account-reaper'):
reaper_conf = dict(c.items('account-reaper'))
else:
print "Unable to find account-reaper config section in %s." % \
sys.argv[1]
sys.exit(1)
logger = utils.get_logger(reaper_conf, 'account-reaper')
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
reaper = AccountReaper(server_conf, reaper_conf)
if once:
reaper.reap_once()
else:
reaper.reap_forever()

57
bin/swift-account-replicator.py Executable file
View File

@ -0,0 +1,57 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from ConfigParser import ConfigParser
import getopt
from swift.account import server as account_server
from swift.common import db, db_replicator, utils
class AccountReplicator(db_replicator.Replicator):
server_type = 'account'
ring_file = 'account.ring.gz'
brokerclass = db.AccountBroker
datadir = account_server.DATADIR
default_port = 6002
if __name__ == '__main__':
optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
if not args:
print "Usage: account-replicator <--once> CONFIG_FILE [once]"
sys.exit()
c = ConfigParser()
if not c.read(args[0]):
print "Unable to read config file."
sys.exit(1)
once = len(args) > 1 and args[1] == 'once'
server_conf = dict(c.items('account-server'))
if c.has_section('account-replicator'):
replicator_conf = dict(c.items('account-replicator'))
else:
print "Unable to find account-replicator config section in %s." % \
args[0]
sys.exit(1)
utils.drop_privileges(server_conf.get('user', 'swift'))
if once or '--once' in [opt[0] for opt in optlist]:
AccountReplicator(server_conf, replicator_conf).replicate_once()
else:
AccountReplicator(server_conf, replicator_conf).replicate_forever()

30
bin/swift-account-server.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import sys
from swift.common.wsgi import run_wsgi
from swift.account.server import AccountController
if __name__ == '__main__':
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
conf = dict(c.items('account-server'))
run_wsgi(AccountController, conf, default_port=6002)

View File

@ -0,0 +1,45 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
from sys import argv, exit
from swift.common.bufferedhttp import http_connect_raw as http_connect
if __name__ == '__main__':
f = '/etc/swift/auth-server.conf'
if len(argv) == 5:
f = argv[4]
elif len(argv) != 4:
exit('Syntax: %s <new_account> <new_user> <new_password> [conf_file]' %
argv[0])
new_account = argv[1]
new_user = argv[2]
new_password = argv[3]
c = ConfigParser()
if not c.read(f):
exit('Unable to read conf file: %s' % f)
conf = dict(c.items('auth-server'))
host = conf.get('bind_ip', '127.0.0.1')
port = int(conf.get('bind_port', 11000))
path = '/account/%s/%s' % (new_account, new_user)
conn = http_connect(host, port, 'PUT', path, {'x-auth-key':new_password})
resp = conn.getresponse()
if resp.status == 204:
print resp.getheader('x-storage-url')
else:
print 'Account creation failed. (%d)' % resp.status

View File

@ -0,0 +1,40 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
from sys import argv, exit
from swift.common.bufferedhttp import http_connect_raw as http_connect
if __name__ == '__main__':
f = '/etc/swift/auth-server.conf'
if len(argv) == 2:
f = argv[1]
elif len(argv) != 1:
exit('Syntax: %s [conf_file]' % argv[0])
c = ConfigParser()
if not c.read(f):
exit('Unable to read conf file: %s' % f)
conf = dict(c.items('auth-server'))
host = conf.get('bind_ip', '127.0.0.1')
port = int(conf.get('bind_port', 11000))
path = '/recreate_accounts'
conn = http_connect(host, port, 'POST', path)
resp = conn.getresponse()
if resp.status == 200:
print resp.read()
else:
print 'Recreating accounts failed. (%d)' % resp.status

30
bin/swift-auth-server.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import sys
from swift.common.wsgi import run_wsgi
from swift.auth.server import AuthController
if __name__ == '__main__':
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
conf = dict(c.items('auth-server'))
run_wsgi(AuthController, conf, default_port=11000)

69
bin/swift-container-auditor.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.container.auditor import ContainerAuditor
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: container-auditor CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('container-server'))
if c.has_section('container-auditor'):
auditor_conf = dict(c.items('container-auditor'))
else:
print "Unable to find container-auditor config section in %s." % \
sys.argv[1]
sys.exit(1)
logger = utils.get_logger(auditor_conf, 'container-auditor')
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
auditor = ContainerAuditor(server_conf, auditor_conf)
if once:
auditor.audit_once()
else:
auditor.audit_forever()

View File

@ -0,0 +1,57 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from ConfigParser import ConfigParser
import getopt
from swift.container import server as container_server
from swift.common import db, db_replicator, utils
class ContainerReplicator(db_replicator.Replicator):
server_type = 'container'
ring_file = 'container.ring.gz'
brokerclass = db.ContainerBroker
datadir = container_server.DATADIR
default_port = 6001
if __name__ == '__main__':
optlist, args = getopt.getopt(sys.argv[1:], '', ['once'])
if not args:
print "Usage: container-replicator <--once> CONFIG_FILE [once]"
sys.exit()
c = ConfigParser()
if not c.read(args[0]):
print "Unable to read config file."
sys.exit(1)
once = len(args) > 1 and args[1] == 'once'
server_conf = dict(c.items('container-server'))
if c.has_section('container-replicator'):
replicator_conf = dict(c.items('container-replicator'))
else:
print "Unable to find container-replicator config section in %s." % \
args[0]
sys.exit(1)
utils.drop_privileges(server_conf.get('user', 'swift'))
if once or '--once' in [opt[0] for opt in optlist]:
ContainerReplicator(server_conf, replicator_conf).replicate_once()
else:
ContainerReplicator(server_conf, replicator_conf).replicate_forever()

30
bin/swift-container-server.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import sys
from swift.common.wsgi import run_wsgi
from swift.container.server import ContainerController
if __name__ == '__main__':
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
conf = dict(c.items('container-server'))
run_wsgi(ContainerController, conf, default_port=6001)

63
bin/swift-container-updater.py Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.container.updater import ContainerUpdater
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: container-updater CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('container-server'))
if c.has_section('container-updater'):
updater_conf = dict(c.items('container-updater'))
else:
print "Unable to find container-updater config section in %s." % \
sys.argv[1]
sys.exit(1)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
updater = ContainerUpdater(server_conf, updater_conf)
if once:
updater.update_once_single_threaded()
else:
updater.update_forever()

125
bin/swift-drive-audit.py Executable file
View File

@ -0,0 +1,125 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import re
import subprocess
import sys
from ConfigParser import ConfigParser
from swift.common.utils import get_logger
# To search for more types of errors, add the regex to the list below
error_re = [
'error.*(sd[a-z])',
'(sd[a-z]).*error',
]
def get_devices(device_dir, logger):
devices = []
for line in open('/proc/mounts').readlines():
data = line.strip().split()
block_device = data[0]
mount_point = data[1]
if mount_point.startswith(device_dir):
device = {}
device['mount_point'] = mount_point
device['block_device'] = block_device
try:
device_num = os.stat(block_device).st_rdev
except OSError, e:
# If we can't stat the device, then something weird is going on
logger.error("Error: Could not stat %s!" %
block_device)
continue
device['major'] = str(os.major(device_num))
device['minor'] = str(os.minor(device_num))
devices.append(device)
for line in open('/proc/partitions').readlines()[2:]:
major,minor,blocks,kernel_device = line.strip().split()
device = [d for d in devices
if d['major'] == major and d['minor'] == minor]
if device:
device[0]['kernel_device'] = kernel_device
return devices
def get_errors(minutes):
errors = {}
start_time = datetime.datetime.now() - datetime.timedelta(minutes=minutes)
for line in open('/var/log/kern.log'):
if '[ 0.000000]' in line:
# Ignore anything before the last boot
errors = {}
continue
log_time_string = '%s %s' % (start_time.year,' '.join(line.split()[:3]))
log_time = datetime.datetime.strptime(
log_time_string,'%Y %b %d %H:%M:%S')
if log_time > start_time:
for err in error_re:
for device in re.findall(err,line):
errors[device] = errors.get(device,0) + 1
return errors
def comment_fstab(mount_point):
with open('/etc/fstab', 'r') as fstab:
with open('/etc/fstab.new', 'w') as new_fstab:
for line in fstab:
parts = line.split()
if len(parts) > 2 and line.split()[1] == mount_point:
new_fstab.write('#' + line)
else:
new_fstab.write(line)
os.rename('/etc/fstab.new', '/etc/fstab')
if __name__ == '__main__':
c = ConfigParser()
try:
conf_path = sys.argv[1]
except:
print "Usage: %s CONF_FILE" % sys.argv[0].split('/')[-1]
sys.exit(1)
if not c.read(conf_path):
print "Unable to read config file %s" % conf_path
sys.exit(1)
conf = dict(c.items('drive-audit'))
device_dir = conf.get('device_dir', '/srv/node')
minutes = int(conf.get('minutes', 60))
error_limit = int(conf.get('error_limit', 1))
logger = get_logger(conf, 'drive-audit')
devices = get_devices(device_dir, logger)
logger.debug("Devices found: %s" % str(devices))
if not devices:
logger.error("Error: No devices found!")
errors = get_errors(minutes)
logger.debug("Errors found: %s" % str(errors))
unmounts = 0
for kernel_device,count in errors.items():
if count >= error_limit:
device = [d for d in devices
if d['kernel_device'].startswith(kernel_device)]
if device:
mount_point = device[0]['mount_point']
if mount_point.startswith('/srv/node'):
logger.info("Unmounting %s with %d errors" %
(mount_point, count))
subprocess.call(['umount','-fl',mount_point])
logger.info("Commenting out %s from /etc/fstab" %
(mount_point))
comment_fstab(mount_point)
unmounts += 1
if unmounts == 0:
logger.info("No drives were unmounted")

87
bin/swift-get-nodes.py Executable file
View File

@ -0,0 +1,87 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import urllib
from swift.common.ring import Ring
from swift.common.utils import hash_path
if len(sys.argv) < 3 or len(sys.argv) > 5:
print 'Usage: %s <ring.gz> <account> [<container>] [<object>]' % sys.argv[0]
print 'Shows the nodes responsible for the item specified.'
print 'Example:'
print ' $ %s /etc/swift/account.ring.gz MyAccount' % sys.argv[0]
print ' Partition 5743883'
print ' Hash 96ae332a60b58910784e4417a03e1ad0'
print ' 10.1.1.7:8000 sdd1'
print ' 10.1.9.2:8000 sdb1'
print ' 10.1.5.5:8000 sdf1'
sys.exit(1)
ringloc = None
account = None
container = None
obj = None
if len(sys.argv) > 4: ring,account,container,obj = sys.argv[1:5]
elif len(sys.argv) > 3: ring,account,container = sys.argv[1:4]
elif len(sys.argv) > 2: ring,account = sys.argv[1:3]
print '\nAccount \t%s' % account
print 'Container\t%s' % container
print 'Object \t%s\n' % obj
if obj:
hash_str = hash_path(account,container,obj)
part, nodes = Ring(ring).get_nodes(account,container,obj)
for node in nodes:
print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
print '\nPartition\t%s' % part
print 'Hash \t%s\n' % hash_str
for node in nodes:
acct_cont_obj = "%s/%s/%s" % (account, container, obj)
print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part,urllib.quote(acct_cont_obj))
print "\n"
for node in nodes:
print 'ssh %s "ls -lah /srv/node/%s/objects/%s/%s/%s/"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str)
elif container:
hash_str = hash_path(account,container)
part, nodes = Ring(ring).get_nodes(account,container)
for node in nodes:
print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
print '\nPartition %s' % part
print 'Hash %s\n' % hash_str
for node in nodes:
acct_cont = "%s/%s" % (account,container)
print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part,urllib.quote(acct_cont))
print "\n"
for node in nodes:
print 'ssh %s "ls -lah /srv/node/%s/containers/%s/%s/%s/%s.db"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str,hash_str)
elif account:
hash_str = hash_path(account)
part, nodes = Ring(ring).get_nodes(account)
for node in nodes:
print 'Server:Port Device\t%s:%s %s' % (node['ip'], node['port'], node['device'])
print '\nPartition %s' % part
print 'Hash %s\n' % hash_str
for node in nodes:
print 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' % (node['ip'],node['port'],node['device'],part, urllib.quote(account))
print "\n"
for node in nodes:
print 'ssh %s "ls -lah /srv/node/%s/accounts/%s/%s/%s/%s.db"' % (node['ip'],node['device'],part,hash_str[-3:],hash_str,hash_str)
print "\n\n"

181
bin/swift-init.py Executable file
View File

@ -0,0 +1,181 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import errno
import glob
import os
import resource
import signal
import sys
import time
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-server', 'container-updater',
'object-auditor', 'object-server', 'object-replicator', 'object-updater',
'proxy-server', 'account-replicator', 'auth-server', 'account-reaper']
GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',
'object-server', 'proxy-server', 'auth-server']
MAX_DESCRIPTORS = 32768
MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB
_, server, command = sys.argv
if server == 'all':
servers = ALL_SERVERS
else:
if '-' not in server:
server = '%s-server' % server
servers = [server]
command = command.lower()
def pid_files(server):
if os.path.exists('/var/run/swift/%s.pid' % server):
pid_files = ['/var/run/swift/%s.pid' % server]
else:
pid_files = glob.glob('/var/run/swift/%s/*.pid' % server)
for pid_file in pid_files:
pid = int(open(pid_file).read().strip())
yield pid_file, pid
def do_start(server, once=False):
server_type = '-'.join(server.split('-')[:-1])
for pid_file, pid in pid_files(server):
if os.path.exists('/proc/%s' % pid):
print "%s appears to already be running: %s" % (server, pid_file)
return
else:
print "Removing stale pid file %s" % pid_file
os.unlink(pid_file)
try:
resource.setrlimit(resource.RLIMIT_NOFILE,
(MAX_DESCRIPTORS, MAX_DESCRIPTORS))
resource.setrlimit(resource.RLIMIT_DATA,
(MAX_MEMORY, MAX_MEMORY))
except ValueError:
print "Unable to increase file descriptor limit. Running as non-root?"
os.environ['PYTHON_EGG_CACHE'] = '/tmp'
def launch(ini_file, pid_file):
pid = os.fork()
if pid == 0:
os.setsid()
with open(os.devnull, 'r+b') as nullfile:
for desc in (0, 1, 2): # close stdio
try:
os.dup2(nullfile.fileno(), desc)
except OSError:
pass
try:
if once:
os.execl('/usr/bin/swift-%s' % server, server,
ini_file, 'once')
else:
os.execl('/usr/bin/swift-%s' % server, server, ini_file)
except OSError:
print 'unable to launch %s' % server
sys.exit(0)
else:
fp = open(pid_file, 'w')
fp.write('%d\n' % pid)
fp.close()
try:
os.mkdir('/var/run/swift')
except OSError, err:
if err.errno == errno.EACCES:
sys.exit('Unable to create /var/run/swift. Running as non-root?')
elif err.errno != errno.EEXIST:
raise
if os.path.exists('/etc/swift/%s-server.conf' % server_type):
if once:
print 'Running %s once' % server
else:
print 'Starting %s' % server
launch('/etc/swift/%s-server.conf' % server_type,
'/var/run/swift/%s.pid' % server)
else:
try:
os.mkdir('/var/run/swift/%s' % server)
except OSError, err:
if err.errno == errno.EACCES:
sys.exit(
'Unable to create /var/run/swift. Running as non-root?')
elif err.errno != errno.EEXIST:
raise
if once:
print 'Running %ss once' % server
else:
print 'Starting %ss' % server
for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):
launch(ini_file, '/var/run/swift/%s/%d.pid' % (server, num))
def do_stop(server, graceful=False):
if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:
sig = signal.SIGHUP
else:
sig = signal.SIGTERM
did_anything = False
pfiles = pid_files(server)
for pid_file, pid in pfiles:
did_anything = True
try:
print 'Stopping %s pid: %s signal: %s' % (server, pid, sig)
os.kill(pid, sig)
except OSError:
print "Process %d not running" % pid
try:
os.unlink(pid_file)
except OSError:
pass
for pid_file, pid in pfiles:
for _ in xrange(150): # 15 seconds
if not os.path.exists('/proc/%s' % pid):
break
time.sleep(0.1)
else:
print 'Waited 15 seconds for pid %s (%s) to die; giving up' % \
(pid, pid_file)
if not did_anything:
print 'No %s running' % server
if command == 'start':
for server in servers:
do_start(server)
if command == 'stop':
for server in servers:
do_stop(server)
if command == 'shutdown':
for server in servers:
do_stop(server, graceful=True)
if command == 'restart':
for server in servers:
do_stop(server)
for server in servers:
do_start(server)
if command == 'reload':
for server in servers:
do_stop(server, graceful=True)
do_start(server)
if command == 'once':
for server in servers:
do_start(server, once=True)

69
bin/swift-object-auditor.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.obj.auditor import ObjectAuditor
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: object-auditor CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('object-server'))
if c.has_section('object-auditor'):
auditor_conf = dict(c.items('object-auditor'))
else:
print "Unable to find object-auditor config section in %s." % \
sys.argv[1]
sys.exit(1)
logger = utils.get_logger(auditor_conf, 'object-auditor')
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = utils.LoggerFileObject(logger)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
auditor = ObjectAuditor(server_conf, auditor_conf)
if once:
auditor.audit_once()
else:
auditor.audit_forever()

92
bin/swift-object-info.py Executable file
View File

@ -0,0 +1,92 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import cPickle as pickle
from datetime import datetime
from hashlib import md5
from swift.common.ring import Ring
from swift.obj.server import read_metadata
from swift.common.utils import hash_path
if __name__ == '__main__':
if len(sys.argv) <= 1:
print "Usage: %s OBJECT_FILE" % sys.argv[0]
sys.exit(1)
try:
ring = Ring('/etc/swift/object.ring.gz')
except:
ring = None
datafile = sys.argv[1]
fp = open(datafile, 'rb')
metadata = read_metadata(fp)
path = metadata.pop('name','')
content_type = metadata.pop('Content-Type','')
ts = metadata.pop('X-Timestamp','')
etag = metadata.pop('ETag','')
length = metadata.pop('Content-Length','')
if path:
print 'Path: %s' % path
account, container, obj = path.split('/',3)[1:]
print ' Account: %s' % account
print ' Container: %s' % container
print ' Object: %s' % obj
obj_hash = hash_path(account, container, obj)
print ' Object hash: %s' % obj_hash
if ring is not None:
print 'Ring locations:'
part, nodes = ring.get_nodes(account, container, obj)
for node in nodes:
print (' %s:%s - /srv/node/%s/objects/%s/%s/%s/%s.data' %
(node['ip'], node['port'], node['device'], part,
obj_hash[-3:], obj_hash, ts))
else:
print 'Path: Not found in metadata'
if content_type:
print 'Content-Type: %s' % content_type
else:
print 'Content-Type: Not found in metadata'
if ts:
print 'Timestamp: %s (%s)' % (datetime.fromtimestamp(float(ts)), ts)
else:
print 'Timestamp: Not found in metadata'
h = md5()
file_len = 0
while True:
data = fp.read(64*1024)
if not data:
break
h.update(data)
file_len += len(data)
h = h.hexdigest()
if etag:
if h == etag:
print 'ETag: %s (valid)' % etag
else:
print "Etag: %s doesn't match file hash of %s!" % (etag, h)
else:
print 'ETag: Not found in metadata'
if length:
if file_len == int(length):
print 'Content-Length: %s (valid)' % length
else:
print "Content-Length: %s doesn't match file length of %s" % (
length, file_len)
else:
print 'Content-Length: Not found in metadata'
print 'User Metadata: %s' % metadata
fp.close()

30
bin/swift-object-server.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import sys
from swift.common.wsgi import run_wsgi
from swift.obj.server import ObjectController
if __name__ == '__main__':
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
conf = dict(c.items('object-server'))
run_wsgi(ObjectController, conf, default_port=6000)

64
bin/swift-object-updater.py Executable file
View File

@ -0,0 +1,64 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import sys
from ConfigParser import ConfigParser
from swift.obj.updater import ObjectUpdater
from swift.common import utils
if __name__ == '__main__':
if len(sys.argv) < 2:
print "Usage: object-updater CONFIG_FILE [once]"
sys.exit()
once = len(sys.argv) > 2 and sys.argv[2] == 'once'
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
server_conf = dict(c.items('object-server'))
if c.has_section('object-updater'):
updater_conf = dict(c.items('object-updater'))
else:
print "Unable to find object-updater config section in %s." % \
sys.argv[1]
sys.exit(1)
utils.drop_privileges(server_conf.get('user', 'swift'))
try:
os.setsid()
except OSError:
pass
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
sys.exit()
signal.signal(signal.SIGTERM, kill_children)
updater = ObjectUpdater(server_conf, updater_conf)
if once:
updater.update_once_single_threaded()
else:
updater.update_forever()

45
bin/swift-proxy-server.py Executable file
View File

@ -0,0 +1,45 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser
import os
import sys
from swift.common.wsgi import run_wsgi
from swift.common.auth import DevAuthMiddleware
from swift.common.memcached import MemcacheRing
from swift.common.utils import get_logger
from swift.proxy.server import Application
if __name__ == '__main__':
c = ConfigParser()
if not c.read(sys.argv[1]):
print "Unable to read config file."
sys.exit(1)
conf = dict(c.items('proxy-server'))
swift_dir = conf.get('swift_dir', '/etc/swift')
c = ConfigParser()
c.read(os.path.join(swift_dir, 'auth-server.conf'))
auth_conf = dict(c.items('auth-server'))
memcache = MemcacheRing([s.strip() for s in
conf.get('memcache_servers', '127.0.0.1:11211').split(',')
if s.strip()])
logger = get_logger(conf, 'proxy')
app = Application(conf, memcache, logger)
# Wrap the app with auth
app = DevAuthMiddleware(app, auth_conf, memcache, logger)
run_wsgi(app, conf, logger=logger, default_port=80)

558
bin/swift-ring-builder.py Executable file
View File

@ -0,0 +1,558 @@
#!/usr/bin/python -uO
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cPickle as pickle
from errno import EEXIST
from gzip import GzipFile
from os import mkdir
from os.path import basename, dirname, exists, join as pathjoin
from sys import argv, exit
from time import time
from swift.common.ring import RingBuilder
MAJOR_VERSION = 1
MINOR_VERSION = 1
EXIT_RING_CHANGED = 0
EXIT_RING_UNCHANGED = 1
EXIT_ERROR = 2
def search_devs(builder, search_value):
# d<device_id>z<zone>-<ip>:<port>/<device_name>_<meta>
orig_search_value = search_value
match = []
if search_value.startswith('d'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
i += 1
match.append(('id', int(search_value[1:i])))
search_value = search_value[i:]
if search_value.startswith('z'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
i += 1
match.append(('zone', int(search_value[1:i])))
search_value = search_value[i:]
if search_value.startswith('-'):
search_value = search_value[1:]
if len(search_value) and search_value[0].isdigit():
i = 1
while i < len(search_value) and search_value[i] in '0123456789.':
i += 1
match.append(('ip', search_value[:i]))
search_value = search_value[i:]
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
i += 1
match.append(('port', int(search_value[1:i])))
search_value = search_value[i:]
if search_value.startswith('/'):
i = 1
while i < len(search_value) and search_value[i] != '_':
i += 1
match.append(('device', search_value[1:i]))
search_value = search_value[i:]
if search_value.startswith('_'):
match.append(('meta', search_value[1:]))
search_value = ''
if search_value:
raise ValueError('Invalid <search-value>: %s' % repr(orig_search_value))
devs = []
for dev in builder.devs:
if not dev:
continue
matched = True
for key, value in match:
if key == 'meta':
if value not in dev.get(key):
matched = False
elif dev.get(key) != value:
matched = False
if matched:
devs.append(dev)
return devs
SEARCH_VALUE_HELP = '''
The <search-value> can be of the form:
d<device_id>z<zone>-<ip>:<port>/<device_name>_<meta>
Any part is optional, but you must include at least one part.
Examples:
d74 Matches the device id 74
z1 Matches devices in zone 1
z1-1.2.3.4 Matches devices in zone 1 with the ip 1.2.3.4
1.2.3.4 Matches devices in any zone with the ip 1.2.3.4
z1:5678 Matches devices in zone 1 using port 5678
:5678 Matches devices that use port 5678
/sdb1 Matches devices with the device name sdb1
_shiny Matches devices with shiny in the meta data
_"snet: 5.6.7.8" Matches devices with snet: 5.6.7.8 in the meta data
Most specific example:
d74z1-1.2.3.4:5678/sdb1_"snet: 5.6.7.8"
Nerd explanation:
All items require their single character prefix except the ip, in which
case the - is optional unless the device id or zone is also included.
'''.strip()
CREATE_HELP = '''
ring_builder <builder_file> create <part_power> <replicas> <min_part_hours>
Creates <builder_file> with 2^<part_power> partitions and <replicas>.
<min_part_hours> is number of hours to restrict moving a partition more
than once.
'''.strip()
SEARCH_HELP = '''
ring_builder <builder_file> search <search-value>
Shows information about matching devices.
%(SEARCH_VALUE_HELP)s
'''.strip() % globals()
ADD_HELP = '''
ring_builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta> <wght>
Adds a device to the ring with the given information. No partitions will be
assigned to the new device until after running 'rebalance'. This is so you
can make multiple device changes and rebalance them all just once.
'''.strip()
SET_WEIGHT_HELP = '''
ring_builder <builder_file> set_weight <search-value> <weight>
Resets the device's weight. No partitions will be reassigned to or from the
device until after running 'rebalance'. This is so you can make multiple
device changes and rebalance them all just once.
%(SEARCH_VALUE_HELP)s
'''.strip() % globals()
SET_INFO_HELP = '''
ring_builder <builder_file> set_info <search-value>
<ip>:<port>/<device_name>_<meta>
Resets the device's information. This information isn't used to assign
partitions, so you can use 'write_ring' afterward to rewrite the current
ring with the newer device information. Any of the parts are optional
in the final <ip>:<port>/<device_name>_<meta> parameter; just give what you
want to change. For instance set_info d74 _"snet: 5.6.7.8" would just
update the meta data for device id 74.
%(SEARCH_VALUE_HELP)s
'''.strip() % globals()
REMOVE_HELP = '''
ring_builder <builder_file> remove <search-value>
Removes the device(s) from the ring. This should normally just be used for
a device that has failed. For a device you wish to decommission, it's best
to set its weight to 0, wait for it to drain all its data, then use this
remove command. This will not take effect until after running 'rebalance'.
This is so you can make multiple device changes and rebalance them all just
once.
%(SEARCH_VALUE_HELP)s
'''.strip() % globals()
SET_MIN_PART_HOURS_HELP = '''
ring_builder <builder_file> set_min_part_hours <hours>
Changes the <min_part_hours> to the given <hours>. This should be set to
however long a full replication/update cycle takes. We're working on a way
to determine this more easily than scanning logs.
'''.strip()
if __name__ == '__main__':
if len(argv) < 2:
print '''
ring_builder %(MAJOR_VERSION)s.%(MINOR_VERSION)s
%(CREATE_HELP)s
ring_builder <builder_file>
Shows information about the ring and the devices within.
%(SEARCH_HELP)s
%(ADD_HELP)s
%(SET_WEIGHT_HELP)s
%(SET_INFO_HELP)s
%(REMOVE_HELP)s
ring_builder <builder_file> rebalance
Attempts to rebalance the ring by reassigning partitions that haven't been
recently reassigned.
ring_builder <builder_file> validate
Just runs the validation routines on the ring.
ring_builder <builder_file> write_ring
Just rewrites the distributable ring file. This is done automatically after
a successful rebalance, so really this is only useful after one or more
'set_info' calls when no rebalance is needed but you want to send out the
new device information.
%(SET_MIN_PART_HOURS_HELP)s
Quick list: create search add set_weight set_info remove rebalance write_ring
set_min_part_hours
Exit codes: 0 = ring changed, 1 = ring did not change, 2 = error
'''.strip() % globals()
exit(EXIT_RING_UNCHANGED)
if exists(argv[1]):
builder = pickle.load(open(argv[1], 'rb'))
for dev in builder.devs:
if dev and 'meta' not in dev:
dev['meta'] = ''
elif len(argv) < 3 or argv[2] != 'create':
print 'Ring Builder file does not exist: %s' % argv[1]
exit(EXIT_ERROR)
elif argv[2] == 'create':
if len(argv) < 6:
print CREATE_HELP
exit(EXIT_RING_UNCHANGED)
builder = RingBuilder(int(argv[3]), int(argv[4]), int(argv[5]))
backup_dir = pathjoin(dirname(argv[1]), 'backups')
try:
mkdir(backup_dir)
except OSError, err:
if err.errno != EEXIST:
raise
pickle.dump(builder, open(pathjoin(backup_dir,
'%d.' % time() + basename(argv[1])), 'wb'), protocol=2)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
backup_dir = pathjoin(dirname(argv[1]), 'backups')
try:
mkdir(backup_dir)
except OSError, err:
if err.errno != EEXIST:
raise
ring_file = argv[1]
if ring_file.endswith('.builder'):
ring_file = ring_file[:-len('.builder')]
ring_file += '.ring.gz'
if len(argv) == 2:
print '%s, build version %d' % (argv[1], builder.version)
zones = 0
balance = 0
if builder.devs:
zones = len(set(d['zone'] for d in builder.devs if d is not None))
balance = builder.get_balance()
print '%d partitions, %d replicas, %d zones, %d devices, %.02f ' \
'balance' % (builder.parts, builder.replicas, zones,
len([d for d in builder.devs if d]), balance)
print 'The minimum number of hours before a partition can be ' \
'reassigned is %s' % builder.min_part_hours
if builder.devs:
print 'Devices: id zone ip address port name ' \
'weight partitions balance meta'
weighted_parts = builder.parts * builder.replicas / \
sum(d['weight'] for d in builder.devs if d is not None)
for dev in builder.devs:
if dev is None:
continue
if not dev['weight']:
if dev['parts']:
balance = 999.99
else:
balance = 0
else:
balance = 100.0 * dev['parts'] / \
(dev['weight'] * weighted_parts) - 100.0
print ' %5d %5d %15s %5d %9s %6.02f %10s %7.02f %s' % \
(dev['id'], dev['zone'], dev['ip'], dev['port'],
dev['device'], dev['weight'], dev['parts'], balance,
dev['meta'])
exit(EXIT_RING_UNCHANGED)
if argv[2] == 'search':
if len(argv) < 4:
print SEARCH_HELP
exit(EXIT_RING_UNCHANGED)
devs = search_devs(builder, argv[3])
if not devs:
print 'No matching devices found'
exit(EXIT_ERROR)
print 'Devices: id zone ip address port name ' \
'weight partitions balance meta'
weighted_parts = builder.parts * builder.replicas / \
sum(d['weight'] for d in builder.devs if d is not None)
for dev in devs:
if not dev['weight']:
if dev['parts']:
balance = 999.99
else:
balance = 0
else:
balance = 100.0 * dev['parts'] / \
(dev['weight'] * weighted_parts) - 100.0
print ' %5d %5d %15s %5d %9s %6.02f %10s %7.02f %s' % \
(dev['id'], dev['zone'], dev['ip'], dev['port'],
dev['device'], dev['weight'], dev['parts'], balance,
dev['meta'])
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'add':
# add z<zone>-<ip>:<port>/<device_name>_<meta> <wght>
if len(argv) < 5:
print ADD_HELP
exit(EXIT_RING_UNCHANGED)
if not argv[3].startswith('z'):
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(argv[3]) and argv[3][i].isdigit():
i += 1
zone = int(argv[3][1:i])
rest = argv[3][i:]
if not rest.startswith('-'):
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i] in '0123456789.':
i += 1
ip = rest[1:i]
rest = rest[i:]
if not rest.startswith(':'):
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i].isdigit():
i += 1
port = int(rest[1:i])
rest = rest[i:]
if not rest.startswith('/'):
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i] != '_':
i += 1
device_name = rest[1:i]
rest = rest[i:]
meta = ''
if rest.startswith('_'):
meta = rest[1:]
weight = float(argv[4])
for dev in builder.devs:
if dev is None:
continue
if dev['ip'] == ip and dev['port'] == port and \
dev['device'] == device_name:
print 'Device %d already uses %s:%d/%s.' % \
(dev['id'], dev['ip'], dev['port'], dev['device'])
exit(EXIT_ERROR)
next_dev_id = 0
if builder.devs:
next_dev_id = max(d['id'] for d in builder.devs if d) + 1
builder.add_dev({'id': next_dev_id, 'zone': zone, 'ip': ip,
'port': port, 'device': device_name, 'weight': weight,
'meta': meta})
print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'set_weight':
if len(argv) != 5:
print SET_WEIGHT_HELP
exit(EXIT_RING_UNCHANGED)
devs = search_devs(builder, argv[3])
weight = float(argv[4])
if not devs:
print 'No matching devices found'
exit(EXIT_ERROR)
if len(devs) > 1:
print 'Matched more than one device:'
for dev in devs:
print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
'"%(meta)s"' % dev
if raw_input('Are you sure you want to update the weight for '
'these %s devices? (y/N) ' % len(devs)) != 'y':
print 'Aborting device modifications'
exit(EXIT_ERROR)
for dev in devs:
builder.set_dev_weight(dev['id'], weight)
print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
'weight set to %(weight)s' % dev
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'set_info':
if len(argv) != 5:
print SET_INFO_HELP
exit(EXIT_RING_UNCHANGED)
devs = search_devs(builder, argv[3])
change_value = argv[4]
change = []
if len(change_value) and change_value[0].isdigit():
i = 1
while i < len(change_value) and change_value[i] in '0123456789.':
i += 1
change.append(('ip', change_value[:i]))
change_value = change_value[i:]
if change_value.startswith(':'):
i = 1
while i < len(change_value) and change_value[i].isdigit():
i += 1
change.append(('port', int(change_value[1:i])))
change_value = change_value[i:]
if change_value.startswith('/'):
i = 1
while i < len(change_value) and change_value[i] != '_':
i += 1
change.append(('device', change_value[1:i]))
change_value = change_value[i:]
if change_value.startswith('_'):
change.append(('meta', change_value[1:]))
change_value = ''
if change_value or not change:
raise ValueError('Invalid set info change value: %s' %
repr(argv[4]))
if not devs:
print 'No matching devices found'
exit(EXIT_ERROR)
if len(devs) > 1:
print 'Matched more than one device:'
for dev in devs:
print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
'"%(meta)s"' % dev
if raw_input('Are you sure you want to update the info for '
'these %s devices? (y/N) ' % len(devs)) != 'y':
print 'Aborting device modifications'
exit(EXIT_ERROR)
for dev in devs:
orig_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
test_dev = dict(dev)
for key, value in change:
test_dev[key] = value
for check_dev in builder.devs:
if not check_dev or check_dev['id'] == test_dev['id']:
continue
if check_dev['ip'] == test_dev['ip'] and \
check_dev['port'] == test_dev['port'] and \
check_dev['device'] == test_dev['device']:
print 'Device %d already uses %s:%d/%s.' % \
(check_dev['id'], check_dev['ip'], check_dev['port'],
check_dev['device'])
exit(EXIT_ERROR)
for key, value in change:
dev[key] = value
new_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
print 'Device %s is now %s' % (orig_dev_string, new_dev_string)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'remove':
if len(argv) < 4:
print REMOVE_HELP
exit(EXIT_RING_UNCHANGED)
devs = search_devs(builder, argv[3])
if not devs:
print 'No matching devices found'
exit(EXIT_ERROR)
if len(devs) > 1:
print 'Matched more than one device:'
for dev in devs:
print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
'"%(meta)s"' % dev
if raw_input('Are you sure you want to remove these %s devices? '
'(y/N) ' % len(devs)) != 'y':
print 'Aborting device removals'
exit(EXIT_ERROR)
for dev in devs:
builder.remove_dev(dev['id'])
print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
'marked for removal and will be removed next rebalance.' % dev
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'rebalance':
devs_changed = builder.devs_changed
last_balance = builder.get_balance()
parts, balance = builder.rebalance()
if not parts:
print 'No partitions could be reassigned.'
print 'Either none need to be or none can be due to ' \
'min_part_hours [%s].' % builder.min_part_hours
exit(EXIT_RING_UNCHANGED)
if not devs_changed and abs(last_balance - balance) < 1:
print 'Cowardly refusing to save rebalance as it did not change ' \
'at least 1%.'
exit(EXIT_RING_UNCHANGED)
builder.validate()
print 'Reassigned %d (%.02f%%) partitions. Balance is now %.02f.' % \
(parts, 100.0 * parts / builder.parts, balance)
if balance > 5:
print '-' * 79
print 'NOTE: Balance of %.02f indicates you should push this ' % \
balance
print ' ring, wait at least %d hours, and rebalance/repush.' \
% builder.min_part_hours
print '-' * 79
ts = time()
pickle.dump(builder.get_ring(),
GzipFile(pathjoin(backup_dir, '%d.' % ts +
basename(ring_file)), 'wb'), protocol=2)
pickle.dump(builder, open(pathjoin(backup_dir,
'%d.' % ts + basename(argv[1])), 'wb'), protocol=2)
pickle.dump(builder.get_ring(), GzipFile(ring_file, 'wb'), protocol=2)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
elif argv[2] == 'validate':
builder.validate()
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'write_ring':
pickle.dump(builder.get_ring(),
GzipFile(pathjoin(backup_dir, '%d.' % time() +
basename(ring_file)), 'wb'), protocol=2)
pickle.dump(builder.get_ring(), GzipFile(ring_file, 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
elif argv[2] == 'pretend_min_part_hours_passed':
builder.pretend_min_part_hours_passed()
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
elif argv[2] == 'set_min_part_hours':
if len(argv) < 4:
print SET_MIN_PART_HOURS_HELP
exit(EXIT_RING_UNCHANGED)
builder.change_min_part_hours(int(argv[3]))
print 'The minimum number of hours before a partition can be ' \
'reassigned is now set to %s' % argv[3]
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
print 'Unknown command: %s' % argv[2]
exit(EXIT_ERROR)

197
bin/swift-stats-populate.py Executable file
View File

@ -0,0 +1,197 @@
#!/usr/bin/python -u
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import traceback
from ConfigParser import ConfigParser
from optparse import OptionParser
from sys import exit, argv
from time import time
from uuid import uuid4
from eventlet import GreenPool, patcher, sleep
from eventlet.pools import Pool
from swift.common.client import Connection, get_auth
from swift.common.ring import Ring
from swift.common.utils import compute_eta, get_time_units
def put_container(connpool, container, report):
global retries_done
try:
with connpool.item() as conn:
conn.put_container(container)
retries_done += conn.attempts - 1
if report:
report(True)
except:
if report:
report(False)
raise
def put_object(connpool, container, obj, report):
global retries_done
try:
with connpool.item() as conn:
conn.put_object(container, obj, obj, metadata={'stats': obj})
retries_done += conn.attempts - 1
if report:
report(True)
except:
if report:
report(False)
raise
def report(success):
global begun, created, item_type, next_report, need_to_create, retries_done
if not success:
traceback.print_exc()
exit('Gave up due to error(s).')
created += 1
if time() < next_report:
return
next_report = time() + 5
eta, eta_unit = compute_eta(begun, created, need_to_create)
print '\r\x1B[KCreating %s: %d of %d, %d%s left, %d retries' % (item_type,
created, need_to_create, round(eta), eta_unit, retries_done),
if __name__ == '__main__':
global begun, created, item_type, next_report, need_to_create, retries_done
patcher.monkey_patch()
parser = OptionParser()
parser.add_option('-d', '--dispersion', action='store_true',
dest='dispersion', default=False,
help='Run the dispersion population')
parser.add_option('-p', '--performance', action='store_true',
dest='performance', default=False,
help='Run the performance population')
args = argv[1:]
if not args:
args.append('-h')
(options, args) = parser.parse_args(args)
conf_file = '/etc/swift/stats.conf'
if args:
conf_file = args[0]
c = ConfigParser()
if not c.read(conf_file):
exit('Unable to read config file: %s' % conf_file)
conf = dict(c.items('stats'))
swift_dir = conf.get('swift_dir', '/etc/swift')
dispersion_coverage = int(conf.get('dispersion_coverage', 1))
big_container_count = int(conf.get('big_container_count', 1000000))
retries = int(conf.get('retries', 5))
concurrency = int(conf.get('concurrency', 50))
coropool = GreenPool(size=concurrency)
retries_done = 0
url, token = get_auth(conf['auth_url'], conf['auth_user'],
conf['auth_key'])
account = url.rsplit('/', 1)[1]
connpool = Pool(max_size=concurrency)
connpool.create = lambda: Connection(conf['auth_url'],
conf['auth_user'], conf['auth_key'],
retries=retries,
preauthurl=url, preauthtoken=token)
if options.dispersion:
container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
parts_left = \
dict((x, x) for x in xrange(container_ring.partition_count))
item_type = 'containers'
created = 0
retries_done = 0
need_to_create = need_to_queue = \
dispersion_coverage / 100.0 * container_ring.partition_count
begun = next_report = time()
next_report += 2
while need_to_queue >= 1:
container = 'stats_container_dispersion_%s' % uuid4()
part, _ = container_ring.get_nodes(account, container)
if part in parts_left:
coropool.spawn(put_container, connpool, container, report)
sleep()
del parts_left[part]
need_to_queue -= 1
coropool.waitall()
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KCreated %d containers for dispersion reporting, ' \
'%d%s, %d retries' % \
(need_to_create, round(elapsed), elapsed_unit, retries_done)
container = 'stats_objects'
put_container(connpool, container, None)
object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
parts_left = dict((x, x) for x in xrange(object_ring.partition_count))
item_type = 'objects'
created = 0
retries_done = 0
need_to_create = need_to_queue = \
dispersion_coverage / 100.0 * object_ring.partition_count
begun = next_report = time()
next_report += 2
while need_to_queue >= 1:
obj = 'stats_object_dispersion_%s' % uuid4()
part, _ = object_ring.get_nodes(account, container, obj)
if part in parts_left:
coropool.spawn(put_object, connpool, container, obj, report)
sleep()
del parts_left[part]
need_to_queue -= 1
coropool.waitall()
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KCreated %d objects for dispersion reporting, ' \
'%d%s, %d retries' % \
(need_to_create, round(elapsed), elapsed_unit, retries_done)
if options.performance:
container = 'big_container'
put_container(connpool, container, None)
item_type = 'objects'
created = 0
retries_done = 0
need_to_create = need_to_queue = big_container_count
begun = next_report = time()
next_report += 2
segments = ['00']
for x in xrange(big_container_count):
obj = '%s/%02x' % ('/'.join(segments), x)
coropool.spawn(put_object, connpool, container, obj, report)
sleep()
need_to_queue -= 1
i = 0
while True:
nxt = int(segments[i], 16) + 1
if nxt < 10005:
segments[i] = '%02x' % nxt
break
else:
segments[i] = '00'
i += 1
if len(segments) <= i:
segments.append('00')
break
coropool.waitall()
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KCreated %d objects for performance reporting, ' \
'%d%s, %d retries' % \
(need_to_create, round(elapsed), elapsed_unit, retries_done)

942
bin/swift-stats-report.py Executable file
View File

@ -0,0 +1,942 @@
#!/usr/bin/python -u
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import os
import socket
from ConfigParser import ConfigParser
from httplib import HTTPException
from optparse import OptionParser
from sys import argv, exit, stderr
from time import time
from uuid import uuid4
from eventlet import GreenPool, hubs, patcher, sleep, Timeout
from eventlet.pools import Pool
from swift.common import direct_client
from swift.common.client import ClientException, Connection, get_auth
from swift.common.ring import Ring
from swift.common.utils import compute_eta, get_time_units
unmounted = []
def get_error_log(prefix):
def error_log(msg_or_exc):
global unmounted
if hasattr(msg_or_exc, 'http_status') and \
msg_or_exc.http_status == 507:
identifier = '%s:%s/%s'
if identifier not in unmounted:
unmounted.append(identifier)
print >>stderr, 'ERROR: %s:%s/%s is unmounted -- This will ' \
'cause replicas designated for that device to be ' \
'considered missing until resolved or the ring is ' \
'updated.' % (msg_or_exc.http_host, msg_or_exc.http_port,
msg_or_exc.http_device)
if not hasattr(msg_or_exc, 'http_status') or \
msg_or_exc.http_status not in (404, 507):
print >>stderr, 'ERROR: %s: %s' % (prefix, msg_or_exc)
return error_log
def audit(coropool, connpool, account, container_ring, object_ring, options):
begun = time()
with connpool.item() as conn:
estimated_items = [conn.head_account()[0]]
items_completed = [0]
retries_done = [0]
containers_missing_replicas = {}
objects_missing_replicas = {}
next_report = [time() + 2]
def report():
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = \
compute_eta(begun, items_completed[0], estimated_items[0])
print '\r\x1B[KAuditing items: %d of %d, %d%s left, %d ' \
'retries' % (items_completed[0], estimated_items[0],
round(eta), eta_unit, retries_done[0]),
def direct_container(container, part, nodes):
estimated_objects = 0
for node in nodes:
found = False
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
try:
attempts, info = direct_client.retry(
direct_client.direct_head_container, node,
part, account, container,
error_log=error_log,
retries=options.retries)
retries_done[0] += attempts - 1
found = True
if not estimated_objects:
estimated_objects = info[0]
except ClientException, err:
if err.http_status not in (404, 507):
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
except (Exception, Timeout), err:
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
if not found:
if container in containers_missing_replicas:
containers_missing_replicas[container].append(node)
else:
containers_missing_replicas[container] = [node]
estimated_items[0] += estimated_objects
items_completed[0] += 1
report()
def direct_object(container, obj, part, nodes):
for node in nodes:
found = False
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
try:
attempts, _ = direct_client.retry(
direct_client.direct_head_object, node, part,
account, container, obj, error_log=error_log,
retries=options.retries)
retries_done[0] += attempts - 1
found = True
except ClientException, err:
if err.http_status not in (404, 507):
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
except (Exception, Timeout), err:
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
if not found:
opath = '/%s/%s' % (container, obj)
if opath in objects_missing_replicas:
objects_missing_replicas[opath].append(node)
else:
objects_missing_replicas[opath] = [node]
items_completed[0] += 1
report()
cmarker = ''
while True:
with connpool.item() as conn:
containers = [c['name'] for c in conn.get_account(marker=cmarker)]
if not containers:
break
cmarker = containers[-1]
for container in containers:
part, nodes = container_ring.get_nodes(account, container)
coropool.spawn(direct_container, container, part, nodes)
for container in containers:
omarker = ''
while True:
with connpool.item() as conn:
objects = [o['name'] for o in
conn.get_container(container, marker=omarker)]
if not objects:
break
omarker = objects[-1]
for obj in objects:
part, nodes = object_ring.get_nodes(account, container, obj)
coropool.spawn(direct_object, container, obj, part, nodes)
coropool.waitall()
print '\r\x1B[K\r',
if not containers_missing_replicas and not objects_missing_replicas:
print 'No missing items.'
return
if containers_missing_replicas:
print 'Containers Missing'
print '-' * 78
for container in sorted(containers_missing_replicas.keys()):
part, _ = container_ring.get_nodes(account, container)
for node in containers_missing_replicas[container]:
print 'http://%s:%s/%s/%s/%s/%s' % (node['ip'], node['port'],
node['device'], part, account, container)
if objects_missing_replicas:
if containers_missing_replicas:
print
print 'Objects Missing'
print '-' * 78
for opath in sorted(objects_missing_replicas.keys()):
_, container, obj = opath.split('/', 2)
part, _ = object_ring.get_nodes(account, container, obj)
for node in objects_missing_replicas[opath]:
print 'http://%s:%s/%s/%s/%s/%s/%s' % (node['ip'],
node['port'], node['device'], part, account, container,
obj)
def container_dispersion_report(coropool, connpool, account, container_ring,
options):
""" Returns (number of containers listed, number of distinct partitions,
number of container copies found) """
with connpool.item() as conn:
containers = [c['name'] for c in
conn.get_account(prefix='stats_container_dispersion_',
full_listing=True)]
containers_listed = len(containers)
if not containers_listed:
print >>stderr, 'No containers to query. Has stats-populate been run?'
return 0
retries_done = [0]
containers_queried = [0]
container_copies_found = [0, 0, 0, 0]
begun = time()
next_report = [time() + 2]
def direct(container, part, nodes):
found_count = 0
for node in nodes:
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
try:
attempts, _ = direct_client.retry(
direct_client.direct_head_container, node,
part, account, container, error_log=error_log,
retries=options.retries)
retries_done[0] += attempts - 1
found_count += 1
except ClientException, err:
if err.http_status not in (404, 507):
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
except (Exception, Timeout), err:
error_log('Giving up on /%s/%s/%s: %s' % (part, account,
container, err))
container_copies_found[found_count] += 1
containers_queried[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, containers_queried[0],
containers_listed)
print '\r\x1B[KQuerying containers: %d of %d, %d%s left, %d ' \
'retries' % (containers_queried[0], containers_listed,
round(eta), eta_unit, retries_done[0]),
container_parts = {}
for container in containers:
part, nodes = container_ring.get_nodes(account, container)
if part not in container_parts:
container_parts[part] = part
coropool.spawn(direct, container, part, nodes)
coropool.waitall()
distinct_partitions = len(container_parts)
copies_expected = distinct_partitions * container_ring.replica_count
copies_found = sum(a * b for a, b in enumerate(container_copies_found))
value = 100.0 * copies_found / copies_expected
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KQueried %d containers for dispersion reporting, ' \
'%d%s, %d retries' % (containers_listed, round(elapsed),
elapsed_unit, retries_done[0])
if containers_listed - distinct_partitions:
print 'There were %d overlapping partitions' % (
containers_listed - distinct_partitions)
if container_copies_found[2]:
print 'There were %d partitions missing one copy.' % \
container_copies_found[2]
if container_copies_found[1]:
print '! There were %d partitions missing two copies.' % \
container_copies_found[1]
if container_copies_found[0]:
print '!!! There were %d partitions missing all copies.' % \
container_copies_found[0]
print '%.02f%% of container copies found (%d of %d)' % (
value, copies_found, copies_expected)
print 'Sample represents %.02f%% of the container partition space' % (
100.0 * distinct_partitions / container_ring.partition_count)
return value
def object_dispersion_report(coropool, connpool, account, object_ring, options):
""" Returns (number of objects listed, number of distinct partitions,
number of object copies found) """
container = 'stats_objects'
with connpool.item() as conn:
try:
objects = [o['name'] for o in conn.get_container(container,
prefix='stats_object_dispersion_', full_listing=True)]
except ClientException, err:
if err.http_status != 404:
raise
print >>stderr, 'No objects to query. Has stats-populate been run?'
return 0
objects_listed = len(objects)
if not objects_listed:
print >>stderr, 'No objects to query. Has stats-populate been run?'
return 0
retries_done = [0]
objects_queried = [0]
object_copies_found = [0, 0, 0, 0]
begun = time()
next_report = [time() + 2]
def direct(obj, part, nodes):
found_count = 0
for node in nodes:
error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node)
try:
attempts, _ = direct_client.retry(
direct_client.direct_head_object, node, part,
account, container, obj, error_log=error_log,
retries=options.retries)
retries_done[0] += attempts - 1
found_count += 1
except ClientException, err:
if err.http_status not in (404, 507):
error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
container, obj, err))
except (Exception, Timeout), err:
error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account,
container, obj, err))
object_copies_found[found_count] += 1
objects_queried[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, objects_queried[0],
objects_listed)
print '\r\x1B[KQuerying objects: %d of %d, %d%s left, %d ' \
'retries' % (objects_queried[0], objects_listed, round(eta),
eta_unit, retries_done[0]),
object_parts = {}
for obj in objects:
part, nodes = object_ring.get_nodes(account, container, obj)
if part not in object_parts:
object_parts[part] = part
coropool.spawn(direct, obj, part, nodes)
coropool.waitall()
distinct_partitions = len(object_parts)
copies_expected = distinct_partitions * object_ring.replica_count
copies_found = sum(a * b for a, b in enumerate(object_copies_found))
value = 100.0 * copies_found / copies_expected
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KQueried %d objects for dispersion reporting, ' \
'%d%s, %d retries' % (objects_listed, round(elapsed),
elapsed_unit, retries_done[0])
if objects_listed - distinct_partitions:
print 'There were %d overlapping partitions' % (
objects_listed - distinct_partitions)
if object_copies_found[2]:
print 'There were %d partitions missing one copy.' % \
object_copies_found[2]
if object_copies_found[1]:
print '! There were %d partitions missing two copies.' % \
object_copies_found[1]
if object_copies_found[0]:
print '!!! There were %d partitions missing all copies.' % \
object_copies_found[0]
print '%.02f%% of object copies found (%d of %d)' % (
value, copies_found, copies_expected)
print 'Sample represents %.02f%% of the object partition space' % (
100.0 * distinct_partitions / object_ring.partition_count)
return value
def container_put_report(coropool, connpool, count, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
def put(container):
with connpool.item() as conn:
try:
conn.put_container(container)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KCreating containers: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for x in xrange(count):
coropool.spawn(put, 'stats_container_put_%02x' % x)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / count
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KCreated %d containers for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def container_head_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
containers = [c['name'] for c in
conn.get_account(prefix='stats_container_put_',
full_listing=True)]
count = len(containers)
def head(container):
with connpool.item() as conn:
try:
conn.head_container(container)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KHeading containers: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for container in containers:
coropool.spawn(head, container)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(containers)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KHeaded %d containers for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def container_get_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
containers = [c['name'] for c in
conn.get_account(prefix='stats_container_put_',
full_listing=True)]
count = len(containers)
def get(container):
with connpool.item() as conn:
try:
conn.get_container(container)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KListing containers: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for container in containers:
coropool.spawn(get, container)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(containers)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KListing %d containers for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def container_standard_listing_report(coropool, connpool, options):
begun = time()
if options.verbose:
print 'Listing big_container',
with connpool.item() as conn:
try:
value = len(conn.get_container('big_container', full_listing=True))
except ClientException, err:
if err.http_status != 404:
raise
print >>stderr, \
"big_container doesn't exist. Has stats-populate been run?"
return 0
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\rGot %d objects (standard listing) in big_container, %d%s' % \
(value, elapsed, elapsed_unit)
return value
def container_prefix_listing_report(coropool, connpool, options):
begun = time()
if options.verbose:
print 'Prefix-listing big_container',
value = 0
with connpool.item() as conn:
try:
for x in xrange(256):
value += len(conn.get_container('big_container',
prefix=('%02x' % x), full_listing=True))
except ClientException, err:
if err.http_status != 404:
raise
print >>stderr, \
"big_container doesn't exist. Has stats-populate been run?"
return 0
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\rGot %d objects (prefix listing) in big_container, %d%s' % \
(value, elapsed, elapsed_unit)
return value
def container_prefix_delimiter_listing_report(coropool, connpool, options):
begun = time()
if options.verbose:
print 'Prefix-delimiter-listing big_container',
value = [0]
def list(prefix=None):
marker = None
while True:
try:
with connpool.item() as conn:
listing = conn.get_container('big_container',
marker=marker, prefix=prefix, delimiter='/')
except ClientException, err:
if err.http_status != 404:
raise
print >>stderr, "big_container doesn't exist. " \
"Has stats-populate been run?"
return 0
if not len(listing):
break
marker = listing[-1].get('name', listing[-1].get('subdir'))
value[0] += len(listing)
subdirs = []
i = 0
# Capping the subdirs we'll list per dir to 10
while len(subdirs) < 10 and i < len(listing):
if 'subdir' in listing[i]:
subdirs.append(listing[i]['subdir'])
i += 1
del listing
for subdir in subdirs:
coropool.spawn(list, subdir)
sleep()
coropool.spawn(list)
coropool.waitall()
value = value[0]
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\rGot %d objects/subdirs in big_container, %d%s' % (value,
elapsed, elapsed_unit)
return value
def container_delete_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
containers = [c['name'] for c in
conn.get_account(prefix='stats_container_put_',
full_listing=True)]
count = len(containers)
def delete(container):
with connpool.item() as conn:
try:
conn.delete_container(container)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KDeleting containers: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for container in containers:
coropool.spawn(delete, container)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(containers)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KDeleting %d containers for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def object_put_report(coropool, connpool, count, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
def put(obj):
with connpool.item() as conn:
try:
conn.put_object('stats_object_put', obj, '')
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KCreating objects: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
with connpool.item() as conn:
conn.put_container('stats_object_put')
for x in xrange(count):
coropool.spawn(put, 'stats_object_put_%02x' % x)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / count
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KCreated %d objects for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def object_head_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
objects = [o['name'] for o in conn.get_container('stats_object_put',
prefix='stats_object_put_', full_listing=True)]
count = len(objects)
def head(obj):
with connpool.item() as conn:
try:
conn.head_object('stats_object_put', obj)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KHeading objects: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for obj in objects:
coropool.spawn(head, obj)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(objects)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KHeaded %d objects for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def object_get_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
objects = [o['name'] for o in conn.get_container('stats_object_put',
prefix='stats_object_put_', full_listing=True)]
count = len(objects)
def get(obj):
with connpool.item() as conn:
try:
conn.get_object('stats_object_put', obj)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KRetrieving objects: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for obj in objects:
coropool.spawn(get, obj)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(objects)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KRetrieved %d objects for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
def object_delete_report(coropool, connpool, options):
successes = [0]
failures = [0]
retries_done = [0]
begun = time()
next_report = [time() + 2]
with connpool.item() as conn:
objects = [o['name'] for o in conn.get_container('stats_object_put',
prefix='stats_object_put_', full_listing=True)]
count = len(objects)
def delete(obj):
with connpool.item() as conn:
try:
conn.delete_object('stats_object_put', obj)
successes[0] += 1
except (Exception, Timeout):
failures[0] += 1
if options.verbose and time() >= next_report[0]:
next_report[0] = time() + 5
eta, eta_unit = compute_eta(begun, successes[0] + failures[0],
count)
print '\r\x1B[KDeleting objects: %d of %d, %d%s left, %d ' \
'retries' % (successes[0] + failures[0], count, eta,
eta_unit, retries_done[0]),
for obj in objects:
coropool.spawn(delete, obj)
coropool.waitall()
successes = successes[0]
failures = failures[0]
value = 100.0 * successes / len(objects)
if options.verbose:
elapsed, elapsed_unit = get_time_units(time() - begun)
print '\r\x1B[KDeleted %d objects for performance reporting, ' \
'%d%s, %d retries' % (count, round(elapsed), elapsed_unit,
retries_done[0])
print '%d succeeded, %d failed, %.02f%% success rate' % (
successes, failures, value)
return value
if __name__ == '__main__':
patcher.monkey_patch()
hubs.get_hub().debug_exceptions = False
parser = OptionParser(usage='''
Usage: %prog [options] [conf_file]
[conf_file] defaults to /etc/swift/stats.conf'''.strip())
parser.add_option('-a', '--audit', action='store_true',
dest='audit', default=False,
help='Run the audit checks')
parser.add_option('-d', '--dispersion', action='store_true',
dest='dispersion', default=False,
help='Run the dispersion reports')
parser.add_option('-o', '--output', dest='csv_output',
default=None,
help='Override where the CSV report is written '
'(default from conf file); the keyword None will '
'suppress the CSV report')
parser.add_option('-p', '--performance', action='store_true',
dest='performance', default=False,
help='Run the performance reports')
parser.add_option('-q', '--quiet', action='store_false', dest='verbose',
default=True, help='Suppress status output')
parser.add_option('-r', '--retries', dest='retries',
default=None,
help='Override retry attempts (default from conf file)')
args = argv[1:]
if not args:
args.append('-h')
(options, args) = parser.parse_args(args)
conf_file = '/etc/swift/stats.conf'
if args:
conf_file = args.pop(0)
c = ConfigParser()
if not c.read(conf_file):
exit('Unable to read config file: %s' % conf_file)
conf = dict(c.items('stats'))
swift_dir = conf.get('swift_dir', '/etc/swift')
dispersion_coverage = int(conf.get('dispersion_coverage', 1))
container_put_count = int(conf.get('container_put_count', 1000))
object_put_count = int(conf.get('object_put_count', 1000))
concurrency = int(conf.get('concurrency', 50))
if options.retries:
options.retries = int(options.retries)
else:
options.retries = int(conf.get('retries', 5))
if not options.csv_output:
csv_output = conf.get('csv_output', '/etc/swift/stats.csv')
coropool = GreenPool(size=concurrency)
url, token = get_auth(conf['auth_url'], conf['auth_user'],
conf['auth_key'])
account = url.rsplit('/', 1)[1]
connpool = Pool(max_size=concurrency)
connpool.create = lambda: Connection(conf['auth_url'],
conf['auth_user'], conf['auth_key'],
retries=options.retries, preauthurl=url,
preauthtoken=token)
report = [time(), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0]
(R_TIMESTAMP, R_CDR_TIME, R_CDR_VALUE, R_ODR_TIME, R_ODR_VALUE,
R_CPUT_TIME, R_CPUT_RATE, R_CHEAD_TIME, R_CHEAD_RATE, R_CGET_TIME,
R_CGET_RATE, R_CDELETE_TIME, R_CDELETE_RATE, R_CLSTANDARD_TIME,
R_CLPREFIX_TIME, R_CLPREDELIM_TIME, R_OPUT_TIME, R_OPUT_RATE, R_OHEAD_TIME,
R_OHEAD_RATE, R_OGET_TIME, R_OGET_RATE, R_ODELETE_TIME, R_ODELETE_RATE) = \
xrange(len(report))
container_ring = Ring(os.path.join(swift_dir, 'container.ring.gz'))
object_ring = Ring(os.path.join(swift_dir, 'object.ring.gz'))
if options.audit:
audit(coropool, connpool, account, container_ring, object_ring, options)
if options.verbose and (options.dispersion or options.performance):
print
if options.dispersion:
begin = time()
report[R_CDR_VALUE] = container_dispersion_report(coropool, connpool,
account, container_ring, options)
report[R_CDR_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_ODR_VALUE] = object_dispersion_report(coropool, connpool,
account, object_ring, options)
report[R_ODR_TIME] = time() - begin
if options.verbose and options.performance:
print
if options.performance:
begin = time()
report[R_CPUT_RATE] = container_put_report(coropool, connpool,
container_put_count, options)
report[R_CPUT_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_CHEAD_RATE] = \
container_head_report(coropool, connpool, options)
report[R_CHEAD_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_CGET_RATE] = container_get_report(coropool, connpool, options)
report[R_CGET_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_CDELETE_RATE] = \
container_delete_report(coropool, connpool, options)
report[R_CDELETE_TIME] = time() - begin
if options.verbose:
print
begin = time()
container_standard_listing_report(coropool, connpool, options)
report[R_CLSTANDARD_TIME] = time() - begin
if options.verbose:
print
begin = time()
container_prefix_listing_report(coropool, connpool, options)
report[R_CLPREFIX_TIME] = time() - begin
if options.verbose:
print
begin = time()
container_prefix_delimiter_listing_report(coropool, connpool, options)
report[R_CLPREDELIM_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_OPUT_RATE] = \
object_put_report(coropool, connpool, object_put_count, options)
report[R_OPUT_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_OHEAD_RATE] = object_head_report(coropool, connpool, options)
report[R_OHEAD_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_OGET_RATE] = object_get_report(coropool, connpool, options)
report[R_OGET_TIME] = time() - begin
if options.verbose:
print
begin = time()
report[R_ODELETE_RATE] = \
object_delete_report(coropool, connpool, options)
report[R_ODELETE_TIME] = time() - begin
if options.csv_output != 'None':
try:
if not os.path.exists(csv_output):
f = open(csv_output, 'wb')
f.write('Timestamp,'
'Container Dispersion Report Time,'
'Container Dispersion Report Value,'
'Object Dispersion Report Time,'
'Object Dispersion Report Value,'
'Container PUT Report Time,'
'Container PUT Report Success Rate,'
'Container HEAD Report Time,'
'Container HEAD Report Success Rate,'
'Container GET Report Time,'
'Container GET Report Success Rate'
'Container DELETE Report Time,'
'Container DELETE Report Success Rate,'
'Container Standard Listing Time,'
'Container Prefix Listing Time,'
'Container Prefix Delimiter Listing Time,'
'Object PUT Report Time,'
'Object PUT Report Success Rate,'
'Object HEAD Report Time,'
'Object HEAD Report Success Rate,'
'Object GET Report Time,'
'Object GET Report Success Rate'
'Object DELETE Report Time,'
'Object DELETE Report Success Rate\r\n')
csv = csv.writer(f)
else:
csv = csv.writer(open(csv_output, 'ab'))
csv.writerow(report)
except Exception, err:
print >>stderr, 'Could not write CSV report:', err

48
setup.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/python
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from distutils.core import setup
setup(
name='swift',
version='1.0.0-1',
description='Swift',
license='Apache License (2.0)',
author='OpenStack, LLC.',
url='https://launchpad.net/swift',
packages=['swift', 'swift.common'],
classifiers=[
'Development Status :: 4 - Beta',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 2.6',
'Environment :: No Input/Output (Daemon)',
],
scripts=['bin/st.py', 'bin/swift-account-auditor.py',
'bin/swift-account-audit.py', 'bin/swift-account-reaper.py',
'bin/swift-account-replicator.py', 'bin/swift-account-server.py',
'bin/swift-auth-create-account.py',
'bin/swift-auth-recreate-accounts.py', 'bin/swift-auth-server.py',
'bin/swift-container-auditor.py',
'bin/swift-container-replicator.py',
'bin/swift-container-server.py', 'bin/swift-container-updater.py',
'bin/swift-drive-audit.py', 'bin/swift-get-nodes.py',
'bin/swift-init.py', 'bin/swift-object-auditor.py',
'bin/swift-object-info.py', 'bin/swift-object-server.py',
'bin/swift-object-updater.py', 'bin/swift-proxy-server.py',
'bin/swift-ring-builder.py', 'bin/swift-stats-populate.py',
'bin/swift-stats-report.py']
)

0
swift/__init__.py Normal file
View File

6
swift/common/__init__.py Normal file
View File

@ -0,0 +1,6 @@
""" Code common to all of Swift. """
ACCOUNT_LISTING_LIMIT = 10000
CONTAINER_LISTING_LIMIT = 10000
FILE_SIZE_LIMIT = 5368709122

98
swift/common/auth.py Normal file
View File

@ -0,0 +1,98 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ConfigParser import ConfigParser, NoOptionError
import os
import time
from webob.request import Request
from webob.exc import HTTPUnauthorized, HTTPPreconditionFailed
from eventlet.timeout import Timeout
from swift.common.utils import split_path
from swift.common.bufferedhttp import http_connect_raw as http_connect
class DevAuthMiddleware(object):
"""
Auth Middleware that uses the dev auth server
"""
def __init__(self, app, conf, memcache_client, logger):
self.app = app
self.memcache_client = memcache_client
self.logger = logger
self.conf = conf
self.auth_host = conf.get('bind_ip', '127.0.0.1')
self.auth_port = int(conf.get('bind_port', 11000))
self.timeout = int(conf.get('node_timeout', 10))
def __call__(self, env, start_response):
req = Request(env)
if req.path != '/healthcheck':
if 'x-storage-token' in req.headers and \
'x-auth-token' not in req.headers:
req.headers['x-auth-token'] = req.headers['x-storage-token']
version, account, container, obj = split_path(req.path, 1, 4, True)
if account is None:
return HTTPPreconditionFailed(request=req, body='Bad URL')(
env, start_response)
if not req.headers.get('x-auth-token'):
return HTTPPreconditionFailed(request=req,
body='Missing Auth Token')(env, start_response)
if account is None:
return HTTPPreconditionFailed(
request=req, body='Bad URL')(env, start_response)
if not self.auth(account, req.headers['x-auth-token']):
return HTTPUnauthorized(request=req)(env, start_response)
# If we get here, then things should be good.
return self.app(env, start_response)
def auth(self, account, token):
"""
Dev authorization implmentation
:param account: account name
:param token: auth token
:returns: True if authorization is successful, False otherwise
"""
key = 'auth/%s/%s' % (account, token)
now = time.time()
cached_auth_data = self.memcache_client.get(key)
if cached_auth_data:
start, expiration = cached_auth_data
if now - start <= expiration:
return True
try:
with Timeout(self.timeout):
conn = http_connect(self.auth_host, self.auth_port, 'GET',
'/token/%s/%s' % (account, token))
resp = conn.getresponse()
resp.read()
conn.close()
if resp.status == 204:
validated = float(resp.getheader('x-auth-ttl'))
else:
validated = False
except:
self.logger.exception('ERROR with auth')
return False
if not validated:
return False
else:
val = (now, validated)
self.memcache_client.set(key, val, timeout=validated)
return True

View File

@ -0,0 +1,158 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Monkey Patch httplib.HTTPResponse to buffer reads of headers. This can improve
performance when making large numbers of small HTTP requests. This module
also provides helper functions to make HTTP connections using
BufferedHTTPResponse.
.. warning::
If you use this, be sure that the libraries you are using do not access
the socket directly (xmlrpclib, I'm looking at you :/), and instead
make all calls through httplib.
"""
from urllib import quote
import logging
import time
from eventlet.green.httplib import HTTPConnection, HTTPResponse, _UNKNOWN, \
CONTINUE, HTTPMessage
class BufferedHTTPResponse(HTTPResponse):
"""HTTPResponse class that buffers reading of headers"""
def __init__(self, sock, debuglevel=0, strict=0,
method=None): # pragma: no cover
self.sock = sock
self.fp = sock.makefile('rb')
self.debuglevel = debuglevel
self.strict = strict
self._method = method
self.msg = None
# from the Status-Line of the response
self.version = _UNKNOWN # HTTP-Version
self.status = _UNKNOWN # Status-Code
self.reason = _UNKNOWN # Reason-Phrase
self.chunked = _UNKNOWN # is "chunked" being used?
self.chunk_left = _UNKNOWN # bytes left to read in current chunk
self.length = _UNKNOWN # number of bytes left in response
self.will_close = _UNKNOWN # conn will close at end of response
def expect_response(self):
self.fp = self.sock.makefile('rb', 0)
version, status, reason = self._read_status()
if status != CONTINUE:
self._read_status = lambda: (version, status, reason)
self.begin()
else:
self.status = status
self.reason = reason.strip()
self.version = 11
self.msg = HTTPMessage(self.fp, 0)
self.msg.fp = None
class BufferedHTTPConnection(HTTPConnection):
"""HTTPConnection class that uses BufferedHTTPResponse"""
response_class = BufferedHTTPResponse
def connect(self):
self._connected_time = time.time()
return HTTPConnection.connect(self)
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
self._method = method
self._path = url
self._txn_id = '-'
return HTTPConnection.putrequest(self, method, url, skip_host,
skip_accept_encoding)
def putheader(self, header, value):
if header.lower() == 'x-cf-trans-id':
self._txn_id = value
return HTTPConnection.putheader(self, header, value)
def getexpect(self):
response = BufferedHTTPResponse(self.sock, strict=self.strict,
method=self._method)
response.expect_response()
return response
def getresponse(self):
response = HTTPConnection.getresponse(self)
logging.debug("HTTP PERF: %.5f seconds to %s %s:%s %s (%s)" %
(time.time() - self._connected_time, self._method, self.host,
self.port, self._path, self._txn_id))
return response
def http_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
"""
Helper function to create a HTTPConnection object that is buffered
for backend Swift services.
:param ipaddr: IPv4 address to connect to
:param port: port to connect to
:param device: device of the node to query
:param partition: partition on the device
:param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
:param path: request path
:param headers: dictionary of headers
:param query_string: request query string
:returns: HTTPConnection object
"""
conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
path = quote('/' + device + '/' + str(partition) + path)
if query_string:
path += '?' + query_string
conn.path = path
conn.putrequest(method, path)
if headers:
for header, value in headers.iteritems():
conn.putheader(header, value)
conn.endheaders()
return conn
def http_connect_raw(ipaddr, port, method, path, headers=None,
query_string=None):
"""
Helper function to create a HTTPConnection object that is buffered.
:param ipaddr: IPv4 address to connect to
:param port: port to connect to
:param method: HTTP method to request ('GET', 'PUT', 'POST', etc.)
:param path: request path
:param headers: dictionary of headers
:param query_string: request query string
:returns: HTTPConnection object
"""
conn = BufferedHTTPConnection('%s:%s' % (ipaddr, port))
if query_string:
path += '?' + query_string
conn.path = path
conn.putrequest(method, path)
if headers:
for header, value in headers.iteritems():
conn.putheader(header, value)
conn.endheaders()
return conn

718
swift/common/client.py Normal file
View File

@ -0,0 +1,718 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Cloud Files client library used internally
"""
import socket
from cStringIO import StringIO
from httplib import HTTPConnection, HTTPException, HTTPSConnection
from re import compile, DOTALL
from tokenize import generate_tokens, STRING, NAME, OP
from urllib import quote as _quote, unquote
from urlparse import urlparse, urlunparse
try:
from eventlet import sleep
except:
from time import sleep
def quote(value, safe='/'):
"""
Patched version of urllib.quote that encodes utf8 strings before quoting
"""
if isinstance(value, unicode):
value = value.encode('utf8')
return _quote(value, safe)
# look for a real json parser first
try:
# simplejson is popular and pretty good
from simplejson import loads as json_loads
except ImportError:
try:
# 2.6 will have a json module in the stdlib
from json import loads as json_loads
except ImportError:
# fall back on local parser otherwise
comments = compile(r'/\*.*\*/|//[^\r\n]*', DOTALL)
def json_loads(string):
'''
Fairly competent json parser exploiting the python tokenizer and
eval(). -- From python-cloudfiles
_loads(serialized_json) -> object
'''
try:
res = []
consts = {'true': True, 'false': False, 'null': None}
string = '(' + comments.sub('', string) + ')'
for type, val, _, _, _ in \
generate_tokens(StringIO(string).readline):
if (type == OP and val not in '[]{}:,()-') or \
(type == NAME and val not in consts):
raise AttributeError()
elif type == STRING:
res.append('u')
res.append(val.replace('\\/', '/'))
else:
res.append(val)
return eval(''.join(res), {}, consts)
except:
raise AttributeError()
class ClientException(Exception):
def __init__(self, msg, http_scheme='', http_host='', http_port='',
http_path='', http_query='', http_status=0, http_reason='',
http_device=''):
Exception.__init__(self, msg)
self.msg = msg
self.http_scheme = http_scheme
self.http_host = http_host
self.http_port = http_port
self.http_path = http_path
self.http_query = http_query
self.http_status = http_status
self.http_reason = http_reason
self.http_device = http_device
def __str__(self):
a = self.msg
b = ''
if self.http_scheme:
b += '%s://' % self.http_scheme
if self.http_host:
b += self.http_host
if self.http_port:
b += ':%s' % self.http_port
if self.http_path:
b += self.http_path
if self.http_query:
b += '?%s' % self.http_query
if self.http_status:
if b:
b = '%s %s' % (b, self.http_status)
else:
b = str(self.http_status)
if self.http_reason:
if b:
b = '%s %s' % (b, self.http_reason)
else:
b = '- %s' % self.http_reason
if self.http_device:
if b:
b = '%s: device %s' % (b, self.http_device)
else:
b = 'device %s' % self.http_device
return b and '%s: %s' % (a, b) or a
def http_connection(url):
"""
Make an HTTPConnection or HTTPSConnection
:param url: url to connect to
:returns: tuple of (parsed url, connection object)
:raises ClientException: Unable to handle protocol scheme
"""
parsed = urlparse(url)
if parsed.scheme == 'http':
conn = HTTPConnection(parsed.netloc)
elif parsed.scheme == 'https':
conn = HTTPSConnection(parsed.netloc)
else:
raise ClientException('Cannot handle protocol scheme %s for url %s' %
(parsed.scheme, repr(url)))
return parsed, conn
def get_auth(url, user, key, snet=False):
"""
Get authentication credentials
:param url: authentication URL
:param user: user to auth as
:param key: key or passowrd for auth
:param snet: use SERVICENET internal network default is False
:returns: tuple of (storage URL, storage token, auth token)
:raises ClientException: HTTP GET request to auth URL failed
"""
parsed, conn = http_connection(url)
conn.request('GET', parsed.path, '',
{'X-Auth-User': user, 'X-Auth-Key': key})
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Auth GET failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port,
http_path=parsed.path, http_status=resp.status,
http_reason=resp.reason)
url = resp.getheader('x-storage-url')
if snet:
parsed = list(urlparse(url))
# Second item in the list is the netloc
parsed[1] = 'snet-' + parsed[1]
url = urlunparse(parsed)
return url, resp.getheader('x-storage-token',
resp.getheader('x-auth-token'))
def get_account(url, token, marker=None, limit=None, prefix=None,
http_conn=None, full_listing=False):
"""
Get a listing of containers for the account.
:param url: storage URL
:param token: auth token
:param marker: marker query
:param limit: limit query
:param prefix: prefix query
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param full_listing: if True, return a full listing, else returns a max
of 10000 listings
:returns: a list of accounts
:raises ClientException: HTTP GET request failed
"""
if not http_conn:
http_conn = http_connection(url)
if full_listing:
rv = []
listing = get_account(url, token, marker, limit, prefix, http_conn)
while listing:
rv.extend(listing)
marker = listing[-1]['name']
listing = get_account(url, token, marker, limit, prefix, http_conn)
return rv
parsed, conn = http_conn
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
conn.request('GET', '%s?%s' % (parsed.path, qs), '',
{'X-Auth-Token': token})
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
resp.read()
raise ClientException('Account GET failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port,
http_path=parsed.path, http_query=qs, http_status=resp.status,
http_reason=resp.reason)
if resp.status == 204:
resp.read()
return []
return json_loads(resp.read())
def head_account(url, token, http_conn=None):
"""
Get account stats.
:param url: storage URL
:param token: auth token
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:returns: a tuple of (container count, object count, bytes used)
:raises ClientException: HTTP HEAD request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Account HEAD failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port,
http_path=parsed.path, http_status=resp.status,
http_reason=resp.reason)
return int(resp.getheader('x-account-container-count', 0)), \
int(resp.getheader('x-account-object-count', 0)), \
int(resp.getheader('x-account-bytes-used', 0))
def get_container(url, token, container, marker=None, limit=None,
prefix=None, delimiter=None, http_conn=None,
full_listing=False):
"""
Get a listing of objects for the container.
:param url: storage URL
:param token: auth token
:param container: container name to get a listing for
:param marker: marker query
:param limit: limit query
:param prefix: prefix query
:param delimeter: string to delimit the queries on
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param full_listing: if True, return a full listing, else returns a max
of 10000 listings
:returns: a list of objects
:raises ClientException: HTTP GET request failed
"""
if not http_conn:
http_conn = http_connection(url)
if full_listing:
rv = []
listing = get_container(url, token, container, marker, limit, prefix,
delimiter, http_conn)
while listing:
rv.extend(listing)
if not delimiter:
marker = listing[-1]['name']
else:
marker = listing[-1].get('name', listing[-1].get('subdir'))
listing = get_container(url, token, container, marker, limit,
prefix, delimiter, http_conn)
return rv
parsed, conn = http_conn
path = '%s/%s' % (parsed.path, quote(container))
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
if delimiter:
qs += '&delimiter=%s' % quote(delimiter)
conn.request('GET', '%s?%s' % (path, qs), '', {'X-Auth-Token': token})
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
resp.read()
raise ClientException('Container GET failed',
http_scheme=parsed.scheme, http_host=conn.host,
http_port=conn.port, http_path=path, http_query=qs,
http_status=resp.status, http_reason=resp.reason)
if resp.status == 204:
resp.read()
return []
return json_loads(resp.read())
def head_container(url, token, container, http_conn=None):
"""
Get container stats.
:param url: storage URL
:param token: auth token
:param container: container name to get stats for
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:returns: a tuple of (object count, bytes used)
:raises ClientException: HTTP HEAD request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s' % (parsed.path, quote(container))
conn.request('HEAD', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Container HEAD failed',
http_scheme=parsed.scheme, http_host=conn.host,
http_port=conn.port, http_path=path, http_status=resp.status,
http_reason=resp.reason)
return int(resp.getheader('x-container-object-count', 0)), \
int(resp.getheader('x-container-bytes-used', 0))
def put_container(url, token, container, http_conn=None):
"""
Create a container
:param url: storage URL
:param token: auth token
:param container: container name to create
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:raises ClientException: HTTP PUT request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s' % (parsed.path, quote(container))
conn.request('PUT', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Container PUT failed',
http_scheme=parsed.scheme, http_host=conn.host,
http_port=conn.port, http_path=path, http_status=resp.status,
http_reason=resp.reason)
def delete_container(url, token, container, http_conn=None):
"""
Delete a container
:param url: storage URL
:param token: auth token
:param container: container name to delete
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:raises ClientException: HTTP DELETE request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s' % (parsed.path, quote(container))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Container DELETE failed',
http_scheme=parsed.scheme, http_host=conn.host,
http_port=conn.port, http_path=path, http_status=resp.status,
http_reason=resp.reason)
def get_object(url, token, container, name, http_conn=None,
resp_chunk_size=None):
"""
Get an object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to get
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param resp_chunk_size: if defined, chunk size of data to read
:returns: a list of objects
:raises ClientException: HTTP GET request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('GET', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
resp.read()
raise ClientException('Object GET failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
metadata = {}
for key, value in resp.getheaders():
if key.lower().startswith('x-object-meta-'):
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
if resp_chunk_size:
def _object_body():
buf = resp.read(resp_chunk_size)
while buf:
yield buf
buf = resp.read(resp_chunk_size)
object_body = _object_body()
else:
object_body = resp.read()
return resp.getheader('content-type'), \
int(resp.getheader('content-length', 0)), \
resp.getheader('last-modified'), \
resp.getheader('etag').strip('"'), \
metadata, \
object_body
def head_object(url, token, container, name, http_conn=None):
"""
Get object info
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to get info for
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:returns: a tuple of (content type, content length, last modfied, etag,
dictionary of metadata)
:raises ClientException: HTTP HEAD request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('HEAD', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Object HEAD failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
metadata = {}
for key, value in resp.getheaders():
if key.lower().startswith('x-object-meta-'):
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
return resp.getheader('content-type'), \
int(resp.getheader('content-length', 0)), \
resp.getheader('last-modified'), \
resp.getheader('etag').strip('"'), \
metadata
def put_object(url, token, container, name, contents, metadata={},
content_length=None, etag=None, chunk_size=65536,
content_type=None, http_conn=None):
"""
Put an object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to put
:param contents: file like object to read object data from
:param metadata: dictionary of object metadata
:param content_length: value to send as content-length header
:param etag: etag of contents
:param chunk_size: chunk size of data to write
:param content_type: value to send as content-type header
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:returns: etag from server response
:raises ClientException: HTTP PUT request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
headers = {'X-Auth-Token': token}
for key, value in metadata.iteritems():
headers['X-Object-Meta-%s' % quote(key)] = quote(value)
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
headers['Content-Length'] = str(content_length)
if content_type is not None:
headers['Content-Type'] = content_type
if not contents:
headers['Content-Length'] = '0'
if hasattr(contents, 'read'):
conn.putrequest('PUT', path)
for header, value in headers.iteritems():
conn.putheader(header, value)
if not content_length:
conn.putheader('Transfer-Encoding', 'chunked')
conn.endheaders()
chunk = contents.read(chunk_size)
while chunk:
if not content_length:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
else:
conn.send(chunk)
chunk = contents.read(chunk_size)
if not content_length:
conn.send('0\r\n\r\n')
else:
conn.request('PUT', path, contents, headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
return resp.getheader('etag').strip('"')
def post_object(url, token, container, name, metadata, http_conn=None):
"""
Change object metadata
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to change
:param metadata: dictionary of object metadata
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:raises ClientException: HTTP POST request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
headers = {'X-Auth-Token': token}
for key, value in metadata.iteritems():
headers['X-Object-Meta-%s' % quote(key)] = quote(value)
conn.request('POST', path, '', headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Object POST failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
def delete_object(url, token, container, name, http_conn=None):
"""
Delete object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to delete
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:raises ClientException: HTTP DELETE request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException('Object DELETE failed',
http_scheme=parsed.scheme, http_host=conn.host,
http_port=conn.port, http_path=path, http_status=resp.status,
http_reason=resp.reason)
class Connection(object):
"""Convenience class to make requests that will also retry the request"""
def __init__(self, authurl, user, key, retries=5, preauthurl=None,
preauthtoken=None, snet=False):
"""
:param authurl: authenitcation URL
:param user: user name to authenticate as
:param key: key/password to authenticate with
:param retries: Number of times to retry the request before failing
:param preauthurl: storage URL (if you have already authenticated)
:param preauthtoken: authentication token (if you have already
authenticated)
:param snet: use SERVICENET internal network default is False
"""
self.authurl = authurl
self.user = user
self.key = key
self.retries = retries
self.http_conn = None
self.url = preauthurl
self.token = preauthtoken
self.attempts = 0
self.snet = snet
def _retry(self, func, *args, **kwargs):
kwargs['http_conn'] = self.http_conn
self.attempts = 0
backoff = 1
while self.attempts <= self.retries:
self.attempts += 1
try:
if not self.url or not self.token:
self.url, self.token = \
get_auth(self.authurl, self.user, self.key, snet=self.snet)
self.http_conn = None
if not self.http_conn:
self.http_conn = http_connection(self.url)
kwargs['http_conn'] = self.http_conn
rv = func(self.url, self.token, *args, **kwargs)
return rv
except (socket.error, HTTPException):
if self.attempts > self.retries:
raise
self.http_conn = None
except ClientException, err:
if self.attempts > self.retries:
raise
if err.http_status == 401:
self.url = self.token = None
if self.attempts > 1:
raise
elif 500 <= err.http_status <= 599:
pass
else:
raise
sleep(backoff)
backoff *= 2
def head_account(self):
"""Wrapper for head_account"""
return self._retry(head_account)
def get_account(self, marker=None, limit=None, prefix=None,
full_listing=False):
"""Wrapper for get_account"""
# TODO: With full_listing=True this will restart the entire listing
# with each retry. Need to make a better version that just retries
# where it left off.
return self._retry(get_account, marker=marker, limit=limit,
prefix=prefix, full_listing=full_listing)
def head_container(self, container):
"""Wrapper for head_container"""
return self._retry(head_container, container)
def get_container(self, container, marker=None, limit=None, prefix=None,
delimiter=None, full_listing=False):
"""Wrapper for get_container"""
# TODO: With full_listing=True this will restart the entire listing
# with each retry. Need to make a better version that just retries
# where it left off.
return self._retry(get_container, container, marker=marker,
limit=limit, prefix=prefix, delimiter=delimiter,
full_listing=full_listing)
def put_container(self, container):
"""Wrapper for put_container"""
return self._retry(put_container, container)
def delete_container(self, container):
"""Wrapper for delete_container"""
return self._retry(delete_container, container)
def head_object(self, container, obj):
"""Wrapper for head_object"""
return self._retry(head_object, container, obj)
def get_object(self, container, obj, resp_chunk_size=None):
"""Wrapper for get_object"""
return self._retry(get_object, container, obj,
resp_chunk_size=resp_chunk_size)
def put_object(self, container, obj, contents, metadata={},
content_length=None, etag=None, chunk_size=65536,
content_type=None):
"""Wrapper for put_object"""
return self._retry(put_object, container, obj, contents,
metadata=metadata, content_length=content_length, etag=etag,
chunk_size=chunk_size, content_type=content_type)
def post_object(self, container, obj, metadata):
"""Wrapper for post_object"""
return self._retry(post_object, container, obj, metadata)
def delete_object(self, container, obj):
"""Wrapper for delete_object"""
return self._retry(delete_object, container, obj)

152
swift/common/constraints.py Normal file
View File

@ -0,0 +1,152 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
from webob.exc import HTTPBadRequest, HTTPLengthRequired, \
HTTPRequestEntityTooLarge
#: Max file size allowed for objects
MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
#: Max length of the name of a key for metadata
MAX_META_NAME_LENGTH = 128
#: Max length of the value of a key for metadata
MAX_META_VALUE_LENGTH = 256
#: Max number of metadata items
MAX_META_COUNT = 90
#: Max overall size of metadata
MAX_META_OVERALL_SIZE = 4096
#: Max object name length
MAX_OBJECT_NAME_LENGTH = 1024
def check_metadata(req):
"""
Check metadata sent for objects in the request headers.
:param req: request object
:raises HTTPBadRequest: bad metadata
"""
meta_count = 0
meta_size = 0
for key, value in req.headers.iteritems():
if not key.lower().startswith('x-object-meta-'):
continue
key = key[len('x-object-meta-'):]
if not key:
return HTTPBadRequest(body='Metadata name cannot be empty',
request=req, content_type='text/plain')
meta_count += 1
meta_size += len(key) + len(value)
if len(key) > MAX_META_NAME_LENGTH:
return HTTPBadRequest(
body='Metadata name too long; max %d'
% MAX_META_NAME_LENGTH,
request=req, content_type='text/plain')
elif len(value) > MAX_META_VALUE_LENGTH:
return HTTPBadRequest(
body='Metadata value too long; max %d'
% MAX_META_VALUE_LENGTH,
request=req, content_type='text/plain')
elif meta_count > MAX_META_COUNT:
return HTTPBadRequest(
body='Too many metadata items; max %d' % MAX_META_COUNT,
request=req, content_type='text/plain')
elif meta_size > MAX_META_OVERALL_SIZE:
return HTTPBadRequest(
body='Total metadata too large; max %d'
% MAX_META_OVERALL_SIZE,
request=req, content_type='text/plain')
return None
def check_object_creation(req, object_name):
"""
Check to ensure that everything is alright about an object to be created.
:param req: HTTP request object
:param object_name: name of object to be created
:raises HTTPRequestEntityTooLarge: the object is too large
:raises HTTPLengthRequered: missing content-length header and not
a chunked request
:raises HTTPBadRequest: missing or bad content-type header, or
bad metadata
"""
if req.content_length and req.content_length > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(body='Your request is too large.',
request=req, content_type='text/plain')
if req.content_length is None and \
req.headers.get('transfer-encoding') != 'chunked':
return HTTPLengthRequired(request=req)
if len(object_name) > MAX_OBJECT_NAME_LENGTH:
return HTTPBadRequest(body='Object name length of %d longer than %d' %
(len(object_name), MAX_OBJECT_NAME_LENGTH), request=req,
content_type='text/plain')
if 'Content-Type' not in req.headers:
return HTTPBadRequest(request=req, content_type='text/plain',
body='No content type')
if not check_xml_encodable(req.headers['Content-Type']):
return HTTPBadRequest(request=req, body='Invalid Content-Type',
content_type='text/plain')
return check_metadata(req)
def check_mount(root, drive):
"""
Verify that the path to the device is a mount point and mounted. This
allows us to fast fail on drives that have been unmounted because of
issues, and also prevents us for accidently filling up the root partition.
:param root: base path where the devices are mounted
:param drive: drive name to be checked
:returns: True if it is a valid mounted device, False otherwise
"""
if not drive.isalnum():
return False
path = os.path.join(root, drive)
return os.path.exists(path) and os.path.ismount(path)
def check_float(string):
"""
Helper function for checking if a string can be converted to a float.
:param string: string to be verified as a float
:returns: True if the string can be converted to a float, False otherwise
"""
try:
float(string)
return True
except ValueError:
return False
_invalid_xml = re.compile(ur'[^\x09\x0a\x0d\x20-\uD7FF\uE000-\uFFFD%s-%s]' %
(unichr(0x10000), unichr(0x10FFFF)))
def check_xml_encodable(string):
"""
Validate if a string can be encoded in xml.
:param string: string to be validated
:returns: True if the string can be encoded in xml, False otherwise
"""
try:
return not _invalid_xml.search(string.decode('UTF-8'))
except UnicodeDecodeError:
return False

1463
swift/common/db.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,526 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import sys
import os
import random
import math
import time
import shutil
from eventlet import GreenPool, sleep, Timeout
from eventlet.green import subprocess
import simplejson
from webob import Response
from webob.exc import HTTPNotFound, HTTPNoContent, HTTPAccepted, \
HTTPInsufficientStorage, HTTPBadRequest
from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, unlink_older_than, LoggerFileObject
from swift.common import ring
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted, ConnectionTimeout
def quarantine_db(object_file, server_type):
"""
In the case that a corrupt file is found, move it to a quarantined area to
allow replication to fix it.
:param object_file: path to corrupt file
:param server_type: type of file that is corrupt
('container' or 'account')
"""
object_dir = os.path.dirname(object_file)
quarantine_dir = os.path.abspath(os.path.join(object_dir, '..',
'..', '..', '..', 'quarantined', server_type + 's',
os.path.basename(object_dir)))
renamer(object_dir, quarantine_dir)
class ReplConnection(BufferedHTTPConnection):
"""
Helper to simplify POSTing to a remote server.
"""
def __init__(self, node, partition, hash_, logger):
""
self.logger = logger
self.node = node
BufferedHTTPConnection.__init__(self, '%(ip)s:%(port)s' % node)
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
def post(self, *args):
"""
Make an HTTP POST request
:param args: list of json-encodable objects
:returns: httplib response object
"""
try:
body = simplejson.dumps(args)
self.request('POST', self.path, body,
{'Content-Type': 'application/json'})
response = self.getresponse()
response.data = response.read()
return response
except:
self.logger.exception(
'ERROR reading HTTP response from %s' % self.node)
return None
class Replicator(object):
"""
Implements the logic for directing db replication.
"""
def __init__(self, server_conf, replicator_conf):
self.logger = \
get_logger(replicator_conf, '%s-replicator' % self.server_type)
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = LoggerFileObject(self.logger)
self.root = server_conf.get('devices', '/srv/node')
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.port = int(server_conf.get('bind_port', self.default_port))
concurrency = int(replicator_conf.get('concurrency', 8))
self.cpool = GreenPool(size=concurrency)
swift_dir = server_conf.get('swift_dir', '/etc/swift')
self.ring = ring.Ring(os.path.join(swift_dir, self.ring_file))
self.per_diff = int(replicator_conf.get('per_diff', 1000))
self.run_pause = int(replicator_conf.get('run_pause', 30))
self.vm_test_mode = replicator_conf.get(
'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
self.node_timeout = int(replicator_conf.get('node_timeout', 10))
self.conn_timeout = float(replicator_conf.get('conn_timeout', 0.5))
self.reclaim_age = float(replicator_conf.get('reclaim_age', 86400 * 7))
self._zero_stats()
def _zero_stats(self):
"""Zero out the stats."""
self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
'remove': 0, 'empty': 0, 'remote_merge': 0,
'start': time.time()}
def _report_stats(self):
"""Report the current stats to the logs."""
self.logger.info(
'Attempted to replicate %d dbs in %.5f seconds (%.5f/s)'
% (self.stats['attempted'], time.time() - self.stats['start'],
self.stats['attempted'] /
(time.time() - self.stats['start'] + 0.0000001)))
self.logger.info('Removed %(remove)d dbs' % self.stats)
self.logger.info('%(success)s successes, %(failure)s failures'
% self.stats)
self.logger.info(' '.join(['%s:%s' % item for item in
self.stats.items() if item[0] in
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl', 'empty')]))
def _rsync_file(self, db_file, remote_file, whole_file=True):
"""
Sync a single file using rsync. Used by _rsync_db to handle syncing.
:param db_file: file to be synced
:param remote_file: remote location to sync the DB file to
:param whole-file: if True, uses rsync's --whole-file flag
:returns: True if the sync was successful, False otherwise
"""
popen_args = ['rsync', '--quiet', '--no-motd',
'--timeout=%s' % int(math.ceil(self.node_timeout)),
'--contimeout=%s' % int(math.ceil(self.conn_timeout))]
if whole_file:
popen_args.append('--whole-file')
popen_args.extend([db_file, remote_file])
proc = subprocess.Popen(popen_args)
proc.communicate()
if proc.returncode != 0:
self.logger.error('ERROR rsync failed with %s: %s' %
(proc.returncode, popen_args))
return proc.returncode == 0
def _rsync_db(self, broker, device, http, local_id,
post_method='complete_rsync', post_timeout=None):
"""
Sync a whole db using rsync.
:param broker: DB broker object of DB to be synced
:param device: device to sync to
:param http: ReplConnection object
:param local_id: unique ID of the local database replica
:param post_method: remote operation to perform after rsync
:param post_timeout: timeout to wait in seconds
"""
if self.vm_test_mode:
remote_file = '%s::%s%s/%s/tmp/%s' % (device['ip'],
self.server_type, device['port'], device['device'],
local_id)
else:
remote_file = '%s::%s/%s/tmp/%s' % (device['ip'],
self.server_type, device['device'], local_id)
mtime = os.path.getmtime(broker.db_file)
if not self._rsync_file(broker.db_file, remote_file):
return False
# perform block-level sync if the db was modified during the first sync
if os.path.exists(broker.db_file + '-journal') or \
os.path.getmtime(broker.db_file) > mtime:
# grab a lock so nobody else can modify it
with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False):
return False
with Timeout(post_timeout or self.node_timeout):
response = http.post(post_method, local_id)
return response and response.status >= 200 and response.status < 300
def _usync_db(self, point, broker, http, remote_id, local_id):
"""
Sync a db by sending all records since the last sync.
:param point: synchronization high water mark between the replicas
:param broker: database broker object
:param http: ReplConnection object for the remote server
:param remote_id: database id for the remote replica
:param local_id: database id for the local replica
:returns: boolean indicating completion and success
"""
self.stats['diff'] += 1
self.logger.debug('Syncing chunks with %s', http.host)
sync_table = broker.get_syncs()
objects = broker.get_items_since(point, self.per_diff)
while len(objects):
with Timeout(self.node_timeout):
response = http.post('merge_items', objects, local_id)
if not response or response.status >= 300 or response.status < 200:
if response:
self.logger.error('ERROR Bad response %s from %s' %
(response.status, http.host))
return False
point = objects[-1]['ROWID']
objects = broker.get_items_since(point, self.per_diff)
with Timeout(self.node_timeout):
response = http.post('merge_syncs', sync_table)
if response and response.status >= 200 and response.status < 300:
broker.merge_syncs([{'remote_id': remote_id,
'sync_point': point}], incoming=False)
return True
return False
def _in_sync(self, rinfo, info, broker, local_sync):
"""
Determine whether or not two replicas of a databases are considered
to be in sync.
:param rinfo: remote database info
:param info: local database info
:param broker: database broker object
:param local_sync: cached last sync point between replicas
:returns: boolean indicating whether or not the replicas are in sync
"""
if max(rinfo['point'], local_sync) >= info['max_row']:
self.stats['no_change'] += 1
return True
if rinfo['hash'] == info['hash']:
self.stats['hashmatch'] += 1
broker.merge_syncs([{'remote_id': rinfo['id'],
'sync_point': rinfo['point']}], incoming=False)
return True
def _http_connect(self, node, partition, db_file):
"""
Make an http_connection using ReplConnection
:param node: node dictionary from the ring
:param partition: partition partition to send in the url
:param db_file: DB file
:returns: ReplConnection object
"""
return ReplConnection(node, partition,
os.path.basename(db_file).split('.', 1)[0], self.logger)
def _repl_to_node(self, node, broker, partition, info):
"""
Replicate a database to a node.
:param node: node dictionary from the ring to be replicated to
:param broker: DB broker for the DB to be replication
:param partition: partition on the node to replicate to
:param info: DB info as a dictionary of {'max_row', 'hash', 'id',
'created_at', 'put_timestamp', 'delete_timestamp'}
:returns: True if successful, False otherwise
"""
with ConnectionTimeout(self.conn_timeout):
http = self._http_connect(node, partition, broker.db_file)
if not http:
self.logger.error(
'ERROR Unable to connect to remote server: %s' % node)
return False
with Timeout(self.node_timeout):
response = http.post('sync', info['max_row'], info['hash'],
info['id'], info['created_at'], info['put_timestamp'],
info['delete_timestamp'])
if not response:
return False
elif response.status == HTTPNotFound.code: # completely missing, rsync
self.stats['rsync'] += 1
return self._rsync_db(broker, node, http, info['id'])
elif response.status == HTTPInsufficientStorage.code:
raise DriveNotMounted()
elif response.status >= 200 and response.status < 300:
rinfo = simplejson.loads(response.data)
local_sync = broker.get_sync(rinfo['id'], incoming=False)
if self._in_sync(rinfo, info, broker, local_sync):
return True
# if the difference in rowids between the two differs by
# more than 50%, rsync then do a remote merge.
if rinfo['max_row'] / float(info['max_row']) < 0.5:
self.stats['remote_merge'] += 1
return self._rsync_db(broker, node, http, info['id'],
post_method='rsync_then_merge',
post_timeout=(info['count'] / 2000))
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
def _replicate_object(self, partition, object_file, node_id):
"""
Replicate the db, choosing method based on whether or not it
already exists on peers.
:param partition: partition to be replicated to
:param object_file: DB file name to be replicated
:param node_id: node id of the node to be replicated to
"""
self.logger.debug('Replicating db %s' % object_file)
self.stats['attempted'] += 1
try:
broker = self.brokerclass(object_file, pending_timeout=30)
broker.reclaim(time.time() - self.reclaim_age,
time.time() - (self.reclaim_age * 2))
info = broker.get_replication_info()
except Exception, e:
if 'no such table' in str(e):
self.logger.error('Quarantining DB %s' % object_file)
quarantine_db(broker.db_file, broker.db_type)
else:
self.logger.exception('ERROR reading db %s' % object_file)
self.stats['failure'] += 1
return
# The db is considered deleted if the delete_timestamp value is greater
# than the put_timestamp, and there are no objects.
delete_timestamp = 0
try:
delete_timestamp = float(info['delete_timestamp'])
except ValueError:
pass
put_timestamp = 0
try:
put_timestamp = float(info['put_timestamp'])
except ValueError:
pass
if delete_timestamp < (time.time() - self.reclaim_age) and \
delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
with lock_parent_directory(object_file):
shutil.rmtree(os.path.dirname(object_file), True)
self.stats['remove'] += 1
return
responses = []
nodes = self.ring.get_part_nodes(int(partition))
shouldbehere = bool([n for n in nodes if n['id'] == node_id])
repl_nodes = [n for n in nodes if n['id'] != node_id]
more_nodes = self.ring.get_more_nodes(int(partition))
for node in repl_nodes:
success = False
try:
success = self._repl_to_node(node, broker, partition, info)
except DriveNotMounted:
repl_nodes.append(more_nodes.next())
self.logger.error('ERROR Remote drive not mounted %s' % node)
except:
self.logger.exception('ERROR syncing %s with node %s' %
(object_file, node))
self.stats['success' if success else 'failure'] += 1
responses.append(success)
if not shouldbehere and all(responses):
# If the db shouldn't be on this node and has been successfully
# synced to all of its peers, it can be removed.
with lock_parent_directory(object_file):
shutil.rmtree(os.path.dirname(object_file), True)
self.stats['remove'] += 1
def roundrobin_datadirs(self, datadirs):
"""
Generator to walk the data dirs in a round robin manner, evenly
hitting each device on the system.
:param datadirs: a list of paths to walk
"""
def walk_datadir(datadir, node_id):
partitions = os.listdir(datadir)
random.shuffle(partitions)
for partition in partitions:
part_dir = os.path.join(datadir, partition)
for root, dirs, files in os.walk(part_dir, topdown=False):
for fname in (f for f in files if f.endswith('.db')):
object_file = os.path.join(root, fname)
yield (partition, object_file, node_id)
its = [walk_datadir(datadir, node_id) for datadir, node_id in datadirs]
while its:
for it in its:
try:
yield it.next()
except StopIteration:
its.remove(it)
def replicate_once(self):
"""Run a replication pass once."""
self._zero_stats()
dirs = []
ips = whataremyips()
if not ips:
self.logger.error('ERROR Failed to get my own IPs?')
return
for node in self.ring.devs:
if node and node['ip'] in ips and node['port'] == self.port:
if self.mount_check and not os.path.ismount(
os.path.join(self.root, node['device'])):
self.logger.warn(
'Skipping %(device)s as it is not mounted' % node)
continue
unlink_older_than(
os.path.join(self.root, node['device'], 'tmp'),
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
dirs.append((datadir, node['id']))
self.logger.info('Beginning replication run')
for part, object_file, node_id in self.roundrobin_datadirs(dirs):
self.cpool.spawn_n(
self._replicate_object, part, object_file, node_id)
self.cpool.waitall()
self.logger.info('Replication run OVER')
self._report_stats()
def replicate_forever(self):
"""
Replicate dbs under the given root in an infinite loop.
"""
while True:
try:
self.replicate_once()
except:
self.logger.exception('ERROR trying to replicate')
sleep(self.run_pause)
class ReplicatorRpc(object):
"""Handle Replication RPC calls. TODO: redbo document please :)"""
def __init__(self, root, datadir, broker_class, mount_check=True):
self.root = root
self.datadir = datadir
self.broker_class = broker_class
self.mount_check = mount_check
def dispatch(self, post_args, args):
if not hasattr(args, 'pop'):
return HTTPBadRequest(body='Invalid object type')
op = args.pop(0)
drive, partition, hsh = post_args
if self.mount_check and \
not os.path.ismount(os.path.join(self.root, drive)):
return Response(status='507 %s is not mounted' % drive)
db_file = os.path.join(self.root, drive,
storage_directory(self.datadir, partition, hsh), hsh + '.db')
if op == 'rsync_then_merge':
return self.rsync_then_merge(drive, db_file, args)
if op == 'complete_rsync':
return self.complete_rsync(drive, db_file, args)
else:
# someone might be about to rsync a db to us,
# make sure there's a tmp dir to receive it.
mkdirs(os.path.join(self.root, drive, 'tmp'))
if not os.path.exists(db_file):
return HTTPNotFound()
return getattr(self, op)(self.broker_class(db_file), args)
def sync(self, broker, args):
(remote_sync, hash_, id_, created_at, put_timestamp,
delete_timestamp) = args
try:
info = broker.get_replication_info()
except Exception, e:
if 'no such table' in str(e):
# TODO find a real logger
print "Quarantining DB %s" % broker.db_file
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
if info['put_timestamp'] != put_timestamp or \
info['created_at'] != created_at or \
info['delete_timestamp'] != delete_timestamp:
broker.merge_timestamps(
created_at, put_timestamp, delete_timestamp)
info['point'] = broker.get_sync(id_)
if hash_ == info['hash'] and info['point'] < remote_sync:
broker.merge_syncs([{'remote_id': id_,
'sync_point': remote_sync}])
info['point'] = remote_sync
return Response(simplejson.dumps(info))
def merge_syncs(self, broker, args):
broker.merge_syncs(args[0])
return HTTPAccepted()
def merge_items(self, broker, args):
broker.merge_items(args[0], args[1])
return HTTPAccepted()
def complete_rsync(self, drive, db_file, args):
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
if os.path.exists(db_file):
return HTTPNotFound()
if not os.path.exists(old_filename):
return HTTPNotFound()
broker = self.broker_class(old_filename)
broker.newid(args[0])
renamer(old_filename, db_file)
return HTTPNoContent()
def rsync_then_merge(self, drive, db_file, args):
old_filename = os.path.join(self.root, drive, 'tmp', args[0])
if not os.path.exists(db_file) or not os.path.exists(old_filename):
return HTTPNotFound()
new_broker = self.broker_class(old_filename)
existing_broker = self.broker_class(db_file)
point = -1
objects = existing_broker.get_items_since(point, 1000)
while len(objects):
new_broker.merge_items(objects)
point = objects[-1]['ROWID']
objects = existing_broker.get_items_since(point, 1000)
sleep()
new_broker.newid(args[0])
renamer(old_filename, db_file)
return HTTPNoContent()

View File

@ -0,0 +1,303 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Internal client library for making calls directly to the servers rather than
through the proxy.
"""
import socket
from httplib import HTTPException
from time import time
from urllib import quote as _quote, unquote
from eventlet import sleep, Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.client import ClientException, json_loads
from swift.common.utils import normalize_timestamp
def quote(value, safe='/'):
if isinstance(value, unicode):
value = value.encode('utf8')
return _quote(value, safe)
def direct_head_container(node, part, account, container, conn_timeout=5,
response_timeout=15):
"""
Request container information directly from the container server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: tuple of (object count, bytes used)
"""
path = '/%s/%s' % (account, container)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'HEAD', path)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Container server %s:%s direct HEAD %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
return int(resp.getheader('x-container-object-count')), \
int(resp.getheader('x-container-bytes-used'))
def direct_get_container(node, part, account, container, marker=None,
limit=None, prefix=None, delimiter=None,
conn_timeout=5, response_timeout=15):
"""
Get container listings directly from the container server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param marker: marker query
:param limit: query limit
:param prefix: prefix query
:param delimeter: delimeter for the query
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: list of objects
"""
path = '/%s/%s' % (account, container)
qs = 'format=json'
if marker:
qs += '&marker=%s' % quote(marker)
if limit:
qs += '&limit=%d' % limit
if prefix:
qs += '&prefix=%s' % quote(prefix)
if delimiter:
qs += '&delimiter=%s' % quote(delimiter)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'GET', path, query_string='format=json')
with Timeout(response_timeout):
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
resp.read()
raise ClientException(
'Container server %s:%s direct GET %s gave stats %s' % (node['ip'],
node['port'], repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
if resp.status == 204:
resp.read()
return []
return json_loads(resp.read())
def direct_delete_container(node, part, account, container, conn_timeout=5,
response_timeout=15, headers={}):
path = '/%s/%s' % (account, container)
headers['X-Timestamp'] = normalize_timestamp(time())
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers)
with Timeout(response_timeout):
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Container server %s:%s direct DELETE %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
return resp
def direct_head_object(node, part, account, container, obj, conn_timeout=5,
response_timeout=15):
"""
Request object information directly from the object server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param obj: object name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: tuple of (content-type, object size, last modified timestamp,
etag, metadata dictionary)
"""
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'HEAD', path)
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Object server %s:%s direct HEAD %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
metadata = {}
for key, value in resp.getheaders():
if key.lower().startswith('x-object-meta-'):
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
return resp.getheader('content-type'), \
int(resp.getheader('content-length')), \
resp.getheader('last-modified'), \
resp.getheader('etag').strip('"'), \
metadata
def direct_get_object(node, part, account, container, obj, conn_timeout=5,
response_timeout=15):
"""
Get object directly from the object server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param obj: object name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: object
"""
path = '/%s/%s/%s' % (account, container, obj)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'GET', path)
with Timeout(response_timeout):
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Object server %s:%s direct GET %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
metadata = {}
for key, value in resp.getheaders():
if key.lower().startswith('x-object-meta-'):
metadata[unquote(key[len('x-object-meta-'):])] = unquote(value)
return (resp.getheader('content-type'),
int(resp.getheader('content-length')),
resp.getheader('last-modified'),
resp.getheader('etag').strip('"'),
metadata,
resp.read())
def direct_delete_object(node, part, account, container, obj,
conn_timeout=5, response_timeout=15, headers={}):
"""
Delete object directly from the object server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param obj: object name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: response from server
"""
path = '/%s/%s/%s' % (account, container, obj)
headers['X-Timestamp'] = normalize_timestamp(time())
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path, headers)
with Timeout(response_timeout):
resp = conn.getresponse()
if resp.status < 200 or resp.status >= 300:
raise ClientException(
'Object server %s:%s direct DELETE %s gave status %s' %
(node['ip'], node['port'],
repr('/%s/%s%s' % (node['device'], part, path)),
resp.status),
http_host=node['ip'], http_port=node['port'],
http_device=node['device'], http_status=resp.status,
http_reason=resp.reason)
return resp
def retry(func, *args, **kwargs):
"""
Helper function to retry a given function a number of times.
:param func: callable to be called
:param retries: number of retries
:param error_log: logger for errors
:param args: arguments to send to func
:param kwargs: keyward arguments to send to func (if retries or
error_log are sent, they will be deleted from kwargs
before sending on to func)
:returns: restult of func
"""
retries = 5
if 'retries' in kwargs:
retries = kwargs['retries']
del kwargs['retries']
error_log = None
if 'error_log' in kwargs:
error_log = kwargs['error_log']
del kwargs['error_log']
attempts = 0
backoff = 1
while attempts <= retries:
attempts += 1
try:
return attempts, func(*args, **kwargs)
except (socket.error, HTTPException, Timeout), err:
if error_log:
error_log(err)
if attempts > retries:
raise
except ClientException, err:
if error_log:
error_log(err)
if attempts > retries or err.http_status < 500 or \
err.http_status == 507 or err.http_status > 599:
raise
sleep(backoff)
backoff *= 2
# Shouldn't actually get down here, but just in case.
if args and 'ip' in args[0]:
raise ClientException('Raise too many retries',
http_host=args[0]['ip'], http_port=args[0]['port'],
http_device=args[0]['device'])
else:
raise ClientException('Raise too many retries')

View File

@ -0,0 +1,35 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from eventlet import TimeoutError
class MessageTimeout(TimeoutError):
def __init__(self, seconds=None, msg=None):
TimeoutError.__init__(self, seconds=seconds)
self.msg = msg
def __str__(self):
return '%s: %s' % (TimeoutError.__str__(self), self.msg)
class AuditException(Exception): pass
class AuthException(Exception): pass
class ChunkReadTimeout(TimeoutError): pass
class ChunkWriteTimeout(TimeoutError): pass
class ConnectionTimeout(TimeoutError): pass
class DriveNotMounted(Exception): pass
class LockTimeout(MessageTimeout): pass

View File

@ -0,0 +1,28 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from webob import Response
class HealthCheckController(object):
"""Basic controller used for monitoring."""
def __init__(self, *args, **kwargs):
pass
@classmethod
def GET(self, req):
return Response(request=req, body="OK", content_type="text/plain")
healthcheck = HealthCheckController.GET

272
swift/common/memcached.py Normal file
View File

@ -0,0 +1,272 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Lucid comes with memcached: v1.4.2. Protocol documentation for that
version is at:
http://github.com/memcached/memcached/blob/1.4.2/doc/protocol.txt
"""
import cPickle as pickle
import logging
import socket
import time
from bisect import bisect
from hashlib import md5
CONN_TIMEOUT = 0.3
IO_TIMEOUT = 2.0
PICKLE_FLAG = 1
NODE_WEIGHT = 50
PICKLE_PROTOCOL = 2
TRY_COUNT = 3
# if ERROR_LIMIT_COUNT errors occur in ERROR_LIMIT_TIME seconds, the server
# will be considered failed for ERROR_LIMIT_DURATION seconds.
ERROR_LIMIT_COUNT = 10
ERROR_LIMIT_TIME = 60
ERROR_LIMIT_DURATION = 300
def md5hash(key):
return md5(key).hexdigest()
class MemcacheRing(object):
"""
Simple, consistent-hashed memcache client.
"""
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
io_timeout=IO_TIMEOUT, tries=TRY_COUNT):
self._ring = {}
self._errors = dict(((serv, []) for serv in servers))
self._error_limited = dict(((serv, 0) for serv in servers))
for server in sorted(servers):
for i in xrange(NODE_WEIGHT):
self._ring[md5hash('%s-%s' % (server, i))] = server
self._tries = tries if tries <= len(servers) else len(servers)
self._sorted = sorted(self._ring.keys())
self._client_cache = dict(((server, []) for server in servers))
self._connect_timeout = connect_timeout
self._io_timeout = io_timeout
def _exception_occurred(self, server, e, action='talking'):
if isinstance(e, socket.timeout):
logging.error("Timeout %s to memcached: %s" % (action, server))
else:
logging.exception("Error %s to memcached: %s" % (action, server))
now = time.time()
self._errors[server].append(time.time())
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._errors[server] = [err for err in self._errors[server]
if err > now - ERROR_LIMIT_TIME]
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
self._error_limited[server] = now + ERROR_LIMIT_DURATION
logging.error('Error limiting server %s' % server)
def _get_conns(self, key):
"""
Retrieves a server conn from the pool, or connects a new one.
Chooses the server based on a consistent hash of "key".
"""
pos = bisect(self._sorted, key)
served = []
while len(served) < self._tries:
pos = (pos + 1) % len(self._sorted)
server = self._ring[self._sorted[pos]]
if server in served:
continue
served.append(server)
if self._error_limited[server] > time.time():
continue
try:
fp, sock = self._client_cache[server].pop()
yield server, fp, sock
except IndexError:
try:
host, port = server.split(':')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(self._connect_timeout)
sock.connect((host, int(port)))
sock.settimeout(self._io_timeout)
yield server, sock.makefile(), sock
except Exception, e:
self._exception_occurred(server, e, 'connecting')
def _return_conn(self, server, fp, sock):
""" Returns a server connection to the pool """
self._client_cache[server].append((fp, sock))
def set(self, key, value, serialize=True, timeout=0):
"""
Set a key/value pair in memcache
:param key: key
:param value: value
:param serialize: if True, value is pickled before sending to memcache
:param timeout: ttl in memcache
"""
key = md5hash(key)
if timeout > 0:
timeout += time.time()
flags = 0
if serialize:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' % \
(key, flags, timeout, len(value), value))
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def get(self, key):
"""
Gets the object specified by key. It will also unpickle the object
before returning if it is pickled in memcache.
:param key: key
:returns: value of the key in memcache
"""
key = md5hash(key)
value = None
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('get %s\r\n' % key)
line = fp.readline().strip().split()
while line[0].upper() != 'END':
if line[0].upper() == 'VALUE' and line[1] == key:
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
value = pickle.loads(value)
fp.readline()
line = fp.readline().strip().split()
self._return_conn(server, fp, sock)
return value
except Exception, e:
self._exception_occurred(server, e)
def incr(self, key, delta=1, timeout=0):
"""
Increments a key which has a numeric value by delta.
If the key can't be found, it's added as delta.
:param key: key
:param delta: amount to add to the value of key (or set as the value
if the key is not found)
:param timeout: ttl in memcache
"""
key = md5hash(key)
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('incr %s %s\r\n' % (key, delta))
line = fp.readline().strip().split()
if line[0].upper() == 'NOT_FOUND':
line[0] = str(delta)
sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' % \
(key, 0, timeout, len(line[0]), line[0]))
ret = int(line[0].strip())
self._return_conn(server, fp, sock)
return ret
except Exception, e:
self._exception_occurred(server, e)
def delete(self, key):
"""
Deletes a key/value pair from memcache.
:param key: key to be deleted
"""
key = md5hash(key)
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('delete %s noreply\r\n' % key)
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def set_multi(self, mapping, server_key, serialize=True, timeout=0):
"""
Sets multiple key/value pairs in memcache.
:param mapping: dictonary of keys and values to be set in memcache
:param servery_key: key to use in determining which server in the ring
is used
:param serialize: if True, value is pickled before sending to memcache
:param timeout: ttl for memcache
"""
server_key = md5hash(server_key)
if timeout > 0:
timeout += time.time()
msg = ''
for key, value in mapping.iteritems():
key = md5hash(key)
flags = 0
if serialize:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
msg += ('set %s %d %d %s noreply\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
for (server, fp, sock) in self._get_conns(server_key):
try:
sock.sendall(msg)
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
def get_multi(self, keys, server_key):
"""
Gets multiple values from memcache for the given keys.
:param keys: keys for values to be retrieved from memcache
:param servery_key: key to use in determining which server in the ring
is used
:returns: list of values
"""
server_key = md5hash(server_key)
keys = [md5hash(key) for key in keys]
for (server, fp, sock) in self._get_conns(server_key):
try:
sock.sendall('get %s\r\n' % ' '.join(keys))
line = fp.readline().strip().split()
responses = {}
while line[0].upper() != 'END':
if line[0].upper() == 'VALUE':
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
value = pickle.loads(value)
responses[line[1]] = value
fp.readline()
line = fp.readline().strip().split()
values = []
for key in keys:
if key in responses:
values.append(responses[key])
else:
values.append(None)
self._return_conn(server, fp, sock)
return values
except Exception, e:
self._exception_occurred(server, e)

490
swift/common/utils.py Normal file
View File

@ -0,0 +1,490 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Miscellaneous utility functions for use with Swift."""
import errno
import fcntl
import os
import pwd
import signal
import sys
import time
import mimetools
from hashlib import md5
from random import shuffle
from urllib import quote
from contextlib import contextmanager
import ctypes
import ctypes.util
import fcntl
import struct
import eventlet
from eventlet import greenio, GreenPool, sleep, Timeout, listen
from eventlet.green import socket, subprocess, ssl, thread, threading
from swift.common.exceptions import LockTimeout, MessageTimeout
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
import logging
logging.thread = eventlet.green.thread
logging.threading = eventlet.green.threading
logging._lock = logging.threading.RLock()
libc = ctypes.CDLL(ctypes.util.find_library('c'))
sys_fallocate = libc.fallocate
posix_fadvise = libc.posix_fadvise
# Used by hash_path to offer a bit more security when generating hashes for
# paths. It simply appends this value to all paths; guessing the hash a path
# will end up with would also require knowing this suffix.
HASH_PATH_SUFFIX = os.environ.get('SWIFT_HASH_PATH_SUFFIX', 'endcap')
def get_param(req, name, default=None):
"""
Get parameters from an HTTP request ensuring proper handling UTF-8
encoding.
:param req: Webob request object
:param name: parameter name
:param default: result to return if the parameter is not found
:returns: HTTP request parameter value
"""
value = req.str_params.get(name, default)
if value:
value.decode('utf8') # Ensure UTF8ness
return value
def fallocate(fd, size):
"""
Pre-allocate disk space for a file file.
:param fd: file descriptor
:param size: size to allocate (in bytes)
"""
if size > 0:
# 1 means "FALLOC_FL_KEEP_SIZE", which means it pre-allocates invisibly
ret = sys_fallocate(fd, 1, 0, ctypes.c_uint64(size))
# XXX: in (not very thorough) testing, errno always seems to be 0?
err = ctypes.get_errno()
if ret and err not in (0, errno.ENOSYS):
raise OSError(err, 'Unable to fallocate(%s)' % size)
def drop_buffer_cache(fd, offset, length):
"""
Drop 'buffer' cache for the given range of the given file.
:param fd: file descriptor
:param offset: start offset
:param length: length
"""
# 4 means "POSIX_FADV_DONTNEED"
ret = posix_fadvise(fd, ctypes.c_uint64(offset), ctypes.c_uint64(length), 4)
if ret != 0:
print "posix_fadvise(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret)
def normalize_timestamp(timestamp):
"""
Format a timestamp (string or numeric) into a standardized
xxxxxxxxxx.xxxxx format.
:param timestamp: unix timestamp
:returns: normalized timestamp as a string
"""
return "%016.05f" % (float(timestamp))
def mkdirs(path):
"""
Ensures the path is a directory or makes it if not. Errors if the path
exists but is a file or on permissions failure.
:param path: path to create
"""
if not os.path.isdir(path):
try:
os.makedirs(path)
except OSError, err:
if err.errno != errno.EEXIST or not os.path.isdir(path):
raise
def renamer(old, new): # pragma: no cover
"""
Attempt to fix^H^H^Hhide race conditions like empty object directories
being removed by backend processes during uploads, by retrying.
:param old: old path to be renamed
:param new: new path to be renamed to
"""
try:
mkdirs(os.path.dirname(new))
os.rename(old, new)
except OSError:
mkdirs(os.path.dirname(new))
os.rename(old, new)
def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
"""
Validate and split the given HTTP request path.
**Examples**::
['a'] = split_path('/a')
['a', None] = split_path('/a', 1, 2)
['a', 'c'] = split_path('/a/c', 1, 2)
['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True)
:param path: HTTP Request path to be split
:param minsegs: Minimum number of segments to be extracted
:param maxsegs: Maximum number of segments to be extracted
:param rest_with_last: If True, trailing data will be returned as part
of last segment. If False, and there is
trailing data, raises ValueError.
:returns: list of segments with a length of maxsegs (non-existant
segments will return as None)
"""
if not maxsegs:
maxsegs = minsegs
if minsegs > maxsegs:
raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs))
if rest_with_last:
segs = path.split('/', maxsegs)
minsegs += 1
maxsegs += 1
count = len(segs)
if segs[0] or count < minsegs or count > maxsegs or \
'' in segs[1:minsegs]:
raise ValueError('Invalid path: %s' % quote(path))
else:
minsegs += 1
maxsegs += 1
segs = path.split('/', maxsegs)
count = len(segs)
if segs[0] or count < minsegs or count > maxsegs + 1 or \
'' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]):
raise ValueError('Invalid path: %s' % quote(path))
segs = segs[1:maxsegs]
segs.extend([None] * (maxsegs - 1 - len(segs)))
return segs
class NullLogger():
"""A no-op logger for eventlet wsgi."""
def write(self, *args):
#"Logs" the args to nowhere
pass
class LoggerFileObject(object):
def __init__(self, logger):
self.logger = logger
def write(self, value):
value = value.strip()
if value:
if 'Connection reset by peer' in value:
self.logger.error('STDOUT: Connection reset by peer')
else:
self.logger.error('STDOUT: %s' % value)
def writelines(self, values):
self.logger.error('STDOUT: %s' % '#012'.join(values))
def close(self):
pass
def flush(self):
pass
def __iter__(self):
return self
def next(self):
raise IOError(errno.EBADF, 'Bad file descriptor')
def read(self, size=-1):
raise IOError(errno.EBADF, 'Bad file descriptor')
def readline(self, size=-1):
raise IOError(errno.EBADF, 'Bad file descriptor')
def tell(self):
return 0
def xreadlines(self):
return self
def drop_privileges(user):
"""
Sets the userid of the current process
:param user: User id to change privileges to
"""
user = pwd.getpwnam(user)
os.setgid(user[3])
os.setuid(user[2])
class NamedLogger(object):
"""Cheesy version of the LoggerAdapter available in Python 3"""
def __init__(self, logger, server):
self.logger = logger
self.server = server
for proxied_method in ('debug', 'info', 'log', 'warn', 'warning',
'error', 'critical'):
setattr(self, proxied_method,
self._proxy(getattr(logger, proxied_method)))
def _proxy(self, logger_meth):
def _inner_proxy(msg, *args, **kwargs):
msg = '%s %s' % (self.server, msg)
logger_meth(msg, *args, **kwargs)
return _inner_proxy
def getEffectiveLevel(self):
return self.logger.getEffectiveLevel()
def exception(self, msg, *args):
_, exc, _ = sys.exc_info()
call = self.logger.error
emsg = ''
if isinstance(exc, OSError):
if exc.errno in (errno.EIO, errno.ENOSPC):
emsg = str(exc)
else:
call = self.logger.exception
elif isinstance(exc, socket.error):
if exc.errno == errno.ECONNREFUSED:
emsg = 'Connection refused'
elif exc.errno == errno.EHOSTUNREACH:
emsg = 'Host unreachable'
else:
call = self.logger.exception
elif isinstance(exc, eventlet.Timeout):
emsg = exc.__class__.__name__
if hasattr(exc, 'seconds'):
emsg += ' (%ss)' % exc.seconds
if isinstance(exc, MessageTimeout):
if exc.msg:
emsg += ' %s' % exc.msg
else:
call = self.logger.exception
call('%s %s: %s' % (self.server, msg, emsg), *args)
def get_logger(conf, name):
"""
Get the current system logger using config settings.
**Log config and defaults**::
log_facility = LOG_LOCAL0
log_level = INFO
:param conf: Configuration dict to read settings from
:param name: Name of the logger
"""
root_logger = logging.getLogger()
if hasattr(get_logger, 'handler') and get_logger.handler:
root_logger.removeHandler(get_logger.handler)
get_logger.handler = None
if conf is None:
root_logger.setLevel(logging.INFO)
return NamedLogger(root_logger, name)
get_logger.handler = SysLogHandler(address='/dev/log',
facility=getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
SysLogHandler.LOG_LOCAL0))
root_logger.addHandler(get_logger.handler)
root_logger.setLevel(
getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO))
return NamedLogger(root_logger, name)
def whataremyips():
"""
Get the machine's ip addresses using ifconfig
:returns: list of Strings of IPv4 ip addresses
"""
proc = subprocess.Popen(['/sbin/ifconfig'], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
ret_val = proc.wait()
results = proc.stdout.read().split('\n')
return [x.split(':')[1].split()[0] for x in results if 'inet addr' in x]
def storage_directory(datadir, partition, hash):
"""
Get the storage directory
:param datadir: Base data directory
:param partition: Partition
:param hash: Account, container or object hash
:returns: Storage directory
"""
return os.path.join(datadir, partition, hash[-3:], hash)
def hash_path(account, container=None, object=None, raw_digest=False):
"""
Get the connonical hash for an account/container/object
:param account: Account
:param container: Container
:param object: Object
:param raw_digest: If True, return the raw version rather than a hex digest
:returns: hash string
"""
if object and not container:
raise ValueError('container is required if object is provided')
paths = [account]
if container:
paths.append(container)
if object:
paths.append(object)
if raw_digest:
return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).digest()
else:
return md5('/' + '/'.join(paths) + HASH_PATH_SUFFIX).hexdigest()
@contextmanager
def lock_path(directory, timeout=10):
"""
Context manager that acquires a lock on a directory. This will block until
the lock can be acquired, or the timeout time has expired (whichever occurs
first).
:param directory: directory to be locked
:param timeout: timeout (in seconds)
"""
mkdirs(directory)
fd = os.open(directory, os.O_RDONLY)
try:
with LockTimeout(timeout, directory):
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError, err:
if err.errno != errno.EAGAIN:
raise
sleep(0.01)
yield True
finally:
os.close(fd)
def lock_parent_directory(filename, timeout=10):
"""
Context manager that acquires a lock on the parent directory of the given
file path. This will block until the lock can be acquired, or the timeout
time has expired (whichever occurs first).
:param filename: file path of the parent directory to be locked
:param timeout: timeout (in seconds)
"""
return lock_path(os.path.dirname(filename))
def get_time_units(time_amount):
"""
Get a nomralized length of time in the largest unit of time (hours,
minutes, or seconds.)
:param time_amount: length of time in seconds
:returns: A touple of (length of time, unit of time) where unit of time is
one of ('h', 'm', 's')
"""
time_unit = 's'
if time_amount > 60:
time_amount /= 60
time_unit = 'm'
if time_amount > 60:
time_amount /= 60
time_unit = 'h'
return time_amount, time_unit
def compute_eta(start_time, current_value, final_value):
"""
Compute an ETA. Now only if we could also have a progress bar...
:param start_time: Unix timestamp when the operation began
:param current_value: Current value
:param final_value: Final value
:returns: ETA as a tuple of (length of time, unit of time) where unit of
time is one of ('h', 'm', 's')
"""
elapsed = time.time() - start_time
completion = (float(current_value) / final_value) or 0.00001
return get_time_units(1.0 / completion * elapsed - elapsed)
def iter_devices_partitions(devices_dir, item_type):
"""
Iterate over partitions accross all devices.
:param devices_dir: Path to devices
:param item_type: One of 'accounts', 'containers', or 'objects'
:returns: Each iteration returns a tuple of (device, partition)
"""
devices = os.listdir(devices_dir)
shuffle(devices)
devices_partitions = []
for device in devices:
partitions = os.listdir(os.path.join(devices_dir, device, item_type))
shuffle(partitions)
devices_partitions.append((device, iter(partitions)))
yielded = True
while yielded:
yielded = False
for device, partitions in devices_partitions:
try:
yield device, partitions.next()
yielded = True
except StopIteration:
pass
def unlink_older_than(path, mtime):
"""
Remove any file in a given path that that was last modified before mtime.
:param path: Path to remove file from
:mtime: Timestamp of oldest file to keep
"""
if os.path.exists(path):
for fname in os.listdir(path):
fpath = os.path.join(path, fname)
try:
if os.path.getmtime(fpath) < mtime:
os.unlink(fpath)
except OSError:
pass

164
swift/common/wsgi.py Normal file
View File

@ -0,0 +1,164 @@
# Copyright (c) 2010 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""WSGI tools for use with swift."""
import errno
import os
import signal
import sys
import time
import mimetools
import eventlet
from eventlet import greenio, GreenPool, sleep, wsgi, listen
# Hook to ensure connection resets don't blow up our servers.
# Remove with next release of Eventlet that has it in the set already.
from errno import ECONNRESET
wsgi.ACCEPT_ERRNO.add(ECONNRESET)
from eventlet.green import socket, ssl
from swift.common.utils import get_logger, drop_privileges, \
LoggerFileObject, NullLogger
def monkey_patch_mimetools():
"""
mimetools.Message defaults content-type to "text/plain"
This changes it to default to None, so we can detect missing headers.
"""
orig_parsetype = mimetools.Message.parsetype
def parsetype(self):
if not self.typeheader:
self.type = None
self.maintype = None
self.subtype = None
self.plisttext = ''
else:
orig_parsetype(self)
mimetools.Message.parsetype = parsetype
# We might be able to pull pieces of this out to test, but right now it seems
# like more work than it's worth.
def run_wsgi(app, conf, *args, **kwargs): # pragma: no cover
"""
Loads common settings from conf, then instantiates app and runs
the server using the specified number of workers.
:param app: WSGI callable
:param conf: Configuration dictionary
"""
if 'logger' in kwargs:
logger = kwargs['logger']
else:
logger = get_logger(conf, app.log_name)
# log uncaught exceptions
sys.excepthook = lambda *exc_info: \
logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
sys.stdout = sys.stderr = LoggerFileObject(logger)
try:
os.setsid()
except OSError:
no_cover = True # pass
bind_addr = (conf.get('bind_ip', '0.0.0.0'),
int(conf.get('bind_port', kwargs.get('default_port', 8080))))
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = listen(bind_addr)
if 'cert_file' in conf:
sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
keyfile=conf['key_file'])
except socket.error, err:
if err.args[0] != errno.EADDRINUSE:
raise
sleep(0.1)
if not sock:
raise Exception('Could not bind to %s:%s after trying for 30 seconds' %
bind_addr)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
worker_count = int(conf.get('workers', '1'))
drop_privileges(conf.get('user', 'swift'))
if isinstance(app, type):
# Instantiate app if it hasn't been already
app = app(conf, *args)
def run_server():
wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
eventlet.hubs.use_hub('poll')
eventlet.patcher.monkey_patch(all=False, socket=True)
monkey_patch_mimetools()
pool = GreenPool(size=1024)
try:
wsgi.server(sock, app, NullLogger(), custom_pool=pool)
except socket.error, err:
if err[0] != errno.EINVAL:
raise
pool.waitall()
# Useful for profiling [no forks].
if worker_count == 0:
run_server()
return
def kill_children(*args):
"""Kills the entire process group."""
logger.error('SIGTERM received')
signal.signal(signal.SIGTERM, signal.SIG_IGN)
running[0] = False
os.killpg(0, signal.SIGTERM)
def hup(*args):
"""Shuts down the server, but allows running requests to complete"""
logger.error('SIGHUP received')
signal.signal(signal.SIGHUP, signal.SIG_IGN)
running[0] = False
running = [True]
signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup)
children = []
while running[0]:
while len(children) < worker_count:
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
run_server()
logger.info('Child %d exiting normally' % os.getpid())
return
else:
logger.info('Started child %s' % pid)
children.append(pid)
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
logger.error('Removing dead child %s' % pid)
children.remove(pid)
except OSError, err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
greenio.shutdown_safe(sock)
sock.close()
logger.info('Exited')