328 lines
12 KiB
Python
Executable File
328 lines
12 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# NOTE: XenServer still only supports Python 2.4 in it's dom0 userspace
|
|
# which means the Nova xenapi plugins must use only Python 2.4 features
|
|
|
|
# TODO(sfinucan): Resolve all 'noqa' items once the above is no longer true
|
|
|
|
"""Download images via BitTorrent."""
|
|
|
|
import errno
|
|
import inspect
|
|
import os
|
|
import random
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
|
|
import libtorrent
|
|
import urllib2
|
|
|
|
import utils
|
|
|
|
import pluginlib_nova
|
|
|
|
|
|
pluginlib_nova.configure_logging('bittorrent')
|
|
logging = pluginlib_nova.logging
|
|
|
|
# Taken from units since we don't pull down full library
|
|
Mi = 1024 ** 2
|
|
DEFAULT_TORRENT_CACHE = '/images/torrents'
|
|
DEFAULT_SEED_CACHE = '/images/seeds'
|
|
SEEDER_PROCESS = '_bittorrent_seeder.py'
|
|
DEFAULT_MMA = int(libtorrent.bandwidth_mixed_algo_t.prefer_tcp)
|
|
DEFAULT_MORQ = 400
|
|
DEFAULT_MQDB = 8 * Mi
|
|
DEFAULT_MQDBLW = 0
|
|
|
|
|
|
def _make_torrent_cache():
|
|
torrent_cache_path = os.environ.get(
|
|
'TORRENT_CACHE', DEFAULT_TORRENT_CACHE)
|
|
|
|
if not os.path.exists(torrent_cache_path):
|
|
os.mkdir(torrent_cache_path)
|
|
|
|
return torrent_cache_path
|
|
|
|
|
|
def _fetch_torrent_file(torrent_cache_path, image_id, torrent_url):
|
|
torrent_path = os.path.join(
|
|
torrent_cache_path, image_id + '.torrent')
|
|
|
|
if not os.path.exists(torrent_path):
|
|
logging.info("Downloading %s" % torrent_url)
|
|
|
|
# Write contents to temporary path to ensure we don't have partially
|
|
# completed files in the cache.
|
|
temp_directory = tempfile.mkdtemp(dir=torrent_cache_path)
|
|
try:
|
|
temp_path = os.path.join(
|
|
temp_directory, os.path.basename(torrent_path))
|
|
temp_file = open(temp_path, 'wb')
|
|
try:
|
|
remote_torrent_file = urllib2.urlopen(torrent_url)
|
|
shutil.copyfileobj(remote_torrent_file, temp_file)
|
|
finally:
|
|
temp_file.close()
|
|
|
|
os.rename(temp_path, torrent_path)
|
|
finally:
|
|
shutil.rmtree(temp_directory)
|
|
|
|
return torrent_path
|
|
|
|
|
|
def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed):
|
|
"""Delete any torrent files that haven't been accessed recently."""
|
|
if not torrent_max_last_accessed:
|
|
logging.debug("Reaping old torrent files disabled, skipping...")
|
|
return
|
|
|
|
logging.debug("Preparing to reap old torrent files,"
|
|
" torrent_max_last_accessed=%d" % torrent_max_last_accessed)
|
|
|
|
for fname in os.listdir(torrent_cache_path):
|
|
torrent_path = os.path.join(torrent_cache_path, fname)
|
|
last_accessed = time.time() - os.path.getatime(torrent_path)
|
|
if last_accessed > torrent_max_last_accessed:
|
|
logging.debug("Reaping '%s', last_accessed=%d" % (
|
|
torrent_path, last_accessed))
|
|
utils.delete_if_exists(torrent_path)
|
|
|
|
|
|
def _download(torrent_path, save_as_path, torrent_listen_port_start,
|
|
torrent_listen_port_end, torrent_download_stall_cutoff):
|
|
session = libtorrent.session()
|
|
session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
|
|
|
|
mixed_mode_algorithm = os.environ.get(
|
|
'DEFAULT_MIXED_MODE_ALGORITHM', DEFAULT_MMA)
|
|
max_out_request_queue = os.environ.get(
|
|
'DEFAULT_MAX_OUT_REQUEST_QUEUE', DEFAULT_MORQ)
|
|
max_queued_disk_bytes = os.environ.get(
|
|
'DEFAULT_MAX_QUEUED_DISK_BYTES', DEFAULT_MQDB)
|
|
max_queued_disk_bytes_low_watermark = os.environ.get(
|
|
'DEFAULT_MAX_QUEUED_DISK_BYTES_LOW_WATERMARK', DEFAULT_MQDBLW)
|
|
|
|
session_opts = {'mixed_mode_algorithm': mixed_mode_algorithm,
|
|
'max_queued_disk_bytes': max_queued_disk_bytes,
|
|
'max_out_request_queue': max_out_request_queue,
|
|
'max_queued_disk_bytes_low_watermark':
|
|
max_queued_disk_bytes_low_watermark}
|
|
session.set_settings(session_opts)
|
|
info = libtorrent.torrent_info(
|
|
libtorrent.bdecode(open(torrent_path, 'rb').read()))
|
|
|
|
torrent = session.add_torrent(
|
|
info, save_as_path,
|
|
storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
|
|
|
|
try:
|
|
last_progress = 0
|
|
last_progress_updated = time.time()
|
|
|
|
log_time = 0
|
|
while not torrent.is_seed():
|
|
s = torrent.status()
|
|
|
|
progress = s.progress * 100
|
|
|
|
if progress != last_progress:
|
|
last_progress = progress
|
|
last_progress_updated = time.time()
|
|
|
|
stall_duration = time.time() - last_progress_updated
|
|
if stall_duration > torrent_download_stall_cutoff:
|
|
logging.error(
|
|
"Download stalled: stall_duration=%d,"
|
|
" torrent_download_stall_cutoff=%d" % (
|
|
stall_duration, torrent_download_stall_cutoff))
|
|
raise Exception("Bittorrent download stall detected, bailing!")
|
|
|
|
log_time += 1
|
|
if log_time % 10 == 0:
|
|
logging.debug(
|
|
'%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)'
|
|
' %s %s' % (progress, s.download_rate / 1000,
|
|
s.upload_rate / 1000, s.num_peers, s.state,
|
|
torrent_path))
|
|
time.sleep(1)
|
|
finally:
|
|
session.remove_torrent(torrent)
|
|
|
|
logging.debug("Download of '%s' finished" % torrent_path)
|
|
|
|
|
|
def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
|
|
torrent_max_seeder_processes_per_host):
|
|
if not torrent_seed_duration:
|
|
logging.debug("Seeding disabled, skipping...")
|
|
return False
|
|
|
|
if os.path.exists(seed_path):
|
|
logging.debug("Seed is already present, skipping....")
|
|
return False
|
|
|
|
rand = random.random()
|
|
if rand > torrent_seed_chance:
|
|
logging.debug("%.2f > %.2f, seeding randomly skipping..." % (
|
|
rand, torrent_seed_chance))
|
|
return False
|
|
|
|
num_active_seeders = len(list(_active_seeder_processes()))
|
|
if (torrent_max_seeder_processes_per_host >= 0 and
|
|
num_active_seeders >= torrent_max_seeder_processes_per_host):
|
|
logging.debug("max number of seeder processes for this host reached"
|
|
" (%d), skipping..." %
|
|
torrent_max_seeder_processes_per_host)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def _seed(torrent_path, seed_cache_path, torrent_seed_duration,
|
|
torrent_listen_port_start, torrent_listen_port_end):
|
|
plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe()))
|
|
seeder_path = os.path.join(plugin_path, SEEDER_PROCESS)
|
|
seed_cmd = map(str, [seeder_path, torrent_path, seed_cache_path,
|
|
torrent_seed_duration, torrent_listen_port_start,
|
|
torrent_listen_port_end])
|
|
utils.run_command(seed_cmd)
|
|
|
|
|
|
def _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
|
|
torrent_seed_duration, torrent_seed_chance,
|
|
torrent_listen_port_start, torrent_listen_port_end,
|
|
torrent_max_seeder_processes_per_host):
|
|
seed_filename = os.path.basename(tarball_path)
|
|
seed_path = os.path.join(seed_cache_path, seed_filename)
|
|
|
|
if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
|
|
torrent_max_seeder_processes_per_host):
|
|
logging.debug("Preparing to seed '%s' for %d secs" % (
|
|
seed_path, torrent_seed_duration))
|
|
utils._rename(tarball_path, seed_path)
|
|
|
|
# Daemonize and seed the image
|
|
_seed(torrent_path, seed_cache_path, torrent_seed_duration,
|
|
torrent_listen_port_start, torrent_listen_port_end)
|
|
else:
|
|
utils.delete_if_exists(tarball_path)
|
|
|
|
|
|
def _extract_tarball(tarball_path, staging_path):
|
|
"""Extract the tarball into the staging directory."""
|
|
tarball_fileobj = open(tarball_path, 'rb')
|
|
try:
|
|
utils.extract_tarball(tarball_fileobj, staging_path)
|
|
finally:
|
|
tarball_fileobj.close()
|
|
|
|
|
|
def _active_seeder_processes():
|
|
"""Yields command-line of active seeder processes.
|
|
|
|
Roughly equivalent to performing ps | grep _bittorrent_seeder
|
|
"""
|
|
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
|
|
for pid in pids:
|
|
try:
|
|
cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
|
|
except IOError, e: # noqa
|
|
if e.errno != errno.ENOENT:
|
|
raise
|
|
|
|
if SEEDER_PROCESS in cmdline:
|
|
yield cmdline
|
|
|
|
|
|
def _reap_finished_seeds(seed_cache_path):
|
|
"""Delete any cached seeds where the seeder process has died."""
|
|
logging.debug("Preparing to reap finished seeds")
|
|
missing = {}
|
|
for fname in os.listdir(seed_cache_path):
|
|
seed_path = os.path.join(seed_cache_path, fname)
|
|
missing[seed_path] = None
|
|
|
|
for cmdline in _active_seeder_processes():
|
|
for seed_path in missing.keys():
|
|
seed_filename = os.path.basename(seed_path)
|
|
if seed_filename in cmdline:
|
|
del missing[seed_path]
|
|
|
|
for seed_path in missing:
|
|
logging.debug("Reaping cached seed '%s'" % seed_path)
|
|
utils.delete_if_exists(seed_path)
|
|
|
|
|
|
def _make_seed_cache():
|
|
seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE)
|
|
if not os.path.exists(seed_cache_path):
|
|
os.mkdir(seed_cache_path)
|
|
return seed_cache_path
|
|
|
|
|
|
def download_vhd(session, image_id, torrent_url, torrent_seed_duration,
|
|
torrent_seed_chance, torrent_max_last_accessed,
|
|
torrent_listen_port_start, torrent_listen_port_end,
|
|
torrent_download_stall_cutoff, uuid_stack, sr_path,
|
|
torrent_max_seeder_processes_per_host):
|
|
"""Download an image from BitTorrent, unbundle it, and then deposit the
|
|
VHDs into the storage repository
|
|
"""
|
|
seed_cache_path = _make_seed_cache()
|
|
torrent_cache_path = _make_torrent_cache()
|
|
|
|
# Housekeeping
|
|
_reap_finished_seeds(seed_cache_path)
|
|
_reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed)
|
|
|
|
torrent_path = _fetch_torrent_file(
|
|
torrent_cache_path, image_id, torrent_url)
|
|
|
|
staging_path = utils.make_staging_area(sr_path)
|
|
try:
|
|
tarball_filename = os.path.basename(torrent_path).replace(
|
|
'.torrent', '')
|
|
tarball_path = os.path.join(staging_path, tarball_filename)
|
|
|
|
# Download tarball into staging area
|
|
_download(torrent_path, staging_path, torrent_listen_port_start,
|
|
torrent_listen_port_end, torrent_download_stall_cutoff)
|
|
|
|
# Extract the tarball into the staging area
|
|
_extract_tarball(tarball_path, staging_path)
|
|
|
|
# Move the VHDs from the staging area into the storage repository
|
|
vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
|
|
|
|
# Seed image for others in the swarm
|
|
_seed_if_needed(seed_cache_path, tarball_path, torrent_path,
|
|
torrent_seed_duration, torrent_seed_chance,
|
|
torrent_listen_port_start, torrent_listen_port_end,
|
|
torrent_max_seeder_processes_per_host)
|
|
finally:
|
|
utils.cleanup_staging_area(staging_path)
|
|
|
|
return vdi_list
|
|
|
|
|
|
if __name__ == '__main__':
|
|
utils.register_plugin_calls(download_vhd)
|