Add BaseFtpStorage class fot freezer

The patch add BaseFtpStorage class fot freezer

Story: #2004332
Task: #27917
Change-Id: I5501d41dfcb19f9dac32e54770b5df5a9c14b3ab
This commit is contained in:
gecong1973 2018-11-25 17:18:37 -08:00
parent 42dd05768d
commit 75ca834282
2 changed files with 286 additions and 51 deletions

View File

@ -15,27 +15,303 @@ limitations under the License.
"""
# import errno
# import ftplib
# import os
# import shutil
# import socket
# import tempfile
from oslo_log import log
import errno
import ftplib
import json
import os
import shutil
import socket
import tempfile
from freezer.storage import fslike
# from freezer.utils import utils
from freezer.utils import utils
from oslo_log import log
CHUNK_SIZE = 32768
LOG = log.getLogger(__name__)
class FtpStorage(fslike.FsLikeStorage):
class BaseFtpStorage(fslike.FsLikeStorage):
"""
:type ftp: paramiko.SFTPClient
:type ftp: ftplib
"""
_type = 'ftpbase'
def __init__(self, storage_path, remote_pwd,
remote_username, remote_ip, port, max_segment_size):
"""
:param storage_path: directory of storage
:type storage_path: str
:return:
"""
self.remote_username = remote_username
self.remote_pwd = remote_pwd
self.remote_ip = remote_ip
self.port = port
self.ftp = None
self._validate()
self.init()
super(BaseFtpStorage, self).__init__(
storage_path=storage_path,
max_segment_size=max_segment_size)
def _validate(self):
"""
Validates if all parameters required to ssh are available.
:return: True or raises ValueError
"""
if not self.remote_ip:
raise ValueError('Please provide --ftp-host value.')
elif not self.remote_username:
raise ValueError('Please provide --ftp-username value.')
elif not self.remote_pwd:
raise ValueError('Please provide remote password.'
'--ftp-password argument.')
return True
def init(self):
pass
def _create_tempdir(self):
try:
tmpdir = tempfile.mkdtemp()
LOG.info("****mkdir****")
except Exception:
LOG.error("Unable to create a tmp directory")
raise
return tmpdir
def rmtree(self, path):
LOG.info("ftp rmtree path=%s" % path)
files = []
self.ftp.dir(path, files.append)
LOG.info('rm files=%s' % files)
for f in files:
attr = f.split()[0]
file_name = f.split()[-1]
filepath = utils.path_join(path, file_name)
if attr.startswith('d'):
self.rmtree(filepath)
else:
self.ftp.delete(filepath)
self.ftp.rmd(path)
def create_dirs(self, path):
"""Change to this directory, recursively making new folders if needed.
Returns True if any folders were created."""
LOG.info("ftp create_dirs path=%s" % path)
if path == '/':
# absolute path so change directory to root
self.ftp.cwd('/')
return
if path == '':
# top-level relative directory must exist
return
try:
self.ftp.cwd(path) # sub-directory exists
except ftplib.all_errors:
# LOG.info("ftp create dirs failed %s" % e)
dirname, basename = os.path.split(path.rstrip('/'))
LOG.info("ftp create_dirs dirname=%s basename=%s"
% (dirname, basename))
self.create_dirs(dirname) # make parent directories
self.ftp.mkd(basename) # sub-directory missing, so created it
self.ftp.cwd(basename)
return True
def write_backup(self, rich_queue, backup):
"""
Stores backup in storage
:type rich_queue: freezer.streaming.RichQueue
:type backup: freezer.storage.base.Backup
"""
try:
tmpdir = tempfile.mkdtemp()
except Exception:
LOG.error("Unable to create a tmp directory")
raise
try:
data_meta = utils.path_join(tmpdir, "data_meta")
LOG.info("ftp write data_meta %s" % data_meta)
backup = backup.copy(storage=self)
path = backup.data_path
self.create_dirs(path.rsplit('/', 1)[0])
with open(data_meta, mode='wb') as b_file:
for message in rich_queue.get_messages():
b_file.write(message)
self.put_file(data_meta, path)
finally:
shutil.rmtree(tmpdir)
def get_file(self, from_path, to_path):
LOG.info("ftp get_file from_path=%s to_path=%s" % (from_path, to_path))
try:
dir = self.ftp.pwd()
LOG.info("ftp get file dir %s" % dir)
except (ftplib.all_errors, socket.error) as e:
LOG.info("ftp get file failed %s try again" % e)
self.init()
try:
file = open(to_path, 'wb')
msg = self.ftp.retrbinary('RETR ' + from_path,
file.write, 8192)
# 226
LOG.info("FTP GET %s, ret=%s" % (from_path, msg))
except ftplib.all_errors as e:
file.close()
self.ftp.quit()
LOG.info("ftp get file error %s" % e)
raise e
file.close()
def put_file(self, from_path, to_path):
LOG.info("ftp put_file from_path=%s to_path=%s" % (from_path, to_path))
try:
dir = self.ftp.pwd()
LOG.info("ftp put file dir %s" % dir)
except (ftplib.all_errors, socket.error) as e:
LOG.info("ftp put file failed %s try again" % e)
self.init()
try:
file = open(from_path, 'rb')
msg = self.ftp.storbinary('STOR ' + to_path,
file, 8192)
# 226
LOG.info("FTP PUT %s, ret=%s" % (from_path, msg))
except ftplib.all_errors as e:
self.ftp.quit()
LOG.info("ftp put file error %s" % e)
raise e
file.close()
def listdir(self, directory):
LOG.info("ftp listdir directory=%s" % directory)
try:
# paramiko SFTPClient.listdir_attr returns
# directories in arbitarary order, so we should
# sort results of this command
ret = self.ftp.cwd(directory)
LOG.info('ftp listdir cwd ret=%s' % ret)
res = self.ftp.nlst()
LOG.info('ftp listdir res=%s' % res)
return sorted(res)
except IOError as e:
LOG.info("ftp listdir error %s" % e)
if e.errno == errno.ENOENT:
return list()
else:
raise
def open(self, path, mode):
pass
def read_metadata_file(self, path):
# files = self.ftp.mlsd(path) # 3.3 support
LOG.info("ftp read_metadta_file path=%s" % path)
tmpdir = self._create_tempdir()
try:
data_down = utils.path_join(tmpdir, "data_down")
LOG.info("read metada datadown=%s" % data_down)
self.get_file(path, data_down)
file_size = self.ftp.size(path)
data = ""
received_size = 0
with open(data_down, 'r') as reader:
reader.prefetch(file_size)
chunk = reader.read(CHUNK_SIZE)
while chunk:
received_size += len(chunk)
data += chunk
chunk = reader.read(CHUNK_SIZE)
if file_size != received_size:
raise IOError('Size mismatch: expected {} received {}'
.format(file_size, received_size))
return data
finally:
shutil.rmtree(tmpdir)
def backup_blocks(self, backup):
LOG.info("ftp backup_blocks ")
self.init()
# should recreate ssh for new process
tmpdir = self._create_tempdir()
try:
data = utils.path_join(tmpdir, "data")
LOG.info("backup_blocksa datadown=%s" % data)
self.get_file(backup.data_path, data)
with open(data, 'rb') as backup_file:
while True:
chunk = backup_file.read(self.max_segment_size)
if chunk == '':
break
if len(chunk):
yield chunk
finally:
shutil.rmtree(tmpdir)
def add_stream(self, stream, package_name, headers=None):
"""
:param stream: data
:param package_name: path
:param headers: backup metadata information
:return:
"""
tmpdir = self._create_tempdir()
try:
split = package_name.rsplit('/', 1)
# create backup_basedir
backup_basedir = "{0}/{1}".format(self.storage_path,
package_name)
self.create_dirs(backup_basedir)
# define backup_data_name
backup_basepath = "{0}/{1}".format(backup_basedir,
split[0])
backup_metadata = "%s/metadata" % backup_basedir
# write backup to backup_basepath
data_backup = utils.path_join(tmpdir, "data_backup")
with open(data_backup, 'wb') as backup_file:
for el in stream:
backup_file.write(el)
self.put_file(data_backup, backup_basepath)
# write data matadata to backup_metadata
metadata = utils.path_join(tmpdir, "metadata")
with open(metadata, 'wb') as backup_meta:
backup_meta.write(json.dumps(headers))
self.put_file(metadata, backup_metadata)
finally:
shutil.rmtree(tmpdir)
class FtpStorage(BaseFtpStorage):
"""
:type ftp: ftplib.FTP()
"""
_type = 'ftp'
def __init__(self, storage_path, remote_pwd,
remote_username, remote_ip, port, max_segment_size):
"""
:param storage_path: directory of storage
:type storage_path: str
:return:
"""
pass
class FtpsStorage(BaseFtpStorage):
"""
:type ftps: ftplib.FTP_TLS()
"""
_type = 'ftps'
def __init__(self, storage_path, remote_pwd,
remote_username, remote_ip, port, max_segment_size):
"""
:param storage_path: directory of storage
:type storage_path: str
:return:
"""
pass

View File

@ -1,41 +0,0 @@
"""
(c) Copyright 2018 ZTE Corporation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# import errno
# import ftplib
# import os
# import shutil
# import socket
# import tempfile
from oslo_log import log
from freezer.storage import fslike
# from freezer.utils import utils
CHUNK_SIZE = 32768
LOG = log.getLogger(__name__)
class FtpsStorage(fslike.FsLikeStorage):
"""
:type ftps: paramiko.SFTPClient
"""
_type = 'ftps'
def __init__(self, storage_path, remote_pwd,
remote_username, remote_ip, port, max_segment_size):
pass