Share single ZooKeeper object among workers

Sharing a single ZooKeeper object among the various worker threads
effectively lets us use a single zookeeper client/connection. Since
the client itself uses 3 different threads for communication with
the zookeeper cluster, this should reduce the number of threads,
CPU load, and active connections.

Change-Id: I8f15e3e9c7de7c3e74f8e691241b041a6e11ec91
This commit is contained in:
David Shrewsbury 2016-12-15 09:54:12 -05:00
parent 1c43e54ce9
commit 5ba3663adb
3 changed files with 65 additions and 60 deletions

View File

@ -108,34 +108,28 @@ class DibImageFile(object):
class BaseWorker(threading.Thread):
def __init__(self, config_path, interval):
def __init__(self, config_path, interval, zk):
super(BaseWorker, self).__init__()
self.log = logging.getLogger("nodepool.builder.BaseWorker")
self.daemon = True
self._running = False
self._config = None
self._config_path = config_path
self._zk = None
self._zk = zk
self._hostname = socket.gethostname()
self._statsd = stats.get_client()
self._interval = interval
def _checkForZooKeeperChanges(self, new_config):
'''
Connect to ZooKeeper cluster.
Check config for ZooKeeper cluster changes.
Makes the initial connection to the ZooKeeper cluster. If the defined
set of ZooKeeper servers changes, the connection will be reestablished
using the new server set.
If the defined set of ZooKeeper servers changes, the connection
will use the new server set.
'''
if self._zk is None:
self.log.debug("Connecting to ZooKeeper servers")
self._zk = zk.ZooKeeper()
self._zk.connect(new_config.zookeeper_servers.values())
elif self._config.zookeeper_servers != new_config.zookeeper_servers:
if self._config.zookeeper_servers != new_config.zookeeper_servers:
self.log.debug("Detected ZooKeeper server changes")
self._zk.disconnect()
self._zk.connect(new_config.zookeeper_servers.values())
self._zk.resetHosts(new_config.zookeeper_servers.values())
@property
def running(self):
@ -151,8 +145,8 @@ class CleanupWorker(BaseWorker):
and any local DIB builds.
'''
def __init__(self, name, config_path, interval):
super(CleanupWorker, self).__init__(config_path, interval)
def __init__(self, name, config_path, interval, zk):
super(CleanupWorker, self).__init__(config_path, interval, zk)
self.log = logging.getLogger("nodepool.builder.CleanupWorker.%s" % name)
self.name = 'CleanupWorker.%s' % name
@ -486,9 +480,6 @@ class CleanupWorker(BaseWorker):
time.sleep(self._interval)
if self._zk:
self._zk.disconnect()
provider_manager.ProviderManager.stopProviders(self._config)
def _run(self):
@ -496,6 +487,9 @@ class CleanupWorker(BaseWorker):
Body of run method for exception handling purposes.
'''
new_config = nodepool_config.loadConfig(self._config_path)
if not self._config:
self._config = new_config
self._checkForZooKeeperChanges(new_config)
provider_manager.ProviderManager.reconfigure(self._config, new_config,
use_taskmanager=False)
@ -505,8 +499,8 @@ class CleanupWorker(BaseWorker):
class BuildWorker(BaseWorker):
def __init__(self, name, config_path, interval, dib_cmd):
super(BuildWorker, self).__init__(config_path, interval)
def __init__(self, name, config_path, interval, zk, dib_cmd):
super(BuildWorker, self).__init__(config_path, interval, zk)
self.log = logging.getLogger("nodepool.builder.BuildWorker.%s" % name)
self.name = 'BuildWorker.%s' % name
self.dib_cmd = dib_cmd
@ -783,15 +777,15 @@ class BuildWorker(BaseWorker):
time.sleep(self._interval)
if self._zk:
self._zk.disconnect()
def _run(self):
'''
Body of run method for exception handling purposes.
'''
# NOTE: For the first iteration, we expect self._config to be None
new_config = nodepool_config.loadConfig(self._config_path)
if not self._config:
self._config = new_config
self._checkForZooKeeperChanges(new_config)
self._config = new_config
@ -800,8 +794,8 @@ class BuildWorker(BaseWorker):
class UploadWorker(BaseWorker):
def __init__(self, name, config_path, interval):
super(UploadWorker, self).__init__(config_path, interval)
def __init__(self, name, config_path, interval, zk):
super(UploadWorker, self).__init__(config_path, interval, zk)
self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name)
self.name = 'UploadWorker.%s' % name
@ -997,9 +991,6 @@ class UploadWorker(BaseWorker):
time.sleep(self._interval)
if self._zk:
self._zk.disconnect()
provider_manager.ProviderManager.stopProviders(self._config)
def _run(self):
@ -1007,6 +998,9 @@ class UploadWorker(BaseWorker):
Body of run method for exception handling purposes.
'''
new_config = nodepool_config.loadConfig(self._config_path)
if not self._config:
self._config = new_config
self._checkForZooKeeperChanges(new_config)
provider_manager.ProviderManager.reconfigure(self._config, new_config,
use_taskmanager=False)
@ -1045,6 +1039,7 @@ class NodePoolBuilder(object):
self.build_interval = 10
self.upload_interval = 10
self.dib_cmd = 'disk-image-create'
self.zk = None
# This lock is needed because the run() method is started in a
# separate thread of control, which can return before the scheduler
@ -1083,22 +1078,27 @@ class NodePoolBuilder(object):
self._config = self._getAndValidateConfig()
self._running = True
# All worker threads share a single ZooKeeper instance/connection.
self.zk = zk.ZooKeeper()
self.zk.connect(self._config.zookeeper_servers.values())
self.log.debug('Starting listener for build jobs')
# Create build and upload worker objects
for i in range(self._num_builders):
w = BuildWorker(
i, self._config_path, self.build_interval, self.dib_cmd)
w = BuildWorker(i, self._config_path, self.build_interval,
self.zk, self.dib_cmd)
w.start()
self._build_workers.append(w)
for i in range(self._num_uploaders):
w = UploadWorker(i, self._config_path, self.upload_interval)
w = UploadWorker(i, self._config_path, self.upload_interval,
self.zk)
w.start()
self._upload_workers.append(w)
self._janitor = CleanupWorker(0, self._config_path,
self.cleanup_interval)
self.cleanup_interval, self.zk)
self._janitor.start()
# Wait until all threads are running. Otherwise, we have a race
@ -1138,6 +1138,9 @@ class NodePoolBuilder(object):
):
worker.join()
self.log.debug('Terminating ZooKeeper connection')
self.zk.disconnect()
self.log.debug('Stopping providers')
provider_manager.ProviderManager.stopProviders(self._config)
self.log.debug('Finished stopping')

View File

@ -143,39 +143,31 @@ class ChrootedKazooFixture(fixtures.Fixture):
for x in range(8))
rand_test_path = '%s_%s' % (random_bits, os.getpid())
self.chroot_path = "/nodepool_test/%s" % rand_test_path
self.zookeeper_chroot = "/nodepool_test/%s" % rand_test_path
# Ensure the chroot path exists and clean up an pre-existing znodes.
# Ensure the chroot path exists and clean up any pre-existing znodes.
_tmp_client = kazoo.client.KazooClient(
hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
_tmp_client.start()
if _tmp_client.exists(self.chroot_path):
_tmp_client.delete(self.chroot_path, recursive=True)
if _tmp_client.exists(self.zookeeper_chroot):
_tmp_client.delete(self.zookeeper_chroot, recursive=True)
_tmp_client.ensure_path(self.chroot_path)
_tmp_client.ensure_path(self.zookeeper_chroot)
_tmp_client.stop()
# Create a chroot'ed client
self.zkclient = kazoo.client.KazooClient(
hosts='%s:%s%s' % (self.zookeeper_host,
self.zookeeper_port,
self.chroot_path)
)
self.zkclient.start()
_tmp_client.close()
self.addCleanup(self._cleanup)
def _cleanup(self):
'''Stop the client and remove the chroot path.'''
self.zkclient.stop()
'''Remove the chroot path.'''
# Need a non-chroot'ed client to remove the chroot path
_tmp_client = kazoo.client.KazooClient(
hosts='%s:%s' % (self.zookeeper_host, self.zookeeper_port))
_tmp_client.start()
_tmp_client.delete(self.chroot_path, recursive=True)
_tmp_client.delete(self.zookeeper_chroot, recursive=True)
_tmp_client.stop()
_tmp_client.close()
class GearmanClient(gear.Client):
@ -580,9 +572,13 @@ class DBTestCase(BaseTestCase):
kz_fxtr = self.useFixture(ChrootedKazooFixture(
self.zookeeper_host,
self.zookeeper_port))
self.zkclient = kz_fxtr.zkclient
self.zk = zk.ZooKeeper(self.zkclient)
self.zookeeper_chroot = kz_fxtr.chroot_path
self.zookeeper_chroot = kz_fxtr.zookeeper_chroot
self.zk = zk.ZooKeeper()
host = zk.ZooKeeperConnectionConfig(
self.zookeeper_host, self.zookeeper_port, self.zookeeper_chroot
)
self.zk.connect([host])
self.addCleanup(self.zk.disconnect)
def printZKTree(self, node):
def join(a, b):

View File

@ -291,23 +291,17 @@ class ZooKeeper(object):
Most API calls reference an image name only, as the path for the znode
for that image is calculated automatically. And image names are assumed
to be unique.
If you will have multiple threads needing this API, each thread should
instantiate their own ZooKeeper object. It should not be shared.
'''
log = logging.getLogger("nodepool.zk.ZooKeeper")
IMAGE_ROOT = "/nodepool/images"
def __init__(self, client=None):
def __init__(self):
'''
Initialize the ZooKeeper object.
:param client: A pre-connected client. Optionally, you may choose
to use the connect() call.
'''
self.client = client
self.client = None
self._became_lost = False
#========================================================================
@ -468,6 +462,18 @@ class ZooKeeper(object):
self.client.close()
self.client = None
def resetHosts(self, host_list):
'''
Reset the ZooKeeper cluster connection host list.
:param list host_list: A list of
:py:class:`~nodepool.zk.ZooKeeperConnectionConfig` objects
(one per server) defining the ZooKeeper cluster servers.
'''
if self.client is not None:
hosts = buildZooKeeperHosts(host_list)
self.client.set_hosts(hosts=hosts)
@contextmanager
def imageBuildLock(self, image, blocking=True, timeout=None):
'''