Add BaseFtpStorage class fot freezer
The patch add BaseFtpStorage class fot freezer Story: #2004332 Task: #27917 Change-Id: I5501d41dfcb19f9dac32e54770b5df5a9c14b3ab
This commit is contained in:
parent
42dd05768d
commit
75ca834282
|
@ -15,27 +15,303 @@ limitations under the License.
|
|||
|
||||
"""
|
||||
|
||||
# import errno
|
||||
# import ftplib
|
||||
# import os
|
||||
# import shutil
|
||||
# import socket
|
||||
# import tempfile
|
||||
from oslo_log import log
|
||||
import errno
|
||||
import ftplib
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import tempfile
|
||||
|
||||
from freezer.storage import fslike
|
||||
# from freezer.utils import utils
|
||||
from freezer.utils import utils
|
||||
from oslo_log import log
|
||||
|
||||
CHUNK_SIZE = 32768
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class FtpStorage(fslike.FsLikeStorage):
|
||||
class BaseFtpStorage(fslike.FsLikeStorage):
|
||||
"""
|
||||
:type ftp: paramiko.SFTPClient
|
||||
:type ftp: ftplib
|
||||
"""
|
||||
_type = 'ftpbase'
|
||||
|
||||
def __init__(self, storage_path, remote_pwd,
|
||||
remote_username, remote_ip, port, max_segment_size):
|
||||
"""
|
||||
:param storage_path: directory of storage
|
||||
:type storage_path: str
|
||||
:return:
|
||||
"""
|
||||
self.remote_username = remote_username
|
||||
self.remote_pwd = remote_pwd
|
||||
self.remote_ip = remote_ip
|
||||
self.port = port
|
||||
self.ftp = None
|
||||
self._validate()
|
||||
self.init()
|
||||
super(BaseFtpStorage, self).__init__(
|
||||
storage_path=storage_path,
|
||||
max_segment_size=max_segment_size)
|
||||
|
||||
def _validate(self):
|
||||
"""
|
||||
Validates if all parameters required to ssh are available.
|
||||
:return: True or raises ValueError
|
||||
"""
|
||||
if not self.remote_ip:
|
||||
raise ValueError('Please provide --ftp-host value.')
|
||||
elif not self.remote_username:
|
||||
raise ValueError('Please provide --ftp-username value.')
|
||||
elif not self.remote_pwd:
|
||||
raise ValueError('Please provide remote password.'
|
||||
'--ftp-password argument.')
|
||||
return True
|
||||
|
||||
def init(self):
|
||||
pass
|
||||
|
||||
def _create_tempdir(self):
|
||||
try:
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
LOG.info("****mkdir****")
|
||||
except Exception:
|
||||
LOG.error("Unable to create a tmp directory")
|
||||
raise
|
||||
return tmpdir
|
||||
|
||||
def rmtree(self, path):
|
||||
LOG.info("ftp rmtree path=%s" % path)
|
||||
files = []
|
||||
self.ftp.dir(path, files.append)
|
||||
LOG.info('rm files=%s' % files)
|
||||
for f in files:
|
||||
attr = f.split()[0]
|
||||
file_name = f.split()[-1]
|
||||
filepath = utils.path_join(path, file_name)
|
||||
if attr.startswith('d'):
|
||||
self.rmtree(filepath)
|
||||
else:
|
||||
self.ftp.delete(filepath)
|
||||
self.ftp.rmd(path)
|
||||
|
||||
def create_dirs(self, path):
|
||||
"""Change to this directory, recursively making new folders if needed.
|
||||
Returns True if any folders were created."""
|
||||
LOG.info("ftp create_dirs path=%s" % path)
|
||||
if path == '/':
|
||||
# absolute path so change directory to root
|
||||
self.ftp.cwd('/')
|
||||
return
|
||||
if path == '':
|
||||
# top-level relative directory must exist
|
||||
return
|
||||
try:
|
||||
self.ftp.cwd(path) # sub-directory exists
|
||||
except ftplib.all_errors:
|
||||
# LOG.info("ftp create dirs failed %s" % e)
|
||||
dirname, basename = os.path.split(path.rstrip('/'))
|
||||
LOG.info("ftp create_dirs dirname=%s basename=%s"
|
||||
% (dirname, basename))
|
||||
self.create_dirs(dirname) # make parent directories
|
||||
self.ftp.mkd(basename) # sub-directory missing, so created it
|
||||
self.ftp.cwd(basename)
|
||||
return True
|
||||
|
||||
def write_backup(self, rich_queue, backup):
|
||||
"""
|
||||
Stores backup in storage
|
||||
:type rich_queue: freezer.streaming.RichQueue
|
||||
:type backup: freezer.storage.base.Backup
|
||||
"""
|
||||
try:
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
except Exception:
|
||||
LOG.error("Unable to create a tmp directory")
|
||||
raise
|
||||
|
||||
try:
|
||||
data_meta = utils.path_join(tmpdir, "data_meta")
|
||||
LOG.info("ftp write data_meta %s" % data_meta)
|
||||
backup = backup.copy(storage=self)
|
||||
path = backup.data_path
|
||||
self.create_dirs(path.rsplit('/', 1)[0])
|
||||
|
||||
with open(data_meta, mode='wb') as b_file:
|
||||
for message in rich_queue.get_messages():
|
||||
b_file.write(message)
|
||||
|
||||
self.put_file(data_meta, path)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def get_file(self, from_path, to_path):
|
||||
LOG.info("ftp get_file from_path=%s to_path=%s" % (from_path, to_path))
|
||||
try:
|
||||
dir = self.ftp.pwd()
|
||||
LOG.info("ftp get file dir %s" % dir)
|
||||
except (ftplib.all_errors, socket.error) as e:
|
||||
LOG.info("ftp get file failed %s try again" % e)
|
||||
self.init()
|
||||
try:
|
||||
file = open(to_path, 'wb')
|
||||
msg = self.ftp.retrbinary('RETR ' + from_path,
|
||||
file.write, 8192)
|
||||
# 226
|
||||
LOG.info("FTP GET %s, ret=%s" % (from_path, msg))
|
||||
except ftplib.all_errors as e:
|
||||
file.close()
|
||||
self.ftp.quit()
|
||||
LOG.info("ftp get file error %s" % e)
|
||||
raise e
|
||||
file.close()
|
||||
|
||||
def put_file(self, from_path, to_path):
|
||||
LOG.info("ftp put_file from_path=%s to_path=%s" % (from_path, to_path))
|
||||
try:
|
||||
dir = self.ftp.pwd()
|
||||
LOG.info("ftp put file dir %s" % dir)
|
||||
except (ftplib.all_errors, socket.error) as e:
|
||||
LOG.info("ftp put file failed %s try again" % e)
|
||||
self.init()
|
||||
try:
|
||||
file = open(from_path, 'rb')
|
||||
msg = self.ftp.storbinary('STOR ' + to_path,
|
||||
file, 8192)
|
||||
# 226
|
||||
LOG.info("FTP PUT %s, ret=%s" % (from_path, msg))
|
||||
except ftplib.all_errors as e:
|
||||
self.ftp.quit()
|
||||
LOG.info("ftp put file error %s" % e)
|
||||
raise e
|
||||
file.close()
|
||||
|
||||
def listdir(self, directory):
|
||||
LOG.info("ftp listdir directory=%s" % directory)
|
||||
try:
|
||||
# paramiko SFTPClient.listdir_attr returns
|
||||
# directories in arbitarary order, so we should
|
||||
# sort results of this command
|
||||
ret = self.ftp.cwd(directory)
|
||||
LOG.info('ftp listdir cwd ret=%s' % ret)
|
||||
res = self.ftp.nlst()
|
||||
LOG.info('ftp listdir res=%s' % res)
|
||||
return sorted(res)
|
||||
except IOError as e:
|
||||
LOG.info("ftp listdir error %s" % e)
|
||||
if e.errno == errno.ENOENT:
|
||||
return list()
|
||||
else:
|
||||
raise
|
||||
|
||||
def open(self, path, mode):
|
||||
pass
|
||||
|
||||
def read_metadata_file(self, path):
|
||||
# files = self.ftp.mlsd(path) # 3.3 support
|
||||
LOG.info("ftp read_metadta_file path=%s" % path)
|
||||
tmpdir = self._create_tempdir()
|
||||
try:
|
||||
data_down = utils.path_join(tmpdir, "data_down")
|
||||
LOG.info("read metada datadown=%s" % data_down)
|
||||
self.get_file(path, data_down)
|
||||
file_size = self.ftp.size(path)
|
||||
data = ""
|
||||
received_size = 0
|
||||
with open(data_down, 'r') as reader:
|
||||
reader.prefetch(file_size)
|
||||
chunk = reader.read(CHUNK_SIZE)
|
||||
while chunk:
|
||||
received_size += len(chunk)
|
||||
data += chunk
|
||||
chunk = reader.read(CHUNK_SIZE)
|
||||
if file_size != received_size:
|
||||
raise IOError('Size mismatch: expected {} received {}'
|
||||
.format(file_size, received_size))
|
||||
return data
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def backup_blocks(self, backup):
|
||||
LOG.info("ftp backup_blocks ")
|
||||
self.init()
|
||||
# should recreate ssh for new process
|
||||
tmpdir = self._create_tempdir()
|
||||
try:
|
||||
data = utils.path_join(tmpdir, "data")
|
||||
LOG.info("backup_blocksa datadown=%s" % data)
|
||||
self.get_file(backup.data_path, data)
|
||||
with open(data, 'rb') as backup_file:
|
||||
while True:
|
||||
chunk = backup_file.read(self.max_segment_size)
|
||||
if chunk == '':
|
||||
break
|
||||
if len(chunk):
|
||||
yield chunk
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def add_stream(self, stream, package_name, headers=None):
|
||||
"""
|
||||
:param stream: data
|
||||
:param package_name: path
|
||||
:param headers: backup metadata information
|
||||
:return:
|
||||
"""
|
||||
tmpdir = self._create_tempdir()
|
||||
try:
|
||||
split = package_name.rsplit('/', 1)
|
||||
# create backup_basedir
|
||||
backup_basedir = "{0}/{1}".format(self.storage_path,
|
||||
package_name)
|
||||
self.create_dirs(backup_basedir)
|
||||
# define backup_data_name
|
||||
backup_basepath = "{0}/{1}".format(backup_basedir,
|
||||
split[0])
|
||||
backup_metadata = "%s/metadata" % backup_basedir
|
||||
# write backup to backup_basepath
|
||||
data_backup = utils.path_join(tmpdir, "data_backup")
|
||||
with open(data_backup, 'wb') as backup_file:
|
||||
for el in stream:
|
||||
backup_file.write(el)
|
||||
self.put_file(data_backup, backup_basepath)
|
||||
# write data matadata to backup_metadata
|
||||
metadata = utils.path_join(tmpdir, "metadata")
|
||||
with open(metadata, 'wb') as backup_meta:
|
||||
backup_meta.write(json.dumps(headers))
|
||||
self.put_file(metadata, backup_metadata)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
class FtpStorage(BaseFtpStorage):
|
||||
"""
|
||||
:type ftp: ftplib.FTP()
|
||||
"""
|
||||
_type = 'ftp'
|
||||
|
||||
def __init__(self, storage_path, remote_pwd,
|
||||
remote_username, remote_ip, port, max_segment_size):
|
||||
"""
|
||||
:param storage_path: directory of storage
|
||||
:type storage_path: str
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class FtpsStorage(BaseFtpStorage):
|
||||
"""
|
||||
:type ftps: ftplib.FTP_TLS()
|
||||
"""
|
||||
_type = 'ftps'
|
||||
|
||||
def __init__(self, storage_path, remote_pwd,
|
||||
remote_username, remote_ip, port, max_segment_size):
|
||||
"""
|
||||
:param storage_path: directory of storage
|
||||
:type storage_path: str
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
"""
|
||||
(c) Copyright 2018 ZTE Corporation.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
"""
|
||||
|
||||
# import errno
|
||||
# import ftplib
|
||||
# import os
|
||||
# import shutil
|
||||
# import socket
|
||||
# import tempfile
|
||||
from oslo_log import log
|
||||
|
||||
from freezer.storage import fslike
|
||||
# from freezer.utils import utils
|
||||
|
||||
CHUNK_SIZE = 32768
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class FtpsStorage(fslike.FsLikeStorage):
|
||||
"""
|
||||
:type ftps: paramiko.SFTPClient
|
||||
"""
|
||||
_type = 'ftps'
|
||||
|
||||
def __init__(self, storage_path, remote_pwd,
|
||||
remote_username, remote_ip, port, max_segment_size):
|
||||
pass
|
Loading…
Reference in New Issue