Avoid race condition using parallel fuel-devops env manipulations
If multiple processes are creating/erasing different fuel-devops
environments at the same time, there can be race condition during
libvirt object creation/deletion like:
'bridge virbr3 already exists' and others.
This patch allows to use a lock file to avoid such situation:
export DEVOPS_LOCK_FILE=/run/lock/devops_lock
If the environment variable DEVOPS_LOCK_FILE is not set, then
the lock file is not used (backward compatibility to avoid any
errors caused by using the lock file by default).
Change-Id: Id28f442959594aa5d6bb5c1d15e4f0880653070d
(cherry picked from commit cce44f4784
)
This commit is contained in:
parent
166f811d6b
commit
17b805acaf
|
@ -36,6 +36,7 @@ pip-delete-this-directory.txt
|
|||
htmlcov/
|
||||
.tox/
|
||||
.coverage
|
||||
cover/
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
|
|
|
@ -17,6 +17,12 @@ from __future__ import unicode_literals
|
|||
from functools import wraps
|
||||
from threading import Thread
|
||||
|
||||
import fasteners
|
||||
|
||||
from devops import error
|
||||
from devops import logger
|
||||
from devops import settings
|
||||
|
||||
|
||||
def threaded(name=None, started=False, daemon=False):
|
||||
"""Make function or method threaded with passing arguments
|
||||
|
@ -56,3 +62,41 @@ def threaded(name=None, started=False, daemon=False):
|
|||
return real_decorator(func)
|
||||
|
||||
return real_decorator
|
||||
|
||||
|
||||
def proc_lock(path=settings.DEVOPS_LOCK_FILE, timeout=300):
|
||||
"""Process lock based on fcntl.lockf
|
||||
|
||||
Avoid race condition between different processes which
|
||||
use fuel-devops at the same time during the resources
|
||||
creation/modification/erase.
|
||||
|
||||
:param path: str, path to the lock file
|
||||
:param timeout: int, timeout in second for waiting the lock file
|
||||
"""
|
||||
def real_decorator(func):
|
||||
@wraps(func)
|
||||
def wrapped(*args, **kwargs):
|
||||
acquired = False
|
||||
if path is not None:
|
||||
logger.debug('Acquiring lock file {0} for {1}'
|
||||
.format(path, func.__name__))
|
||||
lock = fasteners.InterProcessLock(path)
|
||||
acquired = lock.acquire(blocking=True,
|
||||
delay=5, timeout=timeout)
|
||||
logger.debug('Acquired the lock file {0} for {1}'
|
||||
.format(path, func.__name__))
|
||||
if not acquired:
|
||||
raise error.DevopsError(
|
||||
'Failed to aquire lock file in {0} sec'
|
||||
.format(timeout))
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
finally:
|
||||
if acquired:
|
||||
logger.debug('Releasing the lock file {0} for {1}'
|
||||
.format(path, func.__name__))
|
||||
lock.release()
|
||||
return result
|
||||
return wrapped
|
||||
return real_decorator
|
||||
|
|
|
@ -22,6 +22,7 @@ from paramiko import Agent
|
|||
from paramiko import RSAKey
|
||||
|
||||
from devops.error import DevopsEnvironmentError
|
||||
from devops.helpers import decorators
|
||||
from devops.helpers.helpers import get_file_size
|
||||
from devops.helpers.ssh_client import SSHAuth
|
||||
from devops.helpers.ssh_client import SSHClient
|
||||
|
@ -128,6 +129,7 @@ class Environment(DriverModel):
|
|||
def has_snapshot(self, name):
|
||||
return all(n.has_snapshot(name) for n in self.get_nodes())
|
||||
|
||||
@decorators.proc_lock()
|
||||
def define(self, skip=True):
|
||||
# 'skip' param is a temporary workaround.
|
||||
# It will be removed with introducing the new database schema
|
||||
|
@ -150,6 +152,7 @@ class Environment(DriverModel):
|
|||
for node in self.get_nodes():
|
||||
node.destroy(verbose=verbose)
|
||||
|
||||
@decorators.proc_lock()
|
||||
def erase(self):
|
||||
for node in self.get_nodes():
|
||||
node.erase()
|
||||
|
@ -173,6 +176,7 @@ class Environment(DriverModel):
|
|||
for node in self.get_nodes():
|
||||
node.resume(verbose)
|
||||
|
||||
@decorators.proc_lock()
|
||||
def snapshot(self, name=None, description=None, force=False):
|
||||
if name is None:
|
||||
name = str(int(time.time()))
|
||||
|
@ -180,6 +184,7 @@ class Environment(DriverModel):
|
|||
node.snapshot(name=name, description=description, force=force,
|
||||
external=settings.SNAPSHOTS_EXTERNAL)
|
||||
|
||||
@decorators.proc_lock()
|
||||
def revert(self, name=None, destroy=True, flag=True):
|
||||
if destroy:
|
||||
for node in self.get_nodes():
|
||||
|
@ -263,6 +268,7 @@ class Environment(DriverModel):
|
|||
return environment
|
||||
|
||||
@classmethod
|
||||
@decorators.proc_lock()
|
||||
def create_environment(cls, full_config):
|
||||
"""Create a new environment using full_config object
|
||||
|
||||
|
|
|
@ -52,6 +52,8 @@ DATABASES = {
|
|||
}
|
||||
}
|
||||
|
||||
DEVOPS_LOCK_FILE = os.environ.get('DEVOPS_LOCK_FILE', None)
|
||||
|
||||
KEYSTONE_CREDS = {'username': os.environ.get('KEYSTONE_USERNAME', 'admin'),
|
||||
'password': os.environ.get('KEYSTONE_PASSWORD', 'admin'),
|
||||
'tenant_name': os.environ.get('KEYSTONE_TENANT', 'admin')}
|
||||
|
|
|
@ -19,6 +19,8 @@ from unittest import TestCase
|
|||
|
||||
import mock
|
||||
|
||||
from devops import error
|
||||
from devops.helpers import decorators
|
||||
from devops.helpers.decorators import threaded
|
||||
|
||||
|
||||
|
@ -99,3 +101,75 @@ class ThreadedTest(TestCase):
|
|||
func_test(add=2, rlock=lock)
|
||||
with lock:
|
||||
self.assertEqual(data, [2])
|
||||
|
||||
|
||||
class TestProcLock(TestCase):
|
||||
|
||||
def patch(self, *args, **kwargs):
|
||||
patcher = mock.patch(*args, **kwargs)
|
||||
m = patcher.start()
|
||||
self.addCleanup(patcher.stop)
|
||||
return m
|
||||
|
||||
def setUp(self):
|
||||
self.sleep_mock = self.patch(
|
||||
'time.sleep')
|
||||
|
||||
def create_class_with_proc_lock(self, path, timeout):
|
||||
class MyClass(object):
|
||||
def __init__(self, method):
|
||||
self.m = method
|
||||
|
||||
@decorators.proc_lock(path=path, timeout=timeout)
|
||||
def method(self):
|
||||
return self.m()
|
||||
|
||||
return MyClass
|
||||
|
||||
@mock.patch('fasteners.InterProcessLock.acquire')
|
||||
@mock.patch('fasteners.InterProcessLock.release')
|
||||
def test_default_no_proc_lock(self, release, acquire):
|
||||
method_mock = mock.Mock()
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
MyClass = self.create_class_with_proc_lock(None, 10)
|
||||
c = MyClass(method_mock)
|
||||
|
||||
c.method()
|
||||
|
||||
acquire.assert_not_called()
|
||||
method_mock.assert_called_once()
|
||||
release.assert_not_called()
|
||||
|
||||
@mock.patch('fasteners.InterProcessLock.acquire')
|
||||
@mock.patch('fasteners.InterProcessLock.release')
|
||||
def test_passed_proc_lock(self, release, acquire):
|
||||
acquire.return_value = True
|
||||
method_mock = mock.Mock()
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
MyClass = self.create_class_with_proc_lock('/run/lock/devops_lock', 20)
|
||||
c = MyClass(method_mock)
|
||||
|
||||
c.method()
|
||||
|
||||
acquire.assert_called_once()
|
||||
method_mock.assert_called_once()
|
||||
release.assert_called_once()
|
||||
|
||||
@mock.patch('fasteners.InterProcessLock.acquire')
|
||||
@mock.patch('fasteners.InterProcessLock.release')
|
||||
def test_acquire_timeout(self, release, acquire):
|
||||
acquire.return_value = False
|
||||
method_mock = mock.Mock()
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
MyClass = self.create_class_with_proc_lock('/run/lock/devops_lock', 30)
|
||||
c = MyClass(method_mock)
|
||||
|
||||
with self.assertRaises(error.DevopsError):
|
||||
c.method()
|
||||
|
||||
acquire.assert_called_once()
|
||||
method_mock.assert_not_called()
|
||||
release.assert_not_called()
|
||||
|
|
Loading…
Reference in New Issue