# Copyright (C) 2011-2014 OpenStack Foundation # Copyright (C) 2017 Red Hat # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # # See the License for the specific language governing permissions and # limitations under the License. import abc import collections import inspect import importlib import logging import math import os from nodepool import zk from nodepool import exceptions class Drivers: """The Drivers plugin interface""" log = logging.getLogger("nodepool.driver.Drivers") drivers = {} drivers_paths = None @staticmethod def _load_class(driver_name, path, parent_class): """Return a driver class that implements the parent_class""" spec = importlib.util.spec_from_file_location(driver_name, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) obj = inspect.getmembers( module, lambda x: inspect.isclass(x) and issubclass(x, parent_class) and x.__module__ == driver_name) error = None if len(obj) > 1: error = "multiple %s implementation" % parent_class if not obj: error = "no %s implementation found" % parent_class if error: Drivers.log.error("%s: %s", path, error) return False return obj[0][1] @staticmethod def load(drivers_paths=[]): """Load drivers""" if drivers_paths == Drivers.drivers_paths: # Already loaded return Drivers.drivers.clear() for drivers_path in drivers_paths + [os.path.dirname(__file__)]: drivers = os.listdir(drivers_path) for driver in drivers: driver_path = os.path.join(drivers_path, driver) if driver in Drivers.drivers: Drivers.log.warning("%s: duplicate driver", driver_path) continue if not os.path.isdir(driver_path) or \ "__init__.py" not in os.listdir(driver_path): continue driver_obj = Drivers._load_class( driver, os.path.join(driver_path, "__init__.py"), Driver) if not driver_obj: Drivers.log.error( "%s: skipping incorrect driver from __init__.py", driver_path) continue Drivers.drivers[driver] = driver_obj() Drivers.drivers_paths = drivers_paths @staticmethod def get(name): if not Drivers.drivers: Drivers.load() try: return Drivers.drivers[name] except KeyError: raise RuntimeError("%s: unknown driver" % name) class Driver(object, metaclass=abc.ABCMeta): """The Driver interface This is the main entrypoint for a Driver. A single instance of this will be created for each driver in the system and will persist for the lifetime of the process. The class or instance attribute **name** must be provided as a string. """ def reset(self): ''' Called before loading configuration to reset any global state ''' pass @abc.abstractmethod def getProviderConfig(self, provider): """Return a ProviderConfig instance :arg dict provider: The parsed provider configuration """ pass @abc.abstractmethod def getProvider(self, provider_config, use_taskmanager): """Return a Provider instance :arg dict provider_config: A ProviderConfig instance :arg bool use_taskmanager: Whether this provider should use a task manager (i.e., perform synchronous or asynchronous operations). """ pass class ProviderNotifications(object): """ Notification interface for :class:`.Provider` objects. This groups all notification messages bound for the Provider. The Provider class inherits from this by default. A Provider overrides the methods here if they want to handle the notification. """ def nodeDeletedNotification(self, node): """ Called after the ZooKeeper object for a node is deleted. :param Node node: Object describing the node just deleted. """ pass class Provider(ProviderNotifications, metaclass=abc.ABCMeta): """The Provider interface Drivers implement this interface to supply Providers. Each "provider" in the nodepool configuration corresponds to an instance of a class which implements this interface. If the configuration is changed, old provider instances will be stopped and new ones created as necessary. The class or instance attribute **name** must be provided as a string. """ @abc.abstractmethod def start(self, zk_conn): """Start this provider :param ZooKeeper zk_conn: A ZooKeeper connection object. This is called after each configuration change to allow the driver to perform initialization tasks and start background threads. The ZooKeeper connection object is provided if the Provider needs to interact with it. """ pass @abc.abstractmethod def stop(self): """Stop this provider Before shutdown or reconfiguration, this is called to signal to the driver that it will no longer be used. It should not begin any new tasks, but may allow currently running tasks to continue. """ pass @abc.abstractmethod def join(self): """Wait for provider to finish On shutdown, this is called after :py:meth:`~nodepool.driver.Provider.stop` and should return when the provider has completed all tasks. This may not be called on reconfiguration (so drivers should not rely on this always being called after stop). """ pass @abc.abstractmethod def labelReady(self, name): """Determine if a label is ready in this provider If the pre-requisites for this label are ready, return true. For example, if the label requires an image that is not present, this should return False. This method should not examine inventory or quota. In other words, it should return True if a request for the label would be expected to succeed with no resource contention, but False if is not possible to satisfy a request for the label. :param str name: The name of the label :returns: True if the label is ready in this provider, False otherwise. """ pass @abc.abstractmethod def cleanupNode(self, node_id): """Cleanup a node after use The driver may delete the node or return it to the pool. This may be called after the node was used, or as part of cleanup from an aborted launch attempt. :param str node_id: The id of the node """ pass @abc.abstractmethod def waitForNodeCleanup(self, node_id): """Wait for a node to be cleaned up When called, this will be called after :py:meth:`~nodepool.driver.Provider.cleanupNode`. This method should return after the node has been deleted or returned to the pool. :param str node_id: The id of the node """ pass @abc.abstractmethod def cleanupLeakedResources(self): """Clean up any leaked resources This is called periodically to give the provider a chance to clean up any resources which make have leaked. """ pass @abc.abstractmethod def getRequestHandler(self, poolworker, request): """Return a NodeRequestHandler for the supplied request """ pass @abc.abstractmethod def listNodes(self): # TODO: This is used by the launcher to find leaked instances # to delete (see _cleanupLeakedInstances). It assumes server # metadata. Instead, this should be folded into # cleanupLeakedResources so that drivers can figure out how to # determine their own leaked instances. pass class LabelRecorder(object): def __init__(self): self.data = [] def add(self, label, node_id): self.data.append({'label': label, 'node_id': node_id}) def labels(self): ''' Return labels in the order they were added. ''' labels = [] for d in self.data: labels.append(d['label']) return labels def pop(self, label): ''' Return the node ID for the (first found) requested label and remove it. ''' for d in self.data: if d['label'] == label: node_id = d['node_id'] break if not node_id: return None self.data.remove({'label': label, 'node_id': node_id}) return node_id def removeNode(self, id): ''' Remove the node with the specified ID. ''' for d in self.data: if d['node_id'] == id: self.data.remove(d) return class NodeRequestHandlerNotifications(object): """ Notification interface for :class:`.NodeRequestHandler` objects. This groups all notification messages bound for the NodeRequestHandler. The NodeRequestHandler class inherits from this by default. A request handler overrides the methods here if they want to handle the notification. """ def nodeReusedNotification(self, node): ''' Handler may implement this to be notified when a node is re-used. The OpenStack handler uses this to set the choozen_az. ''' pass class NodeRequestHandler(NodeRequestHandlerNotifications, metaclass=abc.ABCMeta): ''' Class to process a single nodeset request. The PoolWorker thread will instantiate a class of this type for each node request that it pulls from ZooKeeper. Subclasses are required to implement the launch method. ''' def __init__(self, pw, request): ''' :param PoolWorker pw: The parent PoolWorker object. :param NodeRequest request: The request to handle. ''' self.pw = pw self.request = request self.nodeset = [] self.done = False self.paused = False self.launcher_id = self.pw.launcher_id self._satisfied_types = LabelRecorder() self._failed_nodes = [] self._ready_nodes = [] def _setFromPoolWorker(self): ''' Set values that we pull from the parent PoolWorker. We don't do this in __init__ because this class is re-entrant and we want the updated values. ''' self.provider = self.pw.getProviderConfig() self.pool = self.pw.getPoolConfig() self.zk = self.pw.getZK() self.manager = self.pw.getProviderManager() @property def failed_nodes(self): return self._failed_nodes @property def ready_nodes(self): return self._ready_nodes def _invalidNodeTypes(self): ''' Return any node types that are invalid for this provider. :returns: A list of node type names that are invalid, or an empty list if all are valid. ''' invalid = [] valid = self.provider.getSupportedLabels(self.pool.name) for ntype in self.request.node_types: if ntype not in valid: invalid.append(ntype) return invalid def _waitForNodeSet(self): ''' Fill node set for the request. Obtain nodes for the request, pausing all new request handling for this provider until the node set can be filled. note:: This code is a bit racey in its calculation of the number of nodes in use for quota purposes. It is possible for multiple launchers to be doing this calculation at the same time. Since we currently have no locking mechanism around the "in use" calculation, if we are at the edge of the quota, one of the launchers could attempt to launch a new node after the other launcher has already started doing so. This would cause an expected failure from the underlying library, which is ok for now. ''' # Since this code can be called more than once for the same request, # we need to calculate the difference between our current node set # and what was requested. We cannot use set operations here since a # node type can appear more than once in the requested types. saved_types = collections.Counter(self._satisfied_types.labels()) requested_types = collections.Counter(self.request.node_types) diff = requested_types - saved_types needed_types = list(diff.elements()) ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) for ntype in needed_types: # First try to grab from the list of already available nodes. got_a_node = False if self.request.reuse and ntype in ready_nodes: for node in ready_nodes[ntype]: # Only interested in nodes from this provider and pool if node.provider != self.provider.name: continue if node.pool != self.pool.name: continue # Check this driver reuse requirements if not self.checkReusableNode(node): continue try: self.zk.lockNode(node, blocking=False) except exceptions.ZKLockException: # It's already locked so skip it. continue else: if self.paused: self.log.debug("Unpaused request %s", self.request) self.paused = False self.log.debug( "Locked existing node %s for request %s", node.id, self.request.id) got_a_node = True node.allocated_to = self.request.id self.zk.storeNode(node) self.nodeset.append(node) self._satisfied_types.add(ntype, node.id) # Notify driver handler about node re-use self.nodeReusedNotification(node) break # Could not grab an existing node, so launch a new one. if not got_a_node: # If we calculate that we're at capacity, pause until nodes # are released by Zuul and removed by the DeletedNodeWorker. if not self.hasRemainingQuota(ntype): self.log.info( "Not enough quota remaining to satisfy request %s", self.request.id) if not self.paused: self.log.debug( "Pausing request handling to satisfy request %s", self.request.id) self.paused = True self.zk.deleteOldestUnusedNode(self.provider.name, self.pool.name) return if self.paused: self.log.debug("Unpaused request %s", self.request) self.paused = False node = zk.Node() node.state = zk.INIT node.type = ntype node.provider = self.provider.name node.pool = self.pool.name node.launcher = self.launcher_id node.allocated_to = self.request.id self.setNodeMetadata(node) # Note: It should be safe (i.e., no race) to lock the node # *after* it is stored since nodes in INIT state are not # locked anywhere. self.zk.storeNode(node) self.zk.lockNode(node, blocking=False) self.log.debug("Locked building node %s for request %s", node.id, self.request.id) # Set state AFTER lock so that it isn't accidentally cleaned # up (unlocked BUILDING nodes will be deleted). node.state = zk.BUILDING self.zk.storeNode(node) self.nodeset.append(node) self._satisfied_types.add(ntype, node.id) self.launch(node) def _runHandler(self): ''' Main body for the node request handling. ''' self._setFromPoolWorker() if self.provider is None or self.pool is None: # If the config changed out from underneath us, we could now be # an invalid provider and should stop handling this request. raise Exception("Provider configuration missing") # We have the launcher_id attr after _setFromPoolWorker() is called. self.log = logging.getLogger( "nodepool.driver.NodeRequestHandler[%s]" % self.launcher_id) declined_reasons = [] invalid_types = self._invalidNodeTypes() if self.pool.max_servers <= 0: declined_reasons.append('pool is disabled by max_servers') elif invalid_types: declined_reasons.append('node type(s) [%s] not available' % ','.join(invalid_types)) elif not self.imagesAvailable(): declined_reasons.append('images are not available') elif not self.hasProviderQuota(self.request.node_types): declined_reasons.append('it would exceed quota') # TODO(tobiash): Maybe also calculate the quota prediction here and # backoff for some seconds if the used quota would be exceeded? # This way we could give another (free) provider the chance to take # this request earlier. # For min-ready requests, which do not re-use READY nodes, let's # decline if this provider is already at capacity. Otherwise, we # could end up wedged until another request frees up a node. if self.pool.max_servers is not None and \ self.request.requestor == "NodePool:min-ready": current_count = self.zk.countPoolNodes(self.provider.name, self.pool.name) # Use >= because dynamic config changes to max-servers can leave # us with more than max-servers. # TODO: handle this with the quota code if current_count >= self.pool.max_servers: declined_reasons.append("provider cannot satisfy min-ready") if declined_reasons: self.log.debug("Declining node request %s because %s", self.request.id, ', '.join(declined_reasons)) self.decline_request() self._declinedHandlerCleanup() return if self.paused: self.log.debug("Retrying node request %s", self.request.id) else: self.log.debug("Accepting node request %s", self.request.id) self.request.state = zk.PENDING self.zk.storeNodeRequest(self.request) self._waitForNodeSet() def _declinedHandlerCleanup(self): """ After declining a request, do necessary cleanup actions. """ self.unlockNodeSet(clear_allocation=True) # If conditions have changed for a paused request to now cause us # to decline it, we need to unpause so we don't keep trying it if self.paused: self.paused = False try: self.zk.storeNodeRequest(self.request) self.zk.unlockNodeRequest(self.request) except Exception: # If the request is gone for some reason, we need to make # sure that self.done still gets set. self.log.exception("Unable to modify missing request %s", self.request.id) self.done = True # --------------------------------------------------------------- # Public methods # --------------------------------------------------------------- def unlockNodeSet(self, clear_allocation=False): ''' Attempt unlocking all Nodes in the node set. :param bool clear_allocation: If true, clears the node allocated_to attribute. ''' for node in self.nodeset: if not node.lock: continue if clear_allocation: node.allocated_to = None self.zk.storeNode(node) try: self.zk.unlockNode(node) except Exception: self.log.exception("Error unlocking node:") self.log.debug("Unlocked node %s for request %s", node.id, self.request.id) self.nodeset = [] def decline_request(self): # Technically, this check to see if we've already declined it should # not be necessary. But if there is a bug (and there has been), we # want to make sure we don't continuously grow this array. if self.launcher_id not in self.request.declined_by: self.request.declined_by.append(self.launcher_id) launchers = set([x.id for x in self.zk.getRegisteredLaunchers()]) if launchers.issubset(set(self.request.declined_by)): # All launchers have declined it self.log.debug("Failing declined node request %s", self.request.id) self.request.state = zk.FAILED else: self.request.state = zk.REQUESTED def run(self): ''' Execute node request handling. This code is designed to be re-entrant. Because we can't always satisfy a request immediately (due to lack of provider resources), we need to be able to call run() repeatedly until the request can be fulfilled. The node set is saved and added to between calls. ''' try: self._runHandler() except Exception: self.log.exception( "Declining node request %s due to exception in " "NodeRequestHandler:", self.request.id) self.decline_request() self._declinedHandlerCleanup() def poll(self): if self.paused: return False if self.done: return True # Driver must implement this call if not self.launchesComplete(): return False # Launches are complete, so populate ready_nodes and failed_nodes. aborted_nodes = [] for node in self.nodeset.copy(): if node.state == zk.READY: self.ready_nodes.append(node) elif node.state == zk.ABORTED: # ABORTED is a transient error triggered by overquota. In order # to handle this gracefully don't count this as failed so the # node is relaunched within this provider. Unlock the node so # the DeletedNodeWorker cleans up the zNode. aborted_nodes.append(node) self.nodeset.remove(node) self.zk.unlockNode(node) else: self.failed_nodes.append(node) # If the request has been pulled, unallocate the node set so other # requests can use them. if not self.zk.getNodeRequest(self.request.id): self.log.info("Node request %s disappeared", self.request.id) for node in self.nodeset: node.allocated_to = None self.zk.storeNode(node) self.unlockNodeSet() try: self.zk.unlockNodeRequest(self.request) except exceptions.ZKLockException: # If the lock object is invalid that is "ok" since we no # longer have a request either. Just do our best, log and # move on. self.log.debug("Request lock invalid for node request %s " "when attempting to clean up the lock", self.request.id) return True if self.failed_nodes: self.log.debug("Declining node request %s because nodes failed", self.request.id) self.decline_request() elif aborted_nodes: # Because nodes are added to the satisfied types list before they # are ready we need to remove the aborted nodes again so they can # be created again. for node in aborted_nodes: self._satisfied_types.removeNode(node.id) self.paused = True return False else: # The assigned nodes must be added to the request in the order # in which they were requested. for requested_type in self.request.node_types: node_id = self._satisfied_types.pop(requested_type) self.request.nodes.append(node_id) self.log.debug("Fulfilled node request %s", self.request.id) self.request.state = zk.FULFILLED self.unlockNodeSet() self.zk.storeNodeRequest(self.request) self.zk.unlockNodeRequest(self.request) return True # --------------------------------------------------------------- # Driver Implementation # --------------------------------------------------------------- def hasProviderQuota(self, node_types): ''' Checks if a provider has enough quota to handle a list of nodes. This does not take our currently existing nodes into account. :param node_types: list of node types to check :return: True if the node list fits into the provider, False otherwise ''' return True def hasRemainingQuota(self, ntype): ''' Checks if the predicted quota is enough for an additional node of type ntype. :param ntype: node type for the quota check :return: True if there is enough quota, False otherwise ''' return True def checkReusableNode(self, node): ''' Handler may implement this to verify a node can be re-used. The OpenStack handler uses this to verify the node az is correct. ''' return True def setNodeMetadata(self, node): ''' Handler may implement this to store metadata before building the node. The OpenStack handler uses this to set az, cloud and region. ''' pass @property @abc.abstractmethod def alive_thread_count(self): ''' Return the number of active node launching threads in use by this request handler. This is used to limit request handling threads for a provider. This is an approximate, top-end number for alive threads, since some threads obviously may have finished by the time we finish the calculation. :returns: A count (integer) of active threads. ''' pass @abc.abstractmethod def imagesAvailable(self): ''' Handler needs to implement this to determines if the requested images in self.request.node_types are available for this provider. :returns: True if it is available, False otherwise. ''' pass @abc.abstractmethod def launch(self, node): ''' Handler needs to implement this to launch the node. ''' pass @abc.abstractmethod def launchesComplete(self): ''' Handler needs to implement this to check if all nodes in self.nodeset have completed the launch sequence.. This method will be called periodically to check on launch progress. :returns: True if all launches are complete (successfully or not), False otherwise. ''' pass class ConfigValue(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __eq__(self, other): pass def __ne__(self, other): return not self.__eq__(other) class ConfigPool(ConfigValue): def __init__(self): self.labels = {} self.max_servers = math.inf def __eq__(self, other): if isinstance(other, ConfigPool): return (self.labels == other.labels and self.max_servers == other.max_servers) return False class DriverConfig(ConfigValue): def __init__(self): self.name = None def __eq__(self, other): if isinstance(other, DriverConfig): return self.name == other.name return False class ProviderConfig(ConfigValue, metaclass=abc.ABCMeta): """The Provider config interface The class or instance attribute **name** must be provided as a string. """ def __init__(self, provider): self.name = provider['name'] self.provider = provider self.driver = DriverConfig() self.driver.name = provider.get('driver', 'openstack') self.max_concurrency = provider.get('max-concurrency', -1) def __eq__(self, other): if isinstance(other, ProviderConfig): return (self.name == other.name and self.provider == other.provider and self.driver == other.driver and self.max_concurrency == other.max_concurrency) return False def __repr__(self): return "" % self.name @property @abc.abstractmethod def pools(self): ''' Return a dict of ConfigPool-based objects, indexed by pool name. ''' pass @property @abc.abstractmethod def manage_images(self): ''' Return True if provider manages external images, False otherwise. ''' pass @abc.abstractmethod def load(self, newconfig): ''' Update this config object from the supplied parsed config ''' pass @abc.abstractmethod def getSchema(self): ''' Return a voluptuous schema for config validation ''' pass @abc.abstractmethod def getSupportedLabels(self, pool_name=None): ''' Return a set of label names supported by this provider. :param str pool_name: If provided, get labels for the given pool only. ''' pass