Detach function from underlying orchestrator

This is an admin only operation. which gives admin user a safe way to
clean up the underlying resources allocated for the function.

Change-Id: If98ab5235902516f98be7d485e744cf3ea4cd262
This commit is contained in:
Lingxian Kong 2017-12-14 15:28:45 +13:00
parent e120058fbe
commit b3bedc375d
7 changed files with 104 additions and 60 deletions

View File

@ -10,6 +10,9 @@
"function:get_all:all_projects": "rule:context_is_admin",
"function_worker:get_all": "rule:context_is_admin",
"function:scale_up": "rule:context_is_admin",
"function:scale_down": "rule:context_is_admin",
"function:detach": "rule:context_is_admin",
"execution:get_all:all_projects": "rule:context_is_admin",
}

View File

@ -67,6 +67,7 @@ class FunctionsController(rest.RestController):
_custom_actions = {
'scale_up': ['POST'],
'scale_down': ['POST'],
'detach': ['POST'],
}
def __init__(self, *args, **kwargs):
@ -334,6 +335,8 @@ class FunctionsController(rest.RestController):
This is admin only operation. The load monitoring of function execution
depends on the monitoring solution of underlying orchestrator.
"""
acl.enforce('function:scale_up', context.get_ctx())
func_db = db_api.get_function(id)
params = scale.to_dict()
@ -358,6 +361,8 @@ class FunctionsController(rest.RestController):
This is admin only operation. The load monitoring of function execution
depends on the monitoring solution of underlying orchestrator.
"""
acl.enforce('function:scale_down', context.get_ctx())
func_db = db_api.get_function(id)
params = scale.to_dict()
if len(func_db.workers) <= 1:
@ -367,3 +372,19 @@ class FunctionsController(rest.RestController):
LOG.info('Starting to scale down function %s, params: %s', id, params)
self.engine_client.scaledown_function(id, count=params['count'])
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=202)
def detach(self, id):
"""Detach the function from its underlying workers.
This is admin only operation, which gives admin user a safe way to
clean up the underlying resources allocated for the function.
"""
acl.enforce('function:detach', context.get_ctx())
db_api.get_function(id)
LOG.info('Starting to detach function %s', id)
# Delete all resources created by orchestrator asynchronously.
self.engine_client.delete_function(id)

View File

@ -219,6 +219,8 @@ class DefaultEngine(object):
labels = {'function_id': function_id}
self.orchestrator.delete_function(function_id, labels=labels)
db_api.delete_function_workers(function_id)
LOG.info('Deleted.', resource=resource)
def scaleup_function(self, ctx, function_id, runtime_id, count=1):

View File

@ -85,6 +85,11 @@ class QinlingClient(client_base.QinlingClientBase):
return self.get('/v1/functions/%s?download=true' % function_id,
headers={})
def detach_function(self, function_id):
return self.post('/v1/functions/%s/detach' % function_id,
None,
headers={})
def create_execution(self, function_id, input=None, sync=True):
req_body = {'function_id': function_id, 'sync': sync, 'input': input}
resp, body = self.post_json('executions', req_body)

View File

@ -52,16 +52,20 @@ class ExecutionsTest(base.BaseQinlingTest):
super(ExecutionsTest, cls).resource_cleanup()
def setUp(self):
super(ExecutionsTest, self).setUp()
self.await_runtime_available(self.runtime_id)
def _create_function(self, name='python_test.py'):
python_file_path = pkg_resources.resource_filename(
'qinling_tempest_plugin',
"functions/%s" % name
)
base_name, extention = os.path.splitext(python_file_path)
self.base_name = os.path.basename(base_name)
module_name = os.path.basename(base_name)
self.python_zip_file = os.path.join(
tempfile.gettempdir(),
'%s.zip' % self.base_name
'%s.zip' % module_name
)
if not os.path.isfile(self.python_zip_file):
@ -70,32 +74,16 @@ class ExecutionsTest(base.BaseQinlingTest):
# Use default compression mode, may change in future.
zf.write(
python_file_path,
'%s%s' % (self.base_name, extention),
'%s%s' % (module_name, extention),
compress_type=zipfile.ZIP_STORED
)
finally:
zf.close()
# Create function
function_name = data_utils.rand_name('function',
prefix=self.name_prefix)
with open(self.python_zip_file, 'rb') as package_data:
resp, body = self.client.create_function(
{"source": "package"},
self.runtime_id,
name=function_name,
package_data=package_data,
entry='%s.main' % self.base_name
)
self.function_id = body['id']
self.addCleanup(self.client.delete_resource, 'functions',
self.function_id, ignore_notfound=True)
self.addCleanup(os.remove, self.python_zip_file)
self.function_id = self.create_function(self.python_zip_file)
@decorators.idempotent_id('2a93fab0-2dae-4748-b0d4-f06b735ff451')
def test_crud_execution(self):
self.await_runtime_available(self.runtime_id)
self._create_function()
resp, body = self.client.create_execution(self.function_id,
@ -126,7 +114,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('2199d1e6-de7d-4345-8745-a8184d6022b1')
def test_get_all_admin(self):
"""Admin user can get executions of other projects"""
self.await_runtime_available(self.runtime_id)
self._create_function()
resp, body = self.client.create_execution(self.function_id,
@ -161,7 +148,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('8096cc52-64d2-4660-a657-9ac0bdd743ae')
def test_execution_async(self):
self.await_runtime_available(self.runtime_id)
self._create_function()
resp, body = self.client.create_execution(self.function_id, sync=False)
@ -178,7 +164,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('6cb47b1d-a8c6-48f2-a92f-c4f613c33d1c')
def test_execution_log(self):
self.await_runtime_available(self.runtime_id)
self._create_function()
resp, body = self.client.create_execution(self.function_id,
@ -199,7 +184,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('f22097dc-37db-484d-83d3-3a97e72ec576')
def test_execution_concurrency(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_sleep.py')
def _create_execution():
@ -227,7 +211,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('a948382a-84af-4f0e-ad08-4297345e302c')
def test_python_execution_file_limit(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_file_limit.py')
resp, body = self.client.create_execution(self.function_id)
@ -244,7 +227,6 @@ class ExecutionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab')
def test_python_execution_process_number(self):
self.await_runtime_available(self.runtime_id)
self._create_function(name='test_python_process_limit.py')
resp, body = self.client.create_execution(self.function_id)

View File

@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pkg_resources
import tempfile
import zipfile
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import tenacity
from qinling_tempest_plugin.tests import base
@ -45,46 +47,21 @@ class FunctionsTest(base.BaseQinlingTest):
super(FunctionsTest, cls).resource_cleanup()
def _create_function(self):
function_name = data_utils.rand_name('function',
prefix=self.name_prefix)
with open(self.python_zip_file, 'rb') as package_data:
resp, body = self.client.create_function(
{"source": "package"},
self.runtime_id,
name=function_name,
package_data=package_data,
entry='%s.main' % self.base_name
)
self.assertEqual(201, resp.status_code)
function_id = body['id']
self.addCleanup(self.client.delete_resource, 'functions',
function_id, ignore_notfound=True)
return function_id
def setUp(self):
super(FunctionsTest, self).setUp()
# Wait until runtime is available
self.await_runtime_available(self.runtime_id)
python_file_path = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
os.pardir,
os.pardir,
'functions/python_test.py'
)
python_file_path = pkg_resources.resource_filename(
'qinling_tempest_plugin',
"functions/python_test.py"
)
base_name, extention = os.path.splitext(python_file_path)
self.base_name = os.path.basename(base_name)
module_name = os.path.basename(base_name)
self.python_zip_file = os.path.join(
tempfile.gettempdir(),
'%s.zip' % self.base_name
'%s.zip' % module_name
)
if not os.path.isfile(self.python_zip_file):
@ -93,7 +70,7 @@ class FunctionsTest(base.BaseQinlingTest):
# Use default compression mode, may change in future.
zf.write(
python_file_path,
'%s%s' % (self.base_name, extention),
'%s%s' % (module_name, extention),
compress_type=zipfile.ZIP_STORED
)
finally:
@ -102,7 +79,7 @@ class FunctionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('9c36ac64-9a44-4c44-9e44-241dcc6b0933')
def test_crud_function(self):
# Create function
function_id = self._create_function()
function_id = self.create_function(self.python_zip_file)
# Get functions
resp, body = self.client.get_resources('functions')
@ -124,7 +101,7 @@ class FunctionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('051f3106-df01-4fcd-a0a3-c81c99653163')
def test_get_all_admin(self):
# Create function by normal user
function_id = self._create_function()
function_id = self.create_function(self.python_zip_file)
# Get functions by admin
resp, body = self.admin_client.get_resources('functions')
@ -162,7 +139,7 @@ class FunctionsTest(base.BaseQinlingTest):
@decorators.idempotent_id('5cb44ee4-6c0c-4ede-9e6c-e1b9109eaa2c')
def test_delete_not_allowed(self):
"""Even admin user can not delete other project's function."""
function_id = self._create_function()
function_id = self.create_function(self.python_zip_file)
self.assertRaises(
exceptions.Forbidden,
@ -170,3 +147,31 @@ class FunctionsTest(base.BaseQinlingTest):
'functions',
function_id
)
@decorators.idempotent_id('45df227e-3399-4412-a8d3-d40c1290bc1c')
def test_detach(self):
"""Admin only operation."""
function_id = self.create_function(self.python_zip_file)
resp, _ = self.client.create_execution(function_id,
input={'name': 'Qinling'})
self.assertEqual(201, resp.status)
resp, body = self.admin_client.get_function_workers(function_id)
self.assertEqual(200, resp.status)
self.assertEqual(1, len(body['workers']))
# Detach function
resp, _ = self.admin_client.detach_function(function_id)
self.assertEqual(202, resp.status)
def _assert_workers():
resp, body = self.admin_client.get_function_workers(function_id)
self.assertEqual(200, resp.status)
self.assertEqual(0, len(body['workers']))
r = tenacity.Retrying(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_exception_type(AssertionError)
)
r.call(_assert_workers)

View File

@ -11,8 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from kubernetes import client as k8s_client
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest import test
import tenacity
@ -64,3 +67,26 @@ class BaseQinlingTest(test.BaseTestCase):
self.assertEqual(200, resp.status)
self.assertEqual('success', body['status'])
def create_function(self, package_path):
function_name = data_utils.rand_name('function',
prefix=self.name_prefix)
base_name, _ = os.path.splitext(package_path)
module_name = os.path.basename(base_name)
with open(package_path, 'rb') as package_data:
resp, body = self.client.create_function(
{"source": "package"},
self.runtime_id,
name=function_name,
package_data=package_data,
entry='%s.main' % module_name
)
self.assertEqual(201, resp.status_code)
function_id = body['id']
self.addCleanup(os.remove, package_path)
self.addCleanup(self.client.delete_resource, 'functions',
function_id, ignore_notfound=True)
return function_id