Extend network commands to operate with inter-service connectivity

Actions 'plug' and 'unplug' are extended to allow operating with
the connectivity between services.

Command format:
plug|unplug <service> service ingress|egress to|from <other service> service

Example:
`unplug nova service egress to db service' - this blocks the outgoing
traffic from nova service processes to port of db service.

Change-Id: Ia9717a76597da6aad408520e45f41fb25d3e0d49
This commit is contained in:
Ilya Shakhat 2020-04-22 19:09:22 +04:00
parent 911d6ab20d
commit 1e70f233f7
6 changed files with 177 additions and 14 deletions

View File

@ -62,6 +62,8 @@ NODE_ACTIONS_PATTERN = '|'.join(NODE_ACTIONS)
PATTERNS = [
re.compile(r'(?P<action>%s)'
r'\s+(?P<service>\S+)\s+service'
r'(\s+(?P<direction>ingress|egress)\s+(to|from)'
r'\s+(?P<other_service>\S+)\s+service)?'
r'(\s+on(\s+(?P<node>\S+))?\s+nodes?)?'
r'(\s+for\s+(?P<duration>\d+)\s+seconds)?' %
SERVICE_ACTIONS_PATTERN),
@ -101,6 +103,7 @@ def execute(destructor, command):
network_name = groups.get('network')
target = groups.get('target')
duration = groups.get('duration')
direction = groups.get('direction')
if service_name:
service = destructor.get_service(name=service_name)
@ -116,6 +119,16 @@ def execute(destructor, command):
if duration:
kwargs['sec'] = int(duration)
if direction:
other_service_name = groups.get('other_service')
if other_service_name:
other_service = destructor.get_service(
name=other_service_name)
other_port = getattr(other_service, 'port', None)
if other_port:
kwargs['direction'] = direction
kwargs['other_port'] = other_port
fn = getattr(service, action)
fn(**kwargs)

View File

@ -82,18 +82,22 @@ class Service(base_driver.BaseDriver):
raise NotImplementedError
@public
def unplug(self, nodes=None):
def unplug(self, nodes=None, direction=None, other_port=None):
"""Unplug Service out of network on all nodes or on particular subset
:param nodes: NodesCollection
:param direction: str, traffic direction 'ingress' or 'egress'
:param other_port: int, port number which needs to be blocked
"""
raise NotImplementedError
@public
def plug(self, nodes=None):
def plug(self, nodes=None, direction=None, other_port=None):
"""Plug Service into network on all nodes or on particular subset
:param nodes: NodesCollection
:param direction: str, traffic direction 'ingress' or 'egress'
:param other_port: int, port number which needs to be allowed
"""
raise NotImplementedError

View File

@ -154,16 +154,28 @@ class ServiceAsProcess(service.Service):
self._run_task(nodes, task, 'Unfreeze')
@utils.require_variables('port')
def plug(self, nodes=None):
def plug(self, nodes=None, direction=None, other_port=None):
nodes = nodes if nodes is not None else self.get_nodes()
message = "Open port %d for" % self.port[1]
direction = self.port[2] if len(self.port) > 2 else 'ingress'
if other_port:
port = other_port
else:
# work with local service port
port = self.port
direction = self.port[2] if len(self.port) > 2 else 'ingress'
protocol = port[0]
port_number = port[1]
message = "Open %s traffic to %s port %d" % (
direction, protocol, port_number)
task = {
'iptables': {
'chain': 'INPUT' if direction == 'ingress' else 'OUTPUT',
'protocol': self.port[0],
'protocol': protocol,
'jump': 'DROP',
'destination_port': self.port[1],
'destination_port': '%d' % port_number,
'state': 'absent',
},
'become': 'yes',
@ -171,18 +183,31 @@ class ServiceAsProcess(service.Service):
self._run_task(nodes, task, message)
@utils.require_variables('port')
def unplug(self, nodes=None):
def unplug(self, nodes=None, direction=None, other_port=None):
nodes = nodes if nodes is not None else self.get_nodes()
message = "Close port %d for" % self.port[1]
direction = self.port[2] if len(self.port) > 2 else 'ingress'
if other_port:
port = other_port
else:
# work with local service port
port = self.port
direction = self.port[2] if len(self.port) > 2 else 'ingress'
protocol = port[0]
port_number = port[1]
message = "Block %s traffic to %s port %d" % (
direction, protocol, port_number)
task = {
'iptables': {
'chain': 'INPUT' if direction == 'ingress' else 'OUTPUT',
'protocol': self.port[0],
'protocol': protocol,
'jump': 'DROP',
'destination_port': self.port[1],
'destination_port': '%d' % port_number,
'action': 'insert',
'state': 'present',
'comment': 'Added by os-faults',
},
'become': 'yes',
}

View File

@ -29,7 +29,8 @@ class TestHumanAPI(test.TestCase):
self.destructor.get_service = mock.MagicMock(return_value=self.service)
@ddt.data(('restart', 'keystone'), ('kill', 'nova-api'))
@ddt.data(('restart', 'keystone'), ('kill', 'nova-api'),
('unplug', 'keystone'), ('plug', 'nova-api'))
@ddt.unpack
def test_service_action(self, action, service_name):
@ -67,7 +68,8 @@ class TestHumanAPI(test.TestCase):
self.destructor.get_service.assert_called_once_with(name=service_name)
getattr(self.service, action).assert_called_once_with(sec=t)
@ddt.data(('restart', 'keystone', 'node'), ('kill', 'nova-api', 'node'))
@ddt.data(('restart', 'keystone', 'node'), ('kill', 'nova-api', 'node'),
('unplug', 'keystone', 'node'))
@ddt.unpack
def test_service_action_on_fqdn_node(self, action, service_name, node):
@ -81,6 +83,29 @@ class TestHumanAPI(test.TestCase):
self.destructor.get_nodes.assert_called_once_with(fqdns=[node])
getattr(self.service, action).assert_called_once_with(nodes=nodes)
@ddt.data(('unplug', 'keystone', 'egress', 'db'),
('plug', 'nova', 'ingress', 'neutron'))
@ddt.unpack
def test_unplug_with_ref(self, action, service_name, direction,
other_service_name):
other_port = ('tcp', mock.Mock())
other_service = mock.Mock()
other_service.port = other_port
self.destructor.get_service.side_effect = [
self.service,
other_service
]
command = '%s %s service %s to %s service' % (
action, service_name, direction, other_service_name)
human.execute(self.destructor, command)
self.destructor.get_service.assert_has_calls(
[mock.call(name=service_name), mock.call(name=other_service_name)])
getattr(self.service, action).assert_called_once_with(
direction=direction, other_port=other_port)
@ddt.data(('reboot', 'keystone'), ('reset', 'nova-api'))
@ddt.unpack
def test_node_action_on_all_nodes(self, action, service_name):

View File

@ -0,0 +1,96 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
from os_faults.api.node_collection import Host
from os_faults.api.node_collection import NodeCollection
from os_faults.drivers.services.process import ServiceAsProcess
class TestServiceAsProcess(unittest.TestCase):
def setUp(self):
super(TestServiceAsProcess, self).setUp()
config = {
'grep': 'test-service',
'port': ['tcp', 8000],
}
self.cloud_management = mock.Mock()
self.hosts = [Host('10.1.1.10')]
self.service = ServiceAsProcess(
'test-service', config, mock.Mock(), self.cloud_management)
node_collection = NodeCollection(
cloud_management=self.cloud_management, hosts=self.hosts)
self.service.get_nodes = mock.Mock(return_value=node_collection)
def test_unplug(self):
# run the command
self.service.unplug()
# verify
expected_task = {
'iptables': {
'chain': 'INPUT',
'protocol': 'tcp',
'jump': 'DROP',
'destination_port': '8000',
'action': 'insert',
'state': 'present',
'comment': 'Added by os-faults',
},
'become': 'yes',
}
self.cloud_management.execute_on_cloud.assert_called_once_with(
self.hosts, expected_task)
def test_unplug_with_other_port(self):
# run the command
self.service.unplug(direction='egress', other_port=['udp', 10000])
# verify
expected_task = {
'iptables': {
'chain': 'OUTPUT',
'protocol': 'udp',
'jump': 'DROP',
'destination_port': '10000',
'action': 'insert',
'state': 'present',
'comment': 'Added by os-faults',
},
'become': 'yes',
}
self.cloud_management.execute_on_cloud.assert_called_once_with(
self.hosts, expected_task)
def test_plug(self):
# run the command
self.service.plug()
# verify
expected_task = {
'iptables': {
'chain': 'INPUT',
'protocol': 'tcp',
'jump': 'DROP',
'destination_port': '8000',
'state': 'absent',
},
'become': 'yes',
}
self.cloud_management.execute_on_cloud.assert_called_once_with(
self.hosts, expected_task)