Merge "Load JSON/YAML string to structure for datasource action execution"

This commit is contained in:
Zuul 2018-07-26 19:46:08 +00:00 committed by Gerrit Code Review
commit 1742852553
6 changed files with 213 additions and 7 deletions

View File

@ -18,10 +18,12 @@ from __future__ import division
from __future__ import absolute_import
import collections
import copy
import datetime
from functools import cmp_to_key
from functools import reduce
import hashlib
import inspect
import json
import time
@ -29,6 +31,7 @@ import eventlet
from oslo_log import log as logging
from oslo_utils import strutils
import six
import yaml
from congress.datasources import datasource_utils as ds_utils
from congress.db import db_ds_table_data
@ -1533,6 +1536,7 @@ class ExecutionDriver(object):
self._leader_node_id = None
# defined in DataService class
self.heartbeat_callbacks['check_leader'] = self._check_leader_heartbeat
self.method_structured_args = {}
def _check_leader_heartbeat(self):
"""Vacate leader if heartbeat lost"""
@ -1563,16 +1567,19 @@ class ExecutionDriver(object):
except Exception as e:
LOG.exception(str(e))
def add_executable_client_methods(self, client, api_prefix):
def add_executable_client_methods(self, client, api_prefix, exclude=None):
"""Inspect client to get supported builtin methods
param client: the datasource driver client
param api_prefix: the filter used to filter methods
"""
if exclude is None:
exclude = []
builtin = ds_utils.inspect_methods(client, api_prefix)
for method in builtin:
self.add_executable_method(method['name'], method['args'],
method['desc'])
if method['name'] not in exclude:
self.add_executable_method(method['name'], method['args'],
method['desc'])
def add_executable_method(self, method_name, method_args, method_desc=""):
"""Add executable method information.
@ -1603,14 +1610,56 @@ class ExecutionDriver(object):
action, positional_args, named_args)
try:
method = self._get_method(client, action)
# Note(thread-safety): blocking call (potentially)
method(*positional_args, **named_args)
except Exception as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"the method isn't accepted as an executable method."
% (self.name, action, action_args))
# if some arguments are structures, load json/yaml string into struct
try:
structured_args = self.method_structured_args.get(action)
if structured_args is not None:
positional_args = copy.deepcopy(positional_args)
named_args = copy.deepcopy(named_args)
# compute which positional args to load str->struct.
if 'positional' not in structured_args:
if inspect.ismethod(method):
method_args = inspect.getargspec(method).args[1:]
else: # function or staticmethod without special 1st arg
method_args = inspect.getargspec(method).args
structured_positional_args = []
for (index, arg) in enumerate(method_args):
if arg in structured_args['named']:
structured_positional_args.append(index)
structured_args['positional'] = frozenset(
structured_positional_args)
# load selected named args
for arg_name in named_args:
if arg_name in structured_args['named']:
named_args[arg_name] = yaml.load(
named_args[arg_name])
# load selected positional args
for (index, arg) in enumerate(positional_args):
if index in structured_args['positional']:
positional_args[index] = yaml.load(arg)
except yaml.parser.ParserError as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"loading of JSON/YAML in designated arguments failed due to "
"invalid format."
% (self.name, action, action_args))
# Note(thread-safety): blocking call (potentially)
try:
return method(*positional_args, **named_args)
except Exception as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"the method raised an exception."
% (self.name, action, action_args))
def get_actions(self):
"""Return all supported actions of a datasource driver.

View File

@ -360,6 +360,93 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
self.creds = args
session = ds_utils.get_keystone_session(self.creds)
self.neutron = neutronclient.v2_0.client.Client(session=session)
# specify the arg name for all method structured args
self.method_structured_args = {
'add_bgp_speaker_to_dragent': {'named': frozenset(['body'])},
'add_gateway_router': {'named': frozenset(['body'])},
'add_interface_router': {'named': frozenset(['body'])},
'add_network_to_bgp_speaker': {'named': frozenset(['body'])},
'add_network_to_dhcp_agent': {'named': frozenset(['body'])},
'add_peer_to_bgp_speaker': {'named': frozenset(['body'])},
'add_router_to_l3_agent': {'named': frozenset(['body'])},
'associate_flavor': {'named': frozenset(['body'])},
'associate_health_monitor': {'named': frozenset(['body'])},
'connect_network_gateway': {'named': frozenset(['body'])},
'create_address_scope': {'named': frozenset(['body'])},
'create_bandwidth_limit_rule': {'named': frozenset(['body'])},
'create_bgp_peer': {'named': frozenset(['body'])},
'create_bgp_speaker': {'named': frozenset(['body'])},
'create_bgpvpn': {'named': frozenset(['body'])},
'create_bgpvpn_network_assoc': {'named': frozenset(['body'])},
'create_bgpvpn_port_assoc': {'named': frozenset(['body'])},
'create_bgpvpn_router_assoc': {'named': frozenset(['body'])},
'create_dscp_marking_rule': {'named': frozenset(['body'])},
'create_endpoint_group': {'named': frozenset(['body'])},
'create_ext': {'named': frozenset(['body'])},
'create_firewall': {'named': frozenset(['body'])},
'create_firewall_policy': {'named': frozenset(['body'])},
'create_firewall_rule': {'named': frozenset(['body'])},
'create_flavor': {'named': frozenset(['body'])},
'create_floatingip': {'named': frozenset(['body'])},
'create_fwaas_firewall_group': {'named': frozenset(['body'])},
'create_fwaas_firewall_policy': {'named': frozenset(['body'])},
'create_fwaas_firewall_rule': {'named': frozenset(['body'])},
'create_gateway_device': {'named': frozenset(['body'])},
'create_health_monitor': {'named': frozenset(['body'])},
'create_ikepolicy': {'named': frozenset(['body'])},
'create_ipsec_site_connection': {'named': frozenset(['body'])},
'create_ipsecpolicy': {'named': frozenset(['body'])},
'create_lbaas_healthmonitor': {'named': frozenset(['body'])},
'create_lbaas_l7policy': {'named': frozenset(['body'])},
'create_lbaas_l7rule': {'named': frozenset(['body'])},
'create_lbaas_member': {'named': frozenset(['body'])},
'create_lbaas_pool': {'named': frozenset(['body'])},
'create_listener': {'named': frozenset(['body'])},
'create_loadbalancer': {'named': frozenset(['body'])},
'create_member': {'named': frozenset(['body'])},
'create_metering_label': {'named': frozenset(['body'])},
'create_metering_label_rule': {'named': frozenset(['body'])},
'create_minimum_bandwidth_rule': {'named': frozenset(['body'])},
'create_network': {'named': frozenset(['body'])},
'create_network_gateway': {'named': frozenset(['body'])},
'create_network_log': {'named': frozenset(['body'])},
'create_pool': {'named': frozenset(['body'])},
'create_port': {'named': frozenset(['body'])},
'create_qos_policy': {'named': frozenset(['body'])},
'create_qos_queue': {'named': frozenset(['body'])},
'create_rbac_policy': {'named': frozenset(['body'])},
'create_router': {'named': frozenset(['body'])},
'create_security_group': {'named': frozenset(['body'])},
'create_security_group_rule': {'named': frozenset(['body'])},
'create_service_profile': {'named': frozenset(['body'])},
'create_sfc_flow_classifier': {'named': frozenset(['body'])},
'create_sfc_port_chain': {'named': frozenset(['body'])},
'create_sfc_port_pair': {'named': frozenset(['body'])},
'create_sfc_port_pair_group': {'named': frozenset(['body'])},
'create_sfc_service_graph': {'named': frozenset(['body'])},
'create_subnet': {'named': frozenset(['body'])},
'create_subnetpool': {'named': frozenset(['body'])},
'create_trunk': {'named': frozenset(['body'])},
'create_vip': {'named': frozenset(['body'])},
'create_vpnservice': {'named': frozenset(['body'])},
'disconnect_network_gateway': {'named': frozenset(['body'])},
'firewall_policy_insert_rule': {'named': frozenset(['body'])},
'firewall_policy_remove_rule': {'named': frozenset(['body'])},
'insert_rule_fwaas_firewall_policy': {
'named': frozenset(['body'])},
'remove_interface_router': {'named': frozenset(['body'])},
'remove_network_from_bgp_speaker': {'named': frozenset(['body'])},
'remove_peer_from_bgp_speaker': {'named': frozenset(['body'])},
'remove_rule_fwaas_firewall_policy': {
'named': frozenset(['body'])},
'replace_tag': {'named': frozenset(['body'])},
'retry_request': {'named': frozenset(['body'])},
'show_minimum_bandwidth_rule': {'named': frozenset(['body'])},
'trunk_add_subports': {'named': frozenset(['body'])},
'trunk_remove_subports': {'named': frozenset(['body'])},
}
self.add_executable_method('update_resource_attrs',
[{'name': 'resource_type',
'description': 'resource type (e.g. ' +
@ -395,8 +482,43 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
"Detach a security group to port (WARNING: "
"may overwrite concurrent changes to "
"port's security groups list.")
# add action methods from client, but exclude 'update_*' because those
# are covered by the update_resource_attr method.
exclude_methods = ['update_address_scope', 'update_agent',
'update_bandwidth_limit_rule', 'update_bgp_peer',
'update_bgp_speaker', 'update_bgpvpn',
'update_bgpvpn_network_assoc',
'update_bgpvpn_port_assoc',
'update_bgpvpn_router_assoc',
'update_dscp_marking_rule', 'update_endpoint_group',
'update_ext', 'update_firewall',
'update_firewall_policy', 'update_firewall_rule',
'update_flavor', 'update_floatingip',
'update_fwaas_firewall_group',
'update_fwaas_firewall_policy',
'update_fwaas_firewall_rule',
'update_gateway_device', 'update_health_monitor',
'update_ikepolicy', 'update_ipsec_site_connection',
'update_ipsecpolicy', 'update_lbaas_healthmonitor',
'update_lbaas_l7policy', 'update_lbaas_l7rule',
'update_lbaas_member', 'update_lbaas_pool',
'update_listener', 'update_loadbalancer',
'update_member', 'update_minimum_bandwidth_rule',
'update_network', 'update_network_gateway',
'update_network_log', 'update_pool', 'update_port',
'update_qos_policy', 'update_quota',
'update_rbac_policy', 'update_router',
'update_security_group', 'update_service_profile',
'update_sfc_flow_classifier',
'update_sfc_port_chain', 'update_sfc_port_pair',
'update_sfc_port_pair_group',
'update_sfc_service_graph', 'update_subnet',
'update_subnetpool', 'update_trunk', 'update_vip',
'update_vpnservice']
self.add_executable_client_methods(self.neutron,
'neutronclient.v2_0.client')
'neutronclient.v2_0.client',
exclude_methods)
self.initialize_update_methods()
self._init_end_start_poll()

View File

@ -1955,8 +1955,11 @@ class TestExecutionDriver(base.TestCase):
"""
def __init__(self):
# A variable defined in datasource_driver
self.name = 'TestExecutionDriver'
self.heartbeat_callbacks = {}
super(TestExecutionDriver.ExtendedExecutionDriver, self).__init__()
self.method_structured_args = {
'action_struct': {'named': frozenset(['arg2', 'arg3'])}}
def setUp(self):
super(TestExecutionDriver, self).setUp()
@ -1999,6 +2002,27 @@ class TestExecutionDriver(base.TestCase):
# the api
self.exec_driver._execute_api(nova_client, "action", arg)
def test_execute_api_structured_args(self):
class NovaClient(object):
def action_struct(self, arg1, arg2, arg3):
return (arg1, arg2, arg3)
nova_client = NovaClient()
arg = {"positional": ["value1", '{"value2":2}'],
"named": {"arg3": '{"value3":3}'}}
# it will raise exception if the method _execute_api failed to location
# the api
self.assertEqual(
("value1", {"value2": 2}, {"value3": 3}),
self.exec_driver._execute_api(nova_client, "action_struct", arg))
arg = {"positional": ["value1", "}{invalid"],
"named": {"arg3": '{"value3":3}'}}
self.assertRaisesRegexp(
exception.CongressException, '.*invalid format.*',
self.exec_driver._execute_api, nova_client, "action_struct", arg)
def test_get_actions_order_by_name(self):
mock_methods = {'funcA': mock.MagicMock(),
'funcH': mock.MagicMock(),

View File

@ -24,6 +24,16 @@ rules:
server_security_zone(server_id, tag) :-
nova:tags(server_id=server_id, tag=tag),
security_zone_tags(tag)
-
comment: "Create special security group for each security zone"
rule: >
execute[neutronv2:create_security_group(json)] :-
zone_missing_sg(zone),
admin_project_id(project_id),
builtin:concat('{"security_group": {"name": "', zone, j1),
builtin:concat(j1, '", "project_id": "', j2),
builtin:concat(j2, project_id, j3),
builtin:concat(j3, '", "description": "security group for security zone"}}', json)
-
comment: "Show error if security group missing for security zone"
rule: >

View File

@ -107,7 +107,7 @@ python-novaclient==9.1.0
python-subunit==1.0.0
python-swiftclient==3.2.0
pytz==2018.3
PyYAML==3.12
PyYAML==3.10.0
reno==2.5.0
repoze.lru==0.7
requests-mock==1.2.0

View File

@ -43,3 +43,4 @@ oslo.middleware>=3.31.0 # Apache-2.0
oslo.vmware>=2.17.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
WebOb>=1.7.1 # MIT
PyYAML>=3.10.0 # MIT