Merge "Fixes limitation of nwa create tenant"

This commit is contained in:
Jenkins 2016-04-20 19:36:39 +00:00 committed by Gerrit Code Review
commit 395ad4c03b
6 changed files with 163 additions and 28 deletions

View File

@ -128,7 +128,7 @@ class NECNWANeutronAgent(object):
def _report_state(self):
try:
queues = self.server_manager.get_rpc_server_topics()
self.agent_state['configurations']['tenant_queues'] = queues
self.agent_state['configurations']['tenant_queues'] = len(queues)
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)

View File

@ -12,13 +12,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from neutron.common import rpc as n_rpc
from oslo_config import cfg
from oslo_log import log as logging
from oslo_messaging.rpc.server import get_rpc_server
from oslo_messaging.target import Target
from networking_nec._i18n import _LW
from networking_nec._i18n import _LW, _LI, _LE
from networking_nec.nwa.common import constants as nwa_const
LOG = logging.getLogger(__name__)
@ -28,10 +30,12 @@ class ServerManager(object):
rpc_servers = {}
def __init__(self, topic, agent_top):
def __init__(self, topic, agent_top, size=1000):
super(ServerManager, self).__init__()
self.topic = topic
self.agent_top = agent_top
self.greenpool_size = size
self.greenpool = eventlet.greenpool.GreenPool(self.greenpool_size)
def get_rpc_server_topics(self):
return [v['topic'] for v in self.rpc_servers.values()]
@ -66,14 +70,32 @@ class ServerManager(object):
self.agent_top.endpoints,
'blocking', serializer
)
LOG.debug("RPCServer create: topic=%s", topic)
if self.greenpool.free() < 1:
self.greenpool_size += nwa_const.NWA_GREENPOOL_ADD_SIZE
self.greenpool.resize(self.greenpool_size)
LOG.info(_LI('RPCServer greenpool resize %s'), self.greenpool_size)
def server_start():
while True:
try:
LOG.debug('RPCServer thread %d start %s',
(self.greenpool.running(), server))
server.start()
LOG.debug('RPCServer thread end %s', server)
break
except Exception as e:
LOG.exception(_LE('RPCServer thread start failed: %s'), e)
self.rpc_servers[tid] = {
'thread': self.greenpool.spawn(server_start),
'server': server,
'topic': topic
}
LOG.debug("RPCServer create: topic=%s", topic)
self.rpc_servers[tid]['server'].start()
eventlet.sleep(0)
LOG.info(_LI('RPCServer started: %(topic)s server=%(server)s'),
{'topic': topic, 'server': server})
ret['result'] = 'SUCCESS'
ret['tenant_id'] = tid
@ -86,14 +108,23 @@ class ServerManager(object):
LOG.warning(_LW("rpc server not found. tid=%s"), tid)
return {'result': 'FAILED'}
LOG.debug('RPCServer delete: stop %s', tid)
self.rpc_servers[tid]['server'].stop()
LOG.debug('RPCServer delete: wait %s', tid)
self.rpc_servers[tid]['server'].wait()
LOG.debug('RPCServer delete: pop %s', tid)
self.rpc_servers.pop(tid)
LOG.debug('RPCServer delete: sleep %s', tid)
eventlet.sleep(0)
ret = {
'result': 'SUCCESS',
'tenant_id': tid
}
LOG.debug("RPCServer delete: %s", ret)
LOG.debug("RPCServer deleted: %s", ret)
return ret

View File

@ -19,3 +19,6 @@ NWA_AGENT_TOPIC = 'nwa_agent'
NWA_AGENT_TYPE = 'NEC NWA Agent'
NWA_FIREWALL_PLUGIN = 'NECNWAFWaaS'
# an incremental size if the remaining size is zero.
NWA_GREENPOOL_ADD_SIZE = 32

View File

@ -117,26 +117,21 @@ class TenantBindingServerRpcCallback(object):
return {'status': 'FAILED'}
def update_tenant_rpc_servers(self, rpc_context, **kwargs):
ret = {'servers': []}
servers = kwargs.get('servers')
plugin = manager.NeutronManager.get_plugin()
session = db_api.get_session()
with session.begin(subtransactions=True):
queues = necnwa_api.get_nwa_tenant_queues(session)
for queue in queues:
tenant_ids = [server['tenant_id'] for server in servers]
if queue.tenant_id in tenant_ids:
LOG.info(_LI("RPC Server active(tid=%s)"),
queue.tenant_id)
continue
else:
# create rpc server for tenant
LOG.debug("create_server: tid=%s", queue.tenant_id)
plugin.nwa_rpc.create_server(
rpc_context, queue.tenant_id
)
ret['servers'].append({'tenant_id': queue.tenant_id})
return ret
q_tids = [q.tenant_id
for q in necnwa_api.get_nwa_tenant_queues(session)]
plugin = manager.NeutronManager.get_plugin()
tenant_ids = [server['tenant_id'] for server in kwargs['servers']]
ret = []
for tenant_id in set(q_tids) - set(tenant_ids):
LOG.info(_LI("RPCServer only db tid=%s, send create_server"),
tenant_id)
plugin.nwa_rpc.create_server(rpc_context, tenant_id)
ret.append({'tenant_id': tenant_id})
for tenant_id in set(tenant_ids) - set(q_tids):
LOG.info(_LI("RPCServer only agent tid=%s, send delete_server"),
tenant_id)
plugin.nwa_rpc.delete_server(rpc_context, tenant_id)
return {'servers': ret}

View File

@ -12,8 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import mock
from networking_nec.nwa.agent import server_manager
from networking_nec.nwa.common import constants as nwa_const
from networking_nec.tests.unit.nwa.agent import base
@ -27,6 +30,27 @@ class TestAgentServerManager(base.TestNWAAgentBase):
self.assertEqual(rd['result'], 'SUCCESS')
self.assertEqual(rd['tenant_id'], tenant_id)
def _wait_greenpool_resized(self, *args, **kwargs):
class Server(object):
def start(self):
eventlet.sleep(5)
return Server()
@mock.patch('oslo_messaging.server.MessageHandlingServer')
def test_create_tenant_rpc_server_greenpool_resize(self, mhs):
poolsize = 3
manager = server_manager.ServerManager(self.agent.topic, self.agent,
size=poolsize)
mhs.side_effect = self._wait_greenpool_resized
for i in range(poolsize + 1):
tenant_id = 'T-%d' % i
rd = manager.create_tenant_rpc_server(tenant_id)
self.assertIsInstance(rd, dict)
self.assertEqual(rd['result'], 'SUCCESS')
self.assertEqual(rd['tenant_id'], tenant_id)
self.assertEqual(manager.greenpool_size,
poolsize + nwa_const.NWA_GREENPOOL_ADD_SIZE)
@mock.patch('oslo_messaging.rpc.server.get_rpc_server')
@mock.patch('networking_nec.nwa.agent.nwa_agent')
@mock.patch('neutron.common.rpc.Connection')

View File

@ -0,0 +1,82 @@
# Copyright 2016 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.tests import base
from networking_nec.nwa.l2.rpc import tenant_binding_callback as tenant_cb
class TestTenantBindingServerRpcCallback(base.BaseTestCase):
def setUp(self):
super(TestTenantBindingServerRpcCallback, self).setUp()
self.rpc_context = mock.MagicMock()
self.callback = tenant_cb.TenantBindingServerRpcCallback()
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_both_empty(self, gntq, gs, gp):
gntq.return_value = []
kwargs = {'servers': []}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_both_equal(self, gntq, plugin, gs):
q1 = mock.MagicMock()
q1.tenant_id = 'T-1'
gntq.return_value = [q1]
kwargs = {'servers': [{'tenant_id': 'T-1'}]}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 0)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 0)
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_create(self, gntq, gp, gs):
q1 = mock.MagicMock()
q1.tenant_id = 'T-2'
gntq.return_value = [q1]
plugin = mock.MagicMock()
gp.return_value = plugin
plugin.nwa_rpc.create_server = mock.MagicMock()
kwargs = {'servers': []}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': [{'tenant_id': 'T-2'}]})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 1)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 0)
@mock.patch('neutron.db.api.get_session')
@mock.patch('neutron.manager.NeutronManager.get_plugin')
@mock.patch('networking_nec.nwa.l2.db_api.get_nwa_tenant_queues')
def test_update_tenant_rpc_servers_delete(self, gntq, gp, gs):
gntq.return_value = []
plugin = mock.MagicMock()
gp.return_value = plugin
plugin.nwa_rpc.create_server = mock.MagicMock()
kwargs = {'servers': [{'tenant_id': 'T-1'}]}
rd = self.callback.update_tenant_rpc_servers(self.rpc_context,
**kwargs)
self.assertEqual(rd, {'servers': []})
self.assertEqual(plugin.nwa_rpc.create_server.call_count, 0)
self.assertEqual(plugin.nwa_rpc.delete_server.call_count, 1)