Add UT for runtime API operation

Partially implements: blueprint add-unit-tests
Change-Id: I2bf92b36e4bee273171bd626638a61e313377a44
This commit is contained in:
Lingxian Kong 2017-07-11 13:08:56 +12:00
parent 81139c1c7c
commit 00885b42ff
8 changed files with 176 additions and 24 deletions

View File

@ -214,7 +214,7 @@ class Functions(ResourceList):
class Runtime(Resource):
id = wtypes.text
name = wtypes.text
image = wsme.wsattr(wtypes.text, mandatory=True)
image = wtypes.text
description = wtypes.text
status = wsme.wsattr(wtypes.text, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)

View File

@ -26,6 +26,7 @@ from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
POST_REQUIRED = set(['image'])
UPDATE_ALLOWED = set(['name', 'description', 'image'])
@ -63,6 +64,11 @@ class RuntimesController(rest.RestController):
def post(self, runtime):
params = runtime.to_dict()
if not POST_REQUIRED.issubset(set(params.keys())):
raise exc.InputException(
'Required param is missing. Required: %s' % POST_REQUIRED
)
LOG.info("Creating runtime. [runtime=%s]", params)
params.update({'status': status.CREATING})

View File

@ -15,7 +15,7 @@
import functools
from oslo_config import cfg
from oslo_db import options
from oslo_db import options as db_options
from oslo_db.sqlalchemy import session as db_session
from qinling.db.sqlalchemy import sqlite_lock
@ -23,10 +23,8 @@ from qinling import exceptions as exc
from qinling.utils import thread_local
# Note(dzimine): sqlite only works for basic testing.
options.set_defaults(cfg.CONF, connection="sqlite:///qinling.sqlite")
db_options.set_defaults(cfg.CONF, connection="sqlite:///qinling.sqlite")
_FACADE = None
_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session"

View File

@ -89,7 +89,7 @@ def upgrade():
sa.Column('image', sa.String(length=255), nullable=False),
sa.Column('status', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name'),
sa.UniqueConstraint('image', 'project_id'),
info={"check_ifexists": True}
)

View File

@ -53,13 +53,15 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
class Runtime(model_base.QinlingSecureModelBase):
__tablename__ = 'runtime'
__table_args__ = (
sa.UniqueConstraint('image', 'project_id'),
)
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
image = sa.Column(sa.String(255), nullable=False)
status = sa.Column(sa.String(32), nullable=False)
sa.UniqueConstraint('name')
class Execution(model_base.QinlingSecureModelBase):
__tablename__ = 'execution'

View File

@ -56,22 +56,20 @@ class APITest(base.DbTestCase):
self.mock_ctx.return_value = self.ctx
self.addCleanup(self.patch_ctx.stop)
def assertNotFound(self, url):
def _assertNotFound(self, url):
try:
self.app.get(url, headers={'Accept': 'application/json'})
except webtest_app.AppError as error:
self.assertIn('Bad response: 404 Not Found', str(error))
return
self.fail('Expected 404 Not found but got OK')
def assertUnauthorized(self, url):
def _assertUnauthorized(self, url):
try:
self.app.get(url, headers={'Accept': 'application/json'})
except webtest_app.AppError as error:
self.assertIn('Bad response: 401 Unauthorized', str(error))
return
self.fail('Expected 401 Unauthorized but got OK')

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from qinling.db import api as db_api
from qinling import status
from qinling.tests.unit.api import base
@ -22,16 +24,19 @@ class TestRuntimeController(base.APITest):
def setUp(self):
super(TestRuntimeController, self).setUp()
# Insert a runtime record in db. The data will be removed in clean up.
db_runtime = db_api.create_runtime(
# Insert a runtime record in db. The data will be removed in db clean
# up.
self.db_runtime = db_api.create_runtime(
{
'name': 'test_runtime',
'image': 'python2.7',
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.AVAILABLE
}
)
self.runtime_id = db_runtime.id
self.runtime_id = self.db_runtime.id
def test_get(self):
resp = self.app.get('/v1/runtimes/%s' % self.runtime_id)
@ -46,3 +51,110 @@ class TestRuntimeController(base.APITest):
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, expected)
def test_get_all(self):
resp = self.app.get('/v1/runtimes')
expected = {
'id': self.runtime_id,
"image": "python2.7",
"name": "test_runtime",
"project_id": test_base.DEFAULT_PROJECT_ID,
"status": status.AVAILABLE
}
self.assertEqual(200, resp.status_int)
actual = self._assert_single_item(
resp.json['runtimes'], id=self.runtime_id
)
self._assertDictContainsSubset(actual, expected)
@mock.patch('qinling.rpc.EngineClient.create_runtime')
def test_post(self, mock_create_time):
body = {
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
}
resp = self.app.post_json('/v1/runtimes', body)
self.assertEqual(201, resp.status_int)
self._assertDictContainsSubset(resp.json, body)
mock_create_time.assert_called_once_with(resp.json['id'])
@mock.patch('qinling.rpc.EngineClient.delete_runtime')
def test_delete(self, mock_delete_runtime):
db_runtime = db_api.create_runtime(
{
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.AVAILABLE
}
)
runtime_id = db_runtime.id
resp = self.app.delete('/v1/runtimes/%s' % runtime_id)
self.assertEqual(204, resp.status_int)
mock_delete_runtime.assert_called_once_with(runtime_id)
def test_delete_runtime_with_function_associated(self):
db_api.create_function(
{
'name': self.rand_name('function', prefix='APITest'),
'runtime_id': self.runtime_id,
'code': {},
'entry': 'main.main',
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
}
)
resp = self.app.delete(
'/v1/runtimes/%s' % self.runtime_id, expect_errors=True
)
self.assertEqual(403, resp.status_int)
def test_put_name(self):
resp = self.app.put_json(
'/v1/runtimes/%s' % self.runtime_id, {'name': 'new_name'}
)
self.assertEqual(200, resp.status_int)
self.assertEqual('new_name', resp.json['name'])
def test_put_image_runtime_not_available(self):
db_runtime = db_api.create_runtime(
{
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.CREATING
}
)
runtime_id = db_runtime.id
resp = self.app.put_json(
'/v1/runtimes/%s' % runtime_id, {'image': 'new_image'},
expect_errors=True
)
self.assertEqual(403, resp.status_int)
@mock.patch('qinling.rpc.EngineClient.update_runtime')
def test_put_image(self, mock_update_runtime):
resp = self.app.put_json(
'/v1/runtimes/%s' % self.runtime_id, {'image': 'new_image'}
)
self.assertEqual(200, resp.status_int)
self.assertEqual('new_image', resp.json['image'])
mock_update_runtime.assert_called_once_with(
self.runtime_id,
image='new_image',
pre_image=self.db_runtime.image
)

View File

@ -14,13 +14,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from oslo_config import cfg
from oslotest import base
from qinling import context as auth_context
from qinling.db import api as db_api
from qinling.db import base as db_base
from qinling.db.sqlalchemy import sqlite_lock
from qinling.tests.unit import config as test_config
@ -61,9 +61,51 @@ class BaseTest(base.BaseTestCase):
since Python 3.2
"""
self.assertTrue(
set(child.items()).issubset(set(parent.items()))
set(child.items()).issubset(set(parent.items())),
msg=msg
)
def _assert_single_item(self, items, **props):
return self._assert_multiple_items(items, 1, **props)[0]
def _assert_multiple_items(self, items, count, **props):
def _matches(item, **props):
for prop_name, prop_val in props.items():
v = (item[prop_name] if isinstance(item, dict)
else getattr(item, prop_name))
if v != prop_val:
return False
return True
filtered_items = list(
[item for item in items if _matches(item, **props)]
)
found = len(filtered_items)
if found != count:
self.fail("Wrong number of items found [props=%s, "
"expected=%s, found=%s]" % (props, count, found))
return filtered_items
def rand_name(self, name='', prefix=None):
"""Generate a random name that inclues a random number.
:param str name: The name that you want to include
:param str prefix: The prefix that you want to include
:return: a random name. The format is
'<prefix>-<name>-<random number>'.
(e.g. 'prefixfoo-namebar-154876201')
:rtype: string
"""
randbits = str(random.randint(1, 0x7fffffff))
rand_name = randbits
if name:
rand_name = name + '-' + rand_name
if prefix:
rand_name = prefix + '-' + rand_name
return rand_name
class DbTestCase(BaseTest):
is_heavy_init_called = False
@ -86,10 +128,7 @@ class DbTestCase(BaseTest):
This method runs long initialization once by class
and can be extended by child classes.
"""
# If using sqlite, change to memory. The default is file based.
if cfg.CONF.database.connection.startswith('sqlite'):
cfg.CONF.set_default('connection', 'sqlite://', group='database')
cfg.CONF.set_default('connection', 'sqlite://', group='database')
cfg.CONF.set_default('max_overflow', -1, group='database')
cfg.CONF.set_default('max_pool_size', 1000, group='database')
@ -112,6 +151,3 @@ class DbTestCase(BaseTest):
def _clean_db(self):
db_api.delete_all()
sqlite_lock.cleanup()
if not cfg.CONF.database.connection.startswith('sqlite'):
db_base.get_engine().dispose()