diff --git a/.gitignore b/.gitignore index 172bf57..e77993f 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ .tox +.idea +iotronic.egg-info diff --git a/bin/iotronic-conductor b/bin/iotronic-conductor index cd2f571..bf7fcdd 100755 --- a/bin/iotronic-conductor +++ b/bin/iotronic-conductor @@ -1,8 +1,5 @@ #!/usr/bin/env python -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -19,24 +16,14 @@ Iotronic Conductor """ -import sys - from oslo_config import cfg -from iotronic.common import service as iotronic_service -from iotronic.openstack.common import service - +from iotronic.conductor.manager import ConductorManager CONF = cfg.CONF if __name__ == '__main__': - - iotronic_service.prepare_service(sys.argv) - mgr = iotronic_service.RPCService(CONF.host, - 'iotronic.conductor.manager', - 'ConductorManager') - - launcher = service.launch(mgr) - launcher.wait() + + cond=ConductorManager('iotronic') diff --git a/install.sh b/install.sh index 77f757b..2cad777 100755 --- a/install.sh +++ b/install.sh @@ -8,7 +8,7 @@ function build_install { rm -rf dist } case "$1" in - iotronic) + conductor) build_install systemctl restart httpd cp bin/iotronic-conductor /usr/bin/ @@ -27,6 +27,6 @@ case "$1" in ;; *) - echo $"Usage: $0 {iotronic|wamp-agent|all}" + echo $"Usage: $0 {conductor|wamp-agent|all}" exit 1 -esac \ No newline at end of file +esac diff --git a/iotronic/api/controllers/v1/node.py b/iotronic/api/controllers/v1/node.py index 0ee2e66..12f8114 100644 --- a/iotronic/api/controllers/v1/node.py +++ b/iotronic/api/controllers/v1/node.py @@ -100,7 +100,6 @@ class NodeCollection(collection.Collection): class NodesController(rest.RestController): - invalid_sort_key_list = ['properties'] def _get_nodes_collection(self, marker, limit, sort_key, sort_dir, @@ -188,11 +187,11 @@ class NodesController(rest.RestController): new_Node = objects.Node(pecan.request.context, **Node.as_dict()) - new_Node.create() new_Location = objects.Location(pecan.request.context, **Node.location[0].as_dict()) - new_Location.node_id = new_Node.id - new_Location.create() + + new_Node = pecan.request.rpcapi.create_node(pecan.request.context, + new_Node, new_Location) return Node.convert_with_locates(new_Node) diff --git a/iotronic/common/rpc.py b/iotronic/common/rpc.py index 3be54cb..7a7b8ac 100644 --- a/iotronic/common/rpc.py +++ b/iotronic/common/rpc.py @@ -35,7 +35,6 @@ __all__ = [ ] CONF = cfg.CONF -# print CONF.transport_url TRANSPORT = None NOTIFIER = None @@ -120,7 +119,6 @@ class RequestContextSerializer(messaging.Serializer): def get_transport_url(url_str=None): - # LOG.info('yoooooooooooo') return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES) diff --git a/iotronic/conductor/endpoints.py b/iotronic/conductor/endpoints.py new file mode 100644 index 0000000..fe22767 --- /dev/null +++ b/iotronic/conductor/endpoints.py @@ -0,0 +1,71 @@ +# coding=utf-8 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from iotronic.common import exception +from iotronic import objects +from iotronic.objects import base as objects_base +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + +serializer = objects_base.IotronicObjectSerializer() + + +class ConductorEndpoint(object): + def echo(self, ctx, data): + LOG.info("ECHO: %s" % data) + return data + + def registration(self, ctx, token, session_num): + LOG.debug('Receved registration from %s with session %s', + token, session_num) + try: + node = objects.Node.get_by_code({}, token) + except Exception: + return exception.NodeNotFound(node=token) + try: + old_session = objects.SessionWP( + {}).get_session_by_node_uuid(node.uuid, valid=True) + old_session.valid = False + old_session.save() + except Exception: + LOG.debug('valid session for %s Not found', node.uuid) + + session = objects.SessionWP({}) + session.node_id = node.id + session.node_uuid = node.uuid + session.session_id = session_num + session.create() + session.save() + + return + + def destroy_node(self, ctx, node_id): + LOG.debug('Destroying node with id %s', + node_id) + node = objects.Node.get_by_uuid(ctx, node_id) + node.destroy() + return {} + + def create_node(self, ctx, node_obj, location_obj): + new_node = serializer.deserialize_entity(ctx, node_obj) + LOG.debug('Creating node %s', + new_node.name) + new_location = serializer.deserialize_entity(ctx, location_obj) + new_node.create() + new_location.node_id = new_node.id + new_location.create() + + return serializer.serialize_entity(ctx, new_node) diff --git a/iotronic/conductor/manager.py b/iotronic/conductor/manager.py index 94aa5f3..a30f7d7 100644 --- a/iotronic/conductor/manager.py +++ b/iotronic/conductor/manager.py @@ -1,9 +1,5 @@ # coding=utf-8 -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# Copyright 2013 International Business Machines Corporation -# All Rights Reserved. -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -16,124 +12,51 @@ # License for the specific language governing permissions and limitations # under the License. -from eventlet import greenpool -import inspect - -from iotronic.db import api as dbapi - from iotronic.common import exception - -from iotronic.common.i18n import _LC from iotronic.common.i18n import _LI from iotronic.common.i18n import _LW - -from iotronic.conductor import task_manager - -from iotronic.openstack.common import periodic_task - - -from oslo_concurrency import lockutils +from iotronic.conductor import endpoints as endp +from iotronic.db import api as dbapi from oslo_config import cfg -from oslo_db import exception as db_exception -from oslo_log import log -import oslo_messaging as messaging -from oslo_utils import excutils +from oslo_log import log as logging +import oslo_messaging +import time -import threading +LOG = logging.getLogger(__name__) MANAGER_TOPIC = 'iotronic.conductor_manager' -WORKER_SPAWN_lOCK = "conductor_worker_spawn" - -LOG = log.getLogger(__name__) conductor_opts = [ cfg.StrOpt('api_url', - help=('URL of Iotronic API service. If not set iotronic can ' - 'get the current value from the keystone service ' - 'catalog.')), - cfg.IntOpt('heartbeat_interval', - default=10, - help='Seconds between conductor heart beats.'), + help='URL of Iotronic API service. If not set iotronic can ' + 'get the current value from the keystone service ' + 'catalog.'), cfg.IntOpt('heartbeat_timeout', default=60, help='Maximum time (in seconds) since the last check-in ' 'of a conductor. A conductor is considered inactive ' 'when this time has been exceeded.'), - cfg.IntOpt('periodic_max_workers', - default=8, - help='Maximum number of worker threads that can be started ' - 'simultaneously by a periodic task. Should be less ' - 'than RPC thread pool size.'), - cfg.IntOpt('workers_pool_size', - default=100, - help='The size of the workers greenthread pool.'), - cfg.IntOpt('node_locked_retry_attempts', - default=3, - help='Number of attempts to grab a node lock.'), - cfg.IntOpt('node_locked_retry_interval', - default=1, - help='Seconds to sleep between node lock attempts.'), - cfg.BoolOpt('send_sensor_data', - default=False, - help='Enable sending sensor data message via the ' - 'notification bus'), - cfg.IntOpt('send_sensor_data_interval', - default=600, - help='Seconds between conductor sending sensor data message' - ' to ceilometer via the notification bus.'), - cfg.ListOpt('send_sensor_data_types', - default=['ALL'], - help='List of comma separated meter types which need to be' - ' sent to Ceilometer. The default value, "ALL", is a ' - 'special value meaning send all the sensor data.'), - cfg.IntOpt('sync_local_state_interval', - default=180, - help='When conductors join or leave the cluster, existing ' - 'conductors may need to update any persistent ' - 'local state as nodes are moved around the cluster. ' - 'This option controls how often, in seconds, each ' - 'conductor will check for nodes that it should ' - '"take over". Set it to a negative value to disable ' - 'the check entirely.'), - cfg.BoolOpt('configdrive_use_swift', - default=False, - help='Whether to upload the config drive to Swift.'), - cfg.IntOpt('inspect_timeout', - default=1800, - help='Timeout (seconds) for waiting for node inspection. ' - '0 - unlimited.'), ] + CONF = cfg.CONF CONF.register_opts(conductor_opts, 'conductor') -class ConductorManager(periodic_task.PeriodicTasks): - """Iotronic Conductor manager main class.""" - - # sync with rpcapi.ConductorAPI's. +class ConductorManager(object): RPC_API_VERSION = '1.0' - target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, host): + logging.register_options(CONF) + CONF(project='iotronic') + logging.setup(CONF, "iotronic-conductor") - def __init__(self, host, topic): - super(ConductorManager, self).__init__() if not host: host = CONF.host self.host = host - self.topic = topic - - def init_host(self): + self.topic = MANAGER_TOPIC self.dbapi = dbapi.get_instance() - self._keepalive_evt = threading.Event() - """Event for the keepalive thread.""" - - self._worker_pool = greenpool.GreenPool( - size=CONF.conductor.workers_pool_size) - """GreenPool of background workers for performing tasks async.""" - try: - # Register this conductor with the cluster cdr = self.dbapi.register_conductor( {'hostname': self.host}) except exception.ConductorAlreadyRegistered: @@ -145,29 +68,27 @@ class ConductorManager(periodic_task.PeriodicTasks): update_existing=True) self.conductor = cdr - # Spawn a dedicated greenthread for the keepalive + transport = oslo_messaging.get_transport(cfg.CONF) + target = oslo_messaging.Target(topic=self.topic, server=self.host, + version=self.RPC_API_VERSION) + endpoints = [ + endp.ConductorEndpoint(), + ] + server = oslo_messaging.get_rpc_server(transport, target, endpoints, + executor='threading') try: - self._spawn_worker(self._conductor_service_record_keepalive) - LOG.info(_LI('Successfully started conductor with hostname ' - '%(hostname)s.'), - {'hostname': self.host}) - except exception.NoFreeConductorWorker: - with excutils.save_and_reraise_exception(): - LOG.critical(_LC('Failed to start keepalive')) - self.del_host() + server.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + print("Stopping server") - def _collect_periodic_tasks(self, obj): - for n, method in inspect.getmembers(obj, inspect.ismethod): - if getattr(method, '_periodic_enabled', False): - self.add_periodic_task(method) + server.stop() + # server.wait() def del_host(self, deregister=True): - self._keepalive_evt.set() if deregister: try: - # Inform the cluster that this conductor is shutting down. - # Note that rebalancing will not occur immediately, but when - # the periodic sync takes place. self.dbapi.unregister_conductor(self.host) LOG.info(_LI('Successfully stopped conductor with hostname ' '%(hostname)s.'), @@ -178,85 +99,3 @@ class ConductorManager(periodic_task.PeriodicTasks): LOG.info(_LI('Not deregistering conductor with hostname ' '%(hostname)s.'), {'hostname': self.host}) - # Waiting here to give workers the chance to finish. This has the - # benefit of releasing locks workers placed on nodes, as well as - # having work complete normally. - self._worker_pool.waitall() - - def periodic_tasks(self, context, raise_on_error=False): - """Periodic tasks are run at pre-specified interval.""" - return self.run_periodic_tasks(context, raise_on_error=raise_on_error) - - @lockutils.synchronized(WORKER_SPAWN_lOCK, 'iotronic-') - def _spawn_worker(self, func, *args, **kwargs): - """Create a greenthread to run func(*args, **kwargs). - - Spawns a greenthread if there are free slots in pool, otherwise raises - exception. Execution control returns immediately to the caller. - - :returns: GreenThread object. - :raises: NoFreeConductorWorker if worker pool is currently full. - - """ - if self._worker_pool.free(): - return self._worker_pool.spawn(func, *args, **kwargs) - else: - raise exception.NoFreeConductorWorker() - - def _conductor_service_record_keepalive(self): - while not self._keepalive_evt.is_set(): - try: - self.dbapi.touch_conductor(self.host) - except db_exception.DBConnectionError: - LOG.warning(_LW('Conductor could not connect to database ' - 'while heartbeating.')) - self._keepalive_evt.wait(CONF.conductor.heartbeat_interval) - - @messaging.expected_exceptions(exception.InvalidParameterValue, - exception.MissingParameterValue, - exception.NodeLocked) - def update_node(self, context, node_obj): - """Update a node with the supplied data. - - This method is the main "hub" for PUT and PATCH requests in the API. - - :param context: an admin context - :param node_obj: a changed (but not saved) node object. - - """ - node_id = node_obj.uuid - LOG.debug("RPC update_node called for node %s." % node_id) - - with task_manager.acquire(context, node_id, shared=False): - node_obj.save() - - return node_obj - - @messaging.expected_exceptions(exception.NodeLocked, - exception.NodeNotConnected) - def destroy_node(self, context, node_id): - """Delete a node. - - :param context: request context. - :param node_id: node id or uuid. - :raises: NodeLocked if node is locked by another conductor. - :raises: NodeNotConnected if the node is not connected. - - """ - """ REMOVE ASAP - - with task_manager.acquire(context, node_id) as task: - node = task.node - r = WampResponse() - r.clearConfig() - response = self.wamp.rpc_call( - 'stack4things.' + node.uuid + '.configure', - r.getResponse()) - if response['result'] == 0: - node.destroy() - LOG.info(_LI('Successfully deleted node %(node)s.'), - {'node': node.uuid}) - else: - raise exception.NodeNotConnected(node=node.uuid) - """ - pass diff --git a/iotronic/conductor/rpcapi.py b/iotronic/conductor/rpcapi.py index 593bd9a..dc1ce8a 100644 --- a/iotronic/conductor/rpcapi.py +++ b/iotronic/conductor/rpcapi.py @@ -1,8 +1,5 @@ # coding=utf-8 -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -17,16 +14,13 @@ """ Client side of the conductor RPC API. """ - -import oslo_messaging as messaging - -from iotronic.common import hash_ring from iotronic.common import rpc from iotronic.conductor import manager -from iotronic.objects import base as objects_base - +from iotronic.objects import base from oslo_log import log as logging -LOG = logging.getLogger('object') +import oslo_messaging + +LOG = logging.getLogger(__name__) class ConductorAPI(object): @@ -36,7 +30,6 @@ class ConductorAPI(object): | 1.0 - Initial version. """ - # NOTE(rloo): This must be in sync with manager.ConductorManager's. RPC_API_VERSION = '1.0' def __init__(self, topic=None): @@ -45,14 +38,47 @@ class ConductorAPI(object): if self.topic is None: self.topic = manager.MANAGER_TOPIC - target = messaging.Target(topic=self.topic, - version='1.0') - serializer = objects_base.IotronicObjectSerializer() + target = oslo_messaging.Target(topic=self.topic, + version='1.0') + serializer = base.IotronicObjectSerializer() self.client = rpc.get_client(target, version_cap=self.RPC_API_VERSION, serializer=serializer) - # NOTE(deva): this is going to be buggy - self.ring_manager = hash_ring.HashRingManager() + + def echo(self, context, data, topic=None): + """Test + + :param context: request context. + :param data: node id or uuid. + :param topic: RPC topic. Defaults to self.topic. + """ + cctxt = self.client.prepare(topic=topic or self.topic, version='1.0') + return cctxt.call(context, 'echo', data=data) + + def registration(self, context, token, session_num, topic=None): + """Registration of a node. + + :param context: request context. + :param token: token used for the first registration + :param session_num: wamp session number + :param topic: RPC topic. Defaults to self.topic. + """ + cctxt = self.client.prepare(topic=topic or self.topic, version='1.0') + return cctxt.call(context, 'registration', + token=token, session_num=session_num) + + def create_node(self, context, node_obj, location_obj, topic=None): + """Add a node on the cloud + + :param context: request context. + :param node_obj: a changed (but not saved) node object. + :param topic: RPC topic. Defaults to self.topic. + :returns: created node object + + """ + cctxt = self.client.prepare(topic=topic or self.topic, version='1.0') + return cctxt.call(context, 'create_node', + node_obj=node_obj, location_obj=location_obj) def update_node(self, context, node_obj, topic=None): """Synchronously, have a conductor update the node's information. @@ -72,7 +98,7 @@ class ConductorAPI(object): :returns: updated node object, including all fields. """ - cctxt = self.client.prepare(topic=topic or self.topic, version='1.1') + cctxt = self.client.prepare(topic=topic or self.topic, version='1.0') return cctxt.call(context, 'update_node', node_obj=node_obj) def destroy_node(self, context, node_id, topic=None): diff --git a/iotronic/conductor/task_manager.py b/iotronic/conductor/task_manager.py deleted file mode 100644 index 7647857..0000000 --- a/iotronic/conductor/task_manager.py +++ /dev/null @@ -1,341 +0,0 @@ -# coding=utf-8 - -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A context manager to perform a series of tasks on a set of resources. - -:class:`TaskManager` is a context manager, created on-demand to allow -synchronized access to a node and its resources. - -The :class:`TaskManager` will, by default, acquire an exclusive lock on -a node for the duration that the TaskManager instance exists. You may -create a TaskManager instance without locking by passing "shared=True" -when creating it, but certain operations on the resources held by such -an instance of TaskManager will not be possible. Requiring this exclusive -lock guards against parallel operations interfering with each other. - -A shared lock is useful when performing non-interfering operations, -such as validating the driver interfaces. - -An exclusive lock is stored in the database to coordinate between -:class:`iotronic.iotconductor.manager` instances, -that are typically deployed on different hosts. - -:class:`TaskManager` methods, as well as driver methods, may be decorated to -determine whether their invocation requires an exclusive lock. - -The TaskManager instance exposes certain node resources and properties as -attributes that you may access: - - task.context - The context passed to TaskManager() - task.shared - False if Node is locked, True if it is not locked. (The - 'shared' kwarg arg of TaskManager()) - task.node - The Node object - -If you need to execute task-requiring code in a background thread, the -TaskManager instance provides an interface to handle this for you, making -sure to release resources when the thread finishes (successfully or if -an exception occurs). Common use of this is within the Manager like so: - -:: - - with task_manager.acquire(context, node_id) as task: - - task.spawn_after(self._spawn_worker, - utils.node_power_action, task, new_state) - -All exceptions that occur in the current GreenThread as part of the -spawn handling are re-raised. You can specify a hook to execute custom -code when such exceptions occur. For example, the hook is a more elegant -solution than wrapping the "with task_manager.acquire()" with a -try..exception block. (Note that this hook does not handle exceptions -raised in the background thread.): - -:: - - def on_error(e): - if isinstance(e, Exception): - ... - - with task_manager.acquire(context, node_id) as task: - - task.set_spawn_error_hook(on_error) - task.spawn_after(self._spawn_worker, - utils.node_power_action, task, new_state) - -""" - -import functools - -from oslo_config import cfg -from oslo_log import log as logging -from oslo_utils import excutils -import retrying - -from iotronic.common import exception -from iotronic.common.i18n import _LW -from iotronic.common import states -from iotronic import objects - -LOG = logging.getLogger(__name__) - -CONF = cfg.CONF - - -def require_exclusive_lock(f): - """Decorator to require an exclusive lock. - - Decorated functions must take a :class:`TaskManager` as the first - parameter. Decorated class methods should take a :class:`TaskManager` - as the first parameter after "self". - - """ - @functools.wraps(f) - def wrapper(*args, **kwargs): - task = args[0] if isinstance(args[0], TaskManager) else args[1] - if task.shared: - raise exception.ExclusiveLockRequired() - return f(*args, **kwargs) - return wrapper - - -def acquire(context, node_id, shared=False, driver_name=None): - """Shortcut for acquiring a lock on a Node. - - :param context: Request context. - :param node_id: ID or UUID of node to lock. - :param shared: Boolean indicating whether to take a shared or exclusive - lock. Default: False. - :param driver_name: Name of Driver. Default: None. - :returns: An instance of :class:`TaskManager`. - - """ - return TaskManager(context, node_id, shared=shared, - driver_name=driver_name) - - -class TaskManager(object): - """Context manager for tasks. - - This class wraps the locking, driver loading, and acquisition - of related resources (eg, Node and Ports) when beginning a unit of work. - - """ - - def __init__(self, context, node_id, shared=False, driver_name=None): - """Create a new TaskManager. - - Acquire a lock on a node. The lock can be either shared or - exclusive. Shared locks may be used for read-only or - non-disruptive actions only, and must be considerate to what - other threads may be doing on the same node at the same time. - - :param context: request context - :param node_id: ID or UUID of node to lock. - :param shared: Boolean indicating whether to take a shared or exclusive - lock. Default: False. - :param driver_name: The name of the driver to load, if different - from the Node's current driver. - :raises: DriverNotFound - :raises: NodeNotFound - :raises: NodeLocked - - """ - - self._spawn_method = None - self._on_error_method = None - - self.context = context - self.node = None - self.shared = shared - - self.fsm = states.machine.copy() - - # NodeLocked exceptions can be annoying. Let's try to alleviate - # some of that pain by retrying our lock attempts. The retrying - # module expects a wait_fixed value in milliseconds. - @retrying.retry( - retry_on_exception=lambda e: isinstance(e, exception.NodeLocked), - stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts, - wait_fixed=CONF.conductor.node_locked_retry_interval * 1000) - def reserve_node(): - LOG.debug("Attempting to reserve node %(node)s", - {'node': node_id}) - self.node = objects.Node.reserve(context, CONF.host, node_id) - - try: - """ - if not self.shared: - reserve_node() - else: - """ - self.node = objects.Node.get(context, node_id) - - except Exception: - with excutils.save_and_reraise_exception(): - self.release_resources() - - def spawn_after(self, _spawn_method, *args, **kwargs): - """Call this to spawn a thread to complete the task. - - The specified method will be called when the TaskManager instance - exits. - - :param _spawn_method: a method that returns a GreenThread object - :param args: args passed to the method. - :param kwargs: additional kwargs passed to the method. - - """ - self._spawn_method = _spawn_method - self._spawn_args = args - self._spawn_kwargs = kwargs - - def set_spawn_error_hook(self, _on_error_method, *args, **kwargs): - """Create a hook to handle exceptions when spawning a task. - - Create a hook that gets called upon an exception being raised - from spawning a background thread to do a task. - - :param _on_error_method: a callable object, it's first parameter - should accept the Exception object that was raised. - :param args: additional args passed to the callable object. - :param kwargs: additional kwargs passed to the callable object. - - """ - self._on_error_method = _on_error_method - self._on_error_args = args - self._on_error_kwargs = kwargs - - def release_resources(self): - """Unlock a node and release resources. - - If an exclusive lock is held, unlock the node. Reset attributes - to make it clear that this instance of TaskManager should no - longer be accessed. - """ - pass # don't need it at the moment - """ - if not self.shared: - try: - if self.node: - objects.Node.release(self.context, CONF.host, self.node.id) - except exception.NodeNotFound: - # squelch the exception if the node was deleted - # within the task's context. - pass - self.node = None - self.driver = None - self.ports = None - self.fsm = None - """ - - def _thread_release_resources(self, t): - """Thread.link() callback to release resources.""" - self.release_resources() - - def process_event(self, event, callback=None, call_args=None, - call_kwargs=None, err_handler=None): - """Process the given event for the task's current state. - - :param event: the name of the event to process - :param callback: optional callback to invoke upon event transition - :param call_args: optional \*args to pass to the callback method - :param call_kwargs: optional \**kwargs to pass to the callback method - :param err_handler: optional error handler to invoke if the - callback fails, eg. because there are no workers available - (err_handler should accept arguments node, prev_prov_state, and - prev_target_state) - :raises: InvalidState if the event is not allowed by the associated - state machine - """ - # Advance the state model for the given event. Note that this doesn't - # alter the node in any way. This may raise InvalidState, if this event - # is not allowed in the current state. - self.fsm.process_event(event) - - # stash current states in the error handler if callback is set, - # in case we fail to get a worker from the pool - if err_handler and callback: - self.set_spawn_error_hook(err_handler, self.node, - self.node.provision_state, - self.node.target_provision_state) - - self.node.provision_state = self.fsm.current_state - self.node.target_provision_state = self.fsm.target_state - - # set up the async worker - if callback: - # clear the error if we're going to start work in a callback - self.node.last_error = None - if call_args is None: - call_args = () - if call_kwargs is None: - call_kwargs = {} - self.spawn_after(callback, *call_args, **call_kwargs) - - # publish the state transition by saving the Node - self.node.save() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is None and self._spawn_method is not None: - # Spawn a worker to complete the task - # The linked callback below will be called whenever: - # - background task finished with no errors. - # - background task has crashed with exception. - # - callback was added after the background task has - # finished or crashed. While eventlet currently doesn't - # schedule the new thread until the current thread blocks - # for some reason, this is true. - # All of the above are asserted in tests such that we'll - # catch if eventlet ever changes this behavior. - thread = None - try: - thread = self._spawn_method(*self._spawn_args, - **self._spawn_kwargs) - - # NOTE(comstud): Trying to use a lambda here causes - # the callback to not occur for some reason. This - # also makes it easier to test. - thread.link(self._thread_release_resources) - # Don't unlock! The unlock will occur when the - # thread finshes. - return - except Exception as e: - with excutils.save_and_reraise_exception(): - try: - # Execute the on_error hook if set - if self._on_error_method: - self._on_error_method(e, *self._on_error_args, - **self._on_error_kwargs) - except Exception: - LOG.warning(_LW("Task's on_error hook failed to " - "call %(method)s on node %(node)s"), - {'method': self._on_error_method.__name__, - 'node': self.node.uuid}) - - if thread is not None: - # This means the link() failed for some - # reason. Nuke the thread. - thread.cancel() - self.release_resources() - self.release_resources() diff --git a/iotronic/conductor/utils.py b/iotronic/conductor/utils.py deleted file mode 100644 index 31a36ab..0000000 --- a/iotronic/conductor/utils.py +++ /dev/null @@ -1,160 +0,0 @@ -# coding=utf-8 - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import log -from oslo_utils import excutils - -from iotronic.common import exception -from iotronic.common.i18n import _ -from iotronic.common.i18n import _LI -from iotronic.common.i18n import _LW -from iotronic.common import states -from iotronic.conductor import task_manager - -LOG = log.getLogger(__name__) - - -@task_manager.require_exclusive_lock -def node_set_boot_device(task, device, persistent=False): - """Set the boot device for a node. - - :param task: a TaskManager instance. - :param device: Boot device. Values are vendor-specific. - :param persistent: Whether to set next-boot, or make the change - permanent. Default: False. - :raises: InvalidParameterValue if the validation of the - ManagementInterface fails. - - """ - if getattr(task.driver, 'management', None): - task.driver.management.validate(task) - task.driver.management.set_boot_device(task, - device=device, - persistent=persistent) - - -@task_manager.require_exclusive_lock -def node_power_action(task, new_state): - """Change power state or reset for a node. - - Perform the requested power action if the transition is required. - - :param task: a TaskManager instance containing the node to act on. - :param new_state: Any power state from iotronic.common.states. If the - state is 'REBOOT' then a reboot will be attempted, otherwise - the node power state is directly set to 'state'. - :raises: InvalidParameterValue when the wrong state is specified - or the wrong driver info is specified. - :raises: other exceptions by the node's power driver if something - wrong occurred during the power action. - - """ - node = task.node - target_state = states.POWER_ON if new_state == states.REBOOT else new_state - - if new_state != states.REBOOT: - try: - curr_state = task.driver.power.get_power_state(task) - except Exception as e: - with excutils.save_and_reraise_exception(): - node['last_error'] = _( - "Failed to change power state to '%(target)s'. " - "Error: %(error)s") % {'target': new_state, 'error': e} - node['target_power_state'] = states.NOSTATE - node.save() - - if curr_state == new_state: - # Neither the iotronic service nor the hardware has erred. The - # node is, for some reason, already in the requested state, - # though we don't know why. eg, perhaps the user previously - # requested the node POWER_ON, the network delayed those IPMI - # packets, and they are trying again -- but the node finally - # responds to the first request, and so the second request - # gets to this check and stops. - # This isn't an error, so we'll clear last_error field - # (from previous operation), log a warning, and return. - node['last_error'] = None - # NOTE(dtantsur): under rare conditions we can get out of sync here - node['power_state'] = new_state - node['target_power_state'] = states.NOSTATE - node.save() - LOG.warn(_LW("Not going to change_node_power_state because " - "current state = requested state = '%(state)s'."), - {'state': curr_state}) - return - - if curr_state == states.ERROR: - # be optimistic and continue action - LOG.warn(_LW("Driver returns ERROR power state for node %s."), - node.uuid) - - # Set the target_power_state and clear any last_error, if we're - # starting a new operation. This will expose to other processes - # and clients that work is in progress. - if node['target_power_state'] != target_state: - node['target_power_state'] = target_state - node['last_error'] = None - node.save() - - # take power action - try: - if new_state != states.REBOOT: - task.driver.power.set_power_state(task, new_state) - else: - task.driver.power.reboot(task) - except Exception as e: - with excutils.save_and_reraise_exception(): - node['last_error'] = _( - "Failed to change power state to '%(target)s'. " - "Error: %(error)s") % {'target': target_state, 'error': e} - else: - # success! - node['power_state'] = target_state - LOG.info(_LI('Successfully set node %(node)s power state to ' - '%(state)s.'), - {'node': node.uuid, 'state': target_state}) - finally: - node['target_power_state'] = states.NOSTATE - node.save() - - -@task_manager.require_exclusive_lock -def cleanup_after_timeout(task): - """Cleanup deploy task after timeout. - - :param task: a TaskManager instance. - """ - node = task.node - msg = (_('Timeout reached while waiting for callback for node %s') - % node.uuid) - node.last_error = msg - LOG.error(msg) - node.save() - - error_msg = _('Cleanup failed for node %(node)s after deploy timeout: ' - ' %(error)s') - try: - task.driver.deploy.clean_up(task) - except exception.IotronicException as e: - msg = error_msg % {'node': node.uuid, 'error': e} - LOG.error(msg) - node.last_error = msg - node.save() - except Exception as e: - msg = error_msg % {'node': node.uuid, 'error': e} - LOG.error(msg) - node.last_error = _('Deploy timed out, but an unhandled exception was ' - 'encountered while aborting. More info may be ' - 'found in the log file.') - node.save() diff --git a/iotronic/wamp/agent.py b/iotronic/wamp/agent.py index 06f43d8..d071c73 100644 --- a/iotronic/wamp/agent.py +++ b/iotronic/wamp/agent.py @@ -18,13 +18,12 @@ from autobahn.twisted import websocket from autobahn.wamp import types from oslo_config import cfg from oslo_log import log as logging +import oslo_messaging +import threading from threading import Thread from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet import reactor -import oslo_messaging -import threading - LOG = logging.getLogger(__name__) @@ -42,10 +41,10 @@ CONF.register_opts(wamp_opts, 'wamp') shared_result = {} wamp_session_caller = None +AGENT_UUID = None def wamp_request(e, kwarg, session): - id = threading.current_thread().ident shared_result[id] = {} shared_result[id]['result'] = None @@ -71,10 +70,9 @@ def wamp_request(e, kwarg, session): # OSLO ENDPOINT class WampEndpoint(object): - def __init__(self, wamp_session, agent_uuid): self.wamp_session = wamp_session - setattr(self, agent_uuid+'.s4t_invoke_wamp', self.s4t_invoke_wamp) + setattr(self, agent_uuid + '.s4t_invoke_wamp', self.s4t_invoke_wamp) def s4t_invoke_wamp(self, ctx, **kwarg): e = threading.Event() @@ -93,9 +91,8 @@ class WampEndpoint(object): class WampFrontend(wamp.ApplicationSession): - def onJoin(self, details): - global wamp_session_caller + global wamp_session_caller, AGENT_UUID wamp_session_caller = self import iotronic.wamp.registerd_functions as fun @@ -103,7 +100,8 @@ class WampFrontend(wamp.ApplicationSession): self.subscribe(fun.board_on_join, 'wamp.session.on_join') try: - self.register(fun.register_board, u'register_board') + self.register(fun.registration, u'stack4things.register') + self.register(fun.echo, AGENT_UUID + u'.stack4things.echo') LOG.info("procedure registered") except Exception as e: LOG.error("could not register procedure: {0}".format(e)) @@ -114,10 +112,8 @@ class WampFrontend(wamp.ApplicationSession): LOG.info("disconnected") -class WampClientFactory( - websocket.WampWebSocketClientFactory, - ReconnectingClientFactory): - +class WampClientFactory(websocket.WampWebSocketClientFactory, + ReconnectingClientFactory): maxDelay = 30 def clientConnectionFailed(self, connector, reason): @@ -134,7 +130,6 @@ class WampClientFactory( class RPCServer(Thread): - def __init__(self, agent_uuid): # AMQP CONFIG @@ -144,7 +139,7 @@ class RPCServer(Thread): Thread.__init__(self) transport = oslo_messaging.get_transport(CONF) - target = oslo_messaging.Target(topic=agent_uuid+'.s4t_invoke_wamp', + target = oslo_messaging.Target(topic=agent_uuid + '.s4t_invoke_wamp', server='server1') self.server = oslo_messaging.get_rpc_server(transport, @@ -165,7 +160,6 @@ class RPCServer(Thread): class WampManager(object): - def __init__(self): component_config = types.ComponentConfig( realm=unicode(CONF.wamp.wamp_realm)) @@ -190,13 +184,14 @@ class WampManager(object): class WampAgent(object): - def __init__(self): logging.register_options(CONF) CONF(project='iotronic') logging.setup(CONF, "iotronic-wamp-agent") agent_uuid = 'agent' + global AGENT_UUID + AGENT_UUID = agent_uuid r = RPCServer(agent_uuid) w = WampManager() diff --git a/iotronic/wamp/registerd_functions.py b/iotronic/wamp/registerd_functions.py index 8f85553..1f4ee4f 100644 --- a/iotronic/wamp/registerd_functions.py +++ b/iotronic/wamp/registerd_functions.py @@ -13,17 +13,52 @@ # License for the specific language governing permissions and limitations # under the License. - +from iotronic.common import rpc +from iotronic.conductor import rpcapi +from iotronic import objects +from oslo_config import cfg from oslo_log import log + + LOG = log.getLogger(__name__) +CONF = cfg.CONF +CONF(project='iotronic') -def register_board(): - return 'hello!\n' +rpc.init(CONF) + +topic = 'iotronic.conductor_manager' +c = rpcapi.ConductorAPI(topic) + + +class cont(object): + def to_dict(self): + return {} + + +ctxt = cont() + + +def echo(data): + LOG.info("ECHO: %s" % data) + return data def board_on_leave(session_id): LOG.debug('A node with %s disconnectd', session_id) + try: + old_session = objects.SessionWP({}).get_by_session_id({}, session_id) + old_session.valid = False + old_session.save() + LOG.debug('Session %s deleted', session_id) + except Exception: + LOG.debug('session %s not found', session_id) + + +def registration(data): + token = data[0] + session = data[1] + return c.registration(ctxt, token, session) def board_on_join(session_id):