Add Sync mechanism to NEO

Change-Id: I89bdd873633b9e0b456477bc9260ec0e0b57ce25
This commit is contained in:
Moshe Levi 2016-09-08 11:25:17 +03:00
parent 9eab83fbb3
commit c5ecaee06c
8 changed files with 893 additions and 235 deletions

View File

@ -25,3 +25,34 @@
# This is an optional parameter, default value is 10 seconds.
# Example: timeout = 15
# timeout =
# (IntOpt) Timeout in seconds for the driver thread to fire off
# another thread run through the journal database.
#
# sync_timeout = 10
# Example: sync_timeout = 10
# (IntOpt) Number of times to retry a journal transaction before
# marking it 'failed'. To disable retry count value should be -1
#
# retry_count = -1
# Example: retry_count = 5
# (IntOpt) Journal maintenance operations interval in seconds.
#
# maintenance_interval = 300
# Example: maintenance_interval = 30
# (IntOpt) Time to keep completed rows in seconds.
# Completed rows retention will be checked every maintenance_interval by the
# cleanup thread.
# To disable completed rows deletion value should be -1
#
# completed_rows_retention = 600
# Example: completed_rows_retention = 30
# (IntOpt) Timeout in seconds to wait before marking a processing
# row back to pending state.
#
# processing_timeout = 100
# Example: maintenance_interval = 200

View File

@ -115,7 +115,7 @@ def update_db_row_job_id(session, row, job_id):
def update_pending_db_row_retry(session, row, retry_count):
if row.retry_count >= retry_count:
if row.retry_count >= retry_count and row.retry_count != -1:
update_db_row_state(session, row, sdn_const.FAILED)
else:
row.retry_count += 1

View File

@ -0,0 +1,231 @@
# Copyright 2016 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import threading
from neutron.db import api as neutron_db_api
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from six.moves import html_parser
from networking_mlnx._i18n import _LI, _LE, _LW
from networking_mlnx.db import db
from networking_mlnx.journal import dependency_validations
from networking_mlnx.plugins.ml2.drivers.sdn import client
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
from networking_mlnx.plugins.ml2.drivers.sdn import exceptions as sdn_exc
from networking_mlnx.plugins.ml2.drivers.sdn import utils as sdn_utils
LOG = logging.getLogger(__name__)
def call_thread_on_end(func):
def new_func(obj, *args, **kwargs):
return_value = func(obj, *args, **kwargs)
obj.journal.set_sync_event()
return return_value
return new_func
def record(db_session, object_type, object_uuid, operation, data,
context=None):
db.create_pending_row(db_session, object_type, object_uuid, operation,
data)
class SdnJournalThread(object):
"""Thread worker for the SDN Journal Database."""
def __init__(self):
self.client = client.SdnRestClient.create_client()
self._sync_timeout = cfg.CONF.sdn.sync_timeout
self._row_retry_count = cfg.CONF.sdn.retry_count
self.event = threading.Event()
self.lock = threading.Lock()
self._sync_thread = self.start_sync_thread()
self._start_sync_timer()
def start_sync_thread(self):
# Start the sync thread
LOG.debug("Starting a new sync thread")
sync_thread = threading.Thread(
name='sync',
target=self.run_sync_thread)
sync_thread.start()
return sync_thread
def set_sync_event(self):
# Prevent race when starting the timer
with self.lock:
LOG.debug("Resetting thread timer")
self._timer.cancel()
self._start_sync_timer()
self.event.set()
def _start_sync_timer(self):
self._timer = threading.Timer(self._sync_timeout,
self.set_sync_event)
self._timer.start()
def run_sync_thread(self, exit_after_run=False):
while True:
try:
self.event.wait()
self.event.clear()
session = neutron_db_api.get_session()
self._sync_pending_rows(session, exit_after_run)
self._sync_progress_rows(session)
LOG.debug("Clearing sync thread event")
if exit_after_run:
# Permanently waiting thread model breaks unit tests
# Adding this arg to exit here only for unit tests
break
except Exception:
# Catch exceptions to protect the thread while running
LOG.exception(_LE("Error on run_sync_thread"))
def _sync_pending_rows(self, session, exit_after_run):
while True:
LOG.debug("sync_pending_rows operation walking database")
row = db.get_oldest_pending_db_row_with_lock(session)
if not row:
LOG.debug("No rows to sync")
break
# Validate the operation
valid = dependency_validations.validate(session, row)
if not valid:
LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a "
"valid operation yet, skipping for now"),
{'operation': row.operation,
'type': row.object_type,
'uuid': row.object_uuid})
# Set row back to pending.
db.update_db_row_state(session, row, sdn_const.PENDING)
if exit_after_run:
break
continue
LOG.info(_LI("Syncing %(operation)s %(type)s %(uuid)s"),
{'operation': row.operation, 'type': row.object_type,
'uuid': row.object_uuid})
# Add code to sync this to NEO
urlpath = sdn_utils.strings_to_url(row.object_type)
if row.operation != sdn_const.POST:
urlpath = sdn_utils.strings_to_url(urlpath, row.object_uuid)
try:
client_operation_method = (
getattr(self.client, row.operation.lower()))
response = (
client_operation_method(
urlpath, jsonutils.loads(row.data)))
if response.status_code == requests.codes.not_implemented:
db.update_db_row_state(session, row, sdn_const.COMPLETED)
else:
# update in progress and job_id
job_id = None
try:
try:
job_id = response.json()
except ValueError:
# Note(moshele) workaround for NEO
# because for POST port it return html
# and not json
parser = html_parser.HTMLParser()
parser.feed(response.text)
parser.handle_starttag('a', [])
url = parser.get_starttag_text()
match = re.match(
r'<a href="([a-zA-Z0-9\/]+)">', url)
if match:
job_id = match.group(1)
except Exception as e:
LOG.error(_LE("Failed to extract job_id %s"), e)
if job_id:
db.update_db_row_job_id(
session, row, job_id=job_id)
db.update_db_row_state(
session, row, sdn_const.MONITORING)
else:
LOG.warning(_LW("object %s has join id is NULL"),
row.object_uuid)
except sdn_exc.SDNConnectionError:
# Don't raise the retry count, just log an error
LOG.error(_LE("Cannot connect to the NEO Controller"))
db.update_pending_db_row_retry(session, row,
self._row_retry_count)
# Break our of the loop and retry with the next
# timer interval
break
def _sync_progress_rows(self, session):
# 1. get all progressed job
# 2. get status for NEO
# 3. Update status if completed/failed
LOG.debug("sync_progress_rows operation walking database")
rows = db.get_all_monitoring_db_row_by_oldest(session)
if not rows:
LOG.debug("No rows to sync")
return
for row in rows:
try:
if row.job_id is None:
LOG.warning(_LW("object %s has join id is NULL"),
row.object_uuid)
continue
response = self.client.get(row.job_id.strip("/"))
if response:
try:
job_status = response.json().get('Status')
if job_status == 'Completed':
db.update_db_row_state(
session, row, sdn_const.COMPLETED)
continue
elif job_status in ("Pending", "Running"):
LOG.debug("NEO Job id %(job_id)s is %(status)s "
"continue monitoring",
{'job_id': row.job_id,
'status': job_status})
continue
else:
LOG.error(_LE("NEO Job id %(job_id)s, failed with"
" %(status)s"),
{'job_id': row.job_id,
'status': job_status})
db.update_db_row_state(
session, row, sdn_const.PENDING)
except ValueError or AttributeError:
LOG.error(_LE("failed to extract response for job"
"id %s"), row.job_id)
else:
LOG.error(_LE("NEO Job id %(job_id)s, failed with "
"%(status)s"),
{'job_id': row.job_id, 'status': job_status})
db.update_db_row_state(session, row, sdn_const.PENDING)
except sdn_exc.SDNConnectionError:
# Don't raise the retry count, just log an error
LOG.error(_LE("Cannot connect to the NEO Controller"))
db.update_db_row_state(session, row, sdn_const.PENDING)
# Break our of the loop and retry with the next
# timer interval
break

View File

@ -0,0 +1,74 @@
# Copyright 2016 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import api as neutron_db_api
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from networking_mlnx._i18n import _LE
from networking_mlnx._i18n import _LI
from networking_mlnx.db import db
LOG = logging.getLogger(__name__)
class MaintenanceThread(object):
def __init__(self):
self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
self.maintenance_interval = cfg.CONF.sdn.maintenance_interval
self.maintenance_ops = []
def start(self):
self.timer.start(self.maintenance_interval, stop_on_exception=False)
def _execute_op(self, operation, session):
op_details = operation.__name__
if operation.__doc__:
op_details += " (%s)" % operation.func_doc
try:
LOG.info(_LI("Starting maintenance operation %s."), op_details)
db.update_maintenance_operation(session, operation=operation)
operation(session=session)
LOG.info(_LI("Finished maintenance operation %s."), op_details)
except Exception:
LOG.exception(_LE("Failed during maintenance operation %s."),
op_details)
def execute_ops(self):
LOG.info(_LI("Starting journal maintenance run."))
session = neutron_db_api.get_session()
if not db.lock_maintenance(session):
LOG.info(_LI("Maintenance already running, aborting."))
return
try:
for operation in self.maintenance_ops:
self._execute_op(operation, session)
finally:
db.update_maintenance_operation(session, operation=None)
db.unlock_maintenance(session)
LOG.info(_LI("Finished journal maintenance run."))
def register_operation(self, f):
"""Register a function to be run by the maintenance thread.
:param f: Function to call when the thread runs. The function will
receive a DB session to use for DB operations.
"""
self.maintenance_ops.append(f)

View File

@ -36,4 +36,22 @@ sdn_opts = [
help=_("HTTP timeout in seconds."),
default=10
),
cfg.IntOpt('sync_timeout', default=10,
help=_("Sync thread timeout in seconds.")),
cfg.IntOpt('retry_count', default=-1,
help=_("Number of times to retry a row "
"before failing."
"To disable retry count value should be -1")),
cfg.IntOpt('maintenance_interval', default=300,
help=_("Journal maintenance operations interval "
"in seconds.")),
cfg.IntOpt('completed_rows_retention', default=600,
help=_("Time to keep completed rows in seconds."
"Completed rows retention will be checked every "
"maintenance_interval by the cleanup thread."
"To disable completed rows deletion "
"value should be -1")),
cfg.IntOpt('processing_timeout', default='100',
help=_("Time in seconds to wait before a "
"processing row is marked back to pending.")),
]

View File

@ -15,15 +15,18 @@
import functools
from neutron.common import constants as neutron_const
from neutron.db import api as db_api
from neutron.objects.qos import policy as policy_object
from neutron.plugins.common import constants
from neutron.plugins.ml2 import driver_api as api
from oslo_log import log
from networking_mlnx._i18n import _LE
from networking_mlnx.journal import cleanup
from networking_mlnx.journal import journal
from networking_mlnx.journal import maintenance
from networking_mlnx.plugins.ml2.drivers.sdn import client
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
from networking_mlnx.plugins.ml2.drivers.sdn import utils as sdn_utils
LOG = log.getLogger(__name__)
@ -73,63 +76,100 @@ class SDNMechanismDriver(api.MechanismDriver):
def initialize(self):
self.client = client.SdnRestClient.create_client()
self.journal = journal.SdnJournalThread()
self._start_maintenance_thread()
def _start_maintenance_thread(self):
# start the maintenance thread and register all the maintenance
# operations :
# (1) JournalCleanup - Delete completed rows from journal
# (2) CleanupProcessing - Mark orphaned processing rows to pending
cleanup_obj = cleanup.JournalCleanup()
self._maintenance_thread = maintenance.MaintenanceThread()
self._maintenance_thread.register_operation(
cleanup_obj.delete_completed_rows)
self._maintenance_thread.register_operation(
cleanup_obj.cleanup_processing_rows)
self._maintenance_thread.start()
@staticmethod
def _record_in_journal(context, object_type, operation, data=None):
if data is None:
data = context.current
journal.record(context._plugin_context.session, object_type,
context.current['id'], operation, data)
@context_validator(sdn_const.NETWORK)
@error_handler
def create_network_postcommit(self, context):
network_dic = context._network
def create_network_precommit(self, context):
network_dic = context.current
network_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, network_dic['id']))
self.client.post(urlpath=sdn_const.NETWORK, data=network_dic)
@context_validator(sdn_const.NETWORK)
@error_handler
def update_network_postcommit(self, context):
network_dic = context._network
network_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, network_dic['id']))
urlpath = sdn_utils.strings_to_url(sdn_const.NETWORK,
network_dic['id'])
self.client.put(urlpath=urlpath, data=network_dic)
@context_validator(sdn_const.NETWORK)
@error_handler
def delete_network_postcommit(self, context):
network_dic = context._network
network_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, network_dic['id']))
urlpath = sdn_utils.strings_to_url(sdn_const.NETWORK,
network_dic['id'])
self.client.delete(urlpath=urlpath, data=network_dic)
@context_validator(sdn_const.PORT)
@error_handler
def update_port_postcommit(self, context):
port_dic = context._port
port_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, port_dic['network_id']))
urlpath = sdn_utils.strings_to_url(sdn_const.PORT,
port_dic['id'])
self.client.put(urlpath=urlpath, data=port_dic)
@context_validator(sdn_const.PORT)
@error_handler
def delete_port_postcommit(self, context):
port_dic = context._port
port_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, port_dic['network_id']))
urlpath = sdn_utils.strings_to_url(sdn_const.PORT,
port_dic['id'])
self.client.delete(urlpath=urlpath, data=port_dic)
SDNMechanismDriver._record_in_journal(
context, sdn_const.NETWORK, sdn_const.POST, network_dic)
@context_validator()
@error_handler
def bind_port(self, context):
port_dic = context._port
port_dic = context.current
if self._is_send_bind_port(port_dic):
port_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, port_dic['network_id']))
self.client.post(urlpath=sdn_const.PORT, data=port_dic)
SDNMechanismDriver._record_in_journal(
context, sdn_const.PORT, sdn_const.POST, port_dic)
@context_validator(sdn_const.NETWORK)
@error_handler
def update_network_precommit(self, context):
network_dic = context.current
network_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, network_dic['id']))
SDNMechanismDriver._record_in_journal(
context, sdn_const.NETWORK, sdn_const.PUT, network_dic)
def update_port_precommit(self, context):
port_dic = context.current
port_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, port_dic['network_id']))
SDNMechanismDriver._record_in_journal(
context, sdn_const.PORT, sdn_const.PUT, port_dic)
@context_validator(sdn_const.NETWORK)
@error_handler
def delete_network_precommit(self, context):
network_dic = context.current
network_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, network_dic['id']))
SDNMechanismDriver._record_in_journal(
context, sdn_const.NETWORK, sdn_const.DELETE, data=network_dic)
@context_validator(sdn_const.PORT)
@error_handler
def delete_port_precommit(self, context):
port_dic = context.current
port_dic[NETWORK_QOS_POLICY] = (
self._get_network_qos_policy(context, port_dic['network_id']))
SDNMechanismDriver._record_in_journal(
context, sdn_const.PORT, sdn_const.DELETE, port_dic)
@journal.call_thread_on_end
def sync_from_callback(self, operation, res_type, res_id, resource_dict):
object_type = res_type.singular
object_uuid = (resource_dict[object_type]['id']
if operation == sdn_const.POST else res_id)
if resource_dict is not None:
resource_dict = resource_dict[object_type]
journal.record(db_api.get_session(), object_type, object_uuid,
operation, resource_dict)
def _postcommit(self, context):
self.journal.set_sync_event()
create_network_postcommit = _postcommit
update_network_postcommit = _postcommit
update_port_postcommit = _postcommit
delete_network_postcommit = _postcommit
delete_port_postcommit = _postcommit
def _is_send_bind_port(self, port_context):
"""Verify that bind port is occur in compute context

View File

@ -0,0 +1,92 @@
# Copyright 2016 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import threading
from neutron.db import api as neutron_db_api
from neutron.tests.unit import testlib_api
from networking_mlnx.db.models import sdn_maintenance_db
from networking_mlnx.journal import maintenance
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
class MaintenanceThreadTestCase(testlib_api.SqlTestCaseLight):
def setUp(self):
super(MaintenanceThreadTestCase, self).setUp()
self.db_session = neutron_db_api.get_session()
row = sdn_maintenance_db.SdnMaintenance(state=sdn_const.PENDING)
self.db_session.add(row)
self.db_session.flush()
self.thread = maintenance.MaintenanceThread()
self.thread.maintenance_interval = 0.01
def test__execute_op_no_exception(self):
with mock.patch.object(maintenance, 'LOG') as mock_log:
operation = mock.MagicMock()
operation.__name__ = "test"
self.thread._execute_op(operation, self.db_session)
self.assertTrue(operation.called)
self.assertTrue(mock_log.info.called)
self.assertFalse(mock_log.exception.called)
def test__execute_op_with_exception(self):
with mock.patch.object(maintenance, 'LOG') as mock_log:
operation = mock.MagicMock(side_effect=Exception())
operation.__name__ = "test"
self.thread._execute_op(operation, self.db_session)
self.assertTrue(mock_log.exception.called)
def test_thread_works(self):
callback_event = threading.Event()
count = [0]
def callback_op(**kwargs):
count[0] += 1
# The following should be true on the second call, so we're making
# sure that the thread runs more than once.
if count[0] > 1:
callback_event.set()
self.thread.register_operation(callback_op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))
def test_thread_continues_after_exception(self):
exception_event = threading.Event()
callback_event = threading.Event()
def exception_op(**kwargs):
if not exception_event.is_set():
exception_event.set()
raise Exception()
def callback_op(**kwargs):
callback_event.set()
for op in [exception_op, callback_op]:
self.thread.register_operation(op)
self.thread.start()
# Make sure the callback event was called and not timed out
self.assertTrue(callback_event.wait(timeout=5))

View File

@ -11,22 +11,28 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import mock
import requests
from neutron.common import constants as neutron_const
from neutron.db import api as neutron_db_api
from neutron.plugins.common import constants
from neutron.plugins.ml2 import config as config
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2 import config
from neutron.plugins.ml2 import plugin
from neutron.tests import base
from neutron.tests.unit.plugins.ml2 import test_plugin
from neutron.tests.unit import testlib_api
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from networking_mlnx.db import db
from networking_mlnx.journal import cleanup
from networking_mlnx.journal import journal
from networking_mlnx.plugins.ml2.drivers.sdn import client
from networking_mlnx.plugins.ml2.drivers.sdn import constants as sdn_const
from networking_mlnx.plugins.ml2.drivers.sdn import sdn_mech_driver
from networking_mlnx.plugins.ml2.drivers.sdn import utils as sdn_utils
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
SEG_ID = 4L
@ -34,25 +40,35 @@ DEVICE_OWNER_COMPUTE = 'compute:None'
MECHANISM_DRIVER_NAME = 'mlnx_sdn_assist'
class SDNTestCase(test_plugin.Ml2PluginV2TestCase):
_mechanism_drivers = ['logger', MECHANISM_DRIVER_NAME]
class SdnConfigBase(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(SdnConfigBase, self).setUp()
config.cfg.CONF.set_override('mechanism_drivers',
['logger', MECHANISM_DRIVER_NAME],
'ml2')
config.cfg.CONF.set_override('url', 'http://127.0.0.1/neo',
sdn_const.GROUP_OPT)
config.cfg.CONF.set_override('username', 'admin', sdn_const.GROUP_OPT)
config.cfg.CONF.set_override('password', 'admin', sdn_const.GROUP_OPT)
super(SDNTestCase, self).setUp()
self.mech = sdn_mech_driver.SDNMechanismDriver()
sdn_mech_driver.SDNMechanismDriver._send_http_request = (
self.check_send_http_request)
def check_send_http_request(self, urlpath, data, method):
class SdnTestCase(SdnConfigBase):
def setUp(self):
super(SdnTestCase, self).setUp()
self.mech = sdn_mech_driver.SDNMechanismDriver()
mock.patch.object(journal.SdnJournalThread,
'start_sync_thread').start()
self.mock_request = mock.patch.object(client.SdnRestClient,
'request').start()
self.mock_request.side_effect = self.check_request
def check_request(self, method, urlpath, obj):
self.assertFalse(urlpath.startswith("http://"))
class SDNMechanismConfigTests(testlib_api.SqlTestCase):
class SdnMechanismConfigTests(testlib_api.SqlTestCase):
def _set_config(self, url='http://127.0.0.1/neo',
username='admin',
@ -82,47 +98,64 @@ class SDNMechanismConfigTests(testlib_api.SqlTestCase):
def test_missing_password_raises_exception(self):
self._test_missing_config(password=None)
class SdnMechanismTestBasicGet(test_plugin.TestMl2BasicGet,
SdnTestCase):
pass
class SdnMechanismTestNetworksV2(test_plugin.TestMl2NetworksV2,
SdnTestCase):
pass
class SdnMechanismTestPortsV2(test_plugin.TestMl2PortsV2,
SdnTestCase):
pass
class DataMatcher(object):
def __init__(self, context, object_type):
self._data = context.__dict__["_" + object_type.lower()]
self._data = jsonutils.dumps(self._data, indent=2)
def __eq__(self, data):
return data == self._data
return jsonutils.loads(data) == self._data
def __repr__(self):
return self._data
return jsonutils.dumps(self._data)
class SDNDriverTestCase(base.BaseTestCase):
class SdnDriverTestCase(SdnConfigBase):
OPERATION_MAPPING = {
sdn_const.PUT: 'update',
sdn_const.DELETE: 'delete',
sdn_const.POST: 'create',
}
def setUp(self):
super(SDNDriverTestCase, self).setUp()
config.cfg.CONF.set_override('mechanism_drivers',
['logger', MECHANISM_DRIVER_NAME], 'ml2')
config.cfg.CONF.set_override('url', 'http://127.0.0.1/neo',
sdn_const.GROUP_OPT)
config.cfg.CONF.set_override('username', 'admin', sdn_const.GROUP_OPT)
config.cfg.CONF.set_override('password', 'admin', sdn_const.GROUP_OPT)
super(SdnDriverTestCase, self).setUp()
self.db_session = neutron_db_api.get_session()
self.mech = sdn_mech_driver.SDNMechanismDriver()
self.mock_sync_thread = mock.patch.object(
journal.SdnJournalThread, 'start_sync_thread').start()
self.mech.initialize()
self.thread = journal.SdnJournalThread()
self.addCleanup(self._db_cleanup)
def _get_segments_list(self, seg_id=SEG_ID, net_type=constants.TYPE_VLAN):
return [{'segmentation_id': seg_id,
'physical_network': u'physnet1',
'id': u'72770a8a-e9b4-46da-8f0a-ffbd6a7fa3de',
'id': u'c13bba05-eb07-45ba-ace2-765706b2d701',
'network_type': net_type}]
def _get_mock_network_operation_context(self):
current = {"provider:segmentation_id": SEG_ID,
'id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
'id': 'c13bba05-eb07-45ba-ace2-765706b2d701',
'name': 'net1',
'provider:network_type': 'vlan',
'network_qos_policy': None}
context = mock.Mock(current=current, _network=current,
_segments=self._get_segments_list())
context._plugin_context.session = neutron_db_api.get_session()
return context
def _get_mock_port_operation_context(self):
@ -144,15 +177,24 @@ class SDNDriverTestCase(base.BaseTestCase):
context = mock.Mock(current=current, _port=current,
_network_context=network_context)
context._plugin_context.session = neutron_db_api.get_session()
return context
def _get_mock_bind_operation_context(self,
device_owner=DEVICE_OWNER_COMPUTE):
current = {'device_owner': device_owner,
def _get_mock_bind_operation_context(self):
current = {'binding:host_id': 'r-ufm177',
'binding:profile': {u'pci_slot': u'0000:02:00.4',
u'physical_network': u'physnet1',
u'pci_vendor_info': u'15b3:1004'},
'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839',
'binding:vnic_type': 'direct',
'mac_address': '12:34:56:78:21:b6',
'name': 'port_test1',
'device_owner': DEVICE_OWNER_COMPUTE,
'network_id': 'c13bba05-eb07-45ba-ace2-765706b2d701',
'network_qos_policy': None}
context = mock.Mock(current=current, _port=current,
segments_to_bind=self._get_segments_list())
context._plugin_context.session = neutron_db_api.get_session()
return context
def _get_mock_operation_context(self, object_type):
@ -173,17 +215,6 @@ class SDNDriverTestCase(base.BaseTestCase):
503: '503 Server Error: Service Unavailable',
}
@classmethod
def _get_mock_request_response(cls, status_code):
response = mock.Mock(status_code=status_code)
if (status_code < requests.codes.bad_request and
status_code != requests.codes.not_implemented):
response.raise_for_status = mock.Mock()
else:
mock.Mock(side_effect=requests.exceptions.HTTPError(
cls._status_code_msgs[status_code]))
return response
def _get_http_request_codes(self):
for err_code in (requests.codes.ok,
requests.codes.created,
@ -197,175 +228,316 @@ class SDNDriverTestCase(base.BaseTestCase):
requests.codes.service_unavailable):
yield err_code
def _test_no_operation(self, method, context, status_code,
*args, **kwargs):
request_response = self._get_mock_request_response(status_code)
with mock.patch('requests.Session.request',
return_value=request_response) as mock_method:
method(context)
self.assertFalse(mock_method.called)
def _db_cleanup(self):
rows = db.get_all_db_rows(self.db_session)
for row in rows:
db.delete_row(self.db_session, row=row)
def _test_operation_with(self, method, context, status_code,
*args, **kwargs):
request_response = self._get_mock_request_response(status_code)
@classmethod
def _get_mock_request_response(cls, status_code, job_url):
response = mock.Mock(status_code=status_code)
if status_code < 400:
response.raise_for_status = mock.Mock()
response.json = mock.Mock(
side_effect=[job_url, {"Status": "Completed"}])
else:
mock.Mock(side_effect=requests.exceptions.HTTPError(
cls._status_code_msgs[status_code]))
return response
def _test_operation(self, method, status_code, expected_calls,
*args, **kwargs):
job_url = 'app/jobs/' + uuidutils.generate_uuid()
urlpath = sdn_utils.strings_to_url(
cfg.CONF.sdn.url, job_url)
request_response = self._get_mock_request_response(
status_code, job_url)
if expected_calls == 4 and status_code < 400:
job_url2 = 'app/jobs/' + uuidutils.generate_uuid()
urlpath2 = sdn_utils.strings_to_url(
cfg.CONF.sdn.url, job_url)
request_response.json = mock.Mock(
side_effect=[job_url, job_url2,
{"Status": "Completed"}, {"Status": "Completed"}])
with mock.patch('requests.Session.request',
return_value=request_response) as mock_method:
method(context)
method(exit_after_run=True)
login_args = mock.call(
sdn_const.POST, mock.ANY,
headers=sdn_const.LOGIN_HTTP_HEADER,
data=mock.ANY, timeout=config.cfg.CONF.sdn.timeout)
job_get_args = mock.call(
sdn_const.GET, data=None,
headers=sdn_const.JSON_HTTP_HEADER,
url=urlpath, timeout=config.cfg.CONF.sdn.timeout)
if status_code < 400:
operation_args = mock.call(
headers=sdn_const.JSON_HTTP_HEADER,
timeout=config.cfg.CONF.sdn.timeout, *args, **kwargs)
self.assertEqual(login_args, mock_method.mock_calls[0])
self.assertEqual(operation_args, mock_method.mock_calls[1])
if expected_calls:
operation_args = mock.call(
headers=sdn_const.JSON_HTTP_HEADER,
timeout=config.cfg.CONF.sdn.timeout, *args, **kwargs)
if expected_calls == 4:
urlpath2 = sdn_utils.strings_to_url(
cfg.CONF.sdn.url, job_url2)
job_get_args2 = mock.call(
sdn_const.GET, data=None,
headers=sdn_const.JSON_HTTP_HEADER,
url=urlpath2, timeout=config.cfg.CONF.sdn.timeout)
self.assertEqual(
login_args, mock_method.mock_calls[4])
self.assertEqual(
job_get_args, mock_method.mock_calls[5])
self.assertEqual(
login_args, mock_method.mock_calls[6])
self.assertEqual(
job_get_args2, mock_method.mock_calls[7])
else:
self.assertEqual(
login_args, mock_method.mock_calls[0])
self.assertEqual(
operation_args, mock_method.mock_calls[1])
self.assertEqual(
login_args, mock_method.mock_calls[2])
self.assertEqual(
job_get_args, mock_method.mock_calls[3])
def _test_create_resource_postcommit(self, object_type, status_code):
method = getattr(self.mech, 'create_%s_postcommit' %
object_type.lower())
context = self._get_mock_operation_context(object_type)
url = (
'%s/%s/%s' % (config.cfg.CONF.sdn.url,
config.cfg.CONF.sdn.domain,
object_type))
kwargs = {'url': url, 'data': DataMatcher(context, object_type)}
self._test_operation_with(method, context, status_code,
sdn_const.POST, **kwargs)
# we need to reduce the login call_cout
self.assertEqual(expected_calls * 2, mock_method.call_count)
def _test_update_resource_postcommit(self, object_type, status_code):
method = getattr(self.mech, 'update_%s_postcommit' %
object_type.lower())
context = self._get_mock_operation_context(object_type)
url = (
'%s/%s/%s/%s' % (config.cfg.CONF.sdn.url,
config.cfg.CONF.sdn.domain,
object_type,
context.current['id']))
kwargs = {'url': url, 'data': DataMatcher(context, object_type)}
self._test_operation_with(method, context, status_code,
sdn_const.PUT, **kwargs)
def _test_delete_resource_postcommit(self, object_type, status_code):
method = getattr(self.mech, 'delete_%s_postcommit' %
object_type.lower())
context = self._get_mock_operation_context(object_type)
url = (
'%s/%s/%s/%s' % (config.cfg.CONF.sdn.url,
config.cfg.CONF.sdn.domain,
object_type,
context.current['id']))
kwargs = {'url': url, 'data': DataMatcher(context, object_type)}
self._test_operation_with(method, context, status_code,
sdn_const.DELETE, **kwargs)
def _test_bind_port(self, status_code, context, assert_called=True):
method = getattr(self.mech, 'bind_port')
object_type = sdn_const.PORT
url = (
'%s/%s/%s' % (config.cfg.CONF.sdn.url,
config.cfg.CONF.sdn.domain,
object_type))
kwargs = {'url': url, 'data': DataMatcher(context, object_type)}
if assert_called:
self._test_operation_with(method, context, status_code,
sdn_const.POST, **kwargs)
def _call_operation_object(self, operation, object_type):
if object_type == sdn_const.PORT and operation == sdn_const.POST:
context = self._get_mock_bind_operation_context()
method = getattr(self.mech, 'bind_port')
else:
self._test_no_operation(method, context, status_code,
sdn_const.POST, **kwargs)
context = self._get_mock_operation_context(object_type)
operation = self.OPERATION_MAPPING[operation]
object_type = object_type.lower()
method = getattr(self.mech, '%s_%s_precommit' % (operation,
object_type))
method(context)
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_create_network_postcommit(self, *args):
for status_code in self._get_http_request_codes():
self._test_create_resource_postcommit(sdn_const.NETWORK,
status_code,
)
def _test_operation_object(self, operation, object_type):
self._call_operation_object(operation, object_type)
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_update_port_postcommit(self, *args):
for status_code in self._get_http_request_codes():
self._test_update_resource_postcommit(sdn_const.PORT,
status_code,
)
context = self._get_mock_operation_context(object_type)
row = db.get_oldest_pending_db_row_with_lock(self.db_session)
self.assertEqual(operation, row['operation'])
self.assertEqual(object_type, row['object_type'])
self.assertEqual(context.current['id'], row['object_uuid'])
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_update_network_postcommit(self, *args):
for status_code in self._get_http_request_codes():
self._test_update_resource_postcommit(sdn_const.NETWORK,
status_code,
)
def _test_thread_processing(self, operation, object_type,
expected_calls=2):
status_codes = {sdn_const.POST: requests.codes.created,
sdn_const.PUT: requests.codes.ok,
sdn_const.DELETE: requests.codes.no_content}
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_delete_network_postcommit(self, *args):
for status_code in self._get_http_request_codes():
self._test_delete_resource_postcommit(sdn_const.NETWORK,
status_code,
)
http_request = operation
status_code = status_codes[operation]
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_delete_port_postcommit(self, *args):
for status_code in self._get_http_request_codes():
self._test_delete_resource_postcommit(sdn_const.PORT,
status_code,
)
self._call_operation_object(operation, object_type)
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_bind_port_compute(self, *args):
"""Bind port to VM
if object_type == sdn_const.PORT and operation == sdn_const.POST:
context = self._get_mock_bind_operation_context()
else:
context = self._get_mock_operation_context(object_type)
SDN MD will call this kind of bind only
The identify of this call is in port context
The device_owner should be: "compute:None"
"""
context = self._get_mock_bind_operation_context()
for status_code in self._get_http_request_codes():
self._test_bind_port(status_code, context)
url_object_type = object_type.replace('_', '-')
url = '%s/%s/%s' % (config.cfg.CONF.sdn.url,
config.cfg.CONF.sdn.domain,
url_object_type)
if operation in (sdn_const.PUT, sdn_const.DELETE):
uuid = context.current['id']
url = '%s/%s' % (url, uuid)
kwargs = {'url': url, 'data': DataMatcher(context, object_type)}
with mock.patch.object(self.thread.event, 'wait',
return_value=False):
self._test_operation(self.thread.run_sync_thread, status_code,
expected_calls, http_request, **kwargs)
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_bind_port_dhcp(self, *args):
"""Bind port dhcp context
def _test_object_type(self, object_type):
# Add and process create request.
self._test_thread_processing(sdn_const.POST, object_type)
rows = db.get_all_db_rows_by_state(self.db_session,
sdn_const.COMPLETED)
self.assertEqual(1, len(rows))
bind network port occurs when a port binded to a dhcp
SDN MD will filter such a calls
"""
context = self._get_mock_bind_operation_context(
neutron_const.DEVICE_OWNER_DHCP)
for status_code in self._get_http_request_codes():
self._test_bind_port(status_code, context)
# Add and process update request. Adds to database.
self._test_thread_processing(sdn_const.PUT, object_type)
rows = db.get_all_db_rows_by_state(self.db_session,
sdn_const.COMPLETED)
self.assertEqual(2, len(rows))
@mock.patch('neutron.objects.qos.policy.QosPolicy.get_network_policy',
return_value=None)
def test_bind_port_router(self, *args):
"""Bind port router context
# Add and process update request. Adds to database.
self._test_thread_processing(sdn_const.DELETE, object_type)
rows = db.get_all_db_rows_by_state(self.db_session,
sdn_const.COMPLETED)
self.assertEqual(3, len(rows))
bind network port occurs when a port binded to a router
SDN MD will filter such a calls
"""
context = self._get_mock_bind_operation_context(
neutron_const.DEVICE_OWNER_ROUTER_GW)
for status_code in self._get_http_request_codes():
self._test_bind_port(status_code, context, assert_called=False)
def _test_object_type_pending_network(self, object_type):
# Create a network (creates db row in pending state).
self._call_operation_object(sdn_const.POST,
sdn_const.NETWORK)
def test_check_segment(self):
"""Validate the check_segment call."""
segment = {'api.NETWORK_TYPE': ""}
segment[api.NETWORK_TYPE] = constants.TYPE_VLAN
self.assertTrue(self.mech.check_segment(segment))
segment[api.NETWORK_TYPE] = constants.TYPE_FLAT
self.assertTrue(self.mech.check_segment(segment))
# Validate a network type not currently supported
segment[api.NETWORK_TYPE] = constants.TYPE_LOCAL
self.assertFalse(self.mech.check_segment(segment))
segment[api.NETWORK_TYPE] = constants.TYPE_GRE
self.assertFalse(self.mech.check_segment(segment))
segment[api.NETWORK_TYPE] = constants.TYPE_VXLAN
self.assertFalse(self.mech.check_segment(segment))
# Create object_type database row and process. This results in both
# the object_type and network rows being processed.
self._test_thread_processing(sdn_const.POST, object_type,
expected_calls=4)
# Verify both rows are now marked as completed.
rows = db.get_all_db_rows_by_state(self.db_session,
sdn_const.COMPLETED)
self.assertEqual(2, len(rows))
def _test_object_type_processing_network(self, object_type):
self._test_object_operation_pending_another_object_operation(
object_type, sdn_const.POST, sdn_const.NETWORK,
sdn_const.POST)
def _test_object_operation_pending_object_operation(
self, object_type, operation, pending_operation):
self._test_object_operation_pending_another_object_operation(
object_type, operation, object_type, pending_operation)
def _test_object_operation_pending_another_object_operation(
self, object_type, operation, pending_type, pending_operation):
# Create the object_type (creates db row in pending state).
self._call_operation_object(pending_operation,
pending_type)
# Get pending row and mark as processing so that
# this row will not be processed by journal thread.
row = db.get_all_db_rows_by_state(self.db_session, sdn_const.PENDING)
db.update_db_row_state(self.db_session, row[0], sdn_const.PROCESSING)
# Create the object_type database row and process.
# Verify that object request is not processed because the
# dependent object operation has not been marked as 'completed'.
self._test_thread_processing(operation,
object_type,
expected_calls=0)
# Verify that all rows are still in the database.
rows = db.get_all_db_rows_by_state(self.db_session,
sdn_const.PROCESSING)
self.assertEqual(1, len(rows))
rows = db.get_all_db_rows_by_state(self.db_session, sdn_const.PENDING)
self.assertEqual(1, len(rows))
def _test_parent_delete_pending_child_delete(self, parent, child):
self._test_object_operation_pending_another_object_operation(
parent, sdn_const.DELETE, child, sdn_const.DELETE)
def _test_cleanup_processing_rows(self, last_retried, expected_state):
# Create a dummy network (creates db row in pending state).
self._call_operation_object(sdn_const.POST,
sdn_const.NETWORK)
# Get pending row and mark as processing and update
# the last_retried time
row = db.get_all_db_rows_by_state(self.db_session,
sdn_const.PENDING)[0]
row.last_retried = last_retried
db.update_db_row_state(self.db_session, row, sdn_const.PROCESSING)
# Test if the cleanup marks this in the desired state
# based on the last_retried timestamp
cleanup.JournalCleanup().cleanup_processing_rows(self.db_session)
# Verify that the Db row is in the desired state
rows = db.get_all_db_rows_by_state(self.db_session, expected_state)
self.assertEqual(1, len(rows))
def test_driver(self):
for operation in (sdn_const.POST, sdn_const.PUT,
sdn_const.DELETE):
for object_type in (sdn_const.NETWORK, sdn_const.PORT):
self._test_operation_object(operation, object_type)
def test_network(self):
self._test_object_type(sdn_const.NETWORK)
def test_network_update_pending_network_create(self):
self._test_object_operation_pending_object_operation(
sdn_const.NETWORK, sdn_const.PUT, sdn_const.POST)
def test_network_delete_pending_network_create(self):
self._test_object_operation_pending_object_operation(
sdn_const.NETWORK, sdn_const.DELETE, sdn_const.POST)
def test_network_delete_pending_network_update(self):
self._test_object_operation_pending_object_operation(
sdn_const.NETWORK, sdn_const.DELETE, sdn_const.PUT)
def test_network_delete_pending_port_delete(self):
self._test_parent_delete_pending_child_delete(
sdn_const.NETWORK, sdn_const.PORT)
def test_port1(self):
self._test_object_type(sdn_const.PORT)
def test_port_update_pending_port_create(self):
self._test_object_operation_pending_object_operation(
sdn_const.PORT, sdn_const.PUT, sdn_const.POST)
def test_port_delete_pending_port_create(self):
self._test_object_operation_pending_object_operation(
sdn_const.PORT, sdn_const.DELETE, sdn_const.POST)
def test_port_delete_pending_port_update(self):
self._test_object_operation_pending_object_operation(
sdn_const.PORT, sdn_const.DELETE, sdn_const.PUT)
def test_port_pending_network(self):
self._test_object_type_pending_network(sdn_const.PORT)
def test_port_processing_network(self):
self._test_object_type_processing_network(sdn_const.PORT)
def test_cleanup_processing_rows_time_not_expired(self):
self._test_cleanup_processing_rows(datetime.datetime.utcnow(),
sdn_const.PROCESSING)
def test_cleanup_processing_rows_time_expired(self):
old_time = datetime.datetime.utcnow() - datetime.timedelta(hours=24)
self._test_cleanup_processing_rows(old_time, sdn_const.PENDING)
def test_thread_call(self):
"""Verify that the sync thread method is called."""
# Create any object that would spin up the sync thread via the
# decorator call_thread_on_end() used by all the event handlers.
self._call_operation_object(sdn_const.POST,
sdn_const.NETWORK)
# Verify that the thread call was made.
self.assertTrue(self.mock_sync_thread.called)
def _decrease_row_created_time(self, row):
row.created_at -= datetime.timedelta(hours=1)
self.db_session.merge(row)
self.db_session.flush()
def test_sync_multiple_updates(self):
# add 2 updates
for i in range(2):
self._call_operation_object(sdn_const.PUT,
sdn_const.NETWORK)
# get the last update row
last_row = db.get_all_db_rows(self.db_session)[-1]
# change the last update created time
self._decrease_row_created_time(last_row)
# create 1 more operation to trigger the sync thread
# verify that there are no calls to NEO controller, because the
# first row was not valid (exit_after_run = true)
self._test_thread_processing(sdn_const.PUT,
sdn_const.NETWORK, expected_calls=0)
# validate that all the rows are in 'pending' state
# first row should be set back to 'pending' because it was not valid
rows = db.get_all_db_rows_by_state(self.db_session, sdn_const.PENDING)
self.assertEqual(3, len(rows))