Merge "Fix safe path checks"

This commit is contained in:
Zuul 2018-03-12 17:04:57 +00:00 committed by Gerrit Code Review
commit 035e034233
10 changed files with 57 additions and 47 deletions

View File

@ -20,14 +20,13 @@ assemble = paths._import_ansible_action_plugin("assemble")
class ActionModule(assemble.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,14 +20,13 @@ copy = paths._import_ansible_action_plugin("copy")
class ActionModule(copy.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and source and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,14 +20,21 @@ include_vars = paths._import_ansible_action_plugin("include_vars")
class ActionModule(include_vars.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source_dir = self._task.args.get('dir', None)
source_file = self._task.args.get('file', None)
for fileloc in (source_dir, source_file):
if fileloc and not paths._is_safe_path(fileloc):
return paths._fail_dict(fileloc)
# This is the handling for source_dir. The source_file is handled by
# the _find_needle override.
if source_dir:
self._set_args()
self._set_root_dir()
if not paths._is_safe_path(self.source_dir):
return paths._fail_dict(self.source_dir)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,13 +20,12 @@ patch = paths._import_ansible_action_plugin("patch")
class ActionModule(patch.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,17 +20,13 @@ script = paths._import_ansible_action_plugin("script")
class ActionModule(script.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
# the script name is the first item in the raw params, so we split it
# out now so we know the file name we need to transfer to the remote,
# and everything else is an argument to the script which we need later
# to append to the remote command
parts = self._task.args.get('_raw_params', '').strip().split()
source = parts[0]
if not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,12 +20,12 @@ template = paths._import_ansible_action_plugin("template")
class ActionModule(template.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
if not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,13 +20,12 @@ unarchive = paths._import_ansible_action_plugin("unarchive")
class ActionModule(unarchive.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,13 +20,12 @@ win_copy = paths._import_ansible_action_plugin("win_copy")
class ActionModule(win_copy.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -20,13 +20,12 @@ win_template = paths._import_ansible_action_plugin("win_template")
class ActionModule(win_template.ActionModule):
def _find_needle(self, dirname, needle):
return paths._safe_find_needle(
super(ActionModule, self), dirname, needle)
def run(self, tmp=None, task_vars=None):
if not paths._is_official_module(self):
return paths._fail_module_dict(self._task.action)
source = self._task.args.get('src', None)
remote_src = self._task.args.get('remote_src', False)
if not remote_src and not paths._is_safe_path(source):
return paths._fail_dict(source)
return super(ActionModule, self).run(tmp, task_vars)

View File

@ -22,8 +22,21 @@ import ansible.plugins.action
import ansible.plugins.lookup
def _safe_find_needle(super, dirname, needle):
result = super._find_needle(dirname, needle)
if not _is_safe_path(result):
fail_dict = _fail_dict(_full_path(result))
raise AnsibleError("{msg}. Invalid path: {path}".format(
msg=fail_dict['msg'], path=fail_dict['path']))
return result
def _full_path(path):
return os.path.realpath(os.path.abspath(os.path.expanduser(path)))
def _is_safe_path(path):
full_path = os.path.realpath(os.path.abspath(os.path.expanduser(path)))
full_path = _full_path(path)
if not full_path.startswith(os.path.abspath(os.path.expanduser('~'))):
return False
return True