Load JSON/YAML string to structure for datasource action execution

Action execution allows using policy to trigger data source client
methods. But some of these methods require a python structure as input,
which cannot be constructed by the congress policy language.

With this patch, the policy language can construct JSON/YAML strings,
which get loaded as python structure before passing to the data source
client methods.

Because there is no automated way to know which arguments to which
methods are non-scalar structures, the functionality requires a
data source driver to specify that information in
self.method_structured_args.

The patch also other related changes:
- specifies the desired structured arguments for the
neutronv2 drivers
- omits the neutron client update_* made redundant by the special
update_resource_attr action.
- updates a policy library to make use of the feature

Depends-On: I34d1a392d4539ede01666002cfa301c21f9cd4bd

Change-Id: I33860ffdcda3e0bc67e488ff2b35bba57241cf02
This commit is contained in:
Eric K 2018-07-20 18:31:49 -07:00 committed by Eric Kao
parent 5c7d690902
commit 24268428e0
6 changed files with 213 additions and 7 deletions

View File

@ -18,10 +18,12 @@ from __future__ import division
from __future__ import absolute_import
import collections
import copy
import datetime
from functools import cmp_to_key
from functools import reduce
import hashlib
import inspect
import json
import time
@ -29,6 +31,7 @@ import eventlet
from oslo_log import log as logging
from oslo_utils import strutils
import six
import yaml
from congress.datasources import datasource_utils as ds_utils
from congress.db import db_ds_table_data
@ -1460,6 +1463,7 @@ class ExecutionDriver(object):
self._leader_node_id = None
# defined in DataService class
self.heartbeat_callbacks['check_leader'] = self._check_leader_heartbeat
self.method_structured_args = {}
def _check_leader_heartbeat(self):
"""Vacate leader if heartbeat lost"""
@ -1490,16 +1494,19 @@ class ExecutionDriver(object):
except Exception as e:
LOG.exception(str(e))
def add_executable_client_methods(self, client, api_prefix):
def add_executable_client_methods(self, client, api_prefix, exclude=None):
"""Inspect client to get supported builtin methods
param client: the datasource driver client
param api_prefix: the filter used to filter methods
"""
if exclude is None:
exclude = []
builtin = ds_utils.inspect_methods(client, api_prefix)
for method in builtin:
self.add_executable_method(method['name'], method['args'],
method['desc'])
if method['name'] not in exclude:
self.add_executable_method(method['name'], method['args'],
method['desc'])
def add_executable_method(self, method_name, method_args, method_desc=""):
"""Add executable method information.
@ -1530,14 +1537,56 @@ class ExecutionDriver(object):
action, positional_args, named_args)
try:
method = self._get_method(client, action)
# Note(thread-safety): blocking call (potentially)
method(*positional_args, **named_args)
except Exception as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"the method isn't accepted as an executable method."
% (self.name, action, action_args))
# if some arguments are structures, load json/yaml string into struct
try:
structured_args = self.method_structured_args.get(action)
if structured_args is not None:
positional_args = copy.deepcopy(positional_args)
named_args = copy.deepcopy(named_args)
# compute which positional args to load str->struct.
if 'positional' not in structured_args:
if inspect.ismethod(method):
method_args = inspect.getargspec(method).args[1:]
else: # function or staticmethod without special 1st arg
method_args = inspect.getargspec(method).args
structured_positional_args = []
for (index, arg) in enumerate(method_args):
if arg in structured_args['named']:
structured_positional_args.append(index)
structured_args['positional'] = frozenset(
structured_positional_args)
# load selected named args
for arg_name in named_args:
if arg_name in structured_args['named']:
named_args[arg_name] = yaml.load(
named_args[arg_name])
# load selected positional args
for (index, arg) in enumerate(positional_args):
if index in structured_args['positional']:
positional_args[index] = yaml.load(arg)
except yaml.parser.ParserError as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"loading of JSON/YAML in designated arguments failed due to "
"invalid format."
% (self.name, action, action_args))
# Note(thread-safety): blocking call (potentially)
try:
return method(*positional_args, **named_args)
except Exception as e:
LOG.exception(e)
raise exception.CongressException(
"driver %s tries to execute %s on arguments %s but "
"the method raised an exception."
% (self.name, action, action_args))
def get_actions(self):
"""Return all supported actions of a datasource driver.

View File

@ -337,6 +337,93 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
self.creds = args
session = ds_utils.get_keystone_session(self.creds)
self.neutron = neutronclient.v2_0.client.Client(session=session)
# specify the arg name for all method structured args
self.method_structured_args = {
'add_bgp_speaker_to_dragent': {'named': frozenset(['body'])},
'add_gateway_router': {'named': frozenset(['body'])},
'add_interface_router': {'named': frozenset(['body'])},
'add_network_to_bgp_speaker': {'named': frozenset(['body'])},
'add_network_to_dhcp_agent': {'named': frozenset(['body'])},
'add_peer_to_bgp_speaker': {'named': frozenset(['body'])},
'add_router_to_l3_agent': {'named': frozenset(['body'])},
'associate_flavor': {'named': frozenset(['body'])},
'associate_health_monitor': {'named': frozenset(['body'])},
'connect_network_gateway': {'named': frozenset(['body'])},
'create_address_scope': {'named': frozenset(['body'])},
'create_bandwidth_limit_rule': {'named': frozenset(['body'])},
'create_bgp_peer': {'named': frozenset(['body'])},
'create_bgp_speaker': {'named': frozenset(['body'])},
'create_bgpvpn': {'named': frozenset(['body'])},
'create_bgpvpn_network_assoc': {'named': frozenset(['body'])},
'create_bgpvpn_port_assoc': {'named': frozenset(['body'])},
'create_bgpvpn_router_assoc': {'named': frozenset(['body'])},
'create_dscp_marking_rule': {'named': frozenset(['body'])},
'create_endpoint_group': {'named': frozenset(['body'])},
'create_ext': {'named': frozenset(['body'])},
'create_firewall': {'named': frozenset(['body'])},
'create_firewall_policy': {'named': frozenset(['body'])},
'create_firewall_rule': {'named': frozenset(['body'])},
'create_flavor': {'named': frozenset(['body'])},
'create_floatingip': {'named': frozenset(['body'])},
'create_fwaas_firewall_group': {'named': frozenset(['body'])},
'create_fwaas_firewall_policy': {'named': frozenset(['body'])},
'create_fwaas_firewall_rule': {'named': frozenset(['body'])},
'create_gateway_device': {'named': frozenset(['body'])},
'create_health_monitor': {'named': frozenset(['body'])},
'create_ikepolicy': {'named': frozenset(['body'])},
'create_ipsec_site_connection': {'named': frozenset(['body'])},
'create_ipsecpolicy': {'named': frozenset(['body'])},
'create_lbaas_healthmonitor': {'named': frozenset(['body'])},
'create_lbaas_l7policy': {'named': frozenset(['body'])},
'create_lbaas_l7rule': {'named': frozenset(['body'])},
'create_lbaas_member': {'named': frozenset(['body'])},
'create_lbaas_pool': {'named': frozenset(['body'])},
'create_listener': {'named': frozenset(['body'])},
'create_loadbalancer': {'named': frozenset(['body'])},
'create_member': {'named': frozenset(['body'])},
'create_metering_label': {'named': frozenset(['body'])},
'create_metering_label_rule': {'named': frozenset(['body'])},
'create_minimum_bandwidth_rule': {'named': frozenset(['body'])},
'create_network': {'named': frozenset(['body'])},
'create_network_gateway': {'named': frozenset(['body'])},
'create_network_log': {'named': frozenset(['body'])},
'create_pool': {'named': frozenset(['body'])},
'create_port': {'named': frozenset(['body'])},
'create_qos_policy': {'named': frozenset(['body'])},
'create_qos_queue': {'named': frozenset(['body'])},
'create_rbac_policy': {'named': frozenset(['body'])},
'create_router': {'named': frozenset(['body'])},
'create_security_group': {'named': frozenset(['body'])},
'create_security_group_rule': {'named': frozenset(['body'])},
'create_service_profile': {'named': frozenset(['body'])},
'create_sfc_flow_classifier': {'named': frozenset(['body'])},
'create_sfc_port_chain': {'named': frozenset(['body'])},
'create_sfc_port_pair': {'named': frozenset(['body'])},
'create_sfc_port_pair_group': {'named': frozenset(['body'])},
'create_sfc_service_graph': {'named': frozenset(['body'])},
'create_subnet': {'named': frozenset(['body'])},
'create_subnetpool': {'named': frozenset(['body'])},
'create_trunk': {'named': frozenset(['body'])},
'create_vip': {'named': frozenset(['body'])},
'create_vpnservice': {'named': frozenset(['body'])},
'disconnect_network_gateway': {'named': frozenset(['body'])},
'firewall_policy_insert_rule': {'named': frozenset(['body'])},
'firewall_policy_remove_rule': {'named': frozenset(['body'])},
'insert_rule_fwaas_firewall_policy': {
'named': frozenset(['body'])},
'remove_interface_router': {'named': frozenset(['body'])},
'remove_network_from_bgp_speaker': {'named': frozenset(['body'])},
'remove_peer_from_bgp_speaker': {'named': frozenset(['body'])},
'remove_rule_fwaas_firewall_policy': {
'named': frozenset(['body'])},
'replace_tag': {'named': frozenset(['body'])},
'retry_request': {'named': frozenset(['body'])},
'show_minimum_bandwidth_rule': {'named': frozenset(['body'])},
'trunk_add_subports': {'named': frozenset(['body'])},
'trunk_remove_subports': {'named': frozenset(['body'])},
}
self.add_executable_method('update_resource_attrs',
[{'name': 'resource_type',
'description': 'resource type (e.g. ' +
@ -372,8 +459,43 @@ class NeutronV2Driver(datasource_driver.PollingDataSourceDriver,
"Detach a security group to port (WARNING: "
"may overwrite concurrent changes to "
"port's security groups list.")
# add action methods from client, but exclude 'update_*' because those
# are covered by the update_resource_attr method.
exclude_methods = ['update_address_scope', 'update_agent',
'update_bandwidth_limit_rule', 'update_bgp_peer',
'update_bgp_speaker', 'update_bgpvpn',
'update_bgpvpn_network_assoc',
'update_bgpvpn_port_assoc',
'update_bgpvpn_router_assoc',
'update_dscp_marking_rule', 'update_endpoint_group',
'update_ext', 'update_firewall',
'update_firewall_policy', 'update_firewall_rule',
'update_flavor', 'update_floatingip',
'update_fwaas_firewall_group',
'update_fwaas_firewall_policy',
'update_fwaas_firewall_rule',
'update_gateway_device', 'update_health_monitor',
'update_ikepolicy', 'update_ipsec_site_connection',
'update_ipsecpolicy', 'update_lbaas_healthmonitor',
'update_lbaas_l7policy', 'update_lbaas_l7rule',
'update_lbaas_member', 'update_lbaas_pool',
'update_listener', 'update_loadbalancer',
'update_member', 'update_minimum_bandwidth_rule',
'update_network', 'update_network_gateway',
'update_network_log', 'update_pool', 'update_port',
'update_qos_policy', 'update_quota',
'update_rbac_policy', 'update_router',
'update_security_group', 'update_service_profile',
'update_sfc_flow_classifier',
'update_sfc_port_chain', 'update_sfc_port_pair',
'update_sfc_port_pair_group',
'update_sfc_service_graph', 'update_subnet',
'update_subnetpool', 'update_trunk', 'update_vip',
'update_vpnservice']
self.add_executable_client_methods(self.neutron,
'neutronclient.v2_0.client')
'neutronclient.v2_0.client',
exclude_methods)
self.initialize_update_methods()
self._init_end_start_poll()

View File

@ -1908,8 +1908,11 @@ class TestExecutionDriver(base.TestCase):
"""
def __init__(self):
# A variable defined in datasource_driver
self.name = 'TestExecutionDriver'
self.heartbeat_callbacks = {}
super(TestExecutionDriver.ExtendedExecutionDriver, self).__init__()
self.method_structured_args = {
'action_struct': {'named': frozenset(['arg2', 'arg3'])}}
def setUp(self):
super(TestExecutionDriver, self).setUp()
@ -1952,6 +1955,27 @@ class TestExecutionDriver(base.TestCase):
# the api
self.exec_driver._execute_api(nova_client, "action", arg)
def test_execute_api_structured_args(self):
class NovaClient(object):
def action_struct(self, arg1, arg2, arg3):
return (arg1, arg2, arg3)
nova_client = NovaClient()
arg = {"positional": ["value1", '{"value2":2}'],
"named": {"arg3": '{"value3":3}'}}
# it will raise exception if the method _execute_api failed to location
# the api
self.assertEqual(
("value1", {"value2": 2}, {"value3": 3}),
self.exec_driver._execute_api(nova_client, "action_struct", arg))
arg = {"positional": ["value1", "}{invalid"],
"named": {"arg3": '{"value3":3}'}}
self.assertRaisesRegexp(
exception.CongressException, '.*invalid format.*',
self.exec_driver._execute_api, nova_client, "action_struct", arg)
def test_get_actions_order_by_name(self):
mock_methods = {'funcA': mock.MagicMock(),
'funcH': mock.MagicMock(),

View File

@ -24,6 +24,16 @@ rules:
server_security_zone(server_id, tag) :-
nova:tags(server_id=server_id, tag=tag),
security_zone_tags(tag)
-
comment: "Create special security group for each security zone"
rule: >
execute[neutronv2:create_security_group(json)] :-
zone_missing_sg(zone),
admin_project_id(project_id),
builtin:concat('{"security_group": {"name": "', zone, j1),
builtin:concat(j1, '", "project_id": "', j2),
builtin:concat(j2, project_id, j3),
builtin:concat(j3, '", "description": "security group for security zone"}}', json)
-
comment: "Show error if security group missing for security zone"
rule: >

View File

@ -107,7 +107,7 @@ python-novaclient==9.1.0
python-subunit==1.0.0
python-swiftclient==3.2.0
pytz==2018.3
PyYAML==3.12
PyYAML==3.10.0
reno==2.5.0
repoze.lru==0.7
requests-mock==1.2.0

View File

@ -42,3 +42,4 @@ oslo.middleware>=3.31.0 # Apache-2.0
oslo.vmware>=2.17.0 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
WebOb>=1.7.1 # MIT
PyYAML>=3.10.0 # MIT