Check RabbitMQ cluster status via REST API during cluster create

This is replacing the TaskFlow task to check when a rabbit node begins to
respond to port 5672.  We now use rabbits management plugin and REST API
to check when the cluster is active and able to consume jobs. The new task
is added at the end of the create_cluster flow.  This is (arbitrarily) going
to the 0th node's RabbitMQ Management port.

Change-Id: I9dffdefd9cdcd6317f5c3114c59b4b30b8218239
This commit is contained in:
Daniel Allegood 2015-07-27 14:18:11 -07:00
parent 2874192814
commit fe9f3767fb
15 changed files with 311 additions and 409 deletions

View File

@ -99,8 +99,8 @@ function configure_cue {
fi
# Set cluster node check timeouts
iniset $CUE_CONF taskflow cluster_node_check_timeout 10
iniset $CUE_CONF taskflow cluster_node_check_max_count 30
iniset $CUE_CONF taskflow cluster_node_check_timeout 30
iniset $CUE_CONF taskflow cluster_node_check_max_count 120
# Set flow create cluster node vm active retry count
iniset $CUE_CONF flow_options create_cluster_node_vm_active_retry_count $CUE_TF_CREATE_CLUSTER_NODE_VM_ACTIVE_RETRY_COUNT
@ -207,6 +207,7 @@ function init_cue {
$OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 5672:5672 $CUE_RABBIT_SECURITY_GROUP
$OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 4369:4369 $CUE_RABBIT_SECURITY_GROUP
$OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 61000:61000 $CUE_RABBIT_SECURITY_GROUP
$OPENSTACK_CMD security group rule create --src-ip 0.0.0.0/0 --proto tcp --dst-port 15672:15672 $CUE_RABBIT_SECURITY_GROUP
fi
CUE_RABBIT_SECURITY_GROUP_ID=$($OPENSTACK_CMD security group list | grep $CUE_RABBIT_SECURITY_GROUP | tr -d ' ' | cut -f 2 -d '|')

View File

@ -15,10 +15,13 @@
import oslo_config.cfg as cfg
import taskflow.patterns.graph_flow as graph_flow
import taskflow.patterns.linear_flow as linear_flow
import taskflow.retry as retry
from cue.db.sqlalchemy import models
from cue.taskflow.flow import create_cluster_node
import cue.taskflow.task as cue_tasks
import os_tasklib.common as os_common
def create_cluster(cluster_id, node_ids, user_network_id,
@ -60,6 +63,25 @@ def create_cluster(cluster_id, node_ids, user_network_id,
node_check_timeout = cfg.CONF.taskflow.cluster_node_check_timeout
node_check_max_count = cfg.CONF.taskflow.cluster_node_check_max_count
check_rabbit_online = linear_flow.Flow(
name="wait for RabbitMQ ready state",
retry=retry.Times(node_check_max_count, revert_all=True))
check_rabbit_online.add(
cue_tasks.GetRabbitClusterStatus(
name="get RabbitMQ status",
rebind={'vm_ip': "vm_management_ip_0"},
provides="clustering_status",
inject={'proto': 'http'}),
os_common.CheckFor(
name="check cluster status",
rebind={'check_var': "clustering_status"},
check_value='OK',
retry_delay_seconds=10),
)
flow.add(check_rabbit_online)
flow.link(check_rabbit_online, end_task)
#todo(dagnello): verify node_ids is a list and not a string
for i, node_id in enumerate(node_ids):
generate_userdata = cue_tasks.ClusterNodeUserData(
@ -71,7 +93,8 @@ def create_cluster(cluster_id, node_ids, user_network_id,
create_cluster_node.create_cluster_node(cluster_id, i, node_id, flow,
generate_userdata, start_task,
end_task, node_check_timeout,
check_rabbit_online,
node_check_timeout,
node_check_max_count,
user_network_id,
management_network_id)

View File

@ -36,7 +36,7 @@ CONF.register_opts(FLOW_OPTS, group='flow_options')
def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
generate_userdata, start_task, end_task,
generate_userdata, start_task, post_task,
node_check_timeout, node_check_max_count,
user_network_id, management_network_id):
"""Create Cluster Node factory function
@ -53,8 +53,8 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
:type graph_flow: taskflow.patterns.graph_flow
:param start_task: Update cluster status start task
:type start_task: cue.taskflow.task.UpdateClusterStatus
:param end_task: Update cluster status end task
:type end_task: cue.taskflow.task.UpdateClusterStatus
:param post_task: Task/Subflow to run after the flow created here
:type post_task: taskflow task or flow
:param generate_userdata: generate user data task
:type generate_userdata: cue.taskflow.task.ClusterNodeUserData
:param node_check_timeout: seconds wait between node status checks
@ -158,18 +158,6 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
graph_flow.add(check_vm_active)
graph_flow.link(get_vm_id, check_vm_active)
#todo(dagnello): make retry times configurable
check_rabbit_online = linear_flow.Flow(
name="wait for RabbitMQ ready state %s" % node_name,
retry=retry.Times(node_check_max_count, revert_all=True))
check_rabbit_online.add(
os_common.VerifyNetwork(
name="get RabbitMQ status %s" % node_name,
rebind={'vm_ip': "vm_management_ip_%d" % node_number},
retry_delay_seconds=node_check_timeout))
graph_flow.add(check_rabbit_online)
graph_flow.link(check_vm_active, check_rabbit_online)
build_node_info = os_common.Lambda(
new_node_values,
name="build new node values %s" % node_name,
@ -194,13 +182,13 @@ def create_cluster_node(cluster_id, node_number, node_id, graph_flow,
provides="endpoint_values_%d" % node_number
)
graph_flow.add(build_endpoint_info)
graph_flow.link(check_rabbit_online, build_endpoint_info)
graph_flow.link(check_vm_active, build_endpoint_info)
create_endpoint = cue_tasks.CreateEndpoint(
name="update endpoint for node %s" % node_name,
rebind={'endpoint_values': "endpoint_values_%d" % node_number})
graph_flow.add(create_endpoint)
graph_flow.link(check_rabbit_online, create_endpoint)
graph_flow.link(check_vm_active, create_endpoint)
graph_flow.link(update_node, end_task)
graph_flow.link(create_endpoint, end_task)
graph_flow.link(update_node, post_task)
graph_flow.link(create_endpoint, post_task)

View File

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from check_or_restart_rabbitmq import CheckOrRestartRabbitMq # noqa
from cluster_node_userdata import ClusterNodeUserData # noqa
from create_endpoint_task import CreateEndpoint # noqa
from get_node import GetNode # noqa
from get_rabbit_cluster_status import GetRabbitClusterStatus # noqa
from update_cluster_task import UpdateClusterStatus # noqa
from update_endpoints_task import UpdateEndpoints # noqa
from update_node_task import UpdateNode # noqa

View File

@ -1,97 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import telnetlib as telnet
import time
import six
import taskflow.task
class CheckOrRestartRabbitMq(taskflow.task.Task):
"""Check or Restart RabbitMQ VM
This task either checks that RabbitMQ is running or restarts the VM,
depending on the supplied action.
"""
def __init__(self,
os_client,
retry_delay_seconds=None,
retry_delay_ms=None,
name=None,
**kwargs):
"""Constructor
This constructor sets the retry delay for this task's revert method.
:param retry_delay_seconds: retry delay in seconds
:param retry_delay_ms: retry delay in milliseconds
:param name: unique name for atom
"""
super(CheckOrRestartRabbitMq, self).__init__(name=name, **kwargs)
self.os_client = os_client
self.sleep_time = 0
if retry_delay_seconds:
self.sleep_time = retry_delay_seconds
if retry_delay_ms:
self.sleep_time += retry_delay_ms / 1000.0
def execute(self, action, vm_info, vm_ip, port, **kwargs):
"""main execute method
:param action: The request context in dict format.
:type action: oslo_context.RequestContext
:param vm_info: Unique ID for the node.
:type vm_info: dict or string
:param vm_ip:
:type vm_ip:
:param port:
:type port:
"""
if six.PY2 and isinstance(port, unicode):
check_port = port.encode()
else:
check_port = port
if isinstance(vm_info, dict):
vm_id = vm_info['id']
else:
vm_id = vm_info
if action == 'restart':
self.os_client.servers.reboot(vm_id)
tn = telnet.Telnet()
tn.open(vm_ip, check_port, timeout=10)
def revert(self, *args, **kwargs):
"""Revert CreateVmTask
This method is executed upon failure of the GetRabbitVmStatus or the
Flow that the Task is part of.
:param args: positional arguments that the task required to execute.
:type args: list
:param kwargs: keyword arguments that the task required to execute; the
special key `result` will contain the :meth:`execute`
results (if any) and the special key `flow_failures`
will contain any failure information.
"""
if self.sleep_time != 0:
time.sleep(self.sleep_time)

View File

@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import urllib
import urllib2
import taskflow.task
class GetRabbitClusterStatus(taskflow.task.Task):
def execute(self,
vm_ip,
default_rabbit_user,
default_rabbit_pass,
proto=None,
**kwargs):
"""Main execute method to verify Rabbitmq clustering in a VM
:param vm_ip: VM ip address
:type vm_ip: string
:param default_rabbit_user: Username for RabbitMQ
:type default_rabbit_user: string
:param default_rabbit_pass: Password for RabbitMQ
:type default_rabbit_pass: string
:param proto: Protocol for url, http/https
:type proto: string
"""
if not proto:
proto = 'http'
base_url = proto + '://' + vm_ip + ':15672/api'
# RMQ management url to get a list of the existing vhosts
vhosts_url = base_url + '/vhosts'
# RMQ management url to query whether the RMQ nodes have clustered
aliveness_url = base_url + '/aliveness-test'
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None,
vhosts_url,
default_rabbit_user,
default_rabbit_pass)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
retval = "NOT-OK"
try:
res = opener.open(vhosts_url)
except urllib2.URLError:
pass
else:
json_res = json.load(res)
result = {}
for x in json_res:
vhost = urllib.quote(x['name'], '')
cur_aliveness_url = aliveness_url + '/' + vhost
password_mgr.add_password(None,
cur_aliveness_url,
default_rabbit_user,
default_rabbit_pass)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
res = opener.open(cur_aliveness_url)
result[x['name']] = json.load(res)['status']
if all(val == 'ok' for val in result.values()):
retval = 'OK'
return retval

View File

@ -51,6 +51,8 @@ rabbitmq hard nofile 65536
rabbitmq soft nofile 65536
EOF
rabbitmq-plugins enable rabbitmq_management
update-rc.d rabbitmq-server enable
while [ ! -z "$(service rabbitmq-server status | grep 'nodedown')" ]; do

View File

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import StringIO
import urllib2
import cue.tests.functional.fixtures.base as base
class Urllib2ResultDetails(object):
urllib2_result_list = []
@staticmethod
def set_urllib2_result(results):
"""Helper function to setup sequence of fixed results.
:param results: list of results
"""
for result in results:
io = StringIO.StringIO(result)
Urllib2ResultDetails.urllib2_result_list.append(io)
@staticmethod
def get_urllib2_result():
"""Returns the next result in configured sequence.
If result sequence is empty, an empty string result of '' is returned.
:return: Current urllib result in urllib2_result_list.
"""
if len(Urllib2ResultDetails.urllib2_result_list) == 0:
result = ''
else:
result = Urllib2ResultDetails.urllib2_result_list.pop()
return result
class Urllib2Fixture(base.BaseFixture):
"""A test fixture to simulate urllib2 calls
This class is used in test cases to simulate urllib2 calls.
"""
def __init__(self, *args, **kwargs):
super(Urllib2Fixture, self).__init__(*args, **kwargs)
def setUp(self):
"""Set up test fixture and apply all method overrides."""
super(Urllib2Fixture, self).setUp()
urllib2_client = self.mock('urllib2.OpenerDirector')
urllib2_client.open = self.open
def open(self, url):
result = Urllib2ResultDetails.get_urllib2_result()
if result.getvalue() is 'URLError':
raise urllib2.URLError('urlerror')
return result

View File

@ -22,7 +22,7 @@ from cue.taskflow.flow import create_cluster
from cue.tests.functional import base
from cue.tests.functional.fixtures import neutron
from cue.tests.functional.fixtures import nova
from cue.tests.functional.fixtures import telnet
from cue.tests.functional.fixtures import urllib2_fixture
from taskflow import engines
import taskflow.exceptions as taskflow_exc
@ -32,7 +32,7 @@ class CreateClusterTests(base.FunctionalTestCase):
additional_fixtures = [
nova.NovaClient,
neutron.NeutronClient,
telnet.TelnetClient
urllib2_fixture.Urllib2Fixture
]
def setUp(self):
@ -66,6 +66,19 @@ class CreateClusterTests(base.FunctionalTestCase):
name=management_network_name)
self.management_network = network_list['networks'][0]
# Todo(Dan) If testing becomes asynchronous, then there is no guarantee
# that these urllib return results will come in the proper order. Will
# have to update the urllib2 fixture to respond appropriately for the
# url passed in.
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['{"status": "ok"}',
'[{"name": "/"}]',
'{"status": "ok"}',
'[{"name": "/"}]',
'{"status": "ok"}',
'[{"name": "/"}]']
)
def test_create_cluster(self):
flow_store = {
"image": self.valid_image.id,

View File

@ -24,7 +24,7 @@ from cue.taskflow.flow import delete_cluster
from cue.tests.functional import base
from cue.tests.functional.fixtures import neutron
from cue.tests.functional.fixtures import nova
from cue.tests.functional.fixtures import telnet
from cue.tests.functional.fixtures import urllib2_fixture
from taskflow import engines
import taskflow.exceptions as taskflow_exc
@ -34,7 +34,7 @@ class DeleteClusterTests(base.FunctionalTestCase):
additional_fixtures = [
nova.NovaClient,
neutron.NeutronClient,
telnet.TelnetClient
urllib2_fixture.Urllib2Fixture
]
def setUp(self):
@ -68,6 +68,19 @@ class DeleteClusterTests(base.FunctionalTestCase):
name=management_network_name)
self.management_network = network_list['networks'][0]
# Todo(Dan) If testing becomes asynchronous, then there is no guarantee
# that these urllib return results will come in the proper order. Will
# have to update the urllib2 fixture to respond appropriately for the
# url passed in.
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['{"status": "ok"}',
'[{"name": "/"}]',
'{"status": "ok"}',
'[{"name": "/"}]',
'{"status": "ok"}',
'[{"name": "/"}]']
)
def test_delete_cluster(self):
flow_store_create = {
"image": self.valid_image.id,

View File

@ -1,110 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import cue.client as client
import cue.taskflow.task as cue_task
from cue.tests.functional import base
from cue.tests.functional.fixtures import nova
from cue.tests.functional.fixtures import telnet
from taskflow import engines
from taskflow.patterns import linear_flow
import taskflow.retry as retry
import uuid
class CheckOrRestartRabbitTest(base.FunctionalTestCase):
additional_fixtures = [
nova.NovaClient,
telnet.TelnetClient
]
task_store = {
'vm_ip': "0.0.0.0",
'vm_info': str(uuid.uuid4()),
'port': '5672'
}
def setUp(self):
super(CheckOrRestartRabbitTest, self).setUp()
self.nova_client = client.nova_client()
rabbitmq_retry_strategy = [
'check',
'check',
'check',
'check',
'restart',
]
retry_controller = retry.ForEach(rabbitmq_retry_strategy,
"retry check RabbitMQ",
provides="retry_strategy")
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state",
retry=retry_controller).add(
cue_task.CheckOrRestartRabbitMq(
os_client=self.nova_client,
name="get RabbitMQ status %s",
rebind={'action': "retry_strategy"},
retry_delay_seconds=1))
def test_get_rabbit_status(self):
"""Verifies GetRabbitVmStatus task directly."""
# start engine to run task
engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store)
def test_get_vm_status_flow(self):
"""Verifies GetRabbitVmStatus in a successful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until telnet_status acquired from get_rabbit_vm_status
task returns 'connect'. Attempting the telnet connection will return
'wait' for the first three times, then will return 'connected' once a
telnet connection is made. The node status should be in Active state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['connected',
'wait',
'wait',
'wait'])
# create flow with "GetRabbitVmStatus" task
# start engine to run task
engines.run(self.flow, store=CheckOrRestartRabbitTest.task_store)
self.assertFalse(self.nova_client.servers.reboot.called)
def test_get_vm_status_flow_fail(self):
"""Verifies GetRabbitVmStatus in an unsuccessful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until the retry count has been exhausted. Attempting
the telnet connection will return 'wait' until retry count reaches
limit and flow fails. The node status should remain in Building state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['wait',
'wait',
'wait',
'wait',
'wait',
'wait'])
# start engine to run task
self.assertRaises(IOError, engines.run, self.flow,
store=CheckOrRestartRabbitTest.task_store)
self.assertTrue(self.nova_client.servers.reboot.called)

View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cue.taskflow.task import get_rabbit_cluster_status
from cue.tests.functional import base
from cue.tests.functional.fixtures import urllib2_fixture as urllib2_fixture
from taskflow import engines
from taskflow.patterns import linear_flow
import urllib2
class GetRabbitClusterStatusTest(base.FunctionalTestCase):
additional_fixtures = [
urllib2_fixture.Urllib2Fixture
]
task_store = {
"vm_ip": "0.0.0.0",
"default_rabbit_user": "user",
"default_rabbit_pass": "pass"
}
def setUp(self):
super(GetRabbitClusterStatusTest, self).setUp()
def test_get_rabbit_cluster_status(self):
"""Verifies GetRabbitClusterStatus task."""
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['{"status": "ok"}',
'{"status": "clustering"}',
'[{"name": "/"}]']
)
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add(
get_rabbit_cluster_status.GetRabbitClusterStatus(
name="get RabbitMQ status"))
# start engine
engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store)
def test_get_rabbit_cluster_status_fail(self):
"""Verifies unsuccessful path.
This test simulates when the RMQ vm is not responding to the
urllib2.open calls on the RMQ management port.
"""
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['{}',
'[]']
)
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add(
get_rabbit_cluster_status.GetRabbitClusterStatus(
name="get RabbitMQ status"))
# start engine
engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store)
def test_get_rabbit_cluster_status_connection_failed(self):
"""Verifies GetRabbitClusterStatus task."""
urllib2_fixture.Urllib2ResultDetails.set_urllib2_result(
['URLError']
)
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state").add(
get_rabbit_cluster_status.GetRabbitClusterStatus(
name="get RabbitMQ status"))
try:
# start engine
engines.run(self.flow, store=GetRabbitClusterStatusTest.task_store)
except urllib2.URLError:
# Expected
pass

View File

@ -1,99 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cue.tests.functional import base
from cue.tests.functional.fixtures import telnet
import os_tasklib.common.verify_network_task as verify_network_task
from taskflow import engines
from taskflow.patterns import linear_flow
import taskflow.retry as retry
class GetRabbitVmStatusTest(base.FunctionalTestCase):
additional_fixtures = [
telnet.TelnetClient
]
task_store = {
'vm_ip': "0.0.0.0",
'port': '5672'
}
def setUp(self):
super(GetRabbitVmStatusTest, self).setUp()
def test_get_rabbit_status(self):
"""Verifies GetRabbitVmStatus task directly."""
# create flow with "GetRabbitVmStatus" task
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state",
retry=retry.Times(5)).add(
verify_network_task.VerifyNetwork(
name="get RabbitMQ status",
retry_delay_seconds=1))
# start engine to run task
engines.run(self.flow, store=GetRabbitVmStatusTest.task_store)
def test_get_vm_status_flow(self):
"""Verifies GetRabbitVmStatus in a successful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until telnet_status acquired from get_rabbit_vm_status
task returns 'connect'. Attempting the telnet connection will return
'wait' for the first three times, then will return 'connected' once a
telnet connection is made. The node status should be in Active state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['connected',
'wait',
'wait',
'wait'])
# create flow with "GetRabbitVmStatus" task
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state",
retry=retry.Times(5)).add(
verify_network_task.VerifyNetwork(
name="get RabbitMQ status",
retry_delay_seconds=1))
# start engine to run task
engines.run(self.flow, store=GetRabbitVmStatusTest.task_store)
def test_get_vm_status_flow_fail(self):
"""Verifies GetRabbitVmStatus in an unsuccessful retry flow.
This test simulates creating a cluster, then directly running a flow
which will fail until the retry count has been exhausted. Attempting
the telnet connection will return 'wait' until retry count reaches
limit and flow fails. The node status should remain in Building state.
"""
# configure custom vm_status list
telnet.TelnetStatusDetails.set_telnet_status(['wait',
'wait',
'wait',
'wait'])
# create flow with "GetRabbitVmStatus" task
self.flow = linear_flow.Flow(name="wait for RabbitMQ ready state",
retry=retry.Times(3)).add(
verify_network_task.VerifyNetwork(
name="get RabbitMQ status",
retry_delay_seconds=1))
# start engine to run task
self.assertRaises(IOError, engines.run, self.flow,
store=GetRabbitVmStatusTest.task_store)

View File

@ -18,4 +18,3 @@ from check_for import CheckFor # noqa
from lambda_task import Lambda # noqa
from map_task import Map # noqa
from reduce_task import Reduce # noqa
from verify_network_task import VerifyNetwork # noqa

View File

@ -1,75 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import telnetlib as telnet
import time
import six
import taskflow.task
class VerifyNetwork(taskflow.task.Task):
def __init__(self,
retry_delay_seconds=None,
retry_delay_ms=None,
name=None,
**kwargs):
"""Constructor
This constructor sets the retry delay for this task's revert method.
:param retry_delay_seconds: retry delay in seconds
:param retry_delay_ms: retry delay in milliseconds
:param name: unique name for atom
"""
super(VerifyNetwork, self).__init__(name=name, **kwargs)
self.sleep_time = 0
if retry_delay_seconds:
self.sleep_time = retry_delay_seconds
if retry_delay_ms:
self.sleep_time += retry_delay_ms / 1000.0
def execute(self, vm_ip, port, **kwargs):
"""Main execute method to verify network connection in a VM
:param vm_ip: VM ip address
:type vm_ip: string
:param port: host service port
:type port: int
"""
if six.PY2 and isinstance(port, unicode):
check_port = port.encode()
else:
check_port = port
tn = telnet.Telnet()
tn.open(vm_ip, check_port, timeout=10)
def revert(self, *args, **kwargs):
"""Revert CreateVmTask
This method is executed upon failure of the GetRabbitVmStatus or the
Flow that the Task is part of.
:param args: positional arguments that the task required to execute.
:type args: list
:param kwargs: keyword arguments that the task required to execute; the
special key `result` will contain the :meth:`execute`
results (if any) and the special key `flow_failures`
will contain any failure information.
"""
if self.sleep_time != 0:
time.sleep(self.sleep_time)