diff --git a/oslo_rootwrap/client.py b/oslo_rootwrap/client.py index b19d97f..ecf730f 100644 --- a/oslo_rootwrap/client.py +++ b/oslo_rootwrap/client.py @@ -47,11 +47,16 @@ class Client(object): def __init__(self, rootwrap_daemon_cmd): self._start_command = rootwrap_daemon_cmd self._initialized = False + self._need_restart = False self._mutex = threading.Lock() self._manager = None self._proxy = None self._process = None self._finalize = None + # This is for eventlet compatibility. multiprocessing stores + # daemon connection in ForkAwareLocal, so this won't be + # needed with the threading module. + self._exec_sem = threading.Lock() def _initialize(self): if self._process is not None and self._process.poll() is not None: @@ -119,20 +124,40 @@ class Client(object): self._proxy = None self._initialized = False self._initialize() + self._need_restart = False return self._proxy - def execute(self, cmd, stdin=None): - self._ensure_initialized() - proxy = self._proxy - retry = False + def _run_one_command(self, proxy, cmd, stdin): + """Wrap proxy.run_one_command, setting _need_restart on an exception. + + Usually it should be enough to drain stale data on socket + rather than to restart, but we cannot do draining easily. + """ try: + _need_restart = True res = proxy.run_one_command(cmd, stdin) - except (EOFError, IOError): - retry = True - # res can be None if we received final None sent by dying server thread - # instead of response to our request. Process is most likely to be dead - # at this point. - if retry or res is None: - proxy = self._restart(proxy) - res = proxy.run_one_command(cmd, stdin) + _need_restart = False + return res + finally: + if _need_restart: + self._need_restart = True + + def execute(self, cmd, stdin=None): + with self._exec_sem: + self._ensure_initialized() + proxy = self._proxy + retry = False + if self._need_restart: + proxy = self._restart(proxy) + try: + res = self._run_one_command(proxy, cmd, stdin) + except (EOFError, IOError): + retry = True + # res can be None if we received final None sent by dying + # server thread instead of response to our + # request. Process is most likely to be dead at this + # point. + if retry or res is None: + proxy = self._restart(proxy) + res = self._run_one_command(proxy, cmd, stdin) return res diff --git a/oslo_rootwrap/tests/test_functional_eventlet.py b/oslo_rootwrap/tests/test_functional_eventlet.py index eafef8e..1fe6337 100644 --- a/oslo_rootwrap/tests/test_functional_eventlet.py +++ b/oslo_rootwrap/tests/test_functional_eventlet.py @@ -25,3 +25,35 @@ if os.environ.get('TEST_EVENTLET', False): def assert_unpatched(self): # This test case is specifically for eventlet testing pass + + def _thread_worker(self, seconds, msg): + code, out, err = self.execute( + ['sh', '-c', 'sleep %d; echo %s' % (seconds, msg)]) + # Ignore trailing newline + self.assertEqual(msg, out.rstrip()) + + def _thread_worker_timeout(self, seconds, msg, timeout): + with eventlet.Timeout(timeout): + try: + self._thread_worker(seconds, msg) + except eventlet.Timeout: + pass + + def test_eventlet_threads(self): + """Check eventlet compatibility. + + The multiprocessing module is not eventlet friendly and + must be protected against eventlet thread switching and its + timeout exceptions. + """ + th = [] + # 10 was not enough for some reason. + for i in range(15): + th.append( + eventlet.spawn(self._thread_worker, i % 3, 'abc%d' % i)) + for i in [5, 17, 20, 25]: + th.append( + eventlet.spawn(self._thread_worker_timeout, 2, + 'timeout%d' % i, i)) + for thread in th: + thread.wait()