diff --git a/swift/common/utils.py b/swift/common/utils.py index 4c55299779..3add489a9d 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -48,9 +48,12 @@ import stat import datetime import eventlet +import eventlet.debug +import eventlet.greenthread import eventlet.semaphore from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading +from eventlet.hubs import trampoline import eventlet.queue import netifaces import codecs @@ -1850,17 +1853,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None, if udp_host: udp_port = int(conf.get('log_udp_port', logging.handlers.SYSLOG_UDP_PORT)) - handler = SysLogHandler(address=(udp_host, udp_port), - facility=facility) + handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port), + facility=facility) else: log_address = conf.get('log_address', '/dev/log') try: - handler = SysLogHandler(address=log_address, facility=facility) + handler = ThreadSafeSysLogHandler(address=log_address, + facility=facility) except socket.error as e: # Either /dev/log isn't a UNIX socket or it does not exist at all if e.errno not in [errno.ENOTSOCK, errno.ENOENT]: raise - handler = SysLogHandler(facility=facility) + handler = ThreadSafeSysLogHandler(facility=facility) handler.setFormatter(formatter) logger.addHandler(handler) get_logger.handler4logger[logger] = handler @@ -4221,3 +4225,123 @@ def md5_hash_for_file(fname): for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''): md5sum.update(block) return md5sum.hexdigest() + + +class PipeMutex(object): + """ + Mutex using a pipe. Works across both greenlets and real threads, even + at the same time. + """ + + def __init__(self): + self.rfd, self.wfd = os.pipe() + + # You can't create a pipe in non-blocking mode; you must set it + # later. + rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) + fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) + os.write(self.wfd, b'-') # start unlocked + + self.owner = None + self.recursion_depth = 0 + + # Usually, it's an error to have multiple greenthreads all waiting + # to read the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # file descriptor by design. This lets greenthreads in real thread A + # wait with greenthreads in real thread B for the same mutex. + # Therefore, we must turn off eventlet's multiple-reader detection. + # + # It would be better to turn off multiple-reader detection for only + # our calls to trampoline(), but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + """ + Acquire the mutex. + + If called with blocking=False, returns True if the mutex was + acquired and False if it wasn't. Otherwise, blocks until the mutex + is acquired and returns True. + + This lock is recursive; the same greenthread may acquire it as many + times as it wants to, though it must then release it that many times + too. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner == current_greenthread_id: + self.recursion_depth += 1 + return True + + while True: + try: + # If there is a byte available, this will read it and remove + # it from the pipe. If not, this will raise OSError with + # errno=EAGAIN. + os.read(self.rfd, 1) + self.owner = current_greenthread_id + return True + except OSError as err: + if err.errno != errno.EAGAIN: + raise + + if not blocking: + return False + + # Tell eventlet to suspend the current greenthread until + # self.rfd becomes readable. This will happen when someone + # else writes to self.wfd. + trampoline(self.rfd, read=True) + + def release(self): + """ + Release the mutex. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner != current_greenthread_id: + raise RuntimeError("cannot release un-acquired lock") + + if self.recursion_depth > 0: + self.recursion_depth -= 1 + return + + self.owner = None + os.write(self.wfd, b'X') + + def close(self): + """ + Close the mutex. This releases its file descriptors. + + You can't use a mutex after it's been closed. + """ + if self.wfd is not None: + os.close(self.rfd) + self.rfd = None + os.close(self.wfd) + self.wfd = None + self.owner = None + self.recursion_depth = 0 + + def __del__(self): + # We need this so we don't leak file descriptors. Otherwise, if you + # call get_logger() and don't explicitly dispose of it by calling + # logger.logger.handlers[0].lock.close() [1], the pipe file + # descriptors are leaked. + # + # This only really comes up in tests. Swift processes tend to call + # get_logger() once and then hang on to it until they exit, but the + # test suite calls get_logger() a lot. + # + # [1] and that's a completely ridiculous thing to expect callers to + # do, so nobody does it and that's okay. + self.close() + + +class ThreadSafeSysLogHandler(SysLogHandler): + def createLock(self): + self.lock = PipeMutex() diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 48525aded4..04e603f06d 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -21,7 +21,9 @@ import ctypes import contextlib import errno import eventlet +import eventlet.debug import eventlet.event +import eventlet.patcher import functools import grp import logging @@ -1454,7 +1456,7 @@ class TestUtils(unittest.TestCase): 'test1\ntest3\ntest4\ntest6\n') def test_get_logger_sysloghandler_plumbing(self): - orig_sysloghandler = utils.SysLogHandler + orig_sysloghandler = utils.ThreadSafeSysLogHandler syslog_handler_args = [] def syslog_handler_catcher(*args, **kwargs): @@ -1465,7 +1467,7 @@ class TestUtils(unittest.TestCase): syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3 try: - utils.SysLogHandler = syslog_handler_catcher + utils.ThreadSafeSysLogHandler = syslog_handler_catcher utils.get_logger({ 'log_facility': 'LOG_LOCAL3', }, 'server', log_route='server') @@ -1515,7 +1517,7 @@ class TestUtils(unittest.TestCase): 'facility': orig_sysloghandler.LOG_LOCAL0})], syslog_handler_args) finally: - utils.SysLogHandler = orig_sysloghandler + utils.ThreadSafeSysLogHandler = orig_sysloghandler @reset_logger_state def test_clean_logger_exception(self): @@ -6114,5 +6116,179 @@ class TestHashForFileFunction(unittest.TestCase): self.fail('Some data did not compute expected hash:\n' + '\n'.join(failures)) + +class TestPipeMutex(unittest.TestCase): + def setUp(self): + self.mutex = utils.PipeMutex() + + def tearDown(self): + self.mutex.close() + + def test_nonblocking(self): + evt_lock1 = eventlet.event.Event() + evt_lock2 = eventlet.event.Event() + evt_unlock = eventlet.event.Event() + + def get_the_lock(): + self.mutex.acquire() + evt_lock1.send('got the lock') + evt_lock2.wait() + self.mutex.release() + evt_unlock.send('released the lock') + + eventlet.spawn(get_the_lock) + evt_lock1.wait() # Now, the other greenthread has the lock. + + self.assertFalse(self.mutex.acquire(blocking=False)) + evt_lock2.send('please release the lock') + evt_unlock.wait() # The other greenthread has released the lock. + self.assertTrue(self.mutex.acquire(blocking=False)) + + def test_recursive(self): + self.assertTrue(self.mutex.acquire(blocking=False)) + self.assertTrue(self.mutex.acquire(blocking=False)) + + def try_acquire_lock(): + return self.mutex.acquire(blocking=False) + + self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) + self.mutex.release() + self.assertFalse(eventlet.spawn(try_acquire_lock).wait()) + self.mutex.release() + self.assertTrue(eventlet.spawn(try_acquire_lock).wait()) + + def test_release_without_acquire(self): + self.assertRaises(RuntimeError, self.mutex.release) + + def test_too_many_releases(self): + self.mutex.acquire() + self.mutex.release() + self.assertRaises(RuntimeError, self.mutex.release) + + def test_wrong_releaser(self): + self.mutex.acquire() + self.assertRaises(RuntimeError, + eventlet.spawn(self.mutex.release).wait) + + def test_blocking(self): + evt = eventlet.event.Event() + + sequence = [] + + def coro1(): + eventlet.sleep(0) # let coro2 go + + self.mutex.acquire() + sequence.append('coro1 acquire') + evt.send('go') + self.mutex.release() + sequence.append('coro1 release') + + def coro2(): + evt.wait() # wait for coro1 to start us + self.mutex.acquire() + sequence.append('coro2 acquire') + self.mutex.release() + sequence.append('coro2 release') + + c1 = eventlet.spawn(coro1) + c2 = eventlet.spawn(coro2) + + c1.wait() + c2.wait() + + self.assertEqual(sequence, [ + 'coro1 acquire', + 'coro1 release', + 'coro2 acquire', + 'coro2 release']) + + def test_blocking_tpool(self): + # Note: this test's success isn't a guarantee that the mutex is + # working. However, this test's failure means that the mutex is + # definitely broken. + sequence = [] + + def do_stuff(): + n = 10 + while n > 0: + self.mutex.acquire() + sequence.append("<") + eventlet.sleep(0.0001) + sequence.append(">") + self.mutex.release() + n -= 1 + + greenthread1 = eventlet.spawn(do_stuff) + greenthread2 = eventlet.spawn(do_stuff) + + real_thread1 = eventlet.patcher.original('threading').Thread( + target=do_stuff) + real_thread1.start() + + real_thread2 = eventlet.patcher.original('threading').Thread( + target=do_stuff) + real_thread2.start() + + greenthread1.wait() + greenthread2.wait() + real_thread1.join() + real_thread2.join() + + self.assertEqual(''.join(sequence), "<>" * 40) + + def test_blocking_preserves_ownership(self): + pthread1_event = eventlet.patcher.original('threading').Event() + pthread2_event1 = eventlet.patcher.original('threading').Event() + pthread2_event2 = eventlet.patcher.original('threading').Event() + thread_id = [] + owner = [] + + def pthread1(): + thread_id.append(id(eventlet.greenthread.getcurrent())) + self.mutex.acquire() + owner.append(self.mutex.owner) + pthread2_event1.set() + + orig_os_write = utils.os.write + + def patched_os_write(*a, **kw): + try: + return orig_os_write(*a, **kw) + finally: + pthread1_event.wait() + + with mock.patch.object(utils.os, 'write', patched_os_write): + self.mutex.release() + pthread2_event2.set() + + def pthread2(): + pthread2_event1.wait() # ensure pthread1 acquires lock first + thread_id.append(id(eventlet.greenthread.getcurrent())) + self.mutex.acquire() + pthread1_event.set() + pthread2_event2.wait() + owner.append(self.mutex.owner) + self.mutex.release() + + real_thread1 = eventlet.patcher.original('threading').Thread( + target=pthread1) + real_thread1.start() + + real_thread2 = eventlet.patcher.original('threading').Thread( + target=pthread2) + real_thread2.start() + + real_thread1.join() + real_thread2.join() + self.assertEqual(thread_id, owner) + self.assertIsNone(self.mutex.owner) + + @classmethod + def tearDownClass(cls): + # PipeMutex turns this off when you instantiate one + eventlet.debug.hub_prevent_multiple_readers(True) + + if __name__ == '__main__': unittest.main()