Wrapping ssh calls into subprocesses
Eventlet does not work properly with Paramiko when several connections are opened concurrently (see bug #1212341). The fix moves ssh calls from main code to subprocess to avoid the issue. Also changed: * added timeout to all remote operations * old SSH utilities were moved from remote.py to integration tests, because new ones can not be utilized there Fixes: bug #1212341 Change-Id: Ib89af3a3bbcb587af46dad3431d512a21d1ba826
This commit is contained in:
parent
8b2ea7ae27
commit
a57790da42
|
@ -0,0 +1,42 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import pickle
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
|
||||
def main():
|
||||
# NOTE(dmitryme): since we do not read stderr in the main process,
|
||||
# we need to flush it somewhere, otherwise both processes might
|
||||
# hang because of i/o buffer overflow.
|
||||
with open('/dev/null', 'w') as sys.stderr:
|
||||
while True:
|
||||
result = dict()
|
||||
|
||||
try:
|
||||
func = pickle.load(sys.stdin)
|
||||
args = pickle.load(sys.stdin)
|
||||
kwargs = pickle.load(sys.stdin)
|
||||
|
||||
result['output'] = func(*args, **kwargs)
|
||||
except BaseException as e:
|
||||
result['exception'] = e
|
||||
result['traceback'] = traceback.format_exc()
|
||||
|
||||
pickle.dump(result, sys.stdout)
|
||||
sys.stdout.flush()
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import re
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.utils import remote
|
||||
from savanna.plugins.hdp import savannautils
|
||||
|
||||
AMBARI_RPM = 'http://s3.amazonaws.com/public-repo-1.hortonworks.com/' \
|
||||
'ambari/centos6/1.x/updates/1.2.5.17/ambari.repo'
|
||||
|
@ -29,14 +29,8 @@ class HadoopServer:
|
|||
def __init__(self, instance, node_group, ambari_rpm=None):
|
||||
self.instance = instance
|
||||
self.node_group = node_group
|
||||
self._ssh = self._connect_to_vm()
|
||||
self.ambari_rpm = ambari_rpm or AMBARI_RPM
|
||||
|
||||
def _connect_to_vm(self):
|
||||
LOG.info(
|
||||
'Connecting to VM: {0}'.format(self.instance.management_ip))
|
||||
return remote.get_remote(self.instance).ssh_connection()
|
||||
|
||||
def provision_ambari(self, ambari_info):
|
||||
self.install_rpms()
|
||||
if 'AMBARI_SERVER' in self.node_group.components:
|
||||
|
@ -45,32 +39,37 @@ class HadoopServer:
|
|||
if 'AMBARI_AGENT' in self.node_group.components:
|
||||
self._setup_and_start_ambari_agent(ambari_info.host.internal_ip)
|
||||
|
||||
def install_rpms(self):
|
||||
@savannautils.inject_remote('r')
|
||||
def install_rpms(self, r):
|
||||
LOG.info(
|
||||
"{0}: Installing rpm's ...".format(self.instance.hostname))
|
||||
|
||||
#TODO(jspeidel): based on image type, use correct command
|
||||
rpm_cmd = 'wget -nv %s -O /etc/yum.repos.d/ambari.repo' % \
|
||||
self.ambari_rpm
|
||||
self._execute_on_vm(rpm_cmd)
|
||||
self._execute_on_vm('yum -y install epel-release')
|
||||
r.execute_command(rpm_cmd)
|
||||
r.execute_command('yum -y install epel-release')
|
||||
|
||||
def _setup_and_start_ambari_server(self, port):
|
||||
@savannautils.inject_remote('r')
|
||||
def _setup_and_start_ambari_server(self, port, r):
|
||||
LOG.info(
|
||||
'{0}: Installing ambari-server ...'.format(self.instance.hostname))
|
||||
self._execute_on_vm('yum -y install ambari-server')
|
||||
r.execute_command('yum -y install ambari-server')
|
||||
|
||||
LOG.info('Running Ambari Server setup ...')
|
||||
self._execute_on_vm_interactive(
|
||||
r.execute_on_vm_interactive(
|
||||
'ambari-server setup', DefaultPromptMatcher(
|
||||
"Ambari Server 'setup' completed successfully", LOG))
|
||||
"Ambari Server 'setup' completed successfully"))
|
||||
|
||||
self._configure_ambari_server_api_port(port)
|
||||
|
||||
LOG.info('Starting Ambari ...')
|
||||
self._execute_on_vm('ambari-server start')
|
||||
# NOTE(dmitryme): Reading stdout from 'ambari-server start'
|
||||
# hangs ssh. Redirecting output to /dev/null fixes that
|
||||
r.execute_command('ambari-server start > /dev/null 2>&1')
|
||||
|
||||
def _configure_ambari_server_api_port(self, port):
|
||||
@savannautils.inject_remote('r')
|
||||
def _configure_ambari_server_api_port(self, port, r):
|
||||
# do nothing if port is not specified or is default
|
||||
if port is None or port == 8080:
|
||||
return
|
||||
|
@ -78,29 +77,31 @@ class HadoopServer:
|
|||
ambari_config_file = '/etc/ambari-server/conf/ambari.properties'
|
||||
LOG.debug('Configuring Ambari Server API port: {0}'.format(port))
|
||||
# read the current contents
|
||||
data = remote.read_file_from(self._ssh.open_sftp(), ambari_config_file)
|
||||
data = r.read_file_from(ambari_config_file)
|
||||
data = '{0}\nclient.api.port={1}\n'.format(data, port)
|
||||
|
||||
# write the file back
|
||||
remote.write_file_to(self._ssh.open_sftp(), ambari_config_file, data)
|
||||
r.write_file_to(ambari_config_file, data)
|
||||
|
||||
def _setup_and_start_ambari_agent(self, ambari_server_ip):
|
||||
@savannautils.inject_remote('r')
|
||||
def _setup_and_start_ambari_agent(self, ambari_server_ip, r):
|
||||
LOG.info(
|
||||
'{0}: Installing Ambari Agent ...'.format(self.instance.hostname))
|
||||
|
||||
self._execute_on_vm('yum -y install ambari-agent')
|
||||
r.execute_command('yum -y install ambari-agent')
|
||||
LOG.debug(
|
||||
'{0}: setting master-ip: {1} in ambari-agent.ini'.format(
|
||||
self.instance.hostname, ambari_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ambari-agent/conf/ambari-agent.ini', 'localhost',
|
||||
ambari_server_ip)
|
||||
|
||||
LOG.info(
|
||||
'{0}: Starting Ambari Agent ...'.format(self.instance.hostname))
|
||||
self._execute_on_vm('ambari-agent start')
|
||||
r.execute_command('ambari-agent start')
|
||||
|
||||
def _configure_ganglia(self, ganglia_server_ip):
|
||||
@savannautils.inject_remote('r')
|
||||
def _configure_ganglia(self, ganglia_server_ip, r):
|
||||
#TODO(John): the set of files to update is now dependent on which
|
||||
# components are deployed on a host
|
||||
#TODO(jspeidel): so we these calls should be based on configuration
|
||||
|
@ -112,42 +113,42 @@ class HadoopServer:
|
|||
#TODO(jspeidel): set MASTER_SLAVE for master where only one node is
|
||||
# deployed
|
||||
if self._is_ganglia_slave() or self._is_ganglia_master():
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPHBaseMaster/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
|
||||
#master config
|
||||
if self._is_ganglia_master():
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
#TODO(jspeidel): appears only to be necessary if hbase is installed
|
||||
# self._replace_str_in_remote_file(self._ssh,
|
||||
# r.replace_remote_string(
|
||||
# '/etc/ganglia/hdp/HDPHBaseMaster/conf.d/gmond.master.conf',
|
||||
# 'bind = {0}'.format(
|
||||
# self.instance.fqdn), '')
|
||||
|
||||
# gangliaClusters.conf
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/usr/libexec/hdp/ganglia/gangliaClusters.conf',
|
||||
self.instance.fqdn, ganglia_server_ip)
|
||||
|
||||
|
@ -155,7 +156,7 @@ class HadoopServer:
|
|||
# configs that are used after restart
|
||||
# gangliaClusters.conf template
|
||||
#TODO(jspeidel): modify file where prop "ganglia_server_host" is set
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/templates'
|
||||
'/gangliaClusters.conf.erb',
|
||||
'<%=scope.function_hdp_host("ganglia_server_host")%>',
|
||||
|
@ -163,60 +164,24 @@ class HadoopServer:
|
|||
|
||||
# gmondLib.sh This script generates the master and slave configs
|
||||
#TODO(jspeidel): combine into one call. Pass map of old/new values
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/files/gmondLib'
|
||||
'.sh',
|
||||
'bind = ${gmondMasterIP}', '')
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/files/gmondLib'
|
||||
'.sh',
|
||||
'host = ${gmondMasterIP}', 'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/usr/libexec/hdp/ganglia/gmondLib.sh',
|
||||
'bind = ${gmondMasterIP}', '')
|
||||
self._replace_str_in_remote_file(
|
||||
r.replace_remote_string(
|
||||
'/usr/libexec/hdp/ganglia/gmondLib.sh',
|
||||
'host = ${gmondMasterIP}', 'host = {0}'.format(ganglia_server_ip))
|
||||
|
||||
def _replace_str_in_remote_file(self, filename, origStr, newStr):
|
||||
|
||||
remote.replace_remote_string(self._ssh, filename, origStr,
|
||||
newStr)
|
||||
|
||||
def _log(self, buf):
|
||||
LOG.debug(buf)
|
||||
|
||||
def _execute_on_vm_interactive(self, cmd, matcher):
|
||||
LOG.debug(
|
||||
"{0}: Executing interactive remote command '{1}'".format(
|
||||
self.instance.hostname, cmd))
|
||||
|
||||
buf = ''
|
||||
all_output = ''
|
||||
channel = self._ssh.invoke_shell()
|
||||
try:
|
||||
channel.send(cmd + '\n')
|
||||
while not matcher.is_eof(buf):
|
||||
buf += channel.recv(4096)
|
||||
response = matcher.get_response(buf)
|
||||
if response is not None:
|
||||
channel.send(response + '\n')
|
||||
all_output += buf
|
||||
buf = ''
|
||||
finally:
|
||||
channel.close()
|
||||
self._log(all_output)
|
||||
self._log(buf)
|
||||
|
||||
def _execute_on_vm(self, cmd):
|
||||
LOG.debug("{0}: Executing remote command '{1}'".format(
|
||||
self.instance.hostname, cmd))
|
||||
LOG.debug(
|
||||
'Executing using instance: id = {0}, hostname = {1}'.format(
|
||||
self.instance.instance_id,
|
||||
self.instance.hostname))
|
||||
remote.execute_command(self._ssh, cmd)
|
||||
|
||||
def _is_component_available(self, component):
|
||||
return component in self.node_group.components
|
||||
|
||||
|
@ -230,23 +195,17 @@ class HadoopServer:
|
|||
class DefaultPromptMatcher():
|
||||
prompt_pattern = re.compile('(.*\()(.)(\)\?\s*$)', re.DOTALL)
|
||||
|
||||
def __init__(self, terminal_token, logger):
|
||||
def __init__(self, terminal_token):
|
||||
self.eof_token = terminal_token
|
||||
self.logger = logger
|
||||
|
||||
def get_response(self, s):
|
||||
match = self.prompt_pattern.match(s)
|
||||
if match:
|
||||
response = match.group(2)
|
||||
LOG.debug(
|
||||
"Returning response '{0}' for prompt '{1}'".format(
|
||||
response, s.rstrip().rsplit('\n', 1)[-1]))
|
||||
return response
|
||||
else:
|
||||
return None
|
||||
|
||||
def is_eof(self, s):
|
||||
eof = self.eof_token in s
|
||||
if eof:
|
||||
LOG.debug('Returning eof = True')
|
||||
return eof
|
||||
|
|
|
@ -26,3 +26,15 @@ def get_node_processes(host):
|
|||
return host.node_processes
|
||||
else:
|
||||
return host.node_group.node_processes
|
||||
|
||||
|
||||
def inject_remote(param_name):
|
||||
def handle(func):
|
||||
def call(self, *args, **kwargs):
|
||||
with self.instance.remote as r:
|
||||
newkwargs = kwargs.copy()
|
||||
newkwargs[param_name] = r
|
||||
return func(self, *args, **newkwargs)
|
||||
|
||||
return call
|
||||
return handle
|
||||
|
|
|
@ -20,13 +20,15 @@ import telnetlib
|
|||
import time
|
||||
|
||||
import keystoneclient.v2_0
|
||||
import paramiko
|
||||
import requests
|
||||
import unittest2
|
||||
|
||||
from savanna import exceptions as savanna_ex
|
||||
import savanna.tests.integration.configs.parameters.common_parameters as param
|
||||
import savanna.tests.integration.configs.parameters.hdp_parameters as hdp_param
|
||||
import savanna.tests.integration.configs.parameters.vanilla_parameters as v_prm
|
||||
from savanna.utils import remote
|
||||
from savanna.utils import crypto
|
||||
|
||||
|
||||
def enable_test(test):
|
||||
|
@ -468,24 +470,72 @@ class ITestCase(unittest2.TestCase):
|
|||
data = data[object_type]
|
||||
return data['id']
|
||||
|
||||
def ssh_connection(self, host, node_username):
|
||||
return remote.setup_ssh_connection(host, node_username,
|
||||
open(param.PATH_TO_SSH).read())
|
||||
def _read_paramimko_stream(self, recv_func):
|
||||
result = ''
|
||||
buf = recv_func(1024)
|
||||
while buf != '':
|
||||
result += buf
|
||||
buf = recv_func(1024)
|
||||
|
||||
return result
|
||||
|
||||
def _execute_command(self, ssh, cmd, get_stderr=False,
|
||||
raise_when_error=True):
|
||||
chan = ssh.get_transport().open_session()
|
||||
chan.exec_command(cmd)
|
||||
|
||||
# todo(dmitryme): that could hang if stderr buffer overflows
|
||||
stdout = self._read_paramimko_stream(chan.recv)
|
||||
stderr = self._read_paramimko_stream(chan.recv_stderr)
|
||||
|
||||
ret_code = chan.recv_exit_status()
|
||||
|
||||
if ret_code and raise_when_error:
|
||||
raise savanna_ex.RemoteCommandException(cmd=cmd, ret_code=ret_code,
|
||||
stdout=stdout,
|
||||
stderr=stderr)
|
||||
|
||||
if get_stderr:
|
||||
return ret_code, stdout, stderr
|
||||
else:
|
||||
return ret_code, stdout
|
||||
|
||||
def _write_file_to(self, ssh, remote_file, data):
|
||||
fl = ssh.open_sftp().file(remote_file, 'w')
|
||||
fl.write(data)
|
||||
fl.close()
|
||||
|
||||
def _read_file_from(self, ssh, remote_file):
|
||||
fl = ssh.open_sftp().file(remote_file, 'r')
|
||||
data = fl.read()
|
||||
fl.close()
|
||||
return data
|
||||
|
||||
# -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
||||
|
||||
def ssh_connection(self, host, username):
|
||||
private_key = open(param.PATH_TO_SSH).read()
|
||||
if type(private_key) in [str, unicode]:
|
||||
private_key = crypto.to_paramiko_private_key(private_key)
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(host, username=username, pkey=private_key)
|
||||
return ssh
|
||||
|
||||
def execute_command(self, host, cmd, node_username):
|
||||
with contextlib.closing(self.ssh_connection(host,
|
||||
node_username)) as ssh:
|
||||
return remote.execute_command(ssh, cmd)
|
||||
return self._execute_command(ssh, cmd)
|
||||
|
||||
def write_file_to(self, host, remote_file, data, node_username):
|
||||
with contextlib.closing(self.ssh_connection(host,
|
||||
node_username)) as ssh:
|
||||
return remote.write_file_to(ssh.open_sftp(), remote_file, data)
|
||||
return self._write_file_to(ssh, remote_file, data)
|
||||
|
||||
def read_file_from(self, host, remote_file, node_username):
|
||||
with contextlib.closing(self.ssh_connection(host,
|
||||
node_username)) as ssh:
|
||||
return remote.read_file_from(ssh.open_sftp(), remote_file)
|
||||
return self._read_file_from(ssh, remote_file)
|
||||
|
||||
def transfer_script_to_node(self, host, node_username,
|
||||
script='hadoop_test/hadoop_test_script.sh'):
|
||||
|
|
|
@ -24,10 +24,19 @@ from savanna.tests.unit import base as models_test_base
|
|||
|
||||
|
||||
class TestAttachVolume(models_test_base.DbTestCase):
|
||||
@mock.patch('savanna.utils.remote.BulkInstanceInteropHelper.close')
|
||||
@mock.patch('savanna.utils.remote.InstanceInteropHelper._get_conn_params')
|
||||
@mock.patch('savanna.utils.procutils.start_subprocess')
|
||||
@mock.patch('savanna.utils.procutils.run_in_subprocess')
|
||||
@mock.patch('savanna.utils.openstack.nova.get_node_group_image_username')
|
||||
@mock.patch(
|
||||
'savanna.utils.remote.BulkInstanceInteropHelper.execute_command')
|
||||
def test_mount_volume(self, p_ex_cmd):
|
||||
instance = r.InstanceResource({'instance_id': '123454321'})
|
||||
def test_mount_volume(self, p_ex_cmd, p_get_username,
|
||||
run_in_sub, start_sub, get_conn_params, p_close):
|
||||
p_get_username.return_value = 'root'
|
||||
|
||||
instance = r.InstanceResource({'instance_id': '123454321',
|
||||
'node_group': {}})
|
||||
|
||||
p_ex_cmd.return_value = (0, None)
|
||||
self.assertIsNone(volumes._mount_volume(instance, '123', '456'))
|
||||
|
@ -59,9 +68,13 @@ class TestAttachVolume(models_test_base.DbTestCase):
|
|||
p_delete.side_effect = RuntimeError
|
||||
self.assertRaises(RuntimeError, volumes.detach, cluster)
|
||||
|
||||
@mock.patch('savanna.utils.openstack.nova.get_node_group_image_username')
|
||||
@mock.patch('savanna.utils.remote.InstanceInteropHelper.execute_command')
|
||||
def test_get_free_device_path(self, p_ex_cmd):
|
||||
instance = r.InstanceResource({'instance_id': '123454321'})
|
||||
def test_get_free_device_path(self, p_ex_cmd, p_get_username):
|
||||
p_get_username.return_value = 'root'
|
||||
|
||||
instance = r.InstanceResource({'instance_id': '123454321',
|
||||
'node_group': {}})
|
||||
|
||||
stdout = """major minor #blocks name
|
||||
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
# Copyright (c) 2013 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import timeout as e_timeout
|
||||
|
||||
from savanna import context
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_sub_executable():
|
||||
return '%s/savanna-subprocess' % os.path.dirname(sys.argv[0])
|
||||
|
||||
|
||||
def start_subprocess():
|
||||
return subprocess.Popen((sys.executable, _get_sub_executable()),
|
||||
close_fds=True,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE)
|
||||
|
||||
|
||||
def run_in_subprocess(proc, func, args=(), kwargs={}):
|
||||
try:
|
||||
pickle.dump(func, proc.stdin)
|
||||
pickle.dump(args, proc.stdin)
|
||||
pickle.dump(kwargs, proc.stdin)
|
||||
proc.stdin.flush()
|
||||
|
||||
result = pickle.load(proc.stdout)
|
||||
|
||||
if 'exception' in result:
|
||||
raise result['exception']
|
||||
|
||||
return result['output']
|
||||
finally:
|
||||
# NOTE(dmitryme): in openstack/common/processutils.py it
|
||||
# is suggested to sleep a little between calls to multiprocessing.
|
||||
# That should allow it make some necessary cleanup
|
||||
context.sleep(0)
|
||||
|
||||
|
||||
def _finish(cleanup_func):
|
||||
cleanup_func()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def shutdown_subprocess(proc, cleanup_func):
|
||||
try:
|
||||
with e_timeout.Timeout(5):
|
||||
# timeout would mean that our single-threaded subprocess
|
||||
# is hung on previous task which blocks _finish to complete
|
||||
run_in_subprocess(proc, _finish, (cleanup_func,))
|
||||
except BaseException:
|
||||
# exception could be caused by either timeout, or
|
||||
# successful shutdown, ignoring anyway
|
||||
pass
|
||||
finally:
|
||||
kill_subprocess(proc)
|
||||
|
||||
|
||||
def kill_subprocess(proc):
|
||||
try:
|
||||
proc.kill()
|
||||
except OSError:
|
||||
# could be caused by process already dead, so ignoring
|
||||
pass
|
|
@ -1,4 +1,5 @@
|
|||
# Copyright (c) 2013 Mirantis Inc.
|
||||
# Copyright (c) 2013 Hortonworks, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -13,164 +14,258 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
"""Helper methods for executing commands on nodes via SSH.
|
||||
|
||||
The main access point is method get_remote(instance), it returns
|
||||
InstanceInteropHelper object which does the actual work. See the
|
||||
class for the list of available methods.
|
||||
|
||||
It is a context manager, so it could be used with 'with' statement
|
||||
like that:
|
||||
with get_remote(instance) as r:
|
||||
r.execute_command(...)
|
||||
|
||||
Note that the module offloads the ssh calls to a child process.
|
||||
It was implemented that way because we found no way to run paramiko
|
||||
and eventlet together. The private high-level module methods are
|
||||
implementations which are run in a separate process.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
from eventlet import timeout as e_timeout
|
||||
import paramiko
|
||||
|
||||
from savanna import exceptions as ex
|
||||
from savanna.openstack.common import excutils
|
||||
from savanna.utils import crypto
|
||||
from savanna.utils.openstack import nova
|
||||
from savanna.utils import procutils
|
||||
|
||||
|
||||
def setup_ssh_connection(host, username, private_key):
|
||||
"""Setup SSH connection to the host using username and private key."""
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_ssh = None
|
||||
|
||||
|
||||
def _connect(host, username, private_key):
|
||||
global _ssh
|
||||
|
||||
if type(private_key) in [str, unicode]:
|
||||
private_key = crypto.to_paramiko_private_key(private_key)
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(host, username=username, pkey=private_key)
|
||||
|
||||
return ssh
|
||||
_ssh = paramiko.SSHClient()
|
||||
_ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
_ssh.connect(host, username=username, pkey=private_key)
|
||||
|
||||
|
||||
def execute_command(ssh_connection, cmd, get_stderr=False,
|
||||
raise_when_error=True):
|
||||
"""Execute specified command remotely using existing ssh connection.
|
||||
def _cleanup():
|
||||
global _ssh
|
||||
_ssh.close()
|
||||
|
||||
Return exit code, stdout data and stderr data of the executed command.
|
||||
"""
|
||||
chan = ssh_connection.get_transport().open_session()
|
||||
|
||||
def _read_paramimko_stream(recv_func):
|
||||
result = ''
|
||||
buf = recv_func(1024)
|
||||
while buf != '':
|
||||
result += buf
|
||||
buf = recv_func(1024)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _execute_command(cmd, get_stderr=False, raise_when_error=True):
|
||||
global _ssh
|
||||
|
||||
chan = _ssh.get_transport().open_session()
|
||||
chan.exec_command(cmd)
|
||||
|
||||
# todo(dmitryme): that could hang if stderr buffer overflows
|
||||
stdout = _read_paramimko_stream(chan.recv)
|
||||
stderr = _read_paramimko_stream(chan.recv_stderr)
|
||||
|
||||
ret_code = chan.recv_exit_status()
|
||||
|
||||
stdout = ''
|
||||
while chan.recv_ready():
|
||||
stdout += chan.recv(1024)
|
||||
|
||||
stderr = ''
|
||||
while chan.recv_stderr_ready():
|
||||
stderr += chan.recv_stderr(1024)
|
||||
|
||||
if ret_code and raise_when_error:
|
||||
raise ex.RemoteCommandException(cmd=cmd, ret_code=ret_code,
|
||||
stdout=stdout, stderr=stderr)
|
||||
|
||||
if get_stderr:
|
||||
return ret_code, stdout, stderr
|
||||
|
||||
else:
|
||||
return ret_code, stdout
|
||||
|
||||
|
||||
def write_file_to(sftp, remote_file, data):
|
||||
"""Create remote file using existing ssh connection and write the given
|
||||
data to it.
|
||||
"""
|
||||
def _write_file(sftp, remote_file, data):
|
||||
fl = sftp.file(remote_file, 'w')
|
||||
fl.write(data)
|
||||
fl.close()
|
||||
|
||||
|
||||
def write_files_to(sftp, files):
|
||||
"""Copy file->data dictionary in a single ssh connection.
|
||||
"""
|
||||
def _write_file_to(remote_file, data):
|
||||
global _ssh
|
||||
|
||||
_write_file(_ssh.open_sftp(), remote_file, data)
|
||||
|
||||
|
||||
def _write_files_to(files):
|
||||
global _ssh
|
||||
|
||||
sftp = _ssh.open_sftp()
|
||||
|
||||
for fl, data in files.iteritems():
|
||||
write_file_to(sftp, fl, data)
|
||||
_write_file(sftp, fl, data)
|
||||
|
||||
|
||||
def read_file_from(sftp, remote_file):
|
||||
"""Read remote file from the specified host and return given data."""
|
||||
fl = sftp.file(remote_file, 'r')
|
||||
def _read_file_from(remote_file):
|
||||
global _ssh
|
||||
|
||||
fl = _ssh.open_sftp().file(remote_file, 'r')
|
||||
data = fl.read()
|
||||
fl.close()
|
||||
return data
|
||||
|
||||
|
||||
def replace_remote_string(ssh_connection, remote_file, old_str, new_str):
|
||||
"""Replaces strings in remote file using sed command."""
|
||||
def _replace_remote_string(remote_file, old_str, new_str):
|
||||
old_str = old_str.replace("\'", "\''")
|
||||
new_str = new_str.replace("\'", "\''")
|
||||
cmd = "sudo sed -i 's,%s,%s,g' %s" % (old_str, new_str, remote_file)
|
||||
execute_command(ssh_connection, cmd)
|
||||
_execute_command(cmd)
|
||||
|
||||
|
||||
def _execute_on_vm_interactive(cmd, matcher):
|
||||
global _ssh
|
||||
|
||||
buf = ''
|
||||
|
||||
channel = _ssh.invoke_shell()
|
||||
try:
|
||||
channel.send(cmd + '\n')
|
||||
while not matcher.is_eof(buf):
|
||||
buf += channel.recv(4096)
|
||||
response = matcher.get_response(buf)
|
||||
if response is not None:
|
||||
channel.send(response + '\n')
|
||||
buf = ''
|
||||
finally:
|
||||
channel.close()
|
||||
|
||||
|
||||
class InstanceInteropHelper(object):
|
||||
def __init__(self, instance):
|
||||
self.instance = instance
|
||||
self.username = nova.get_node_group_image_username(
|
||||
self.instance.node_group)
|
||||
|
||||
def __enter__(self):
|
||||
self.bulk = BulkInstanceInteropHelper(self)
|
||||
self.bulk = BulkInstanceInteropHelper(self.instance, self.username)
|
||||
return self.bulk
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
self.bulk.close()
|
||||
|
||||
def ssh_connection(self):
|
||||
username = nova.get_node_group_image_username(self.instance.node_group)
|
||||
return setup_ssh_connection(
|
||||
self.instance.management_ip, username,
|
||||
self.instance.node_group.cluster.management_private_key)
|
||||
def _get_conn_params(self):
|
||||
return (self.instance.management_ip, self.username,
|
||||
self.instance.node_group.cluster.management_private_key)
|
||||
|
||||
def execute_command(self, cmd, get_stderr=False, raise_when_error=True):
|
||||
with contextlib.closing(self.ssh_connection()) as ssh:
|
||||
return execute_command(ssh, cmd, get_stderr, raise_when_error)
|
||||
def _run(self, func, *args, **kwargs):
|
||||
proc = procutils.start_subprocess()
|
||||
|
||||
def write_file_to(self, remote_file, data):
|
||||
with contextlib.closing(self.ssh_connection()) as ssh:
|
||||
return write_file_to(ssh.open_sftp(), remote_file, data)
|
||||
try:
|
||||
procutils.run_in_subprocess(proc, _connect,
|
||||
self._get_conn_params())
|
||||
return procutils.run_in_subprocess(proc, func, args, kwargs)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
procutils.shutdown_subprocess(proc, _cleanup)
|
||||
finally:
|
||||
procutils.shutdown_subprocess(proc, _cleanup)
|
||||
|
||||
def write_files_to(self, files):
|
||||
with contextlib.closing(self.ssh_connection()) as ssh:
|
||||
return write_files_to(ssh.open_sftp(), files)
|
||||
def _run_t(self, func, timeout, *args, **kwargs):
|
||||
start_time = time.time()
|
||||
try:
|
||||
with e_timeout.Timeout(timeout):
|
||||
return self._run(func, *args, **kwargs)
|
||||
finally:
|
||||
self._log_command('%s took %.1f seconds to complete' % (
|
||||
func.__name__, time.time() - start_time))
|
||||
|
||||
def read_file_from(self, remote_file):
|
||||
with contextlib.closing(self.ssh_connection()) as ssh:
|
||||
return read_file_from(ssh.open_sftp(), remote_file)
|
||||
def execute_command(self, cmd, get_stderr=False, raise_when_error=True,
|
||||
timeout=300):
|
||||
"""Execute specified command remotely using existing ssh connection.
|
||||
|
||||
def replace_remote_string(self, remote_file, old_str, new_str):
|
||||
with contextlib.closing(self.ssh_connection()) as ssh:
|
||||
replace_remote_string(ssh, remote_file, old_str, new_str)
|
||||
Return exit code, stdout data and stderr data of the executed command.
|
||||
"""
|
||||
self._log_command('Executing "%s"' % cmd)
|
||||
return self._run_t(_execute_command, timeout, cmd, get_stderr,
|
||||
raise_when_error)
|
||||
|
||||
def write_file_to(self, remote_file, data, timeout=120):
|
||||
"""Create remote file using existing ssh connection and write the given
|
||||
data to it.
|
||||
"""
|
||||
self._log_command('Writing file "%s"' % remote_file)
|
||||
self._run_t(_write_file_to, timeout, remote_file, data)
|
||||
|
||||
def write_files_to(self, files, timeout=120):
|
||||
"""Copy file->data dictionary in a single ssh connection.
|
||||
"""
|
||||
self._log_command('Writing files "%s"' % files.keys())
|
||||
self._run_t(_write_files_to, timeout, files)
|
||||
|
||||
def read_file_from(self, remote_file, timeout=120):
|
||||
"""Read remote file from the specified host and return given data."""
|
||||
self._log_command('Reading file "%s"' % remote_file)
|
||||
return self._run_t(_read_file_from, timeout, remote_file)
|
||||
|
||||
def replace_remote_string(self, remote_file, old_str, new_str,
|
||||
timeout=120):
|
||||
"""Replaces strings in remote file using sed command."""
|
||||
self._log_command('In file "%s" replacing string "%s" '
|
||||
'with "%s"' % (remote_file, old_str, new_str))
|
||||
self._run_t(_replace_remote_string, timeout, remote_file, old_str,
|
||||
new_str)
|
||||
|
||||
def execute_on_vm_interactive(self, cmd, matcher, timeout=1800):
|
||||
"""Runs given command and responds to prompts.
|
||||
|
||||
'cmd' is a command to execute.
|
||||
|
||||
'matcher' is an object which provides responses on command's
|
||||
prompts. It should have two methods implemented:
|
||||
* get_response(buf) - returns response on prompt if it is
|
||||
found in 'buf' string, which is a part of command output.
|
||||
If no prompt is found, the method should return None.
|
||||
* is_eof(buf) - returns True if current 'buf' indicates that
|
||||
the command is finished. False should be returned
|
||||
otherwise.
|
||||
"""
|
||||
self._log_command('Executing interactively "%s"' % cmd)
|
||||
self._run_t(_execute_on_vm_interactive, timeout, cmd, matcher)
|
||||
|
||||
def _log_command(self, str):
|
||||
LOG.debug('[%s] %s' % (self.instance.instance_name, str))
|
||||
|
||||
|
||||
def get_remote(instance):
|
||||
return InstanceInteropHelper(instance)
|
||||
|
||||
|
||||
class BulkInstanceInteropHelper(object):
|
||||
def __init__(self, helper):
|
||||
self.helper = helper
|
||||
self._ssh = None
|
||||
self._sftp = None
|
||||
class BulkInstanceInteropHelper(InstanceInteropHelper):
|
||||
def __init__(self, instance, username):
|
||||
self.instance = instance
|
||||
self.username = username
|
||||
self.proc = procutils.start_subprocess()
|
||||
try:
|
||||
procutils.run_in_subprocess(self.proc, _connect,
|
||||
self._get_conn_params())
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
procutils.shutdown_subprocess(self.proc, _cleanup)
|
||||
|
||||
def close(self):
|
||||
if self._sftp:
|
||||
self._sftp.close()
|
||||
if self._ssh:
|
||||
self._ssh.close()
|
||||
procutils.shutdown_subprocess(self.proc, _cleanup)
|
||||
|
||||
def ssh_connection(self):
|
||||
if not self._ssh:
|
||||
self._ssh = self.helper.ssh_connection()
|
||||
return self._ssh
|
||||
|
||||
def sftp_connection(self):
|
||||
if not self._sftp:
|
||||
self._sftp = self.ssh_connection().open_sftp()
|
||||
return self._sftp
|
||||
|
||||
def execute_command(self, cmd, get_stderr=False, raise_when_error=True):
|
||||
return execute_command(self.ssh_connection(), cmd, get_stderr,
|
||||
raise_when_error)
|
||||
|
||||
def write_file_to(self, remote_file, data):
|
||||
return write_file_to(self.sftp_connection(), remote_file, data)
|
||||
|
||||
def write_files_to(self, files):
|
||||
return write_files_to(self.sftp_connection(), files)
|
||||
|
||||
def read_file_from(self, remote_file):
|
||||
return read_file_from(self.sftp_connection(), remote_file)
|
||||
|
||||
def replace_remote_string(self, remote_file, old_str, new_str):
|
||||
replace_remote_string(self.ssh_connection(), remote_file,
|
||||
old_str, new_str)
|
||||
def _run(self, func, *args, **kwargs):
|
||||
return procutils.run_in_subprocess(self.proc, func, args, kwargs)
|
||||
|
|
Loading…
Reference in New Issue