Refactor OvsdbSwitchApi

Current OvsdbSwitchApi provides ovs command sometimes, and outputs result
directly sometimes. It makes it hard to use. Also, OvsdbSwitchApi uses
neutron framework to execute ovs command sometimes, and uses idlutils
directly sometimes. Using idlutils directly will not reconnect ovsdb,
if timeout.

This patch does following things:
1) Create a dedicated ovsdb api, which is a sub class of OvsdbIdl from
neutron
2) Make OvsdbSwitchApi execute ovs command, and remove explicit execute()
call from consumers
3) Remove directly using idlutils in OvsdbSwitchApi
4) Remove obselete methods in OvsdbSwitchApi
5) Remove duplicated command class, which already have defination in
neutron

Change-Id: I7e9a7f9337c703673b7817ad47de1223c1b01ec0
Closes-Bug: #1622879
This commit is contained in:
Hong Hui Xiao 2016-09-21 05:18:43 +08:00
parent cf7369d5ce
commit 038b62b5da
3 changed files with 91 additions and 178 deletions

View File

@ -98,13 +98,12 @@ class DfLocalController(object):
str(cfg.CONF.df_ryu.of_listen_port))
is_controller_set = self.vswitch_api.check_controller(targets)
if not is_controller_set:
self.vswitch_api.set_controllers(self.integration_bridge,
[targets]).execute()
self.vswitch_api.set_controller(self.integration_bridge, [targets])
is_fail_mode_set = self.vswitch_api.check_controller_fail_mode(
'secure')
if not is_fail_mode_set:
self.vswitch_api.set_controller_fail_mode(
self.integration_bridge, 'secure').execute()
self.integration_bridge, 'secure')
self.open_flow_app.start()
self.db_sync_loop()
@ -165,14 +164,14 @@ class DfLocalController(object):
# Create tunnel port to this chassis
LOG.info(_LI("Adding tunnel to remote chassis = %s") %
chassis.__str__())
self.vswitch_api.add_tunnel_port(chassis).execute()
self.vswitch_api.add_tunnel_port(chassis)
def chassis_deleted(self, chassis_id):
LOG.info(_LI("Deleting tunnel to remote chassis = %s") % chassis_id)
tunnel_ports = self.vswitch_api.get_tunnel_ports()
for port in tunnel_ports:
if port.get_chassis_id() == chassis_id:
self.vswitch_api.delete_port(port).execute()
self.vswitch_api.delete_port(port)
return
def read_switches(self):
@ -413,7 +412,7 @@ class DfLocalController(object):
# Iterate all tunnel ports that needs to be deleted
for port in tunnel_ports.values():
self.vswitch_api.delete_port(port).execute()
self.vswitch_api.delete_port(port)
def port_mappings(self):
ports_to_remove = self.db_store.get_port_keys()

View File

@ -115,136 +115,113 @@ class DFConnection(connection.Connection):
self.thread.start()
class DFOvsdbApi(impl_idl.OvsdbIdl):
"""The command generator of OVS DB operation
This is a sub-class of OvsdbIdl, which is defined in neutron. The super
class OvsdbIdl has defined lots of command. Dragonflow can use
them. And Dragonflow can extend its own commands in this class.
"""
ovsdb_connection = None
def __init__(self, context, db_connection, timeout):
self.context = context
if DFOvsdbApi.ovsdb_connection is None:
DFOvsdbApi.ovsdb_connection = DFConnection(
db_connection,
timeout,
get_schema_helper(
db_connection,
tables=ovsdb_monitor_table_filter_default))
# Override the super class's attribute
impl_idl.OvsdbIdl.ovsdb_connection = DFOvsdbApi.ovsdb_connection
def start(self, nb_api):
DFOvsdbApi.ovsdb_connection.start(nb_api)
self.idl = DFOvsdbApi.ovsdb_connection.idl
def add_tunnel_port(self, chassis):
return AddTunnelPort(self, chassis)
def get_bridge_ports(self, bridge):
return GetBridgePorts(self, bridge)
def add_patch_port(self, bridge, port, remote_name):
return AddPatchPort(self, bridge, port, remote_name)
class OvsdbSwitchApi(api_vswitch.SwitchApi):
"""The interface of openvswitch
Consumers use this class to set openvswitch or get results from
openvswitch.
"""
def __init__(self, ip, protocol='tcp', port='6640', timeout=10):
super(OvsdbSwitchApi, self).__init__()
self.ip = ip
self.protocol = protocol
self.port = port
self.timeout = timeout
# NOTE: This has to be this name vsctl_timeout, as neutron will use
# this attribute to set the timeout of ovs db.
self.vsctl_timeout = timeout
self.ovsdb = None
self.idl = None
self.integration_bridge = cfg.CONF.df.integration_bridge
vlog.Vlog.init('dragonflow')
def initialize(self, nb_api):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
self.ovsdb = DFConnection(
db_connection,
self.timeout,
get_schema_helper(
db_connection,
tables=ovsdb_monitor_table_filter_default
),
)
self.ovsdb = DFOvsdbApi(self, db_connection, self.vsctl_timeout)
table = constants.OVS_INTERFACE
nb_api.db_change_callback(table, None, 'sync_started', None)
self.ovsdb.start(nb_api)
self.idl = self.ovsdb.idl
nb_api.db_change_callback(table, None, 'sync_finished', None)
@property
def _tables(self):
return self.idl.tables
def _db_get_val(self, table, record, column, check_error=False,
log_errors=True):
return self.ovsdb.db_get(table, record, column).execute(
check_error=check_error, log_errors=log_errors)
@property
def _ovs(self):
return list(self._tables['Open_vSwitch'].rows.values())[0]
def transaction(self, check_error=False, log_errors=True, **kwargs):
return impl_idl.NeutronOVSDBTransaction(self,
self.ovsdb,
self.timeout,
check_error, log_errors)
def del_controller(self, bridge):
return DelControllerCommand(self, bridge)
def set_controllers(self, bridge, targets):
return SetControllerCommand(self, bridge, targets)
def set_controller(self, bridge, targets):
self.ovsdb.set_controller(bridge, targets).execute()
def set_controller_fail_mode(self, bridge, fail_mode):
return SetControllerFailModeCommand(self, bridge, fail_mode)
self.ovsdb.set_fail_mode(bridge, fail_mode).execute()
def check_controller(self, target):
is_controller_set = False
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
# if controller is not set, len(controller) is 0
if br_int is not None and (
len(br_int.controller) > 0 and
br_int.controller[0].target == target):
is_controller_set = True
return is_controller_set
controllers = self.ovsdb.get_controller(
self.integration_bridge).execute()
return target in controllers
def check_controller_fail_mode(self, fail_mode):
is_fail_mode_set = False
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
# if fail_mode is not set, len(fail_mode) is 0
if br_int is not None and (
len(br_int.fail_mode) > 0 and
br_int.fail_mode[0] == fail_mode):
is_fail_mode_set = True
return is_fail_mode_set
return fail_mode == self._db_get_val('Bridge',
self.integration_bridge,
'fail_mode')
def get_tunnel_ports(self):
res = []
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
for port in br_int.ports:
ports = self.ovsdb.get_bridge_ports(self.integration_bridge).execute()
for port in ports:
if 'df-chassis-id' in port.external_ids:
chassis_id = port.external_ids['df-chassis-id']
res.append(OvsdbTunnelPort(port, chassis_id))
return res
def add_tunnel_port(self, chassis):
return AddTunnelPort(self, chassis)
self.ovsdb.add_tunnel_port(chassis).execute()
def delete_port(self, switch_port):
return DeleteSwitchPort(self, switch_port)
def get_local_port_ids(self):
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
port_ids = set()
for port in br_int.ports:
if port.name == self.integration_bridge:
continue
if 'df-chassis-id' in port.external_ids:
continue
for interface in port.interfaces:
if 'iface-id' in interface.external_ids:
port_ids.add(interface.external_ids['iface-id'])
return port_ids
def get_local_port_id_from_name(self, name):
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
for port in br_int.ports:
if port.name != name:
continue
for interface in port.interfaces:
if 'iface-id' in interface.external_ids:
return interface.external_ids['iface-id']
return None
self.ovsdb.del_port(switch_port.get_name(),
self.integration_bridge).execute()
def get_local_ports_to_ofport_mapping(self):
lport_to_ofport = {}
chassis_to_ofport = {}
br_int = idlutils.row_by_value(self.idl, 'Bridge', 'name',
self.integration_bridge)
for port in br_int.ports:
if port.name == self.integration_bridge:
continue
ports = self.ovsdb.get_bridge_ports(self.integration_bridge).execute()
for port in ports:
chassis_id = port.external_ids.get('df-chassis-id')
for interface in port.interfaces:
if interface.ofport is None:
@ -264,25 +241,19 @@ class OvsdbSwitchApi(api_vswitch.SwitchApi):
return chassis_to_ofport, lport_to_ofport
def create_patch_port(self, bridge, port, remote_name):
commands.AddBridgeCommand(self, bridge, True,
datapath_type='system').execute()
self.ovsdb.add_br(bridge, datapath_type='system').execute()
if not self.patch_port_exist(port):
AddPatchPort(self, bridge, port, remote_name).execute()
return self.get_patch_port_ofport(port)
def delete_patch_port(self, bridge, port):
if commands.BridgeExistsCommand(self, bridge).execute():
commands.DelPortCommand(self, port, bridge,
if_exists=True).execute()
self.ovsdb.add_patch_port(bridge, port, remote_name).execute()
return self.get_port_ofport(port)
def patch_port_exist(self, port):
cmd = commands.DbGetCommand(self, 'Interface', port, 'type')
return bool('patch' == cmd.execute(check_error=False,
log_errors=False))
return 'patch' == self._db_get_val('Interface', port, 'type',
check_error=False,
log_errors=False)
def get_patch_port_ofport(self, port):
cmd = commands.DbGetCommand(self, 'Interface', port, 'ofport')
return cmd.execute(check_error=False, log_errors=False)
def get_port_ofport(self, port):
return self._db_get_val('Interface', port, 'ofport',
check_error=False, log_errors=False)
class OvsdbSwitchPort(api_vswitch.SwitchPort):
@ -307,68 +278,6 @@ class OvsdbTunnelPort(OvsdbSwitchPort):
return self.chassis_id
class DelControllerCommand(commands.BaseCommand):
def __init__(self, api, bridge):
super(DelControllerCommand, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
br.controller = []
class SetControllerCommand(commands.BaseCommand):
def __init__(self, api, bridge, targets):
super(SetControllerCommand, self).__init__(api)
self.bridge = bridge
self.targets = targets
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
controllers = []
for target in self.targets:
controller = txn.insert(self.api.idl.tables['Controller'])
controller.target = target
controllers.append(controller)
br.verify('controller')
br.controller = controllers
class SetControllerFailModeCommand(commands.BaseCommand):
def __init__(self, api, bridge, fail_mode):
super(SetControllerFailModeCommand, self).__init__(api)
self.bridge = bridge
self.fail_mode = fail_mode
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
br.verify('fail_mode')
br.fail_mode = [self.fail_mode]
class DeleteSwitchPort(commands.BaseCommand):
def __init__(self, api, switch_port):
super(DeleteSwitchPort, self).__init__(api)
self.switch_port = switch_port
self.integration_bridge = cfg.CONF.df.integration_bridge
def run_idl(self, txn):
port = self.switch_port.port_row
bridge = idlutils.row_by_value(self.api.idl, 'Bridge',
'name', self.integration_bridge)
bridge.verify('ports')
ports = bridge.ports
ports.remove(port)
bridge.ports = ports
# Remote Port Interfaces
port.verify('interfaces')
for iface in port.interfaces:
self.api.idl.tables['Interface'].rows[iface.uuid].delete()
self.api.idl.tables['Port'].rows[port.uuid].delete()
class AddTunnelPort(commands.BaseCommand):
def __init__(self, api, chassis):
super(AddTunnelPort, self).__init__(api)
@ -404,6 +313,16 @@ class AddTunnelPort(commands.BaseCommand):
bridge.ports = ports
class GetBridgePorts(commands.BaseCommand):
def __init__(self, api, bridge):
super(GetBridgePorts, self).__init__(api)
self.bridge = bridge
def run_idl(self, txn):
br = idlutils.row_by_value(self.api.idl, 'Bridge', 'name', self.bridge)
self.result = [p for p in br.ports if p.name != self.bridge]
class DFIdl(idl.Idl):
def __init__(self, nb_api, remote, schema):
super(DFIdl, self).__init__(remote, schema)

View File

@ -17,7 +17,6 @@ import os_client_config
from oslo_config import cfg
from dragonflow.common import common_params
from dragonflow.common import utils as df_utils
from dragonflow.db import api_nb
from dragonflow.tests import base
from dragonflow.tests.common import app_testing_objects as test_objects
@ -56,12 +55,8 @@ class DFTestBase(base.BaseTestCase):
self.conf = cfg.CONF.df
self.integration_bridge = self.conf.integration_bridge
db_driver = df_utils.load_driver(
cfg.CONF.df.nb_db_class,
df_utils.DF_NB_DB_DRIVER_NAMESPACE)
self.nb_api = api_nb.NbApi(db_driver)
self.nb_api.initialize(db_ip=self.conf.remote_db_ip,
db_port=self.conf.remote_db_port)
self.nb_api = api_nb.NbApi.get_instance(False)
self.local_ip = self.conf.local_ip
self.__objects_to_close = []
if cfg.CONF.df.enable_selective_topology_distribution: