new conductor module
Change-Id: Ie1305c7174b5cf80afb08ad2c4ad841685202841
This commit is contained in:
parent
53c01d49f4
commit
d5d4cef5ab
|
@ -1 +1,3 @@
|
|||
.tox
|
||||
.idea
|
||||
iotronic.egg-info
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
|
@ -19,24 +16,14 @@
|
|||
Iotronic Conductor
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from iotronic.common import service as iotronic_service
|
||||
from iotronic.openstack.common import service
|
||||
|
||||
from iotronic.conductor.manager import ConductorManager
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
if __name__ == '__main__':
|
||||
|
||||
iotronic_service.prepare_service(sys.argv)
|
||||
mgr = iotronic_service.RPCService(CONF.host,
|
||||
'iotronic.conductor.manager',
|
||||
'ConductorManager')
|
||||
|
||||
launcher = service.launch(mgr)
|
||||
launcher.wait()
|
||||
cond=ConductorManager('iotronic')
|
||||
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ function build_install {
|
|||
rm -rf dist
|
||||
}
|
||||
case "$1" in
|
||||
iotronic)
|
||||
conductor)
|
||||
build_install
|
||||
systemctl restart httpd
|
||||
cp bin/iotronic-conductor /usr/bin/
|
||||
|
@ -27,6 +27,6 @@ case "$1" in
|
|||
;;
|
||||
|
||||
*)
|
||||
echo $"Usage: $0 {iotronic|wamp-agent|all}"
|
||||
echo $"Usage: $0 {conductor|wamp-agent|all}"
|
||||
exit 1
|
||||
esac
|
|
@ -100,7 +100,6 @@ class NodeCollection(collection.Collection):
|
|||
|
||||
|
||||
class NodesController(rest.RestController):
|
||||
|
||||
invalid_sort_key_list = ['properties']
|
||||
|
||||
def _get_nodes_collection(self, marker, limit, sort_key, sort_dir,
|
||||
|
@ -188,11 +187,11 @@ class NodesController(rest.RestController):
|
|||
|
||||
new_Node = objects.Node(pecan.request.context,
|
||||
**Node.as_dict())
|
||||
new_Node.create()
|
||||
|
||||
new_Location = objects.Location(pecan.request.context,
|
||||
**Node.location[0].as_dict())
|
||||
new_Location.node_id = new_Node.id
|
||||
new_Location.create()
|
||||
|
||||
new_Node = pecan.request.rpcapi.create_node(pecan.request.context,
|
||||
new_Node, new_Location)
|
||||
|
||||
return Node.convert_with_locates(new_Node)
|
||||
|
|
|
@ -35,7 +35,6 @@ __all__ = [
|
|||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
# print CONF.transport_url
|
||||
TRANSPORT = None
|
||||
NOTIFIER = None
|
||||
|
||||
|
@ -120,7 +119,6 @@ class RequestContextSerializer(messaging.Serializer):
|
|||
|
||||
|
||||
def get_transport_url(url_str=None):
|
||||
# LOG.info('yoooooooooooo')
|
||||
return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES)
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic import objects
|
||||
from iotronic.objects import base as objects_base
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
|
||||
|
||||
class ConductorEndpoint(object):
|
||||
def echo(self, ctx, data):
|
||||
LOG.info("ECHO: %s" % data)
|
||||
return data
|
||||
|
||||
def registration(self, ctx, token, session_num):
|
||||
LOG.debug('Receved registration from %s with session %s',
|
||||
token, session_num)
|
||||
try:
|
||||
node = objects.Node.get_by_code({}, token)
|
||||
except Exception:
|
||||
return exception.NodeNotFound(node=token)
|
||||
try:
|
||||
old_session = objects.SessionWP(
|
||||
{}).get_session_by_node_uuid(node.uuid, valid=True)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
except Exception:
|
||||
LOG.debug('valid session for %s Not found', node.uuid)
|
||||
|
||||
session = objects.SessionWP({})
|
||||
session.node_id = node.id
|
||||
session.node_uuid = node.uuid
|
||||
session.session_id = session_num
|
||||
session.create()
|
||||
session.save()
|
||||
|
||||
return
|
||||
|
||||
def destroy_node(self, ctx, node_id):
|
||||
LOG.debug('Destroying node with id %s',
|
||||
node_id)
|
||||
node = objects.Node.get_by_uuid(ctx, node_id)
|
||||
node.destroy()
|
||||
return {}
|
||||
|
||||
def create_node(self, ctx, node_obj, location_obj):
|
||||
new_node = serializer.deserialize_entity(ctx, node_obj)
|
||||
LOG.debug('Creating node %s',
|
||||
new_node.name)
|
||||
new_location = serializer.deserialize_entity(ctx, location_obj)
|
||||
new_node.create()
|
||||
new_location.node_id = new_node.id
|
||||
new_location.create()
|
||||
|
||||
return serializer.serialize_entity(ctx, new_node)
|
|
@ -1,9 +1,5 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2013 International Business Machines Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
|
@ -16,124 +12,51 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from eventlet import greenpool
|
||||
import inspect
|
||||
|
||||
from iotronic.db import api as dbapi
|
||||
|
||||
from iotronic.common import exception
|
||||
|
||||
from iotronic.common.i18n import _LC
|
||||
from iotronic.common.i18n import _LI
|
||||
from iotronic.common.i18n import _LW
|
||||
|
||||
from iotronic.conductor import task_manager
|
||||
|
||||
from iotronic.openstack.common import periodic_task
|
||||
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from iotronic.conductor import endpoints as endp
|
||||
from iotronic.db import api as dbapi
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exception
|
||||
from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
from oslo_utils import excutils
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import time
|
||||
|
||||
import threading
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
MANAGER_TOPIC = 'iotronic.conductor_manager'
|
||||
WORKER_SPAWN_lOCK = "conductor_worker_spawn"
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
conductor_opts = [
|
||||
cfg.StrOpt('api_url',
|
||||
help=('URL of Iotronic API service. If not set iotronic can '
|
||||
help='URL of Iotronic API service. If not set iotronic can '
|
||||
'get the current value from the keystone service '
|
||||
'catalog.')),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help='Seconds between conductor heart beats.'),
|
||||
'catalog.'),
|
||||
cfg.IntOpt('heartbeat_timeout',
|
||||
default=60,
|
||||
help='Maximum time (in seconds) since the last check-in '
|
||||
'of a conductor. A conductor is considered inactive '
|
||||
'when this time has been exceeded.'),
|
||||
cfg.IntOpt('periodic_max_workers',
|
||||
default=8,
|
||||
help='Maximum number of worker threads that can be started '
|
||||
'simultaneously by a periodic task. Should be less '
|
||||
'than RPC thread pool size.'),
|
||||
cfg.IntOpt('workers_pool_size',
|
||||
default=100,
|
||||
help='The size of the workers greenthread pool.'),
|
||||
cfg.IntOpt('node_locked_retry_attempts',
|
||||
default=3,
|
||||
help='Number of attempts to grab a node lock.'),
|
||||
cfg.IntOpt('node_locked_retry_interval',
|
||||
default=1,
|
||||
help='Seconds to sleep between node lock attempts.'),
|
||||
cfg.BoolOpt('send_sensor_data',
|
||||
default=False,
|
||||
help='Enable sending sensor data message via the '
|
||||
'notification bus'),
|
||||
cfg.IntOpt('send_sensor_data_interval',
|
||||
default=600,
|
||||
help='Seconds between conductor sending sensor data message'
|
||||
' to ceilometer via the notification bus.'),
|
||||
cfg.ListOpt('send_sensor_data_types',
|
||||
default=['ALL'],
|
||||
help='List of comma separated meter types which need to be'
|
||||
' sent to Ceilometer. The default value, "ALL", is a '
|
||||
'special value meaning send all the sensor data.'),
|
||||
cfg.IntOpt('sync_local_state_interval',
|
||||
default=180,
|
||||
help='When conductors join or leave the cluster, existing '
|
||||
'conductors may need to update any persistent '
|
||||
'local state as nodes are moved around the cluster. '
|
||||
'This option controls how often, in seconds, each '
|
||||
'conductor will check for nodes that it should '
|
||||
'"take over". Set it to a negative value to disable '
|
||||
'the check entirely.'),
|
||||
cfg.BoolOpt('configdrive_use_swift',
|
||||
default=False,
|
||||
help='Whether to upload the config drive to Swift.'),
|
||||
cfg.IntOpt('inspect_timeout',
|
||||
default=1800,
|
||||
help='Timeout (seconds) for waiting for node inspection. '
|
||||
'0 - unlimited.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(conductor_opts, 'conductor')
|
||||
|
||||
|
||||
class ConductorManager(periodic_task.PeriodicTasks):
|
||||
"""Iotronic Conductor manager main class."""
|
||||
|
||||
# sync with rpcapi.ConductorAPI's.
|
||||
class ConductorManager(object):
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
def __init__(self, host):
|
||||
logging.register_options(CONF)
|
||||
CONF(project='iotronic')
|
||||
logging.setup(CONF, "iotronic-conductor")
|
||||
|
||||
def __init__(self, host, topic):
|
||||
super(ConductorManager, self).__init__()
|
||||
if not host:
|
||||
host = CONF.host
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
|
||||
def init_host(self):
|
||||
self.topic = MANAGER_TOPIC
|
||||
self.dbapi = dbapi.get_instance()
|
||||
|
||||
self._keepalive_evt = threading.Event()
|
||||
"""Event for the keepalive thread."""
|
||||
|
||||
self._worker_pool = greenpool.GreenPool(
|
||||
size=CONF.conductor.workers_pool_size)
|
||||
"""GreenPool of background workers for performing tasks async."""
|
||||
|
||||
try:
|
||||
# Register this conductor with the cluster
|
||||
cdr = self.dbapi.register_conductor(
|
||||
{'hostname': self.host})
|
||||
except exception.ConductorAlreadyRegistered:
|
||||
|
@ -145,29 +68,27 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
|||
update_existing=True)
|
||||
self.conductor = cdr
|
||||
|
||||
# Spawn a dedicated greenthread for the keepalive
|
||||
transport = oslo_messaging.get_transport(cfg.CONF)
|
||||
target = oslo_messaging.Target(topic=self.topic, server=self.host,
|
||||
version=self.RPC_API_VERSION)
|
||||
endpoints = [
|
||||
endp.ConductorEndpoint(),
|
||||
]
|
||||
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
|
||||
executor='threading')
|
||||
try:
|
||||
self._spawn_worker(self._conductor_service_record_keepalive)
|
||||
LOG.info(_LI('Successfully started conductor with hostname '
|
||||
'%(hostname)s.'),
|
||||
{'hostname': self.host})
|
||||
except exception.NoFreeConductorWorker:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.critical(_LC('Failed to start keepalive'))
|
||||
self.del_host()
|
||||
server.start()
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("Stopping server")
|
||||
|
||||
def _collect_periodic_tasks(self, obj):
|
||||
for n, method in inspect.getmembers(obj, inspect.ismethod):
|
||||
if getattr(method, '_periodic_enabled', False):
|
||||
self.add_periodic_task(method)
|
||||
server.stop()
|
||||
# server.wait()
|
||||
|
||||
def del_host(self, deregister=True):
|
||||
self._keepalive_evt.set()
|
||||
if deregister:
|
||||
try:
|
||||
# Inform the cluster that this conductor is shutting down.
|
||||
# Note that rebalancing will not occur immediately, but when
|
||||
# the periodic sync takes place.
|
||||
self.dbapi.unregister_conductor(self.host)
|
||||
LOG.info(_LI('Successfully stopped conductor with hostname '
|
||||
'%(hostname)s.'),
|
||||
|
@ -178,85 +99,3 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
|||
LOG.info(_LI('Not deregistering conductor with hostname '
|
||||
'%(hostname)s.'),
|
||||
{'hostname': self.host})
|
||||
# Waiting here to give workers the chance to finish. This has the
|
||||
# benefit of releasing locks workers placed on nodes, as well as
|
||||
# having work complete normally.
|
||||
self._worker_pool.waitall()
|
||||
|
||||
def periodic_tasks(self, context, raise_on_error=False):
|
||||
"""Periodic tasks are run at pre-specified interval."""
|
||||
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
|
||||
|
||||
@lockutils.synchronized(WORKER_SPAWN_lOCK, 'iotronic-')
|
||||
def _spawn_worker(self, func, *args, **kwargs):
|
||||
"""Create a greenthread to run func(*args, **kwargs).
|
||||
|
||||
Spawns a greenthread if there are free slots in pool, otherwise raises
|
||||
exception. Execution control returns immediately to the caller.
|
||||
|
||||
:returns: GreenThread object.
|
||||
:raises: NoFreeConductorWorker if worker pool is currently full.
|
||||
|
||||
"""
|
||||
if self._worker_pool.free():
|
||||
return self._worker_pool.spawn(func, *args, **kwargs)
|
||||
else:
|
||||
raise exception.NoFreeConductorWorker()
|
||||
|
||||
def _conductor_service_record_keepalive(self):
|
||||
while not self._keepalive_evt.is_set():
|
||||
try:
|
||||
self.dbapi.touch_conductor(self.host)
|
||||
except db_exception.DBConnectionError:
|
||||
LOG.warning(_LW('Conductor could not connect to database '
|
||||
'while heartbeating.'))
|
||||
self._keepalive_evt.wait(CONF.conductor.heartbeat_interval)
|
||||
|
||||
@messaging.expected_exceptions(exception.InvalidParameterValue,
|
||||
exception.MissingParameterValue,
|
||||
exception.NodeLocked)
|
||||
def update_node(self, context, node_obj):
|
||||
"""Update a node with the supplied data.
|
||||
|
||||
This method is the main "hub" for PUT and PATCH requests in the API.
|
||||
|
||||
:param context: an admin context
|
||||
:param node_obj: a changed (but not saved) node object.
|
||||
|
||||
"""
|
||||
node_id = node_obj.uuid
|
||||
LOG.debug("RPC update_node called for node %s." % node_id)
|
||||
|
||||
with task_manager.acquire(context, node_id, shared=False):
|
||||
node_obj.save()
|
||||
|
||||
return node_obj
|
||||
|
||||
@messaging.expected_exceptions(exception.NodeLocked,
|
||||
exception.NodeNotConnected)
|
||||
def destroy_node(self, context, node_id):
|
||||
"""Delete a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: NodeNotConnected if the node is not connected.
|
||||
|
||||
"""
|
||||
""" REMOVE ASAP
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
node = task.node
|
||||
r = WampResponse()
|
||||
r.clearConfig()
|
||||
response = self.wamp.rpc_call(
|
||||
'stack4things.' + node.uuid + '.configure',
|
||||
r.getResponse())
|
||||
if response['result'] == 0:
|
||||
node.destroy()
|
||||
LOG.info(_LI('Successfully deleted node %(node)s.'),
|
||||
{'node': node.uuid})
|
||||
else:
|
||||
raise exception.NodeNotConnected(node=node.uuid)
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
|
@ -17,16 +14,13 @@
|
|||
"""
|
||||
Client side of the conductor RPC API.
|
||||
"""
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from iotronic.common import hash_ring
|
||||
from iotronic.common import rpc
|
||||
from iotronic.conductor import manager
|
||||
from iotronic.objects import base as objects_base
|
||||
|
||||
from iotronic.objects import base
|
||||
from oslo_log import log as logging
|
||||
LOG = logging.getLogger('object')
|
||||
import oslo_messaging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConductorAPI(object):
|
||||
|
@ -36,7 +30,6 @@ class ConductorAPI(object):
|
|||
| 1.0 - Initial version.
|
||||
"""
|
||||
|
||||
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=None):
|
||||
|
@ -45,14 +38,47 @@ class ConductorAPI(object):
|
|||
if self.topic is None:
|
||||
self.topic = manager.MANAGER_TOPIC
|
||||
|
||||
target = messaging.Target(topic=self.topic,
|
||||
target = oslo_messaging.Target(topic=self.topic,
|
||||
version='1.0')
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
serializer = base.IotronicObjectSerializer()
|
||||
self.client = rpc.get_client(target,
|
||||
version_cap=self.RPC_API_VERSION,
|
||||
serializer=serializer)
|
||||
# NOTE(deva): this is going to be buggy
|
||||
self.ring_manager = hash_ring.HashRingManager()
|
||||
|
||||
def echo(self, context, data, topic=None):
|
||||
"""Test
|
||||
|
||||
:param context: request context.
|
||||
:param data: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'echo', data=data)
|
||||
|
||||
def registration(self, context, token, session_num, topic=None):
|
||||
"""Registration of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param token: token used for the first registration
|
||||
:param session_num: wamp session number
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'registration',
|
||||
token=token, session_num=session_num)
|
||||
|
||||
def create_node(self, context, node_obj, location_obj, topic=None):
|
||||
"""Add a node on the cloud
|
||||
|
||||
:param context: request context.
|
||||
:param node_obj: a changed (but not saved) node object.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: created node object
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'create_node',
|
||||
node_obj=node_obj, location_obj=location_obj)
|
||||
|
||||
def update_node(self, context, node_obj, topic=None):
|
||||
"""Synchronously, have a conductor update the node's information.
|
||||
|
@ -72,7 +98,7 @@ class ConductorAPI(object):
|
|||
:returns: updated node object, including all fields.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.1')
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'update_node', node_obj=node_obj)
|
||||
|
||||
def destroy_node(self, context, node_id, topic=None):
|
||||
|
|
|
@ -1,341 +0,0 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A context manager to perform a series of tasks on a set of resources.
|
||||
|
||||
:class:`TaskManager` is a context manager, created on-demand to allow
|
||||
synchronized access to a node and its resources.
|
||||
|
||||
The :class:`TaskManager` will, by default, acquire an exclusive lock on
|
||||
a node for the duration that the TaskManager instance exists. You may
|
||||
create a TaskManager instance without locking by passing "shared=True"
|
||||
when creating it, but certain operations on the resources held by such
|
||||
an instance of TaskManager will not be possible. Requiring this exclusive
|
||||
lock guards against parallel operations interfering with each other.
|
||||
|
||||
A shared lock is useful when performing non-interfering operations,
|
||||
such as validating the driver interfaces.
|
||||
|
||||
An exclusive lock is stored in the database to coordinate between
|
||||
:class:`iotronic.iotconductor.manager` instances,
|
||||
that are typically deployed on different hosts.
|
||||
|
||||
:class:`TaskManager` methods, as well as driver methods, may be decorated to
|
||||
determine whether their invocation requires an exclusive lock.
|
||||
|
||||
The TaskManager instance exposes certain node resources and properties as
|
||||
attributes that you may access:
|
||||
|
||||
task.context
|
||||
The context passed to TaskManager()
|
||||
task.shared
|
||||
False if Node is locked, True if it is not locked. (The
|
||||
'shared' kwarg arg of TaskManager())
|
||||
task.node
|
||||
The Node object
|
||||
|
||||
If you need to execute task-requiring code in a background thread, the
|
||||
TaskManager instance provides an interface to handle this for you, making
|
||||
sure to release resources when the thread finishes (successfully or if
|
||||
an exception occurs). Common use of this is within the Manager like so:
|
||||
|
||||
::
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
<do some work>
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.node_power_action, task, new_state)
|
||||
|
||||
All exceptions that occur in the current GreenThread as part of the
|
||||
spawn handling are re-raised. You can specify a hook to execute custom
|
||||
code when such exceptions occur. For example, the hook is a more elegant
|
||||
solution than wrapping the "with task_manager.acquire()" with a
|
||||
try..exception block. (Note that this hook does not handle exceptions
|
||||
raised in the background thread.):
|
||||
|
||||
::
|
||||
|
||||
def on_error(e):
|
||||
if isinstance(e, Exception):
|
||||
...
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
<do some work>
|
||||
task.set_spawn_error_hook(on_error)
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.node_power_action, task, new_state)
|
||||
|
||||
"""
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
import retrying
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic import objects
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def require_exclusive_lock(f):
|
||||
"""Decorator to require an exclusive lock.
|
||||
|
||||
Decorated functions must take a :class:`TaskManager` as the first
|
||||
parameter. Decorated class methods should take a :class:`TaskManager`
|
||||
as the first parameter after "self".
|
||||
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
task = args[0] if isinstance(args[0], TaskManager) else args[1]
|
||||
if task.shared:
|
||||
raise exception.ExclusiveLockRequired()
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def acquire(context, node_id, shared=False, driver_name=None):
|
||||
"""Shortcut for acquiring a lock on a Node.
|
||||
|
||||
:param context: Request context.
|
||||
:param node_id: ID or UUID of node to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: Name of Driver. Default: None.
|
||||
:returns: An instance of :class:`TaskManager`.
|
||||
|
||||
"""
|
||||
return TaskManager(context, node_id, shared=shared,
|
||||
driver_name=driver_name)
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
"""Context manager for tasks.
|
||||
|
||||
This class wraps the locking, driver loading, and acquisition
|
||||
of related resources (eg, Node and Ports) when beginning a unit of work.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, context, node_id, shared=False, driver_name=None):
|
||||
"""Create a new TaskManager.
|
||||
|
||||
Acquire a lock on a node. The lock can be either shared or
|
||||
exclusive. Shared locks may be used for read-only or
|
||||
non-disruptive actions only, and must be considerate to what
|
||||
other threads may be doing on the same node at the same time.
|
||||
|
||||
:param context: request context
|
||||
:param node_id: ID or UUID of node to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: The name of the driver to load, if different
|
||||
from the Node's current driver.
|
||||
:raises: DriverNotFound
|
||||
:raises: NodeNotFound
|
||||
:raises: NodeLocked
|
||||
|
||||
"""
|
||||
|
||||
self._spawn_method = None
|
||||
self._on_error_method = None
|
||||
|
||||
self.context = context
|
||||
self.node = None
|
||||
self.shared = shared
|
||||
|
||||
self.fsm = states.machine.copy()
|
||||
|
||||
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
||||
# some of that pain by retrying our lock attempts. The retrying
|
||||
# module expects a wait_fixed value in milliseconds.
|
||||
@retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
||||
stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts,
|
||||
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
||||
def reserve_node():
|
||||
LOG.debug("Attempting to reserve node %(node)s",
|
||||
{'node': node_id})
|
||||
self.node = objects.Node.reserve(context, CONF.host, node_id)
|
||||
|
||||
try:
|
||||
"""
|
||||
if not self.shared:
|
||||
reserve_node()
|
||||
else:
|
||||
"""
|
||||
self.node = objects.Node.get(context, node_id)
|
||||
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.release_resources()
|
||||
|
||||
def spawn_after(self, _spawn_method, *args, **kwargs):
|
||||
"""Call this to spawn a thread to complete the task.
|
||||
|
||||
The specified method will be called when the TaskManager instance
|
||||
exits.
|
||||
|
||||
:param _spawn_method: a method that returns a GreenThread object
|
||||
:param args: args passed to the method.
|
||||
:param kwargs: additional kwargs passed to the method.
|
||||
|
||||
"""
|
||||
self._spawn_method = _spawn_method
|
||||
self._spawn_args = args
|
||||
self._spawn_kwargs = kwargs
|
||||
|
||||
def set_spawn_error_hook(self, _on_error_method, *args, **kwargs):
|
||||
"""Create a hook to handle exceptions when spawning a task.
|
||||
|
||||
Create a hook that gets called upon an exception being raised
|
||||
from spawning a background thread to do a task.
|
||||
|
||||
:param _on_error_method: a callable object, it's first parameter
|
||||
should accept the Exception object that was raised.
|
||||
:param args: additional args passed to the callable object.
|
||||
:param kwargs: additional kwargs passed to the callable object.
|
||||
|
||||
"""
|
||||
self._on_error_method = _on_error_method
|
||||
self._on_error_args = args
|
||||
self._on_error_kwargs = kwargs
|
||||
|
||||
def release_resources(self):
|
||||
"""Unlock a node and release resources.
|
||||
|
||||
If an exclusive lock is held, unlock the node. Reset attributes
|
||||
to make it clear that this instance of TaskManager should no
|
||||
longer be accessed.
|
||||
"""
|
||||
pass # don't need it at the moment
|
||||
"""
|
||||
if not self.shared:
|
||||
try:
|
||||
if self.node:
|
||||
objects.Node.release(self.context, CONF.host, self.node.id)
|
||||
except exception.NodeNotFound:
|
||||
# squelch the exception if the node was deleted
|
||||
# within the task's context.
|
||||
pass
|
||||
self.node = None
|
||||
self.driver = None
|
||||
self.ports = None
|
||||
self.fsm = None
|
||||
"""
|
||||
|
||||
def _thread_release_resources(self, t):
|
||||
"""Thread.link() callback to release resources."""
|
||||
self.release_resources()
|
||||
|
||||
def process_event(self, event, callback=None, call_args=None,
|
||||
call_kwargs=None, err_handler=None):
|
||||
"""Process the given event for the task's current state.
|
||||
|
||||
:param event: the name of the event to process
|
||||
:param callback: optional callback to invoke upon event transition
|
||||
:param call_args: optional \*args to pass to the callback method
|
||||
:param call_kwargs: optional \**kwargs to pass to the callback method
|
||||
:param err_handler: optional error handler to invoke if the
|
||||
callback fails, eg. because there are no workers available
|
||||
(err_handler should accept arguments node, prev_prov_state, and
|
||||
prev_target_state)
|
||||
:raises: InvalidState if the event is not allowed by the associated
|
||||
state machine
|
||||
"""
|
||||
# Advance the state model for the given event. Note that this doesn't
|
||||
# alter the node in any way. This may raise InvalidState, if this event
|
||||
# is not allowed in the current state.
|
||||
self.fsm.process_event(event)
|
||||
|
||||
# stash current states in the error handler if callback is set,
|
||||
# in case we fail to get a worker from the pool
|
||||
if err_handler and callback:
|
||||
self.set_spawn_error_hook(err_handler, self.node,
|
||||
self.node.provision_state,
|
||||
self.node.target_provision_state)
|
||||
|
||||
self.node.provision_state = self.fsm.current_state
|
||||
self.node.target_provision_state = self.fsm.target_state
|
||||
|
||||
# set up the async worker
|
||||
if callback:
|
||||
# clear the error if we're going to start work in a callback
|
||||
self.node.last_error = None
|
||||
if call_args is None:
|
||||
call_args = ()
|
||||
if call_kwargs is None:
|
||||
call_kwargs = {}
|
||||
self.spawn_after(callback, *call_args, **call_kwargs)
|
||||
|
||||
# publish the state transition by saving the Node
|
||||
self.node.save()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is None and self._spawn_method is not None:
|
||||
# Spawn a worker to complete the task
|
||||
# The linked callback below will be called whenever:
|
||||
# - background task finished with no errors.
|
||||
# - background task has crashed with exception.
|
||||
# - callback was added after the background task has
|
||||
# finished or crashed. While eventlet currently doesn't
|
||||
# schedule the new thread until the current thread blocks
|
||||
# for some reason, this is true.
|
||||
# All of the above are asserted in tests such that we'll
|
||||
# catch if eventlet ever changes this behavior.
|
||||
thread = None
|
||||
try:
|
||||
thread = self._spawn_method(*self._spawn_args,
|
||||
**self._spawn_kwargs)
|
||||
|
||||
# NOTE(comstud): Trying to use a lambda here causes
|
||||
# the callback to not occur for some reason. This
|
||||
# also makes it easier to test.
|
||||
thread.link(self._thread_release_resources)
|
||||
# Don't unlock! The unlock will occur when the
|
||||
# thread finshes.
|
||||
return
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
try:
|
||||
# Execute the on_error hook if set
|
||||
if self._on_error_method:
|
||||
self._on_error_method(e, *self._on_error_args,
|
||||
**self._on_error_kwargs)
|
||||
except Exception:
|
||||
LOG.warning(_LW("Task's on_error hook failed to "
|
||||
"call %(method)s on node %(node)s"),
|
||||
{'method': self._on_error_method.__name__,
|
||||
'node': self.node.uuid})
|
||||
|
||||
if thread is not None:
|
||||
# This means the link() failed for some
|
||||
# reason. Nuke the thread.
|
||||
thread.cancel()
|
||||
self.release_resources()
|
||||
self.release_resources()
|
|
@ -1,160 +0,0 @@
|
|||
# coding=utf-8
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _
|
||||
from iotronic.common.i18n import _LI
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic.conductor import task_manager
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_set_boot_device(task, device, persistent=False):
|
||||
"""Set the boot device for a node.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
:param device: Boot device. Values are vendor-specific.
|
||||
:param persistent: Whether to set next-boot, or make the change
|
||||
permanent. Default: False.
|
||||
:raises: InvalidParameterValue if the validation of the
|
||||
ManagementInterface fails.
|
||||
|
||||
"""
|
||||
if getattr(task.driver, 'management', None):
|
||||
task.driver.management.validate(task)
|
||||
task.driver.management.set_boot_device(task,
|
||||
device=device,
|
||||
persistent=persistent)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_power_action(task, new_state):
|
||||
"""Change power state or reset for a node.
|
||||
|
||||
Perform the requested power action if the transition is required.
|
||||
|
||||
:param task: a TaskManager instance containing the node to act on.
|
||||
:param new_state: Any power state from iotronic.common.states. If the
|
||||
state is 'REBOOT' then a reboot will be attempted, otherwise
|
||||
the node power state is directly set to 'state'.
|
||||
:raises: InvalidParameterValue when the wrong state is specified
|
||||
or the wrong driver info is specified.
|
||||
:raises: other exceptions by the node's power driver if something
|
||||
wrong occurred during the power action.
|
||||
|
||||
"""
|
||||
node = task.node
|
||||
target_state = states.POWER_ON if new_state == states.REBOOT else new_state
|
||||
|
||||
if new_state != states.REBOOT:
|
||||
try:
|
||||
curr_state = task.driver.power.get_power_state(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': new_state, 'error': e}
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
if curr_state == new_state:
|
||||
# Neither the iotronic service nor the hardware has erred. The
|
||||
# node is, for some reason, already in the requested state,
|
||||
# though we don't know why. eg, perhaps the user previously
|
||||
# requested the node POWER_ON, the network delayed those IPMI
|
||||
# packets, and they are trying again -- but the node finally
|
||||
# responds to the first request, and so the second request
|
||||
# gets to this check and stops.
|
||||
# This isn't an error, so we'll clear last_error field
|
||||
# (from previous operation), log a warning, and return.
|
||||
node['last_error'] = None
|
||||
# NOTE(dtantsur): under rare conditions we can get out of sync here
|
||||
node['power_state'] = new_state
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
LOG.warn(_LW("Not going to change_node_power_state because "
|
||||
"current state = requested state = '%(state)s'."),
|
||||
{'state': curr_state})
|
||||
return
|
||||
|
||||
if curr_state == states.ERROR:
|
||||
# be optimistic and continue action
|
||||
LOG.warn(_LW("Driver returns ERROR power state for node %s."),
|
||||
node.uuid)
|
||||
|
||||
# Set the target_power_state and clear any last_error, if we're
|
||||
# starting a new operation. This will expose to other processes
|
||||
# and clients that work is in progress.
|
||||
if node['target_power_state'] != target_state:
|
||||
node['target_power_state'] = target_state
|
||||
node['last_error'] = None
|
||||
node.save()
|
||||
|
||||
# take power action
|
||||
try:
|
||||
if new_state != states.REBOOT:
|
||||
task.driver.power.set_power_state(task, new_state)
|
||||
else:
|
||||
task.driver.power.reboot(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': target_state, 'error': e}
|
||||
else:
|
||||
# success!
|
||||
node['power_state'] = target_state
|
||||
LOG.info(_LI('Successfully set node %(node)s power state to '
|
||||
'%(state)s.'),
|
||||
{'node': node.uuid, 'state': target_state})
|
||||
finally:
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def cleanup_after_timeout(task):
|
||||
"""Cleanup deploy task after timeout.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
"""
|
||||
node = task.node
|
||||
msg = (_('Timeout reached while waiting for callback for node %s')
|
||||
% node.uuid)
|
||||
node.last_error = msg
|
||||
LOG.error(msg)
|
||||
node.save()
|
||||
|
||||
error_msg = _('Cleanup failed for node %(node)s after deploy timeout: '
|
||||
' %(error)s')
|
||||
try:
|
||||
task.driver.deploy.clean_up(task)
|
||||
except exception.IotronicException as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = msg
|
||||
node.save()
|
||||
except Exception as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = _('Deploy timed out, but an unhandled exception was '
|
||||
'encountered while aborting. More info may be '
|
||||
'found in the log file.')
|
||||
node.save()
|
|
@ -18,13 +18,12 @@ from autobahn.twisted import websocket
|
|||
from autobahn.wamp import types
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import threading
|
||||
from threading import Thread
|
||||
from twisted.internet.protocol import ReconnectingClientFactory
|
||||
from twisted.internet import reactor
|
||||
|
||||
import oslo_messaging
|
||||
import threading
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -42,10 +41,10 @@ CONF.register_opts(wamp_opts, 'wamp')
|
|||
|
||||
shared_result = {}
|
||||
wamp_session_caller = None
|
||||
AGENT_UUID = None
|
||||
|
||||
|
||||
def wamp_request(e, kwarg, session):
|
||||
|
||||
id = threading.current_thread().ident
|
||||
shared_result[id] = {}
|
||||
shared_result[id]['result'] = None
|
||||
|
@ -71,7 +70,6 @@ def wamp_request(e, kwarg, session):
|
|||
|
||||
# OSLO ENDPOINT
|
||||
class WampEndpoint(object):
|
||||
|
||||
def __init__(self, wamp_session, agent_uuid):
|
||||
self.wamp_session = wamp_session
|
||||
setattr(self, agent_uuid + '.s4t_invoke_wamp', self.s4t_invoke_wamp)
|
||||
|
@ -93,9 +91,8 @@ class WampEndpoint(object):
|
|||
|
||||
|
||||
class WampFrontend(wamp.ApplicationSession):
|
||||
|
||||
def onJoin(self, details):
|
||||
global wamp_session_caller
|
||||
global wamp_session_caller, AGENT_UUID
|
||||
wamp_session_caller = self
|
||||
import iotronic.wamp.registerd_functions as fun
|
||||
|
||||
|
@ -103,7 +100,8 @@ class WampFrontend(wamp.ApplicationSession):
|
|||
self.subscribe(fun.board_on_join, 'wamp.session.on_join')
|
||||
|
||||
try:
|
||||
self.register(fun.register_board, u'register_board')
|
||||
self.register(fun.registration, u'stack4things.register')
|
||||
self.register(fun.echo, AGENT_UUID + u'.stack4things.echo')
|
||||
LOG.info("procedure registered")
|
||||
except Exception as e:
|
||||
LOG.error("could not register procedure: {0}".format(e))
|
||||
|
@ -114,10 +112,8 @@ class WampFrontend(wamp.ApplicationSession):
|
|||
LOG.info("disconnected")
|
||||
|
||||
|
||||
class WampClientFactory(
|
||||
websocket.WampWebSocketClientFactory,
|
||||
class WampClientFactory(websocket.WampWebSocketClientFactory,
|
||||
ReconnectingClientFactory):
|
||||
|
||||
maxDelay = 30
|
||||
|
||||
def clientConnectionFailed(self, connector, reason):
|
||||
|
@ -134,7 +130,6 @@ class WampClientFactory(
|
|||
|
||||
|
||||
class RPCServer(Thread):
|
||||
|
||||
def __init__(self, agent_uuid):
|
||||
|
||||
# AMQP CONFIG
|
||||
|
@ -165,7 +160,6 @@ class RPCServer(Thread):
|
|||
|
||||
|
||||
class WampManager(object):
|
||||
|
||||
def __init__(self):
|
||||
component_config = types.ComponentConfig(
|
||||
realm=unicode(CONF.wamp.wamp_realm))
|
||||
|
@ -190,13 +184,14 @@ class WampManager(object):
|
|||
|
||||
|
||||
class WampAgent(object):
|
||||
|
||||
def __init__(self):
|
||||
logging.register_options(CONF)
|
||||
CONF(project='iotronic')
|
||||
logging.setup(CONF, "iotronic-wamp-agent")
|
||||
|
||||
agent_uuid = 'agent'
|
||||
global AGENT_UUID
|
||||
AGENT_UUID = agent_uuid
|
||||
|
||||
r = RPCServer(agent_uuid)
|
||||
w = WampManager()
|
||||
|
|
|
@ -13,17 +13,52 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from iotronic.common import rpc
|
||||
from iotronic.conductor import rpcapi
|
||||
from iotronic import objects
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF(project='iotronic')
|
||||
|
||||
def register_board():
|
||||
return 'hello!\n'
|
||||
rpc.init(CONF)
|
||||
|
||||
topic = 'iotronic.conductor_manager'
|
||||
c = rpcapi.ConductorAPI(topic)
|
||||
|
||||
|
||||
class cont(object):
|
||||
def to_dict(self):
|
||||
return {}
|
||||
|
||||
|
||||
ctxt = cont()
|
||||
|
||||
|
||||
def echo(data):
|
||||
LOG.info("ECHO: %s" % data)
|
||||
return data
|
||||
|
||||
|
||||
def board_on_leave(session_id):
|
||||
LOG.debug('A node with %s disconnectd', session_id)
|
||||
try:
|
||||
old_session = objects.SessionWP({}).get_by_session_id({}, session_id)
|
||||
old_session.valid = False
|
||||
old_session.save()
|
||||
LOG.debug('Session %s deleted', session_id)
|
||||
except Exception:
|
||||
LOG.debug('session %s not found', session_id)
|
||||
|
||||
|
||||
def registration(data):
|
||||
token = data[0]
|
||||
session = data[1]
|
||||
return c.registration(ctxt, token, session)
|
||||
|
||||
|
||||
def board_on_join(session_id):
|
||||
|
|
Loading…
Reference in New Issue