Ensure locks can not be created outside of the root file driver directory

Change-Id: I33870e03141b4a577415bbc7d625237b1d86b513
This commit is contained in:
Joshua Harlow 2015-06-04 09:08:28 -07:00 committed by Joshua Harlow
parent c727093e01
commit e44b35dc48
3 changed files with 40 additions and 2 deletions

View File

@ -15,7 +15,6 @@
# under the License.
import logging
import os
import threading
import fasteners
@ -24,6 +23,7 @@ from oslo_utils import timeutils
import tooz
from tooz import coordination
from tooz import locking
from tooz import utils
LOG = logging.getLogger(__name__)
@ -89,7 +89,7 @@ class FileDriver(coordination.CoordinationDriver):
self._lockdir = parsed_url.path
def get_lock(self, name):
path = os.path.abspath(os.path.join(self._lockdir, name.decode()))
path = utils.safe_abs_path(self._lockdir, name.decode())
return locking.SharedWeakLockHelper(self._lockdir, FileLock, path)
@staticmethod

View File

@ -15,6 +15,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
import six
from testtools import testcase
@ -22,6 +24,24 @@ from testtools import testcase
from tooz import utils
class TestUtilsSafePath(testcase.TestCase):
base = tempfile.gettempdir()
def test_join(self):
self.assertEqual(os.path.join(self.base, 'b'),
utils.safe_abs_path(self.base, "b"))
self.assertEqual(os.path.join(self.base, 'b', 'c'),
utils.safe_abs_path(self.base, "b", 'c'))
self.assertEqual(self.base,
utils.safe_abs_path(self.base, "b", 'c', '../..'))
def test_unsafe_join(self):
self.assertRaises(ValueError, utils.safe_abs_path,
self.base, "../b")
self.assertRaises(ValueError, utils.safe_abs_path,
self.base, "b", 'c', '../../../')
class TestUtilsCollapse(testcase.TestCase):
def test_bad_type(self):

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import msgpack
from oslo_serialization import msgpackutils
import six
@ -21,6 +23,22 @@ import six
from tooz import coordination
def safe_abs_path(rooted_at, *pieces):
# Avoids the following junk...
#
# >>> import os
# >>> os.path.join("/b", "..")
# '/b/..'
# >>> os.path.abspath(os.path.join("/b", ".."))
# '/'
path = os.path.abspath(os.path.join(rooted_at, *pieces))
if not path.startswith(rooted_at):
raise ValueError("Unable to create path that is outside of"
" parent directory '%s' using segments %s"
% (rooted_at, list(pieces)))
return path
def collapse(config, exclude=None, item_selector=None):
"""Collapses config with keys and **list/tuple** values.