# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Discover and lookup command plugins. """ import inspect import logging import stevedore LOG = logging.getLogger(__name__) def _get_commands_by_partial_name(args, commands): n = len(args) candidates = [] for command_name in commands: command_parts = command_name.split() if len(command_parts) != n: continue if all(command_parts[i].startswith(args[i]) for i in range(n)): candidates.append(command_name) return candidates class EntryPointWrapper(object): """Wrap up a command class already imported to make it look like a plugin.""" def __init__(self, name, command_class): self.name = name self.command_class = command_class def load(self, require=False): return self.command_class class CommandManager(object): """Discovers commands and handles lookup based on argv data. :param namespace: String containing the entrypoint namespace for the plugins to be loaded. For example, ``'cliff.formatter.list'``. :param convert_underscores: Whether cliff should convert underscores to spaces in entry_point commands. """ def __init__(self, namespace, convert_underscores=True): self.commands = {} self._legacy = {} self.namespace = namespace self.convert_underscores = convert_underscores self.group_list = [] self._load_commands() def _load_commands(self): # NOTE(jamielennox): kept for compatibility. if self.namespace: self.load_commands(self.namespace) def load_commands(self, namespace): """Load all the commands from an entrypoint""" self.group_list.append(namespace) for ep in stevedore.ExtensionManager(namespace): LOG.debug('found command %r', ep.name) cmd_name = ( ep.name.replace('_', ' ') if self.convert_underscores else ep.name ) self.commands[cmd_name] = ep.entry_point return def __iter__(self): return iter(self.commands.items()) def add_command(self, name, command_class): self.commands[name] = EntryPointWrapper(name, command_class) def add_legacy_command(self, old_name, new_name): """Map an old command name to the new name. :param old_name: The old command name. :type old_name: str :param new_name: The new command name. :type new_name: str """ self._legacy[old_name] = new_name def find_command(self, argv): """Given an argument list, find a command and return the processor and any remaining arguments. """ start = self._get_last_possible_command_index(argv) for i in range(start, 0, -1): name = ' '.join(argv[:i]) search_args = argv[i:] # The legacy command handling may modify name, so remember # the value we actually found in argv so we can return it. return_name = name # Convert the legacy command name to its new name. if name in self._legacy: name = self._legacy[name] found = None if name in self.commands: found = name else: candidates = _get_commands_by_partial_name( argv[:i], self.commands ) if len(candidates) == 1: found = candidates[0] if found: cmd_ep = self.commands[found] if hasattr(cmd_ep, 'resolve'): cmd_factory = cmd_ep.resolve() else: # NOTE(dhellmann): Some fake classes don't take # require as an argument. Yay? arg_spec = inspect.getfullargspec(cmd_ep.load) if 'require' in arg_spec[0]: cmd_factory = cmd_ep.load(require=False) else: cmd_factory = cmd_ep.load() return (cmd_factory, return_name, search_args) else: raise ValueError('Unknown command %r' % (argv,)) def _get_last_possible_command_index(self, argv): """Returns the index after the last argument in argv that can be a command word """ for i, arg in enumerate(argv): if arg.startswith('-'): return i return len(argv) def add_command_group(self, group=None): """Adds another group of command entrypoints""" if group: self.load_commands(group) def get_command_groups(self): """Returns a list of the loaded command groups""" return self.group_list def get_command_names(self, group=None): """Returns a list of commands loaded for the specified group""" group_list = [] if group is not None: for ep in stevedore.ExtensionManager(group): cmd_name = ( ep.name.replace('_', ' ') if self.convert_underscores else ep.name ) group_list.append(cmd_name) return group_list return list(self.commands.keys())