deb-python-hplefthandclient/test/HPELeftHandMockServer_ssh.py

184 lines
5.2 KiB
Python
Executable File

# (c) Copyright 2015 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Test SSH server."""
import argparse
import logging
import os
import shlex
import socket
import sys
import threading
import paramiko
paramiko.util.log_to_file('paramiko_server.log')
class CliParseException(Exception):
pass
class CliArgumentParser(argparse.ArgumentParser):
def error(self, message):
usage = super(CliArgumentParser, self).format_help()
full_message = "%s\r\n%s" % (message, usage)
raise CliParseException(full_message)
def parse_args(self, *args):
return super(CliArgumentParser, self).parse_args(args[1:])
class Cli(object):
def __init__(self):
self.log_name = 'paramiko.LeftHandCLI'
self.logger = paramiko.util.get_logger(self.log_name)
self.fpgs = {}
self.vfss = {}
def do_cli_other(self, *args):
msg = 'FAIL! Mock SSH CLI does not know how to "%s".' % ' '.join(args)
self.logger.log(logging.ERROR, msg)
return msg
def do_cli_exit(self, *args):
self.logger.log(logging.INFO, "quiting... g'bye")
return ''
def do_cli_quit(self, *args):
self.logger.log(logging.INFO, "quiting... g'bye")
return ''
def process_command(self, cmd):
self.logger.log(logging.INFO, cmd)
if cmd is None:
print("returnNone")
return ''
args = shlex.split(cmd)
if args:
method = getattr(self, 'do_cli_' + args[0], self.do_cli_other)
try:
return method(*args)
except Exception as cmd_exception:
return str(cmd_exception)
else:
return ''
class ParamikoServer(paramiko.ServerInterface):
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
def check_auth_none(self, username):
return paramiko.AUTH_SUCCESSFUL
def check_auth_password(self, username, password):
return paramiko.AUTH_SUCCESSFUL
def check_auth_publickey(self, username, key):
return paramiko.AUTH_SUCCESSFUL
def get_allowed_auths(self, username):
return 'password,publickey,none'
def check_channel_shell_request(self, c):
self.event.set()
return True
def check_channel_pty_request(self, c, term, width, height, pixelwidth,
pixelheight, modes):
return True
if __name__ == "__main__":
if len(sys.argv) > 1:
port = int(sys.argv[1])
else:
port = 2200
key_file = os.path.expanduser('~/.ssh/id_rsa')
host_key = paramiko.RSAKey(filename=key_file)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', int(port)))
s.listen(60)
print("Listening for SSH client connections...")
connection, address = s.accept()
transport = None
channel = None
try:
transport = paramiko.Transport(connection)
transport.load_server_moduli()
transport.add_server_key(host_key)
server = ParamikoServer()
transport.start_server(server=server)
cliProcessor = Cli()
while True:
channel = transport.accept(60)
if channel is None:
print("Failed to get SSH channel.")
sys.exit(1)
print("Connected")
server.event.wait(10)
if not server.event.isSet():
print("No shell set")
sys.exit(1)
fio = channel.makefile('rU')
commands = []
command = None
while not (command == 'exit' or command == 'quit'):
command = fio.readline().strip('\r\n')
commands.append(command)
to_send = '\r\n'.join(commands)
channel.send(to_send)
output = ['']
prompt = "FAKE-LeftHand-CLI cli% "
for cmd in commands:
output.append('%s%s' % (prompt, cmd))
result = cliProcessor.process_command(cmd)
if result is not None:
output.append(result)
output_to_send = '\r\n'.join(output)
channel.send(output_to_send)
channel.close()
print("Disconnected")
finally:
if channel:
channel.close()
if transport:
try:
transport.close()
print("transport closed")
except Exception as e:
print("transport close exception %s" % e)
pass