Merge "Load JSON/YAML string to structure for datasource action execution"
This commit is contained in:
commit
1742852553
|
@ -18,10 +18,12 @@ from __future__ import division
|
|||
from __future__ import absolute_import
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import datetime
|
||||
from functools import cmp_to_key
|
||||
from functools import reduce
|
||||
import hashlib
|
||||
import inspect
|
||||
import json
|
||||
import time
|
||||
|
||||
|
@ -29,6 +31,7 @@ import eventlet
|
|||
from oslo_log import log as logging
|
||||
from oslo_utils import strutils
|
||||
import six
|
||||
import yaml
|
||||
|
||||
from congress.datasources import datasource_utils as ds_utils
|
||||
from congress.db import db_ds_table_data
|
||||
|
@ -1533,6 +1536,7 @@ class ExecutionDriver(object):
|
|||
self._leader_node_id = None
|
||||
# defined in DataService class
|
||||
self.heartbeat_callbacks['check_leader'] = self._check_leader_heartbeat
|
||||
self.method_structured_args = {}
|
||||
|
||||
def _check_leader_heartbeat(self):
|
||||
"""Vacate leader if heartbeat lost"""
|
||||
|
@ -1563,16 +1567,19 @@ class ExecutionDriver(object):
|
|||
except Exception as e:
|
||||
LOG.exception(str(e))
|
||||
|
||||
def add_executable_client_methods(self, client, api_prefix):
|
||||
def add_executable_client_methods(self, client, api_prefix, exclude=None):
|
||||
"""Inspect client to get supported builtin methods
|
||||
|
||||
param client: the datasource driver client
|
||||
param api_prefix: the filter used to filter methods
|
||||
"""
|
||||
if exclude is None:
|
||||
exclude = []
|
||||
builtin = ds_utils.inspect_methods(client, api_prefix)
|
||||
for method in builtin:
|
||||
self.add_executable_method(method['name'], method['args'],
|
||||
method['desc'])
|
||||
if method['name'] not in exclude:
|
||||
self.add_executable_method(method['name'], method['args'],
|
||||
method['desc'])
|
||||
|
||||
def add_executable_method(self, method_name, method_args, method_desc=""):
|
||||
"""Add executable method information.
|
||||
|
@ -1603,14 +1610,56 @@ class ExecutionDriver(object):
|
|||
action, positional_args, named_args)
|
||||
try:
|
||||
method = self._get_method(client, action)
|
||||
# Note(thread-safety): blocking call (potentially)
|
||||
method(*positional_args, **named_args)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
raise exception.CongressException(
|
||||
"driver %s tries to execute %s on arguments %s but "
|
||||
"the method isn't accepted as an executable method."
|
||||
% (self.name, action, action_args))
|
||||
# if some arguments are structures, load json/yaml string into struct
|
||||
try:
|
||||
structured_args = self.method_structured_args.get(action)
|
||||
if structured_args is not None:
|
||||
positional_args = copy.deepcopy(positional_args)
|
||||
named_args = copy.deepcopy(named_args)
|
||||
# compute which positional args to load str->struct.
|
||||
if 'positional' not in structured_args:
|
||||
if inspect.ismethod(method):
|
||||
method_args = inspect.getargspec(method).args[1:]
|
||||
else: # function or staticmethod without special 1st arg
|
||||
method_args = inspect.getargspec(method).args
|
||||
structured_positional_args = []
|
||||
for (index, arg) in enumerate(method_args):
|
||||
if arg in structured_args['named']:
|
||||
structured_positional_args.append(index)
|
||||
structured_args['positional'] = frozenset(
|
||||
structured_positional_args)
|
||||
# load selected named args
|
||||
for arg_name in named_args:
|
||||
if arg_name in structured_args['named']:
|
||||
named_args[arg_name] = yaml.load(
|
||||
named_args[arg_name])
|
||||
# load selected positional args
|
||||
for (index, arg) in enumerate(positional_args):
|
||||
if index in structured_args['positional']:
|
||||
positional_args[index] = yaml.load(arg)
|
||||
except yaml.parser.ParserError as e:
|
||||
LOG.exception(e)
|
||||
raise exception.CongressException(
|
||||
"driver %s tries to execute %s on arguments %s but "
|
||||
"loading of JSON/YAML in designated arguments failed due to "
|
||||
"invalid format."
|
||||
% (self.name, action, action_args))
|
||||
|
||||
# Note(thread-safety): blocking call (potentially)
|
||||
try:
|
||||
return method(*positional_args, **named_args)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
raise exception.CongressException(
|
||||
"driver %s tries to execute %s on arguments %s but "
|
||||
"the method raised an exception."
|
||||
% (self.name, action, action_args))
|
||||
|
||||
def get_actions(self):
|
||||
"""Return all supported actions of a datasource driver.
|
||||
|
|
|
@ -360,6 +360,93 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
|
|||
self.creds = args
|
||||
session = ds_utils.get_keystone_session(self.creds)
|
||||
self.neutron = neutronclient.v2_0.client.Client(session=session)
|
||||
|
||||
# specify the arg name for all method structured args
|
||||
self.method_structured_args = {
|
||||
'add_bgp_speaker_to_dragent': {'named': frozenset(['body'])},
|
||||
'add_gateway_router': {'named': frozenset(['body'])},
|
||||
'add_interface_router': {'named': frozenset(['body'])},
|
||||
'add_network_to_bgp_speaker': {'named': frozenset(['body'])},
|
||||
'add_network_to_dhcp_agent': {'named': frozenset(['body'])},
|
||||
'add_peer_to_bgp_speaker': {'named': frozenset(['body'])},
|
||||
'add_router_to_l3_agent': {'named': frozenset(['body'])},
|
||||
'associate_flavor': {'named': frozenset(['body'])},
|
||||
'associate_health_monitor': {'named': frozenset(['body'])},
|
||||
'connect_network_gateway': {'named': frozenset(['body'])},
|
||||
'create_address_scope': {'named': frozenset(['body'])},
|
||||
'create_bandwidth_limit_rule': {'named': frozenset(['body'])},
|
||||
'create_bgp_peer': {'named': frozenset(['body'])},
|
||||
'create_bgp_speaker': {'named': frozenset(['body'])},
|
||||
'create_bgpvpn': {'named': frozenset(['body'])},
|
||||
'create_bgpvpn_network_assoc': {'named': frozenset(['body'])},
|
||||
'create_bgpvpn_port_assoc': {'named': frozenset(['body'])},
|
||||
'create_bgpvpn_router_assoc': {'named': frozenset(['body'])},
|
||||
'create_dscp_marking_rule': {'named': frozenset(['body'])},
|
||||
'create_endpoint_group': {'named': frozenset(['body'])},
|
||||
'create_ext': {'named': frozenset(['body'])},
|
||||
'create_firewall': {'named': frozenset(['body'])},
|
||||
'create_firewall_policy': {'named': frozenset(['body'])},
|
||||
'create_firewall_rule': {'named': frozenset(['body'])},
|
||||
'create_flavor': {'named': frozenset(['body'])},
|
||||
'create_floatingip': {'named': frozenset(['body'])},
|
||||
'create_fwaas_firewall_group': {'named': frozenset(['body'])},
|
||||
'create_fwaas_firewall_policy': {'named': frozenset(['body'])},
|
||||
'create_fwaas_firewall_rule': {'named': frozenset(['body'])},
|
||||
'create_gateway_device': {'named': frozenset(['body'])},
|
||||
'create_health_monitor': {'named': frozenset(['body'])},
|
||||
'create_ikepolicy': {'named': frozenset(['body'])},
|
||||
'create_ipsec_site_connection': {'named': frozenset(['body'])},
|
||||
'create_ipsecpolicy': {'named': frozenset(['body'])},
|
||||
'create_lbaas_healthmonitor': {'named': frozenset(['body'])},
|
||||
'create_lbaas_l7policy': {'named': frozenset(['body'])},
|
||||
'create_lbaas_l7rule': {'named': frozenset(['body'])},
|
||||
'create_lbaas_member': {'named': frozenset(['body'])},
|
||||
'create_lbaas_pool': {'named': frozenset(['body'])},
|
||||
'create_listener': {'named': frozenset(['body'])},
|
||||
'create_loadbalancer': {'named': frozenset(['body'])},
|
||||
'create_member': {'named': frozenset(['body'])},
|
||||
'create_metering_label': {'named': frozenset(['body'])},
|
||||
'create_metering_label_rule': {'named': frozenset(['body'])},
|
||||
'create_minimum_bandwidth_rule': {'named': frozenset(['body'])},
|
||||
'create_network': {'named': frozenset(['body'])},
|
||||
'create_network_gateway': {'named': frozenset(['body'])},
|
||||
'create_network_log': {'named': frozenset(['body'])},
|
||||
'create_pool': {'named': frozenset(['body'])},
|
||||
'create_port': {'named': frozenset(['body'])},
|
||||
'create_qos_policy': {'named': frozenset(['body'])},
|
||||
'create_qos_queue': {'named': frozenset(['body'])},
|
||||
'create_rbac_policy': {'named': frozenset(['body'])},
|
||||
'create_router': {'named': frozenset(['body'])},
|
||||
'create_security_group': {'named': frozenset(['body'])},
|
||||
'create_security_group_rule': {'named': frozenset(['body'])},
|
||||
'create_service_profile': {'named': frozenset(['body'])},
|
||||
'create_sfc_flow_classifier': {'named': frozenset(['body'])},
|
||||
'create_sfc_port_chain': {'named': frozenset(['body'])},
|
||||
'create_sfc_port_pair': {'named': frozenset(['body'])},
|
||||
'create_sfc_port_pair_group': {'named': frozenset(['body'])},
|
||||
'create_sfc_service_graph': {'named': frozenset(['body'])},
|
||||
'create_subnet': {'named': frozenset(['body'])},
|
||||
'create_subnetpool': {'named': frozenset(['body'])},
|
||||
'create_trunk': {'named': frozenset(['body'])},
|
||||
'create_vip': {'named': frozenset(['body'])},
|
||||
'create_vpnservice': {'named': frozenset(['body'])},
|
||||
'disconnect_network_gateway': {'named': frozenset(['body'])},
|
||||
'firewall_policy_insert_rule': {'named': frozenset(['body'])},
|
||||
'firewall_policy_remove_rule': {'named': frozenset(['body'])},
|
||||
'insert_rule_fwaas_firewall_policy': {
|
||||
'named': frozenset(['body'])},
|
||||
'remove_interface_router': {'named': frozenset(['body'])},
|
||||
'remove_network_from_bgp_speaker': {'named': frozenset(['body'])},
|
||||
'remove_peer_from_bgp_speaker': {'named': frozenset(['body'])},
|
||||
'remove_rule_fwaas_firewall_policy': {
|
||||
'named': frozenset(['body'])},
|
||||
'replace_tag': {'named': frozenset(['body'])},
|
||||
'retry_request': {'named': frozenset(['body'])},
|
||||
'show_minimum_bandwidth_rule': {'named': frozenset(['body'])},
|
||||
'trunk_add_subports': {'named': frozenset(['body'])},
|
||||
'trunk_remove_subports': {'named': frozenset(['body'])},
|
||||
}
|
||||
|
||||
self.add_executable_method('update_resource_attrs',
|
||||
[{'name': 'resource_type',
|
||||
'description': 'resource type (e.g. ' +
|
||||
|
@ -395,8 +482,43 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
|
|||
"Detach a security group to port (WARNING: "
|
||||
"may overwrite concurrent changes to "
|
||||
"port's security groups list.")
|
||||
|
||||
# add action methods from client, but exclude 'update_*' because those
|
||||
# are covered by the update_resource_attr method.
|
||||
exclude_methods = ['update_address_scope', 'update_agent',
|
||||
'update_bandwidth_limit_rule', 'update_bgp_peer',
|
||||
'update_bgp_speaker', 'update_bgpvpn',
|
||||
'update_bgpvpn_network_assoc',
|
||||
'update_bgpvpn_port_assoc',
|
||||
'update_bgpvpn_router_assoc',
|
||||
'update_dscp_marking_rule', 'update_endpoint_group',
|
||||
'update_ext', 'update_firewall',
|
||||
'update_firewall_policy', 'update_firewall_rule',
|
||||
'update_flavor', 'update_floatingip',
|
||||
'update_fwaas_firewall_group',
|
||||
'update_fwaas_firewall_policy',
|
||||
'update_fwaas_firewall_rule',
|
||||
'update_gateway_device', 'update_health_monitor',
|
||||
'update_ikepolicy', 'update_ipsec_site_connection',
|
||||
'update_ipsecpolicy', 'update_lbaas_healthmonitor',
|
||||
'update_lbaas_l7policy', 'update_lbaas_l7rule',
|
||||
'update_lbaas_member', 'update_lbaas_pool',
|
||||
'update_listener', 'update_loadbalancer',
|
||||
'update_member', 'update_minimum_bandwidth_rule',
|
||||
'update_network', 'update_network_gateway',
|
||||
'update_network_log', 'update_pool', 'update_port',
|
||||
'update_qos_policy', 'update_quota',
|
||||
'update_rbac_policy', 'update_router',
|
||||
'update_security_group', 'update_service_profile',
|
||||
'update_sfc_flow_classifier',
|
||||
'update_sfc_port_chain', 'update_sfc_port_pair',
|
||||
'update_sfc_port_pair_group',
|
||||
'update_sfc_service_graph', 'update_subnet',
|
||||
'update_subnetpool', 'update_trunk', 'update_vip',
|
||||
'update_vpnservice']
|
||||
self.add_executable_client_methods(self.neutron,
|
||||
'neutronclient.v2_0.client')
|
||||
'neutronclient.v2_0.client',
|
||||
exclude_methods)
|
||||
self.initialize_update_methods()
|
||||
self._init_end_start_poll()
|
||||
|
||||
|
|
|
@ -1955,8 +1955,11 @@ class TestExecutionDriver(base.TestCase):
|
|||
"""
|
||||
def __init__(self):
|
||||
# A variable defined in datasource_driver
|
||||
self.name = 'TestExecutionDriver'
|
||||
self.heartbeat_callbacks = {}
|
||||
super(TestExecutionDriver.ExtendedExecutionDriver, self).__init__()
|
||||
self.method_structured_args = {
|
||||
'action_struct': {'named': frozenset(['arg2', 'arg3'])}}
|
||||
|
||||
def setUp(self):
|
||||
super(TestExecutionDriver, self).setUp()
|
||||
|
@ -1999,6 +2002,27 @@ class TestExecutionDriver(base.TestCase):
|
|||
# the api
|
||||
self.exec_driver._execute_api(nova_client, "action", arg)
|
||||
|
||||
def test_execute_api_structured_args(self):
|
||||
class NovaClient(object):
|
||||
def action_struct(self, arg1, arg2, arg3):
|
||||
return (arg1, arg2, arg3)
|
||||
|
||||
nova_client = NovaClient()
|
||||
arg = {"positional": ["value1", '{"value2":2}'],
|
||||
"named": {"arg3": '{"value3":3}'}}
|
||||
# it will raise exception if the method _execute_api failed to location
|
||||
# the api
|
||||
self.assertEqual(
|
||||
("value1", {"value2": 2}, {"value3": 3}),
|
||||
self.exec_driver._execute_api(nova_client, "action_struct", arg))
|
||||
|
||||
arg = {"positional": ["value1", "}{invalid"],
|
||||
"named": {"arg3": '{"value3":3}'}}
|
||||
|
||||
self.assertRaisesRegexp(
|
||||
exception.CongressException, '.*invalid format.*',
|
||||
self.exec_driver._execute_api, nova_client, "action_struct", arg)
|
||||
|
||||
def test_get_actions_order_by_name(self):
|
||||
mock_methods = {'funcA': mock.MagicMock(),
|
||||
'funcH': mock.MagicMock(),
|
||||
|
|
|
@ -24,6 +24,16 @@ rules:
|
|||
server_security_zone(server_id, tag) :-
|
||||
nova:tags(server_id=server_id, tag=tag),
|
||||
security_zone_tags(tag)
|
||||
-
|
||||
comment: "Create special security group for each security zone"
|
||||
rule: >
|
||||
execute[neutronv2:create_security_group(json)] :-
|
||||
zone_missing_sg(zone),
|
||||
admin_project_id(project_id),
|
||||
builtin:concat('{"security_group": {"name": "', zone, j1),
|
||||
builtin:concat(j1, '", "project_id": "', j2),
|
||||
builtin:concat(j2, project_id, j3),
|
||||
builtin:concat(j3, '", "description": "security group for security zone"}}', json)
|
||||
-
|
||||
comment: "Show error if security group missing for security zone"
|
||||
rule: >
|
||||
|
|
|
@ -107,7 +107,7 @@ python-novaclient==9.1.0
|
|||
python-subunit==1.0.0
|
||||
python-swiftclient==3.2.0
|
||||
pytz==2018.3
|
||||
PyYAML==3.12
|
||||
PyYAML==3.10.0
|
||||
reno==2.5.0
|
||||
repoze.lru==0.7
|
||||
requests-mock==1.2.0
|
||||
|
|
|
@ -43,3 +43,4 @@ oslo.middleware>=3.31.0 # Apache-2.0
|
|||
oslo.vmware>=2.17.0 # Apache-2.0
|
||||
oslo.log>=3.36.0 # Apache-2.0
|
||||
WebOb>=1.7.1 # MIT
|
||||
PyYAML>=3.10.0 # MIT
|
Loading…
Reference in New Issue