From 75ca834282cf9968f048bae42e28442bde97983d Mon Sep 17 00:00:00 2001 From: gecong1973 Date: Sun, 25 Nov 2018 17:18:37 -0800 Subject: [PATCH] Add BaseFtpStorage class fot freezer The patch add BaseFtpStorage class fot freezer Story: #2004332 Task: #27917 Change-Id: I5501d41dfcb19f9dac32e54770b5df5a9c14b3ab --- freezer/storage/ftp.py | 296 ++++++++++++++++++++++++++++++++++++++-- freezer/storage/ftps.py | 41 ------ 2 files changed, 286 insertions(+), 51 deletions(-) delete mode 100644 freezer/storage/ftps.py diff --git a/freezer/storage/ftp.py b/freezer/storage/ftp.py index fbaa6445..609a17e9 100644 --- a/freezer/storage/ftp.py +++ b/freezer/storage/ftp.py @@ -15,27 +15,303 @@ limitations under the License. """ -# import errno -# import ftplib -# import os -# import shutil -# import socket -# import tempfile -from oslo_log import log +import errno +import ftplib +import json +import os +import shutil +import socket +import tempfile from freezer.storage import fslike -# from freezer.utils import utils +from freezer.utils import utils +from oslo_log import log CHUNK_SIZE = 32768 LOG = log.getLogger(__name__) -class FtpStorage(fslike.FsLikeStorage): +class BaseFtpStorage(fslike.FsLikeStorage): """ - :type ftp: paramiko.SFTPClient + :type ftp: ftplib + """ + _type = 'ftpbase' + + def __init__(self, storage_path, remote_pwd, + remote_username, remote_ip, port, max_segment_size): + """ + :param storage_path: directory of storage + :type storage_path: str + :return: + """ + self.remote_username = remote_username + self.remote_pwd = remote_pwd + self.remote_ip = remote_ip + self.port = port + self.ftp = None + self._validate() + self.init() + super(BaseFtpStorage, self).__init__( + storage_path=storage_path, + max_segment_size=max_segment_size) + + def _validate(self): + """ + Validates if all parameters required to ssh are available. + :return: True or raises ValueError + """ + if not self.remote_ip: + raise ValueError('Please provide --ftp-host value.') + elif not self.remote_username: + raise ValueError('Please provide --ftp-username value.') + elif not self.remote_pwd: + raise ValueError('Please provide remote password.' + '--ftp-password argument.') + return True + + def init(self): + pass + + def _create_tempdir(self): + try: + tmpdir = tempfile.mkdtemp() + LOG.info("****mkdir****") + except Exception: + LOG.error("Unable to create a tmp directory") + raise + return tmpdir + + def rmtree(self, path): + LOG.info("ftp rmtree path=%s" % path) + files = [] + self.ftp.dir(path, files.append) + LOG.info('rm files=%s' % files) + for f in files: + attr = f.split()[0] + file_name = f.split()[-1] + filepath = utils.path_join(path, file_name) + if attr.startswith('d'): + self.rmtree(filepath) + else: + self.ftp.delete(filepath) + self.ftp.rmd(path) + + def create_dirs(self, path): + """Change to this directory, recursively making new folders if needed. + Returns True if any folders were created.""" + LOG.info("ftp create_dirs path=%s" % path) + if path == '/': + # absolute path so change directory to root + self.ftp.cwd('/') + return + if path == '': + # top-level relative directory must exist + return + try: + self.ftp.cwd(path) # sub-directory exists + except ftplib.all_errors: + # LOG.info("ftp create dirs failed %s" % e) + dirname, basename = os.path.split(path.rstrip('/')) + LOG.info("ftp create_dirs dirname=%s basename=%s" + % (dirname, basename)) + self.create_dirs(dirname) # make parent directories + self.ftp.mkd(basename) # sub-directory missing, so created it + self.ftp.cwd(basename) + return True + + def write_backup(self, rich_queue, backup): + """ + Stores backup in storage + :type rich_queue: freezer.streaming.RichQueue + :type backup: freezer.storage.base.Backup + """ + try: + tmpdir = tempfile.mkdtemp() + except Exception: + LOG.error("Unable to create a tmp directory") + raise + + try: + data_meta = utils.path_join(tmpdir, "data_meta") + LOG.info("ftp write data_meta %s" % data_meta) + backup = backup.copy(storage=self) + path = backup.data_path + self.create_dirs(path.rsplit('/', 1)[0]) + + with open(data_meta, mode='wb') as b_file: + for message in rich_queue.get_messages(): + b_file.write(message) + + self.put_file(data_meta, path) + finally: + shutil.rmtree(tmpdir) + + def get_file(self, from_path, to_path): + LOG.info("ftp get_file from_path=%s to_path=%s" % (from_path, to_path)) + try: + dir = self.ftp.pwd() + LOG.info("ftp get file dir %s" % dir) + except (ftplib.all_errors, socket.error) as e: + LOG.info("ftp get file failed %s try again" % e) + self.init() + try: + file = open(to_path, 'wb') + msg = self.ftp.retrbinary('RETR ' + from_path, + file.write, 8192) + # 226 + LOG.info("FTP GET %s, ret=%s" % (from_path, msg)) + except ftplib.all_errors as e: + file.close() + self.ftp.quit() + LOG.info("ftp get file error %s" % e) + raise e + file.close() + + def put_file(self, from_path, to_path): + LOG.info("ftp put_file from_path=%s to_path=%s" % (from_path, to_path)) + try: + dir = self.ftp.pwd() + LOG.info("ftp put file dir %s" % dir) + except (ftplib.all_errors, socket.error) as e: + LOG.info("ftp put file failed %s try again" % e) + self.init() + try: + file = open(from_path, 'rb') + msg = self.ftp.storbinary('STOR ' + to_path, + file, 8192) + # 226 + LOG.info("FTP PUT %s, ret=%s" % (from_path, msg)) + except ftplib.all_errors as e: + self.ftp.quit() + LOG.info("ftp put file error %s" % e) + raise e + file.close() + + def listdir(self, directory): + LOG.info("ftp listdir directory=%s" % directory) + try: + # paramiko SFTPClient.listdir_attr returns + # directories in arbitarary order, so we should + # sort results of this command + ret = self.ftp.cwd(directory) + LOG.info('ftp listdir cwd ret=%s' % ret) + res = self.ftp.nlst() + LOG.info('ftp listdir res=%s' % res) + return sorted(res) + except IOError as e: + LOG.info("ftp listdir error %s" % e) + if e.errno == errno.ENOENT: + return list() + else: + raise + + def open(self, path, mode): + pass + + def read_metadata_file(self, path): + # files = self.ftp.mlsd(path) # 3.3 support + LOG.info("ftp read_metadta_file path=%s" % path) + tmpdir = self._create_tempdir() + try: + data_down = utils.path_join(tmpdir, "data_down") + LOG.info("read metada datadown=%s" % data_down) + self.get_file(path, data_down) + file_size = self.ftp.size(path) + data = "" + received_size = 0 + with open(data_down, 'r') as reader: + reader.prefetch(file_size) + chunk = reader.read(CHUNK_SIZE) + while chunk: + received_size += len(chunk) + data += chunk + chunk = reader.read(CHUNK_SIZE) + if file_size != received_size: + raise IOError('Size mismatch: expected {} received {}' + .format(file_size, received_size)) + return data + finally: + shutil.rmtree(tmpdir) + + def backup_blocks(self, backup): + LOG.info("ftp backup_blocks ") + self.init() + # should recreate ssh for new process + tmpdir = self._create_tempdir() + try: + data = utils.path_join(tmpdir, "data") + LOG.info("backup_blocksa datadown=%s" % data) + self.get_file(backup.data_path, data) + with open(data, 'rb') as backup_file: + while True: + chunk = backup_file.read(self.max_segment_size) + if chunk == '': + break + if len(chunk): + yield chunk + finally: + shutil.rmtree(tmpdir) + + def add_stream(self, stream, package_name, headers=None): + """ + :param stream: data + :param package_name: path + :param headers: backup metadata information + :return: + """ + tmpdir = self._create_tempdir() + try: + split = package_name.rsplit('/', 1) + # create backup_basedir + backup_basedir = "{0}/{1}".format(self.storage_path, + package_name) + self.create_dirs(backup_basedir) + # define backup_data_name + backup_basepath = "{0}/{1}".format(backup_basedir, + split[0]) + backup_metadata = "%s/metadata" % backup_basedir + # write backup to backup_basepath + data_backup = utils.path_join(tmpdir, "data_backup") + with open(data_backup, 'wb') as backup_file: + for el in stream: + backup_file.write(el) + self.put_file(data_backup, backup_basepath) + # write data matadata to backup_metadata + metadata = utils.path_join(tmpdir, "metadata") + with open(metadata, 'wb') as backup_meta: + backup_meta.write(json.dumps(headers)) + self.put_file(metadata, backup_metadata) + finally: + shutil.rmtree(tmpdir) + + +class FtpStorage(BaseFtpStorage): + """ + :type ftp: ftplib.FTP() """ _type = 'ftp' def __init__(self, storage_path, remote_pwd, remote_username, remote_ip, port, max_segment_size): + """ + :param storage_path: directory of storage + :type storage_path: str + :return: + """ + pass + + +class FtpsStorage(BaseFtpStorage): + """ + :type ftps: ftplib.FTP_TLS() + """ + _type = 'ftps' + + def __init__(self, storage_path, remote_pwd, + remote_username, remote_ip, port, max_segment_size): + """ + :param storage_path: directory of storage + :type storage_path: str + :return: + """ pass diff --git a/freezer/storage/ftps.py b/freezer/storage/ftps.py deleted file mode 100644 index 1a6aeca4..00000000 --- a/freezer/storage/ftps.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -(c) Copyright 2018 ZTE Corporation. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -# import errno -# import ftplib -# import os -# import shutil -# import socket -# import tempfile -from oslo_log import log - -from freezer.storage import fslike -# from freezer.utils import utils - -CHUNK_SIZE = 32768 -LOG = log.getLogger(__name__) - - -class FtpsStorage(fslike.FsLikeStorage): - """ - :type ftps: paramiko.SFTPClient - """ - _type = 'ftps' - - def __init__(self, storage_path, remote_pwd, - remote_username, remote_ip, port, max_segment_size): - pass