Implement stress injection

Add new type of fault called "stress". When activated this fault
produces load on CPU, disk, memory or kernel of node. The functionality
is implemented with help of stress-ng utility.

Node collection API is extended:
   def stress(self, target, duration=None)

Human API is extended, examples of commands:
 * stress cpu for 20 seconds on controller.domain.tld node
 * stress disk for 10 seconds on all nodes
 * stress memory for 60 seconds on all nodes with keystone service

Change-Id: I8ddb2292b8dd19f476e4a5071259d1a90cbaa37c
Note: 'stress-ng' is required to be installed on target nodes.
This commit is contained in:
Ilya Shakhat 2017-08-30 15:00:07 +02:00
parent 2c6f613055
commit 9cf6337d5e
4 changed files with 124 additions and 4 deletions

View File

@ -0,0 +1,44 @@
#!/usr/bin/python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ansible.module_utils.basic import * # noqa
STRESSORS_MAP = {
'cpu': '--cpu 0',
'disk': '--hdd 0',
'memory': '--brk 0',
'kernel': '--kill 0',
'all': '--all 0',
}
def main():
module = AnsibleModule(
argument_spec=dict(
target=dict(required=True, type='str'),
duration=dict(required=True, type='int')
))
target = module.params['target']
stressor = STRESSORS_MAP.get(target) or STRESSORS_MAP['all']
duration = module.params['duration']
cmd = 'bash -c "stress-ng %s --timeout %ss"' % (stressor, duration)
rc, stdout, stderr = module.run_command(cmd, check_rc=True)
module.exit_json(cmd=cmd, rc=rc, stderr=stderr, stdout=stdout)
if __name__ == '__main__':
main()

View File

@ -30,6 +30,8 @@ Human API understands commands like these (examples):
* unfreeze <service> service [on (random|one|single|<fqdn> node[s])]
* reboot [random|one|single|<fqdn>] node[s] [with <service> service]
* reset [random|one|single|<fqdn>] node[s] [with <service> service]
* stress [cpu|memory|disk|kernel for <T> seconds] on
[random|one|single|<fqdn>] node[s] [with <service> service]
* disconnect <name> network on [random|one|single|<fqdn>] node[s]
[with <service> service]
* connect <name> network on [random|one|single|<fqdn>] node[s]
@ -44,7 +46,8 @@ def list_actions(klazz):
hasattr(o, '__public__'))))
RANDOMNESS = {'one', 'random', 'some', 'single'}
RANDOMNESS_PATTERN = '|'.join(RANDOMNESS)
ANYTHING = {'all'}
NODE_ALIASES_PATTERN = '|'.join(RANDOMNESS | ANYTHING)
SERVICE_ACTIONS = list_actions(service_pkg.Service)
SERVICE_ACTIONS_PATTERN = '|'.join(SERVICE_ACTIONS)
NODE_ACTIONS = list_actions(node_collection_pkg.NodeCollection)
@ -58,10 +61,12 @@ PATTERNS = [
SERVICE_ACTIONS_PATTERN),
re.compile('(?P<action>%s)'
'(\s+(?P<network>\w+)\s+network\s+on)?'
'(\s+(?P<target>\w+)'
'(\s+for\s+(?P<duration>\d+)\s+seconds)(\s+on)?)?'
'(\s+(?P<node>%s|\S+))?'
'\s+nodes?'
'(\s+with\s+(?P<service>\S+)\s+service)?' %
(NODE_ACTIONS_PATTERN, RANDOMNESS_PATTERN)),
(NODE_ACTIONS_PATTERN, NODE_ALIASES_PATTERN)),
]
@ -82,6 +87,7 @@ def execute(destructor, command):
service_name = groups.get('service')
node_name = groups.get('node')
network_name = groups.get('network')
target = groups.get('target')
duration = groups.get('duration')
if service_name:
@ -92,7 +98,7 @@ def execute(destructor, command):
kwargs = {}
if node_name in RANDOMNESS:
kwargs['nodes'] = service.get_nodes().pick()
elif node_name:
elif node_name and node_name not in ANYTHING:
kwargs['nodes'] = destructor.get_nodes(fqdns=[node_name])
if duration:
@ -110,15 +116,24 @@ def execute(destructor, command):
kwargs = {}
if network_name:
kwargs['network_name'] = network_name
if target:
kwargs['target'] = target
kwargs['duration'] = int(duration)
fn = getattr(nodes, action)
fn(**kwargs)
else: # nodes operation
nodes = destructor.get_nodes(fqdns=[node_name])
if node_name and node_name not in ANYTHING:
nodes = destructor.get_nodes(fqdns=[node_name])
else:
nodes = destructor.get_nodes()
kwargs = {}
if network_name:
kwargs['network_name'] = network_name
if target:
kwargs['target'] = target
kwargs['duration'] = int(duration)
fn = getattr(nodes, action)
fn(**kwargs)

View File

@ -213,3 +213,16 @@ class NodeCollection(utils.ReprMixin):
:param network_name: name of network
"""
raise NotImplementedError
@public
def stress(self, target, duration=None):
"""Stress node OS and hardware
"""
duration = duration or 10 # defaults to 10 seconds
LOG.info('Stress %s for %ss on nodes %s', target, duration, self)
task = {'stress': {
'target': target,
'duration': duration,
}}
self.cloud_management.execute_on_cloud(self.hosts, task)

View File

@ -124,6 +124,54 @@ class TestHumanAPI(test.TestCase):
destructor.get_nodes.assert_called_once_with(fqdns=['node-2.local'])
getattr(nodes, action).assert_called_once()
@ddt.data('cpu', 'memory', 'disk', 'kernel')
def test_stress_by_fqdn(self, target):
action = 'stress'
duration = 20
destructor = mock.MagicMock()
nodes = mock.MagicMock(node_collection.NodeCollection)
destructor.get_nodes = mock.MagicMock(return_value=nodes)
command = 'stress %s for %d seconds on node-2.local node' % (
target, duration)
human.execute(destructor, command)
destructor.get_nodes.assert_called_once_with(fqdns=['node-2.local'])
getattr(nodes, action).assert_called_once_with(
target=target, duration=duration)
@ddt.data('cpu', 'memory', 'disk', 'kernel')
def test_stress_target(self, target):
action = 'stress'
duration = 20
destructor = mock.MagicMock()
nodes = mock.MagicMock(node_collection.NodeCollection)
destructor.get_nodes = mock.MagicMock(return_value=nodes)
command = 'stress %s for %d seconds on nodes' % (target, duration)
human.execute(destructor, command)
destructor.get_nodes.assert_called_once_with()
getattr(nodes, action).assert_called_once_with(
target=target, duration=duration)
@ddt.data(('CPU', 'cpu', 10, 'keystone'),
('disk', 'disk', 20, 'nova-api'))
@ddt.unpack
def test_stress_by_service_on_fqdn_node(self, user_target, cmd_target,
duration, service_name):
action = 'stress'
nodes = mock.MagicMock(node_collection.NodeCollection)
self.service.get_nodes.return_value = nodes
command = 'stress %s for %d seconds on all nodes with %s service' % (
user_target, duration, service_name)
human.execute(self.destructor, command)
getattr(nodes, action).assert_called_once_with(
target=cmd_target, duration=duration)
@ddt.data(('Disconnect', 'disconnect'),
('Connect', 'connect'))
@ddt.unpack