From 939ba95cc490096af620e1b7abd2f829dae021aa Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Fri, 8 Dec 2017 13:51:40 +1300 Subject: [PATCH] Support to get workers by admin user Admin user can get current workers related to a function. Change-Id: Ia17e52ad1f7541e63b9e815b47751ad94d33e743 --- etc/policy.json.sample | 2 + qinling/api/controllers/v1/function.py | 17 +++++++ qinling/api/controllers/v1/resources.py | 14 ++++++ qinling/db/sqlalchemy/api.py | 8 ++-- qinling/orchestrator/kubernetes/manager.py | 3 ++ .../controllers/v1/test_function_worker.py | 47 +++++++++++++++++++ .../functions/test_python_sleep.py | 18 +++++++ .../services/qinling_client.py | 3 ++ .../tests/api/test_executions.py | 45 ++++++++++++++++-- test-requirements.txt | 1 + 10 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 qinling/tests/unit/api/controllers/v1/test_function_worker.py create mode 100644 qinling_tempest_plugin/functions/test_python_sleep.py diff --git a/etc/policy.json.sample b/etc/policy.json.sample index f8f92416..c5d4283c 100644 --- a/etc/policy.json.sample +++ b/etc/policy.json.sample @@ -9,4 +9,6 @@ "runtime:delete": "rule:context_is_admin", "function:get_all:all_projects": "rule:context_is_admin", + + "function_worker:get_all": "rule:context_is_admin", } diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 4e3ecf33..f699b29c 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -45,7 +45,24 @@ CODE_SOURCE = set(['package', 'swift', 'image']) UPDATE_ALLOWED = set(['name', 'description', 'code', 'package', 'entry']) +class FunctionWorkerController(rest.RestController): + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.FunctionWorkers, types.uuid) + def get_all(self, function_id): + acl.enforce('function_worker:get_all', context.get_ctx()) + + LOG.info("Get workers for function %s.", function_id) + db_workers = db_api.get_function_workers(function_id) + + workers = [resources.FunctionWorker.from_dict(db_model.to_dict()) + for db_model in db_workers] + + return resources.FunctionWorkers(workers=workers) + + class FunctionsController(rest.RestController): + workers = FunctionWorkerController() + _custom_actions = { 'scale_up': ['POST'], 'scale_down': ['POST'], diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 6c099a5d..bc4a1797 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -213,6 +213,20 @@ class Functions(ResourceList): return sample +class FunctionWorker(Resource): + id = wtypes.text + function_id = wsme.wsattr(types.uuid, readonly=True) + worker_name = wtypes.text + + +class FunctionWorkers(ResourceList): + workers = [FunctionWorker] + + def __init__(self, **kwargs): + self._type = 'workers' + super(FunctionWorkers, self).__init__(**kwargs) + + class Runtime(Resource): id = wtypes.text name = wtypes.text diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 159eb695..c3bdb2f6 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -418,15 +418,17 @@ def delete_function_service_mapping(id, session=None): @db_base.session_aware() def create_function_worker(values, session=None): - mapping = models.FunctionWorkers() - mapping.update(values.copy()) + worker = models.FunctionWorkers() + worker.update(values.copy()) # Ignore duplicate error for FunctionWorkers try: - mapping.save(session=session) + worker.save(session=session) except oslo_db_exc.DBDuplicateEntry: session.close() + return worker + @db_base.session_aware() def get_function_workers(function_id, session=None): diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 80041397..a1192343 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -230,6 +230,9 @@ class KubernetesManager(base.OrchestratorBase): label_selector='function_id=%s' % function_id ) if len(ret.items) > 0: + LOG.debug( + "Function %s already associates to a pod.", function_id + ) return ret.items[:count] ret = self.v1.list_namespaced_pod( diff --git a/qinling/tests/unit/api/controllers/v1/test_function_worker.py b/qinling/tests/unit/api/controllers/v1/test_function_worker.py new file mode 100644 index 00000000..79825055 --- /dev/null +++ b/qinling/tests/unit/api/controllers/v1/test_function_worker.py @@ -0,0 +1,47 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from qinling.db import api as db_api +from qinling.tests.unit.api import base + +TEST_CASE_NAME = 'TestFunctionWorkerController' + + +class TestFunctionWorkerController(base.APITest): + def setUp(self): + super(TestFunctionWorkerController, self).setUp() + + db_func = self.create_function(prefix=TEST_CASE_NAME) + self.function_id = db_func.id + + def test_get_all_workers(self): + db_worker = db_api.create_function_worker( + { + 'function_id': self.function_id, + 'worker_name': 'worker_1', + } + ) + expected = { + "id": db_worker.id, + "function_id": self.function_id, + "worker_name": "worker_1", + } + + resp = self.app.get('/v1/functions/%s/workers' % self.function_id) + + self.assertEqual(200, resp.status_int) + actual = self._assert_single_item( + resp.json['workers'], id=db_worker.id + ) + self._assertDictContainsSubset(actual, expected) diff --git a/qinling_tempest_plugin/functions/test_python_sleep.py b/qinling_tempest_plugin/functions/test_python_sleep.py new file mode 100644 index 00000000..b9058c09 --- /dev/null +++ b/qinling_tempest_plugin/functions/test_python_sleep.py @@ -0,0 +1,18 @@ +# Copyright 2017 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + + +def main(seconds=5, **kwargs): + time.sleep(seconds) diff --git a/qinling_tempest_plugin/services/qinling_client.py b/qinling_tempest_plugin/services/qinling_client.py index ade765e6..dec18dd7 100644 --- a/qinling_tempest_plugin/services/qinling_client.py +++ b/qinling_tempest_plugin/services/qinling_client.py @@ -94,3 +94,6 @@ class QinlingClient(client_base.QinlingClientBase): def get_execution_log(self, execution_id): return self.get('/v1/executions/%s/log' % execution_id, headers={'Accept': 'text/plain'}) + + def get_function_workers(self, function_id): + return self.get_resources('functions/%s/workers' % function_id) diff --git a/qinling_tempest_plugin/tests/api/test_executions.py b/qinling_tempest_plugin/tests/api/test_executions.py index 52546033..29fe8c99 100644 --- a/qinling_tempest_plugin/tests/api/test_executions.py +++ b/qinling_tempest_plugin/tests/api/test_executions.py @@ -16,6 +16,7 @@ import pkg_resources import tempfile import zipfile +import futurist from oslo_serialization import jsonutils from tempest.lib.common.utils import data_utils from tempest.lib import decorators @@ -100,12 +101,13 @@ class ExecutionsTest(base.BaseQinlingTest): input={'name': 'Qinling'}) self.assertEqual(201, resp.status) - self.assertEqual('success', body['status']) execution_id = body['id'] self.addCleanup(self.client.delete_resource, 'executions', execution_id, ignore_notfound=True) + self.assertEqual('success', body['status']) + # Get executions resp, body = self.client.get_resources('executions') @@ -128,12 +130,13 @@ class ExecutionsTest(base.BaseQinlingTest): resp, body = self.client.create_execution(self.function_id, sync=False) self.assertEqual(201, resp.status) - self.assertEqual('running', body['status']) execution_id = body['id'] self.addCleanup(self.client.delete_resource, 'executions', execution_id, ignore_notfound=True) + self.assertEqual('running', body['status']) + self.await_execution_success(execution_id) @decorators.idempotent_id('6cb47b1d-a8c6-48f2-a92f-c4f613c33d1c') @@ -145,11 +148,11 @@ class ExecutionsTest(base.BaseQinlingTest): input={'name': 'OpenStack'}) self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) self.assertEqual('success', body['status']) execution_id = body['id'] - self.addCleanup(self.client.delete_resource, 'executions', - execution_id, ignore_notfound=True) # Get execution log resp, body = self.client.get_execution_log(execution_id) @@ -157,6 +160,35 @@ class ExecutionsTest(base.BaseQinlingTest): self.assertEqual(200, resp.status) self.assertIn('Hello, OpenStack', body) + @decorators.idempotent_id('f22097dc-37db-484d-83d3-3a97e72ec576') + def test_execution_concurrency(self): + self.await_runtime_available(self.runtime_id) + self._create_function(name='test_python_sleep.py') + + def _create_execution(): + resp, body = self.client.create_execution(self.function_id) + return resp, body + + futs = [] + with futurist.GreenThreadPoolExecutor(max_workers=4) as executor: + for _ in range(3): + fut = executor.submit(_create_execution) + futs.append(fut) + for f in futs: + # Wait until we get the response + resp, body = f.result() + + self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) + self.assertEqual('success', body['status']) + + resp, body = self.admin_client.get_function_workers(self.function_id) + + self.assertEqual(200, resp.status) + self.assertEqual(1, len(body['workers'])) + + @decorators.idempotent_id('a948382a-84af-4f0e-ad08-4297345e302c') def test_python_execution_file_limit(self): self.await_runtime_available(self.runtime_id) self._create_function(name='test_python_file_limit.py') @@ -164,6 +196,8 @@ class ExecutionsTest(base.BaseQinlingTest): resp, body = self.client.create_execution(self.function_id) self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) self.assertEqual('failed', body['status']) output = jsonutils.loads(body['output']) @@ -171,6 +205,7 @@ class ExecutionsTest(base.BaseQinlingTest): 'Too many open files', output['output'] ) + @decorators.idempotent_id('bf6f8f35-fa88-469b-8878-7aa85a8ce5ab') def test_python_execution_process_number(self): self.await_runtime_available(self.runtime_id) self._create_function(name='test_python_process_limit.py') @@ -178,6 +213,8 @@ class ExecutionsTest(base.BaseQinlingTest): resp, body = self.client.create_execution(self.function_id) self.assertEqual(201, resp.status) + self.addCleanup(self.client.delete_resource, 'executions', + body['id'], ignore_notfound=True) self.assertEqual('failed', body['status']) output = jsonutils.loads(body['output']) diff --git a/test-requirements.txt b/test-requirements.txt index 41a9edd1..c0dcdc8a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,6 +10,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD testtools>=1.4.0 # MIT tempest>=16.1.0 # Apache-2.0 +futurist>=1.2.0 # Apache-2.0 openstackdocstheme>=1.16.0 # Apache-2.0