Fixes limitation of nwa create tenant

There are two limitations.
The one is on RPC and the other is on DB field size.

The RPC limitation is that create_tenant_rpc_server did not return.
create_tenant_rpc_server calls server.start(), but it did not return
until the thread of callback is finished. Therefore, spawn an another
thread for handling callback.

The DB field size limitation is due to the size of information reported
in PluginReportStateAPI.  nwa agent reports a list of tenant IDs.  When
the number of active tenants becomes more than about 60, the size of
information exceeds the column size.  To reduce the size of information,
the number of tenants is now reported.

Change-Id: I332bff124a708eb41d272d95395bae8c7a028a69
Closes-Bug: #1569628
This commit is contained in:
Shinji YANAGIDA 2016-04-13 10:00:34 +09:00
parent 1e8bf7394e
commit e51f435ba5
6 changed files with 163 additions and 28 deletions

View File

@ -128,7 +128,7 @@ class NECNWANeutronAgent(object):
def _report_state(self):
try:
queues = self.server_manager.get_rpc_server_topics()
self.agent_state['configurations']['tenant_queues'] = queues
self.agent_state['configurations']['tenant_queues'] = len(queues)
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)

View File

@ -12,13 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from neutron.common import rpc as n_rpc
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging.rpc.server import get_rpc_server
from oslo_messaging.target import Target
from networking_nec._i18n import _LW
from networking_nec._i18n import _LW, _LI, _LE
from networking_nec.nwa.common import constants as nwa_const
LOG = logging.getLogger(__name__)
@ -28,10 +30,12 @@ class ServerManager(object):
rpc_servers = {}
def __init__(self, topic, agent_top):
def __init__(self, topic, agent_top, size=1000):
super(ServerManager, self).__init__()
self.topic = topic
self.agent_top = agent_top
self.greenpool_size = size
self.greenpool = eventlet.greenpool.GreenPool(self.greenpool_size)
def get_rpc_server_topics(self):
return [v['topic'] for v in self.rpc_servers.values()]
@ -66,14 +70,32 @@ class ServerManager(object):
self.agent_top.endpoints,
'blocking', serializer
)
LOG.debug("RPCServer create: topic=%s", topic)
if self.greenpool.free() < 1:
self.greenpool_size += nwa_const.NWA_GREENPOOL_ADD_SIZE
self.greenpool.resize(self.greenpool_size)
LOG.info(_LI('RPCServer greenpool resize %s'), self.greenpool_size)
def server_start():
while True:
try:
LOG.debug('RPCServer thread %d start %s',
(self.greenpool.running(), server))
server.start()
LOG.debug('RPCServer thread end %s', server)
break
except Exception as e:
LOG.exception(_LE('RPCServer thread start failed: %s'), e)
self.rpc_servers[tid] = {
'thread': self.greenpool.spawn(server_start),
'server': server,
'topic': topic
}
LOG.debug("RPCServer create: topic=%s", topic)
self.rpc_servers[tid]['server'].start()
eventlet.sleep(0)
LOG.info(_LI('RPCServer started: %(topic)s server=%(server)s'),
{'topic': topic, 'server': server})
ret['result'] = 'SUCCESS'
ret['tenant_id'] = tid
@ -86,14 +108,23 @@ class ServerManager(object):
LOG.warning(_LW("rpc server not found. tid=%s"), tid)
return {'result': 'FAILED'}
LOG.debug('RPCServer delete: stop %s', tid)
self.rpc_servers[tid]['server'].stop()
LOG.debug('RPCServer delete: wait %s', tid)
self.rpc_servers[tid]['server'].wait()
LOG.debug('RPCServer delete: pop %s', tid)
self.rpc_servers.pop(tid)
LOG.debug('RPCServer delete: sleep %s', tid)
eventlet.sleep(0)
ret = {
'result': 'SUCCESS',
'tenant_id': tid
}
LOG.debug("RPCServer delete: %s", ret)
LOG.debug("RPCServer deleted: %s", ret)
return ret

View File

@ -19,3 +19,6 @@ NWA_AGENT_TOPIC = 'nwa_agent'
NWA_AGENT_TYPE = 'NEC NWA Agent'
NWA_FIREWALL_PLUGIN = 'NECNWAFWaaS'
# an incremental size if the remaining size is zero.
NWA_GREENPOOL_ADD_SIZE = 32

View File

@ -117,26 +117,21 @@ class TenantBindingServerRpcCallback(object):
return {'status': 'FAILED'}
def update_tenant_rpc_servers(self, rpc_context, **kwargs):
ret = {'servers': []}
servers = kwargs.get('servers')
plugin = manager.NeutronManager.get_plugin()
session = db_api.get_session()
with session.begin(subtransactions=True):
queues = necnwa_api.get_nwa_tenant_queues(session)
for queue in queues:
tenant_ids = [server['tenant_id'] for server in servers]
if queue.tenant_id in tenant_ids:
LOG.info(_LI("RPC Server active(tid=%s)"),
queue.tenant_id)
continue
else:
# create rpc server for tenant
LOG.debug("create_server: tid=%s", queue.tenant_id)
plugin.nwa_rpc.create_server(
rpc_context, queue.tenant_id
)
ret['servers'].append({'tenant_id': queue.tenant_id})
return ret
q_tids = [q.tenant_id
for q in necnwa_api.get_nwa_tenant_queues(session)]
plugin = manager.NeutronManager.get_plugin()
tenant_ids = [server['tenant_id'] for server in kwargs['servers']]
ret = []
for tenant_id in set(q_tids) - set(tenant_ids):
LOG.info(_LI("RPCServer only db tid=%s, send create_server"),
tenant_id)
plugin.nwa_rpc.create_server(rpc_context, tenant_id)
ret.append({'tenant_id': tenant_id})
for tenant_id in set(tenant_ids) - set(q_tids):
LOG.info(_LI("RPCServer only agent tid=%s, send delete_server"),
tenant_id)
plugin.nwa_rpc.delete_server(rpc_context, tenant_id)
return {'servers': ret}

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
from networking_nec.nwa.agent import server_manager
from networking_nec.nwa.common import constants as nwa_const
from networking_nec.tests.unit.nwa.agent import base
@ -27,6 +30,27 @@ class TestAgentServerManager(base.TestNWAAgentBase):
self.assertEqual(rd['result'], 'SUCCESS')
self.assertEqual(rd['tenant_id'], tenant_id)
def _wait_greenpool_resized(self, *args, **kwargs):
class Server(object):
def start(self):
eventlet.sleep(5)
return Server()
@mock.patch('oslo_messaging.server.MessageHandlingServer')
def test_create_tenant_rpc_server_greenpool_resize(self, mhs):
poolsize = 3
manager = server_manager.ServerManager(self.agent.topic, self.agent,
size=poolsize)
mhs.side_effect = self._wait_greenpool_resized
for i in range(poolsize + 1):
tenant_id = 'T-%d' % i
rd = manager.create_tenant_rpc_server(tenant_id)
self.assertIsInstance(rd, dict)
self.assertEqual(rd['result'], 'SUCCESS')
self.assertEqual(rd['tenant_id'], tenant_id)
self.assertEqual(manager.greenpool_size,
poolsize + nwa_const.NWA_GREENPOOL_ADD_SIZE)
@mock.patch('oslo_messaging.rpc.server.get_rpc_server')
@mock.patch('networking_nec.nwa.agent.nwa_agent')
@mock.patch('neutron.common.rpc.Connection')

View File

@ -0,0 +1,82 @@
# Copyright 2016 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.tests import base
from networking_nec.nwa.l2.rpc import tenant_binding_callback as tenant_cb
class TestTenantBindingServerRpcCallback(base.BaseTestCase):
def setUp(self):
super(TestTenantBindingServerRpcCallback, self).setUp()
self.rpc_context = mock.MagicMock()
self.callback = tenant_cb.TenantBindingServerRpcCallback()
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_both_empty(self, gntq, gs, gp):
gntq.return_value = []
kwargs = {'servers': []}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_both_equal(self, gntq, plugin, gs):
q1 = mock.MagicMock()
q1.tenant_id = 'T-1'
gntq.return_value = [q1]
kwargs = {'servers': [{'tenant_id': 'T-1'}]}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 0)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 0)
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_create(self, gntq, gp, gs):
q1 = mock.MagicMock()
q1.tenant_id = 'T-2'
gntq.return_value = [q1]
plugin = mock.MagicMock()
gp.return_value = plugin
plugin.nwa_rpc.create_server = mock.MagicMock()
kwargs = {'servers': []}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': [{'tenant_id': 'T-2'}]})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 1)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 0)
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_delete(self, gntq, gp, gs):
gntq.return_value = []
plugin = mock.MagicMock()
gp.return_value = plugin
plugin.nwa_rpc.create_server = mock.MagicMock()
kwargs = {'servers': [{'tenant_id': 'T-1'}]}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 0)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 1)