cliff/cliff/app.py

458 lines
16 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Application base class.
"""
import inspect
import locale
import logging
import logging.handlers
import os
import sys
from cliff import _argparse
from . import complete
from . import help
from . import utils
logging.getLogger('cliff').addHandler(logging.NullHandler())
# Exit code for exiting due to a signal is 128 + the signal number
_SIGINT_EXIT = 130
_SIGPIPE_EXIT = 141
class App(object):
"""Application base class.
:param description: one-liner explaining the program purpose
:paramtype description: str
:param version: application version number
:paramtype version: str
:param command_manager: plugin loader
:paramtype command_manager: cliff.commandmanager.CommandManager
:param stdin: Standard input stream
:paramtype stdin: readable I/O stream
:param stdout: Standard output stream
:paramtype stdout: writable I/O stream
:param stderr: Standard error output stream
:paramtype stderr: writable I/O stream
:param interactive_app_factory: callable to create an
interactive application
:paramtype interactive_app_factory: cliff.interactive.InteractiveApp
:param deferred_help: True - Allow subcommands to accept --help with
allowing to defer help print after initialize_app
:paramtype deferred_help: bool
"""
NAME = os.path.splitext(os.path.basename(sys.argv[0]))[0]
if NAME == '__main__':
NAME = os.path.split(os.path.dirname(sys.argv[0]))[-1]
LOG = logging.getLogger(NAME)
CONSOLE_MESSAGE_FORMAT = '%(message)s'
LOG_FILE_MESSAGE_FORMAT = (
'[%(asctime)s] %(levelname)-8s %(name)s %(message)s'
)
DEFAULT_VERBOSE_LEVEL = 1
DEFAULT_OUTPUT_ENCODING = 'utf-8'
def __init__(
self,
description,
version,
command_manager,
stdin=None,
stdout=None,
stderr=None,
interactive_app_factory=None,
deferred_help=False,
):
"""Initialize the application."""
self.command_manager = command_manager
self.command_manager.add_command('help', help.HelpCommand)
self.command_manager.add_command('complete', complete.CompleteCommand)
self._set_streams(stdin, stdout, stderr)
self.interactive_app_factory = interactive_app_factory
self.deferred_help = deferred_help
self.parser = self.build_option_parser(description, version)
self.interactive_mode = False
self.interpreter = None
def _set_streams(self, stdin, stdout, stderr):
try:
locale.setlocale(locale.LC_ALL, '')
except locale.Error:
pass
# Unicode must be encoded/decoded for text I/O streams, the
# correct encoding for the stream must be selected and it must
# be capable of handling the set of characters in the stream
# or Python will raise a codec error. The correct codec is
# selected based on the locale. Python2 uses the locales
# encoding but only when the I/O stream is attached to a
# terminal (TTY) otherwise it uses the default ASCII
# encoding. The effect is internationalized text written to
# the terminal works as expected but if command line output is
# redirected (file or pipe) the ASCII codec is used and the
# program aborts with a codec error.
#
# The default I/O streams stdin, stdout and stderr can be
# wrapped in a codec based on the locale thus assuring the
# users desired encoding is always used no matter the I/O
# destination. Python3 does this by default.
#
# If the caller supplies an I/O stream we use it unmodified on
# the assumption the caller has taken all responsibility for
# the stream. But with Python2 if the caller allows us to
# default the I/O streams to sys.stdin, sys.stdout and
# sys.stderr we apply the locales encoding just as Python3
# would do. We also check to make sure the main Python program
# has not already already wrapped sys.stdin, sys.stdout and
# sys.stderr as this is a common recommendation.
self.stdin = stdin or sys.stdin
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
def build_option_parser(self, description, version, argparse_kwargs=None):
"""Return an argparse option parser for this application.
Subclasses may override this method to extend
the parser with more global options.
:param description: full description of the application
:paramtype description: str
:param version: version number for the application
:paramtype version: str
:param argparse_kwargs: extra keyword argument passed to the
ArgumentParser constructor
:paramtype extra_kwargs: dict
"""
argparse_kwargs = argparse_kwargs or {}
parser = _argparse.ArgumentParser(
description=description, add_help=False, **argparse_kwargs
)
parser.add_argument(
'--version',
action='version',
version='{0} {1}'.format(App.NAME, version),
)
verbose_group = parser.add_mutually_exclusive_group()
verbose_group.add_argument(
'-v',
'--verbose',
action='count',
dest='verbose_level',
default=self.DEFAULT_VERBOSE_LEVEL,
help='Increase verbosity of output. Can be repeated.',
)
verbose_group.add_argument(
'-q',
'--quiet',
action='store_const',
dest='verbose_level',
const=0,
help='Suppress output except warnings and errors.',
)
parser.add_argument(
'--log-file',
action='store',
default=None,
help='Specify a file to log output. Disabled by default.',
)
if self.deferred_help:
parser.add_argument(
'-h',
'--help',
dest='deferred_help',
action='store_true',
help="Show help message and exit.",
)
else:
parser.add_argument(
'-h',
'--help',
action=help.HelpAction,
nargs=0,
default=self, # tricky
help="Show help message and exit.",
)
parser.add_argument(
'--debug',
default=False,
action='store_true',
help='Show tracebacks on errors.',
)
return parser
def configure_logging(self):
"""Create logging handlers for any log output."""
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)
# Set up logging to a file
if self.options.log_file:
file_handler = logging.FileHandler(
filename=self.options.log_file,
)
formatter = logging.Formatter(self.LOG_FILE_MESSAGE_FORMAT)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Always send higher-level messages to the console via stderr
console = logging.StreamHandler(self.stderr)
console_level = {
0: logging.WARNING,
1: logging.INFO,
2: logging.DEBUG,
}.get(self.options.verbose_level, logging.DEBUG)
console.setLevel(console_level)
formatter = logging.Formatter(self.CONSOLE_MESSAGE_FORMAT)
console.setFormatter(formatter)
root_logger.addHandler(console)
return
def print_help_if_requested(self):
"""Print help and exits if deferred help is enabled and requested.
'--help' shows the help message and exits:
* without calling initialize_app if not self.deferred_help (default),
* after initialize_app call if self.deferred_help,
* during initialize_app call if self.deferred_help and subclass calls
explicitly this method in initialize_app.
"""
if self.deferred_help and self.options.deferred_help:
action = help.HelpAction(None, None, default=self)
action(self.parser, self.options, None, None)
def run(self, argv):
"""Equivalent to the main program for the application.
:param argv: input arguments and options
:paramtype argv: list of str
"""
try:
self.options, remainder = self.parser.parse_known_args(argv)
self.configure_logging()
self.interactive_mode = not remainder
if self.deferred_help and self.options.deferred_help and remainder:
# When help is requested and `remainder` has any values disable
# `deferred_help` and instead allow the help subcommand to
# handle the request during run_subcommand(). This turns
# "app foo bar --help" into "app help foo bar". However, when
# `remainder` is empty use print_help_if_requested() to allow
# for an early exit.
# Disabling `deferred_help` here also ensures that
# print_help_if_requested will not fire if called by a subclass
# during its initialize_app().
self.options.deferred_help = False
remainder.insert(0, "help")
self.initialize_app(remainder)
self.print_help_if_requested()
except BrokenPipeError:
return _SIGPIPE_EXIT
except Exception as err:
if hasattr(self, 'options'):
debug = self.options.debug
else:
debug = True
if debug:
self.LOG.exception(err)
raise
else:
self.LOG.error(err)
return 1
except KeyboardInterrupt:
return _SIGINT_EXIT
result = 1
if self.interactive_mode:
result = self.interact()
else:
try:
result = self.run_subcommand(remainder)
except BrokenPipeError:
return _SIGPIPE_EXIT
except KeyboardInterrupt:
return _SIGINT_EXIT
return result
# FIXME(dhellmann): Consider moving these command handling methods
# to a separate class.
def initialize_app(self, argv):
"""Hook for subclasses to take global initialization action
after the arguments are parsed but before a command is run.
Invoked only once, even in interactive mode.
:param argv: List of arguments, including the subcommand to run.
Empty for interactive mode.
"""
return
def prepare_to_run_command(self, cmd):
"""Perform any preliminary work needed to run a command.
:param cmd: command processor being invoked
:paramtype cmd: cliff.command.Command
"""
return
def clean_up(self, cmd, result, err):
"""Hook run after a command is done to shutdown the app.
:param cmd: command processor being invoked
:paramtype cmd: cliff.command.Command
:param result: return value of cmd
:paramtype result: int
:param err: exception or None
:paramtype err: Exception
"""
return
def interact(self):
# Defer importing .interactive as cmd2 is a slow import
from .interactive import InteractiveApp
if self.interactive_app_factory is None:
self.interactive_app_factory = InteractiveApp
self.interpreter = self.interactive_app_factory(
self,
self.command_manager,
self.stdin,
self.stdout,
)
return self.interpreter.cmdloop()
def get_fuzzy_matches(self, cmd):
"""return fuzzy matches of unknown command"""
sep = '_'
if self.command_manager.convert_underscores:
sep = ' '
all_cmds = [k[0] for k in self.command_manager]
dist = []
for candidate in sorted(all_cmds):
prefix = candidate.split(sep)[0]
# Give prefix match a very good score
if candidate.startswith(cmd):
dist.append((0, candidate))
continue
# Levenshtein distance
dist.append(
(
utils.damerau_levenshtein(cmd, prefix, utils.COST) + 1,
candidate,
)
)
matches = []
match_distance = 0
for distance, candidate in sorted(dist):
if distance > match_distance:
if match_distance:
# we copied all items with minimum distance, we are done
break
# we copied all items with distance=0,
# now we match all candidates at the minimum distance
match_distance = distance
matches.append(candidate)
return matches
def run_subcommand(self, argv):
try:
subcommand = self.command_manager.find_command(argv)
except ValueError as err:
# If there was no exact match, try to find a fuzzy match
the_cmd = argv[0]
fuzzy_matches = self.get_fuzzy_matches(the_cmd)
if fuzzy_matches:
article = 'a'
if self.NAME[0] in 'aeiou':
article = 'an'
self.stdout.write(
'%s: \'%s\' is not %s %s command. '
'See \'%s --help\'.\n'
% (
self.NAME,
' '.join(argv),
article,
self.NAME,
self.NAME,
)
)
self.stdout.write('Did you mean one of these?\n')
for match in fuzzy_matches:
self.stdout.write(' %s\n' % match)
else:
if self.options.debug:
raise
else:
self.LOG.error(err)
return 2
cmd_factory, cmd_name, sub_argv = subcommand
kwargs = {}
if 'cmd_name' in inspect.getfullargspec(cmd_factory.__init__).args:
kwargs['cmd_name'] = cmd_name
cmd = cmd_factory(self, self.options, **kwargs)
result = 1
err = None
try:
self.prepare_to_run_command(cmd)
full_name = (
cmd_name
if self.interactive_mode
else ' '.join([self.NAME, cmd_name])
)
cmd_parser = cmd.get_parser(full_name)
try:
parsed_args = cmd_parser.parse_args(sub_argv)
except SystemExit as ex:
if self.interactive_mode:
# Defer importing cmd2 as it is a slow import
import cmd2
raise cmd2.exceptions.Cmd2ArgparseError from ex
else:
raise ex
result = cmd.run(parsed_args)
except BrokenPipeError as err1:
result = _SIGPIPE_EXIT
err = err1
raise
except help.HelpExit:
result = 0
except Exception as err1:
err = err1
if self.options.debug:
self.LOG.exception(err)
else:
self.LOG.error(err)
except KeyboardInterrupt as err1:
result = _SIGINT_EXIT
err = err1
raise
finally:
try:
self.clean_up(cmd, result, err)
except Exception as err2:
if self.options.debug:
self.LOG.exception(err2)
else:
self.LOG.error('Could not clean up: %s', err2)
del err
return result