Implement upgrade-ceph in Python
Change-Id: Id46d1fc6d016f5bab780c891bc343dbdcdd67a65
This commit is contained in:
parent
2ca66b0005
commit
0570c0d8e2
|
@ -0,0 +1,168 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import tarfile
|
||||
|
||||
from cliff import command as cmd
|
||||
from fuelclient.objects import environment as environment_obj
|
||||
|
||||
from octane import magic_consts
|
||||
from octane.util import env as env_util
|
||||
from octane.util import node as node_util
|
||||
from octane.util import ssh
|
||||
|
||||
|
||||
def short_hostname(hostname):
|
||||
return hostname.partition('.')[0]
|
||||
|
||||
|
||||
def remove_mask(ip_addr):
|
||||
return ip_addr.partition('/')[0]
|
||||
|
||||
|
||||
def replace_addresses(conf, hostnames, mgmt_ips):
|
||||
mon_initial_members = ' '.join(hostnames)
|
||||
mon_host = ' '.join(mgmt_ips)
|
||||
|
||||
conf = re.sub(r'\n(mon_initial_members\s+=\s+)[-.\w\s]*\n',
|
||||
"\n\g<1>{0}\n".format(mon_initial_members),
|
||||
conf)
|
||||
conf = re.sub(r'\n(mon_host\s+=\s+)[-.\w\s]*\n',
|
||||
"\n\g<1>{0}\n".format(mon_host),
|
||||
conf)
|
||||
return conf
|
||||
|
||||
|
||||
def get_fsid(conf):
|
||||
match = re.search(r'\nfsid\s+=\s+([-.\w]+)\s*\n', conf)
|
||||
if match is not None:
|
||||
return match.group(1)
|
||||
|
||||
|
||||
def replace_host(conf, hostname):
|
||||
conf = re.sub(r'\n(host\s+=\s+)[-.\w\s]*\n',
|
||||
"\n\g<1>{0}\n".format(hostname),
|
||||
conf)
|
||||
return conf
|
||||
|
||||
|
||||
def import_bootstrap_osd(node):
|
||||
ssh.call(['ceph', 'auth', 'import', '-i',
|
||||
'/root/ceph.bootstrap-osd.keyring'], node=node)
|
||||
ssh.call(['ceph', 'auth', 'caps', 'client.bootstrap-osd', 'mon',
|
||||
"'allow profile bootstrap-osd'"], node=node)
|
||||
|
||||
|
||||
def get_ceph_conf_filename(node):
|
||||
cmd = [
|
||||
'bash', '-c',
|
||||
'pgrep ceph-mon | xargs -I{} cat /proc/{}/cmdline',
|
||||
]
|
||||
cmdlines = ssh.call_output(cmd, node=node)
|
||||
if cmdlines:
|
||||
cmdline = cmdlines.split('\n')[0].split('\0')
|
||||
for i, value in enumerate(cmdline):
|
||||
if value == '-c' and i < len(cmdline):
|
||||
return cmdline[i + 1]
|
||||
return '/etc/ceph/ceph.conf'
|
||||
|
||||
|
||||
def ceph_set_new_mons(seed_env, filename, conf_filename, db_path):
|
||||
nodes = list(env_util.get_controllers(seed_env))
|
||||
hostnames = map(short_hostname, node_util.get_hostnames(nodes))
|
||||
mgmt_ips = map(remove_mask, node_util.get_ips('management', nodes))
|
||||
|
||||
with contextlib.closing(tarfile.open(filename)) as f:
|
||||
conf = f.extractfile(conf_filename).read()
|
||||
conf = replace_addresses(conf, hostnames, mgmt_ips)
|
||||
|
||||
fsid = get_fsid(conf)
|
||||
monmaptool_cmd = ['monmaptool', '--fsid', fsid, '--clobber', '--create']
|
||||
for node_hostname, node_ip in itertools.izip(hostnames, mgmt_ips):
|
||||
monmaptool_cmd += ['--add', node_hostname, node_ip]
|
||||
|
||||
for node, node_hostname in itertools.izip(nodes, hostnames):
|
||||
node_db_path = "/var/lib/ceph/mon/ceph-{0}".format(node_hostname)
|
||||
node_conf = replace_host(conf, node_hostname)
|
||||
try:
|
||||
ssh.call(['stop', 'ceph-mon', "id={0}".format(node_hostname)],
|
||||
node=node)
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
ssh.call(['rm', '-rf', node_db_path], node=node)
|
||||
node_util.untar_files(filename, node)
|
||||
sftp = ssh.sftp(node)
|
||||
with sftp.open(conf_filename, 'w') as f:
|
||||
f.write(node_conf)
|
||||
ssh.call(['mv', db_path, node_db_path], node=node)
|
||||
|
||||
sysvinit = os.path.join(node_db_path, 'sysvinit')
|
||||
try:
|
||||
sftp.remove(sysvinit)
|
||||
except IOError:
|
||||
pass
|
||||
upstart = os.path.join(node_db_path, 'upstart')
|
||||
sftp.open(upstart, 'w').close()
|
||||
|
||||
with ssh.tempdir(node) as tempdir:
|
||||
monmap_filename = os.path.join(tempdir, 'monmap')
|
||||
ssh.call(monmaptool_cmd + [monmap_filename], node=node)
|
||||
ssh.call(['ceph-mon', '-i', node_hostname, '--inject-monmap',
|
||||
monmap_filename], node=node)
|
||||
|
||||
for node, node_hostname in itertools.izip(nodes, hostnames):
|
||||
ssh.call(['start', 'ceph-mon', "id={0}".format(node_hostname)],
|
||||
node=node)
|
||||
import_bootstrap_osd(nodes[0])
|
||||
|
||||
|
||||
def extract_mon_conf_files(orig_env, tar_filename):
|
||||
controller = env_util.get_one_controller(orig_env)
|
||||
conf_filename = get_ceph_conf_filename(controller)
|
||||
conf_dir = os.path.dirname(conf_filename)
|
||||
hostname = short_hostname(
|
||||
node_util.get_hostname_remotely(controller))
|
||||
db_path = "/var/lib/ceph/mon/ceph-{0}".format(hostname)
|
||||
node_util.tar_files(tar_filename, controller, conf_dir, db_path)
|
||||
return conf_filename, db_path
|
||||
|
||||
|
||||
def upgrade_ceph(orig_id, seed_id):
|
||||
orig_env = environment_obj.Environment(orig_id)
|
||||
seed_env = environment_obj.Environment(seed_id)
|
||||
|
||||
tar_filename = os.path.join(magic_consts.FUEL_CACHE,
|
||||
"env-{0}-ceph.conf.tar.gz".format(orig_id))
|
||||
conf_filename, db_path = extract_mon_conf_files(orig_env, tar_filename)
|
||||
ceph_set_new_mons(seed_env, tar_filename, conf_filename, db_path)
|
||||
|
||||
|
||||
class UpgradeCephCommand(cmd.Command):
|
||||
"""update Ceph cluster configuration."""
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(UpgradeCephCommand, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'orig_id', type=int, metavar='ORIG_ID',
|
||||
help="ID of original environment")
|
||||
parser.add_argument(
|
||||
'seed_id', type=int, metavar='SEED_ID',
|
||||
help="ID of seed environment")
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
upgrade_ceph(parsed_args.orig_id, parsed_args.seed_id)
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
def test_parser(mocker, octane_app):
|
||||
m = mocker.patch('octane.commands.upgrade_ceph.upgrade_ceph')
|
||||
octane_app.run(["upgrade-ceph", "1", "2"])
|
||||
assert not octane_app.stdout.getvalue()
|
||||
assert not octane_app.stderr.getvalue()
|
||||
m.assert_called_once_with(1, 2)
|
|
@ -0,0 +1,98 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import io
|
||||
import mock
|
||||
import pytest
|
||||
|
||||
from octane.util import node as node_util
|
||||
from octane.util import ssh
|
||||
|
||||
|
||||
NODES = [
|
||||
{'fqdn': 'node-1',
|
||||
'network_data': [{'name': 'management', 'ip': '10.20.0.2'},
|
||||
{'name': 'public', 'ip': '172.167.0.2'}]},
|
||||
{'fqdn': 'node-2',
|
||||
'network_data': [{'name': 'management', 'ip': '10.20.0.3'},
|
||||
{'name': 'public', 'ip': '172.167.0.3'}]},
|
||||
{'fqdn': 'node-3',
|
||||
'network_data': [{'name': 'management', 'ip': '10.20.0.4'},
|
||||
{'name': 'public', 'ip': '172.167.0.4'}]},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize('node_data,network_name,expected_ip', [
|
||||
(NODES[0], 'management', '10.20.0.2'),
|
||||
(NODES[0], 'storage', None),
|
||||
({'network_data': []}, 'management', None),
|
||||
])
|
||||
def test_get_ip(node_data, network_name, expected_ip):
|
||||
node = create_node(node_data)
|
||||
ip = node_util.get_ip(network_name, node)
|
||||
assert ip == expected_ip
|
||||
|
||||
|
||||
def create_node(data):
|
||||
return mock.Mock(data=data, spec_set=['data'])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nodes():
|
||||
return map(create_node, NODES)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("network_name,expected_ips", [
|
||||
('management', ['10.20.0.2', '10.20.0.3', '10.20.0.4']),
|
||||
('public', ['172.167.0.2', '172.167.0.3', '172.167.0.4']),
|
||||
])
|
||||
def test_get_ips(nodes, network_name, expected_ips):
|
||||
ips = node_util.get_ips(network_name, nodes)
|
||||
assert ips == expected_ips
|
||||
|
||||
|
||||
def test_get_hostnames(nodes):
|
||||
hostnames = node_util.get_hostnames(nodes)
|
||||
assert hostnames == ['node-1', 'node-2', 'node-3']
|
||||
|
||||
|
||||
def test_tar_files(node, mock_ssh_popen, mock_open):
|
||||
content = b'fake data\nin\nthe\narchive'
|
||||
|
||||
proc = mock_ssh_popen.return_value.__enter__.return_value
|
||||
proc.stdout = io.BytesIO(content)
|
||||
buf = io.BytesIO()
|
||||
mock_open.return_value.write.side_effect = buf.write
|
||||
|
||||
node_util.tar_files('filename', node, 'a.file', 'b.file')
|
||||
|
||||
mock_ssh_popen.assert_called_once_with(
|
||||
['tar', '-czvP', 'a.file', 'b.file'],
|
||||
stdout=ssh.PIPE, node=node)
|
||||
mock_open.assert_called_once_with('filename', 'wb')
|
||||
assert buf.getvalue() == content
|
||||
|
||||
|
||||
def test_untar_files(node, mock_ssh_popen, mock_open):
|
||||
content = b'fake data\nin\nthe\narchive'
|
||||
|
||||
proc = mock_ssh_popen.return_value.__enter__.return_value
|
||||
buf = io.BytesIO()
|
||||
proc.stdin.write = buf.write
|
||||
mock_open.return_value = io.BytesIO(content)
|
||||
|
||||
node_util.untar_files('filename', node)
|
||||
|
||||
mock_ssh_popen.assert_called_once_with(['tar', '-xzv', '-C', '/'],
|
||||
stdin=ssh.PIPE, node=node)
|
||||
mock_open.assert_called_once_with('filename', 'rb')
|
||||
assert buf.getvalue() == content
|
|
@ -10,7 +10,9 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import shutil
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
@ -33,6 +35,41 @@ def preserve_partition(node, partition):
|
|||
node.upload_node_attribute('disks', disks)
|
||||
|
||||
|
||||
def get_ip(network_name, node):
|
||||
for net in node.data['network_data']:
|
||||
if net['name'] == network_name:
|
||||
return net['ip']
|
||||
|
||||
|
||||
def get_ips(network_name, nodes):
|
||||
get_network_ip = functools.partial(get_ip, network_name)
|
||||
return map(get_network_ip, nodes)
|
||||
|
||||
|
||||
def get_hostnames(nodes):
|
||||
return [node.data['fqdn'] for node in nodes]
|
||||
|
||||
|
||||
def tar_files(filename, node, *files):
|
||||
cmd = ['tar', '-czvP']
|
||||
cmd.extend(files)
|
||||
with ssh.popen(cmd, stdout=ssh.PIPE, node=node) as proc:
|
||||
with open(filename, 'wb') as f:
|
||||
shutil.copyfileobj(proc.stdout, f)
|
||||
|
||||
|
||||
def untar_files(filename, node):
|
||||
cmd = ['tar', '-xzv', '-C', '/']
|
||||
with ssh.popen(cmd, stdin=ssh.PIPE, node=node) as proc:
|
||||
with open(filename, 'rb') as f:
|
||||
shutil.copyfileobj(f, proc.stdin)
|
||||
|
||||
|
||||
def get_hostname_remotely(node):
|
||||
hostname = ssh.call_output(['hostname'], node=node)
|
||||
return hostname[:-1]
|
||||
|
||||
|
||||
def reboot_nodes(nodes, timeout=600):
|
||||
old_clients = dict((node, ssh.get_client(node)) for node in nodes)
|
||||
for node in nodes:
|
||||
|
|
|
@ -33,6 +33,7 @@ octane =
|
|||
upgrade-env = octane.commands.upgrade_env:UpgradeEnvCommand
|
||||
upgrade-node = octane.commands.upgrade_node:UpgradeNodeCommand
|
||||
upgrade-db = octane.commands.upgrade_db:UpgradeDBCommand
|
||||
upgrade-ceph = octane.commands.upgrade_ceph:UpgradeCephCommand
|
||||
install-node = octane.commands.install_node:InstallNodeCommand
|
||||
upgrade-control = octane.commands.upgrade_controlplane:UpgradeControlPlaneCommand
|
||||
sync-networks = octane.commands.sync_networks:SyncNetworksCommand
|
||||
|
|
Loading…
Reference in New Issue