Merge "Add async executor"

This commit is contained in:
Zuul 2018-04-04 16:28:32 +00:00 committed by Gerrit Code Review
commit 47777b216a
9 changed files with 254 additions and 3 deletions

View File

@ -4,9 +4,11 @@
pbr!=2.1.0,>=2.0.0 # Apache-2.0
aniso8601==1.2.0
click==6.6
eventlet>=0.18.2,!=0.18.3,!=0.20.1,<0.21.0 # MIT
Flask!=0.11,<1.0,>=0.10 # BSD
Flask-Cors==3.0.2
Flask-RESTful>=0.3.5 # BSD
futurist>=1.2.0 # Apache-2.0
itsdangerous==0.24
jsonschema<3.0.0,>=2.6.0 # MIT
Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause)

View File

@ -19,7 +19,11 @@ import flask_cors
import flask_restful
from six.moves import http_client
# Note: setup app needs to be called before the valence imports
# for config options initialization to take place.
from valence.api import app as flaskapp
app = flaskapp.get_app()
import valence.api.root as api_root
import valence.api.v1.devices as v1_devices
import valence.api.v1.flavors as v1_flavors

View File

@ -15,13 +15,16 @@
import logging
import eventlet
eventlet.monkey_patch(os=False)
import gunicorn.app.base
from valence.api.route import app as application
from valence.common import async
import valence.conf
from valence.controller import pooled_devices
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
@ -42,6 +45,24 @@ class StandaloneApplication(gunicorn.app.base.BaseApplication):
return self.application
def start_periodic_tasks(server):
"""Starts asynchronous periodic sync on app startup
If enabled in configuration this function will start periodic sync
of pooled resources in background.
"""
if CONF.podm.enable_periodic_sync:
async.start_periodic_worker([(
pooled_devices.PooledDevices.synchronize_devices, None, None)])
return
def on_server_exit(server):
"""Performs cleanup tasks. """
async.stop_periodic_tasks()
def main():
options = {
'bind': '%s:%s' % (CONF.api.bind_host, CONF.api.bind_port),
@ -50,6 +71,9 @@ def main():
'workers': CONF.api.workers,
'loglevel': CONF.api.log_level,
'errorlog': CONF.api.log_file,
'worker_class': 'eventlet',
'when_ready': start_periodic_tasks,
'on_exit': on_server_exit
}
LOG.info(("Valence Server on http://%(host)s:%(port)s"),
{'host': CONF.api.bind_host, 'port': CONF.api.bind_port})

113
valence/common/async.py Normal file
View File

@ -0,0 +1,113 @@
# Copyright (c) 2017 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import futurist
from valence.common import exception
LOG = logging.getLogger(__name__)
_executor = None
_periodics_worker = None
def executor():
global _executor
if not _executor:
_executor = futurist.GreenThreadPoolExecutor(max_workers=10)
return _executor
def start_periodic_worker(callables):
"""Starts periodic execution of function passed in callables
To enable this:
1. Pass callables in following format
[(func, (arg1, arg2), {}),
(func2, (arg1, arg2), {}),]
2. Decorate func as follow:
@periodics.periodic(spacing=2, enabled=True)
def func():
pass
:param callables: pass functions in this to execute periodically
:returns: Future object
"""
global _periodics_worker
_periodics_worker = futurist.periodics.PeriodicWorker(
callables=callables,
executor_factory=futurist.periodics.ExistingExecutor(executor()))
task_worker = executor().submit(_periodics_worker.start)
task_worker.add_done_callback(_handle_exceptions)
def stop_periodic_tasks():
"""Stops all periodic tasks and cleanup resources. """
global _periodics_worker
if _periodics_worker is not None:
try:
_periodics_worker.stop()
_periodics_worker.wait()
except Exception:
LOG.exception("Exception occurred when stopping periodic workers")
_periodics_worker = None
if _executor and _executor.alive:
_executor.shutdown(wait=True)
LOG.debug("periodic tasks stopped successfully")
def _spawn_worker(func, *args, **kwargs):
"""Creates a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: Future object.
:raises: NoFreeWorker if worker pool is currently full.
"""
try:
return executor().submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
raise exception.ValenceException("No free worker available")
def async(func):
"""Start a job in new background thread.
To start a async job, decorate the function as follows:
Example:
@async.async
def test():
pass
"""
def wrapper(*args, **kwargs):
LOG.info("starting async thread for function %s", func.__name__)
future = _spawn_worker(func, *args, **kwargs)
future.add_done_callback(_handle_exceptions)
return wrapper
def _handle_exceptions(fut):
try:
fut.result()
except Exception:
msg = 'Unexpected exception in background thread'
LOG.exception(msg)

View File

@ -42,6 +42,14 @@ podm_opts = [
default='/redfish/v1/',
help=_('The URL extension that specifies the '
'Redfish API version that valence will interact with')),
cfg.BoolOpt('enable_periodic_sync',
default=False,
help=_('To enable periodic task to automatically sync'
'resources of podmanager with DB.')),
cfg.IntOpt('sync_interval',
default=30,
help=_('Time interval(in seconds) after which devices will be'
'synced periodically.')),
]

View File

@ -14,13 +14,16 @@
import logging
from valence.common import async
from valence.common import exception
from valence.common import utils
import valence.conf
from valence.controller import nodes
from valence.controller import pooled_devices
from valence.db import api as db_api
from valence.podmanagers import manager
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
@ -63,8 +66,7 @@ def create_podmanager(values):
values['status'] = mng.podm.get_status()
podm = db_api.Connection.create_podmanager(values).as_dict()
# updates all devices corresponding to this podm in DB
# TODO(Akhil): Make this as asynchronous action
pooled_devices.PooledDevices.update_device_info(podm['uuid'])
update_podm_resources_to_db(podm['uuid'])
return podm
@ -89,3 +91,17 @@ def delete_podmanager(uuid):
db_api.Connection.delete_device(device['uuid'])
return db_api.Connection.delete_podmanager(uuid)
@async.async
def update_podm_resources_to_db(podm_id):
"""Starts asynchronous one_time sync
As set in configuration this function will sync pooled resources
one time if background periodic sync is disabled.
:param podm_id: to asynchronously sync devices of particular podm
"""
if not CONF.podm.enable_periodic_sync:
pooled_devices.PooledDevices.synchronize_devices(podm_id)
return

View File

@ -14,10 +14,14 @@
import logging
from futurist import periodics
from valence.common import exception
import valence.conf
from valence.db import api as db_api
from valence.podmanagers import manager
CONF = valence.conf.CONF
LOG = logging.getLogger(__name__)
@ -50,6 +54,8 @@ class PooledDevices(object):
return db_api.Connection.get_device_by_uuid(device_id).as_dict()
@classmethod
@periodics.periodic(spacing=CONF.podm.sync_interval, enabled=True,
run_immediately=True)
def synchronize_devices(cls, podm_id=None):
"""Sync devices connected to podmanager(s)

View File

@ -0,0 +1,51 @@
# Copyright (c) 2018 NEC, Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import futurist
import mock
from valence.common import async
from valence.common import exception
class AsyncTestCase(unittest.TestCase):
def setUp(self):
super(AsyncTestCase, self).setUp()
self.executor = mock.Mock(spec=futurist.GreenThreadPoolExecutor)
self.periodics = mock.Mock(spec=futurist.periodics.PeriodicWorker)
async._executor = self.executor
async._periodics_worker = self.periodics
def test__spawn_worker(self):
async._spawn_worker('fake', 1, foo='bar')
self.executor.submit.assert_called_once_with('fake', 1, foo='bar')
def test__spawn_worker_none_free(self):
self.executor.submit.side_effect = futurist.RejectedSubmission()
self.assertRaises(exception.ValenceException,
async._spawn_worker, 'fake')
def test_start_periodic_tasks(self):
fake_callable = mock.MagicMock()
async.start_periodic_worker([(fake_callable, None, None)])
self.executor.submit.assert_called_once_with(
async._periodics_worker.start)
def test_stop_periodic_tasks(self):
async.stop_periodic_tasks()
self.periodics.stop.assert_called()
self.periodics.wait.assert_called()
self.executor.shutdown.assert_called()

View File

@ -17,6 +17,7 @@ import mock
from valence.common.exception import BadRequest
from valence.controller import podmanagers
from valence.podmanagers import podm_base
class TestPodManagers(unittest.TestCase):
@ -104,3 +105,29 @@ class TestPodManagers(unittest.TestCase):
podmanagers.update_podmanager('fake-podm-id', values)
mock_db_update.assert_called_once_with('fake-podm-id', result_values)
@mock.patch('valence.redfish.sushy.sushy_instance.RedfishInstance')
@mock.patch('valence.controller.podmanagers.update_podm_resources_to_db')
@mock.patch('valence.db.api.Connection.create_podmanager')
@mock.patch('valence.podmanagers.manager.Manager')
@mock.patch('valence.controller.podmanagers._check_creation')
def test_create_podmanager(self, mock_creation, mock_mng, mock_db_create,
mock_resource_update, mock_sushy):
values = {"name": "podm_name", "url": "https://10.240.212.123",
"driver": "redfishv1", "status": None,
"authentication": [{
"type": "basic",
"auth_items": {"username": "xxxxxxx",
"password": "xxxxxxx"}}]}
mock_creation.return_value = values
mock_mng.podm.return_value = podm_base.PodManagerBase(
'fake', 'fake-pass', 'http://fake-url')
podmanagers.create_podmanager('fake-values')
mock_db_create.assert_called_once_with(values)
mock_resource_update.assert_called()
@mock.patch('valence.common.async._spawn_worker')
def test_update_podm_resources_to_db(self, mock_worker):
mock_worker.return_value = mock.MagicMock()
podmanagers.update_podm_resources_to_db('fake-podm-id')
mock_worker.assert_called()