Add module to SSH to nodes
This commit is contained in:
parent
d26ab49c58
commit
bb039fce7e
|
@ -26,3 +26,5 @@ PATCHES = [
|
|||
CWD = os.path.dirname(__file__) # FIXME
|
||||
FUEL_CACHE = "/tmp/octane/deployment" # TODO: we shouldn't need this
|
||||
PUPPET_DIR = "/etc/puppet/2014.2.2-6.1/modules"
|
||||
|
||||
SSH_KEYS = ['/root/.ssh/id_rsa', '/root/.ssh/bootstrap.rsa']
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import io
|
||||
import logging
|
||||
import pipes
|
||||
import threading
|
||||
|
||||
import paramiko
|
||||
from paramiko import channel
|
||||
|
||||
from octane import magic_consts
|
||||
from octane.util import subprocess
|
||||
|
||||
_CLIENTS = {}
|
||||
_CLIENTS_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def _get_client(node):
|
||||
node_id = node.data['id']
|
||||
try:
|
||||
return _CLIENTS[node_id]
|
||||
except KeyError:
|
||||
with _CLIENTS_LOCK:
|
||||
try:
|
||||
return _CLIENTS[node_id]
|
||||
except KeyError:
|
||||
client = _new_client(node.data['ip'])
|
||||
_CLIENTS[node_id] = client
|
||||
return client
|
||||
|
||||
|
||||
def _new_client(ip):
|
||||
client = paramiko.SSHClient()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
client.connect(ip, key_filename=magic_consts.SSH_KEYS)
|
||||
return client
|
||||
|
||||
|
||||
class ChannelFile(io.IOBase, channel.ChannelFile):
|
||||
pass
|
||||
|
||||
|
||||
class ChannelStderrFile(io.IOBase, channel.ChannelStderrFile):
|
||||
pass
|
||||
|
||||
|
||||
class _LogPipe(subprocess._BaseLogPipe):
|
||||
def __init__(self, level, pipe):
|
||||
super(_LogPipe, self).__init__(level)
|
||||
self._pipe = pipe
|
||||
|
||||
def pipe(self):
|
||||
return self._pipe
|
||||
|
||||
|
||||
class SSHPopen(subprocess.BasePopen):
|
||||
def __init__(self, name, cmd, popen_kwargs):
|
||||
self.node = popen_kwargs.pop('node')
|
||||
super(SSHPopen, self).__init__(name, cmd, popen_kwargs)
|
||||
self._channel = _get_client(self.node).get_transport().open_session()
|
||||
self._channel.exec_command(" ".join(map(pipes.quote, cmd)))
|
||||
self.name = "%s[at node-%d]" % (self.name, self.node.data['id'])
|
||||
if 'stdin' not in self.popen_kwargs:
|
||||
self.close_stdin()
|
||||
else:
|
||||
self.stdin = ChannelFile(self._channel, 'wb')
|
||||
stdout = ChannelFile(self._channel, 'rb')
|
||||
if 'stdout' not in self.popen_kwargs:
|
||||
self._pipe_stdout = _LogPipe(logging.INFO, stdout)
|
||||
self._pipe_stdout.start(self.name + " stdout")
|
||||
else:
|
||||
self._pipe_stdout = None
|
||||
self.stdout = stdout
|
||||
stderr = ChannelStderrFile(self._channel, 'rb')
|
||||
if 'stderr' not in self.popen_kwargs:
|
||||
self._pipe_stderr = _LogPipe(logging.ERROR, stderr)
|
||||
self._pipe_stderr.start(self.name + " stderr")
|
||||
else:
|
||||
self._pipe_stderr = None
|
||||
self.stderr = stderr
|
||||
|
||||
def poll(self):
|
||||
if self._channel.exit_status_ready():
|
||||
return self._channel.recv_exit_status()
|
||||
else:
|
||||
return None
|
||||
|
||||
def wait(self):
|
||||
return self._channel.recv_exit_status()
|
||||
|
||||
def terminate(self):
|
||||
self._channel.close()
|
||||
|
||||
def close_stdin(self):
|
||||
self._channel.shutdown_write()
|
||||
|
||||
def communicate(self):
|
||||
if self.stdin:
|
||||
self.close_stdin()
|
||||
if self.stdout:
|
||||
stdout = self.stdout.read()
|
||||
else:
|
||||
stdout = None
|
||||
if self.stderr:
|
||||
stderr = self.stderr.read()
|
||||
else:
|
||||
stderr = None
|
||||
return stdout, stderr
|
||||
|
||||
|
||||
def popen(cmd, **kwargs):
|
||||
return subprocess.popen(cmd, popen_class=SSHPopen, **kwargs)
|
||||
|
||||
|
||||
def call(cmd, **kwargs):
|
||||
return subprocess.call(cmd, popen_class=SSHPopen, **kwargs)
|
|
@ -83,6 +83,9 @@ class BasePopen(object):
|
|||
def terminate(self):
|
||||
raise NotImplementedError("terminate")
|
||||
|
||||
def close_stdin(self):
|
||||
raise NotImplementedError("close_stdin")
|
||||
|
||||
def communicate(self):
|
||||
raise NotImplementedError("communicate")
|
||||
|
||||
|
@ -135,6 +138,9 @@ class LocalPopen(BasePopen):
|
|||
def terminate(self):
|
||||
return self._popen_obj.terminate()
|
||||
|
||||
def close_stdin(self):
|
||||
self._popen_obj.stdin.close()
|
||||
|
||||
def communicate(self):
|
||||
return self._popen_obj.communicate()
|
||||
|
||||
|
@ -142,7 +148,8 @@ class LocalPopen(BasePopen):
|
|||
@contextlib.contextmanager
|
||||
def popen(cmd, **kwargs):
|
||||
name = kwargs.pop('name', cmd[0])
|
||||
proc = LocalPopen(name, cmd, kwargs)
|
||||
popen_class = kwargs.pop('popen_class', LocalPopen)
|
||||
proc = popen_class(name, cmd, kwargs)
|
||||
LOG.info('Started process %s: %s', proc.name,
|
||||
" ".join(map(pipes.quote, cmd)))
|
||||
try:
|
||||
|
@ -156,7 +163,7 @@ def popen(cmd, **kwargs):
|
|||
LOG.error("Process %s finished with return value %s", name, rv)
|
||||
raise
|
||||
if 'stdin' in kwargs:
|
||||
proc.stdin.close()
|
||||
proc.close_stdin()
|
||||
try:
|
||||
rv = proc.wait()
|
||||
except Exception:
|
||||
|
|
|
@ -4,3 +4,4 @@
|
|||
pbr>=0.6,!=0.7,<1.0
|
||||
python-fuelclient>=6.1 # it needs specific version of argparse
|
||||
cliff>=1.7.0,<=1.9.0
|
||||
paramiko==1.13.0
|
||||
|
|
Loading…
Reference in New Issue