Add -p option to download command.

Allow the ability to download a subset of containers (--all with -p) or
a subset of objects within a container (container name with -p).

This patch also includes a drive-by fix for "download --all" which would
not actually download any objects (for me, at least) because the object
queue got filled with "stop" messages before the container workers had
run long enough to put work in the object queue.  Doh!

I also closed up a few holes where an (unexpected, obviously) Exception
could cause the process to hang because non-daemon threads still
existed.

Change-Id: I71c6935c60282b5353badc2dfce8a935d47e3bb7
This commit is contained in:
Darrell Bishop 2013-06-26 11:41:29 -07:00
parent 1c86d62fde
commit f022aac0cf
1 changed files with 151 additions and 130 deletions

281
bin/swift
View File

@ -25,7 +25,7 @@ from os.path import basename, dirname, getmtime, getsize, isdir, join
from Queue import Queue
from random import shuffle
from sys import argv, exc_info, exit, stderr, stdout
from threading import enumerate as threading_enumerate, Thread
from threading import Thread
from time import sleep, time, gmtime, strftime
from traceback import format_exception
from urllib import quote, unquote
@ -84,16 +84,6 @@ class StopWorkerThreadSignal(object):
pass
def shutdown_worker_threads(queue, thread_list):
for thread in [t for t in thread_list if t.isAlive()]:
queue.put(StopWorkerThreadSignal())
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
class QueueFunctionThread(Thread):
def __init__(self, queue, func, *args, **kwargs):
@ -128,6 +118,24 @@ class QueueFunctionThread(Thread):
self.exc_infos.append(exc_info())
def shutdown_worker_threads(queue, thread_list):
"""
Takes a job queue and a list of associated QueueFunctionThread objects,
puts a StopWorkerThreadSignal object into the queue, and waits for the
queue to flush.
"""
for thread in [t for t in thread_list if t.isAlive()]:
queue.put(StopWorkerThreadSignal())
while any(map(QueueFunctionThread.is_alive, thread_list)):
sleep(0.05)
def immediate_exit(signum, frame):
stderr.write(" Aborted\n")
os_exit(2)
st_delete_help = '''
delete [options] --all OR delete container [options] [object] [object] ...
Deletes everything in the account (with --all), or everything in a
@ -261,47 +269,52 @@ def st_delete(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
conn = create_connection()
try:
marker = ''
while True:
containers = \
[c['name'] for c in conn.get_account(marker=marker)[1]]
if not containers:
break
for container in containers:
container_queue.put(container)
marker = containers[-1]
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
try:
if not args:
conn = create_connection()
try:
marker = ''
while True:
containers = [
c['name'] for c in conn.get_account(marker=marker)[1]]
if not containers:
break
for container in containers:
container_queue.put(container)
marker = containers[-1]
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might ' \
'have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0])
conn = create_connection()
_delete_container(args[0], conn)
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
st_download_help = '''
download --all OR download container [options] [object] [object] ...
Downloads everything in the account (with --all), or everything in a
container, or a list of objects depending on the args given. For a single
object download, you may use the -o [--output] <filename> option to
redirect the output to a specific file or if "-" then just redirect to
stdout.'''.strip('\n')
download --all [options] OR download container [options] [object] [object] ...
Downloads everything in the account (with --all), or everything in all
containers in the account matching a prefix (with --all and -p [--prefix]),
or everything in a container, or a subset of a container with -p
[--prefix], or a list of objects depending on the args given. -p or
--prefix is an option that will only download items beginning with that
prefix. For a single object download, you may use the -o [--output]
<filename> option to redirect the output to a specific file or if "-" then
just redirect to stdout.'''.strip('\n')
def st_download(parser, args, print_queue, error_queue):
@ -313,6 +326,9 @@ def st_download(parser, args, print_queue, error_queue):
'-m', '--marker', dest='marker',
default='', help='Marker to use when starting a container or '
'account download')
parser.add_option(
'-p', '--prefix', dest='prefix',
help='Will only download items beginning with the prefix')
parser.add_option(
'-o', '--output', dest='out_file', help='For a single '
'file download, stream the output to an alternate location ')
@ -426,12 +442,14 @@ def st_download(parser, args, print_queue, error_queue):
container_queue = Queue(10000)
def _download_container(container, conn):
def _download_container(container, conn, prefix=None):
try:
marker = options.marker
while True:
objects = [o['name'] for o in
conn.get_container(container, marker=marker)[1]]
objects = [
o['name'] for o in
conn.get_container(container, marker=marker,
prefix=prefix)[1]]
if not objects:
break
marker = objects[-1]
@ -455,42 +473,50 @@ def st_download(parser, args, print_queue, error_queue):
for _junk in xrange(options.container_threads)]
for thread in container_threads:
thread.start()
if not args:
conn = create_connection()
try:
marker = options.marker
while True:
containers = [c['name']
for c in conn.get_account(marker=marker)[1]]
if not containers:
break
marker = containers[-1]
shuffle(containers)
for container in containers:
container_queue.put(container)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, 'WARNING: / in container name; you might have ' \
'meant %r instead of %r.' % \
(args[0].replace('/', ' ', 1), args[0])
_download_container(args[0], create_connection())
else:
if len(args) == 2:
obj = args[1]
object_queue.put((args[0], obj, options.out_file))
# We musn't let the main thread die with an exception while non-daemonic
# threads exist or the process with hang and ignore Ctrl-C. So we catch
# anything and tidy up the threads in a finally block.
try:
if not args:
# --all case
conn = create_connection()
try:
marker = options.marker
while True:
containers = [
c['name'] for c in conn.get_account(
marker=marker, prefix=options.prefix)[1]]
if not containers:
break
marker = containers[-1]
shuffle(containers)
for container in containers:
container_queue.put(container)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
elif len(args) == 1:
if '/' in args[0]:
print >> stderr, ('WARNING: / in container name; you might '
'have meant %r instead of %r.' % (
args[0].replace('/', ' ', 1), args[0]))
_download_container(args[0], create_connection(),
options.prefix)
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
if len(args) == 2:
obj = args[1]
object_queue.put((args[0], obj, options.out_file))
else:
for obj in args[1:]:
object_queue.put((args[0], obj))
finally:
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(container_queue, container_threads)
put_errors_from_threads(container_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
def prt_bytes(bytes, human_flag):
@ -546,7 +572,7 @@ def st_list(parser, args, print_queue, error_queue):
parser.add_option(
'-d', '--delimiter', dest='delimiter',
help='Will roll up items with the given delimiter'
' (see Cloud Files general documentation for what this means)')
' (see OpenStack Swift API documentation for what this means)')
(options, args) = parse_args(parser, args)
args = args[1:]
if options.delimiter and not args:
@ -971,34 +997,37 @@ def st_upload(parser, args, print_queue, error_queue):
for _junk in xrange(options.segment_threads)]
for thread in segment_threads:
thread.start()
segment = 0
segment_start = 0
while segment_start < full_size:
segment_size = int(options.segment_size)
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
if options.use_slo:
segment_name = '%s/slo/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options.segment_size, segment)
else:
segment_name = '%s/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options.segment_size, segment)
segment_queue.put(
{'path': path, 'obj': segment_name,
'segment_start': segment_start,
'segment_size': segment_size,
'segment_index': segment,
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
shutdown_worker_threads(segment_queue, segment_threads)
if put_errors_from_threads(segment_threads, error_queue):
raise ClientException(
'Aborting manifest creation '
'because not all segments could be uploaded. %s/%s'
% (container, obj))
try:
segment = 0
segment_start = 0
while segment_start < full_size:
segment_size = int(options.segment_size)
if segment_start + segment_size > full_size:
segment_size = full_size - segment_start
if options.use_slo:
segment_name = '%s/slo/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options.segment_size, segment)
else:
segment_name = '%s/%s/%s/%s/%08d' % (
obj, put_headers['x-object-meta-mtime'],
full_size, options.segment_size, segment)
segment_queue.put(
{'path': path, 'obj': segment_name,
'segment_start': segment_start,
'segment_size': segment_size,
'segment_index': segment,
'log_line': '%s segment %s' % (obj, segment)})
segment += 1
segment_start += segment_size
finally:
shutdown_worker_threads(segment_queue, segment_threads)
if put_errors_from_threads(segment_threads,
error_queue):
raise ClientException(
'Aborting manifest creation '
'because not all segments could be uploaded. '
'%s/%s' % (container, obj))
if options.use_slo:
slo_segments = []
for thread in segment_threads:
@ -1118,19 +1147,20 @@ def st_upload(parser, args, print_queue, error_queue):
except Exception as err:
error_queue.put(
'Error trying to create container %r: %s' % (args[0], err))
try:
for arg in args[1:]:
if isdir(arg):
_upload_dir(arg)
else:
object_queue.put({'path': arg})
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
except ClientException as err:
if err.http_status != 404:
raise
error_queue.put('Account not found')
finally:
shutdown_worker_threads(object_queue, object_threads)
put_errors_from_threads(object_threads, error_queue)
def split_headers(options, prefix='', error_queue=None):
@ -1364,7 +1394,7 @@ Examples:
print item
print_thread = QueueFunctionThread(print_queue, _print)
print_thread.setDaemon(True)
print_thread.start()
error_count = 0
error_queue = Queue(10000)
@ -1377,7 +1407,7 @@ Examples:
print >> stderr, item
error_thread = QueueFunctionThread(error_queue, _error)
error_thread.setDaemon(True)
error_thread.start()
parser.usage = globals()['st_%s_help' % args[0]]
try:
@ -1385,18 +1415,9 @@ Examples:
error_queue)
except (ClientException, HTTPException, socket.error) as err:
error_queue.put(str(err))
# Let other threads start working, now start print and error thread,
# this is to prevent the main thread shutdown two thread prematurely
print_thread.start()
error_thread.start()
# If not all the worker threads have finished, then the main thread
# has to wait. Only when there are main, error and print thread left
# the main thread can proceed to finish up.
while (len(threading_enumerate()) > 3 or not error_queue.empty() or
not print_queue.empty()):
sleep(0.5)
finally:
shutdown_worker_threads(print_queue, [print_thread])
shutdown_worker_threads(error_queue, [error_thread])
if error_count:
exit(1)