diff --git a/freezer_api/tests/unit/sqlalchemy/v1/__init__.py b/freezer_api/tests/unit/sqlalchemy/v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/freezer_api/tests/unit/sqlalchemy/v1/test_action.py b/freezer_api/tests/unit/sqlalchemy/v1/test_action.py new file mode 100644 index 00000000..82f4ec8d --- /dev/null +++ b/freezer_api/tests/unit/sqlalchemy/v1/test_action.py @@ -0,0 +1,416 @@ +# (c) Copyright 2018 ZTE Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for manipulating Action via the DB API""" + +import copy +from oslo_config import cfg + +from freezer_api.tests.unit import common +from freezer_api.tests.unit.sqlalchemy import base + +CONF = cfg.CONF + + +class DbActionTestCase(base.DbTestCase): + + def setUp(self): + super(DbActionTestCase, self).setUp() + self.fake_action_0 = common.get_fake_action_0() + self.fake_action_2 = common.get_fake_action_2() + self.fake_action_3 = common.get_fake_action_3() + self.freezer_action_0 = self.fake_action_0.get('freezer_action') + self.freezer_action_2 = self.fake_action_2.get('freezer_action') + self.fake_action_id = common.get_fake_action_id() + CONF.enable_v1_api = True + + def tearDown(self): + super(DbActionTestCase, self).tearDown() + CONF.enable_v1_api = False + + def test_add_and_get_action(self): + action_doc = copy.deepcopy(self.fake_action_0) + action_id = self.dbapi.add_action(user_id=self.fake_action_0. + get('user_id'), + doc=action_doc) + self.assertIsNotNone(action_id) + + result = self.dbapi.get_action(user_id=self.fake_action_0. + get('user_id'), + action_id=action_id) + + self.assertIsNotNone(result) + + self.assertEqual(result.get('max_retries'), + self.fake_action_0.get('max_retries')) + + self.assertEqual(result.get('max_retries_interval'), + self.fake_action_0.get('max_retries_interval')) + + freezer_action = result.get('freezer_action') + + self.assertEqual(freezer_action.get('action'), + self.freezer_action_0.get('action')) + + self.assertEqual(freezer_action.get('backup_name'), + self.freezer_action_0.get('backup_name')) + + self.assertEqual(freezer_action.get('container'), + self.freezer_action_0.get('container')) + + self.assertEqual(freezer_action.get('src_file'), + self.freezer_action_0.get('src_file')) + + self.assertEqual(freezer_action.get('mode'), + self.freezer_action_0.get('mode')) + + def test_add_and_delete_action(self): + action_doc = copy.deepcopy(self.fake_action_0) + action_id = self.dbapi.add_action(user_id=self.fake_action_0. + get('user_id'), + doc=action_doc) + self.assertIsNotNone(action_id) + + result = self.dbapi.delete_action(user_id=self.fake_action_0. + get('user_id'), + action_id=action_id) + + self.assertIsNotNone(result) + + self.assertEqual(result, action_id) + + result = self.dbapi.get_action(user_id=self.fake_action_0. + get('user_id'), + action_id=action_id) + + self.assertEqual(len(result), 0) + + def test_add_and_update_action(self): + action_doc = copy.deepcopy(self.fake_action_0) + action_id = self.dbapi.add_action(user_id=self.fake_action_0. + get('user_id'), + doc=action_doc) + self.assertIsNotNone(action_id) + + patch_doc = copy.deepcopy(self.fake_action_2) + + result = self.dbapi.update_action(user_id=self.fake_action_2. + get('user_id'), + patch_doc=patch_doc, + action_id=action_id) + + self.assertIsNotNone(result) + + self.assertEqual(result, action_id) + + result = self.dbapi.get_action(user_id=self.fake_action_2. + get('user_id'), + action_id=action_id) + + self.assertEqual(result.get('max_retries'), + self.fake_action_2.get('max_retries')) + + self.assertEqual(result.get('max_retries_interval'), + self.fake_action_2.get('max_retries_interval')) + + freezer_action = result.get('freezer_action') + + self.assertEqual(freezer_action.get('action'), + self.freezer_action_2.get('action')) + + def test_add_and_replace_action(self): + action_doc = copy.deepcopy(self.fake_action_0) + action_id = self.dbapi.add_action(user_id=self.fake_action_0. + get('user_id'), + doc=action_doc) + self.assertIsNotNone(action_id) + + patch_doc = copy.deepcopy(self.fake_action_2) + + result = self.dbapi.replace_action(user_id=self.fake_action_2. + get('user_id'), + doc=patch_doc, + action_id=action_id) + + self.assertIsNotNone(result) + + self.assertEqual(result, action_id) + result = self.dbapi.get_action(user_id=self.fake_action_2. + get('user_id'), + action_id=action_id) + + self.assertEqual(result.get('max_retries'), + self.fake_action_2.get('max_retries')) + + self.assertEqual(result.get('max_retries_interval'), + self.fake_action_2.get('max_retries_interval')) + + freezer_action = result.get('freezer_action') + + self.assertEqual(freezer_action.get('action'), + self.freezer_action_2.get('action')) + patch_doc1 = copy.deepcopy(self.fake_action_0) + result = self.dbapi.replace_action(user_id=self.fake_action_2. + get('user_id'), + doc=patch_doc1, + action_id=self.fake_action_id) + self.assertIsNotNone(result) + result = self.dbapi.get_action(user_id=self.fake_action_2. + get('user_id'), + action_id=self.fake_action_id) + self.assertEqual(result.get('action_id'), self.fake_action_id) + + def test_add_and_search_action(self): + count = 0 + actionids = [] + while(count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=10, + offset=0) + + self.assertIsNotNone(result) + + self.assertEqual(len(result), 10) + + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(actionids[index], actionmap['action_id']) + + def test_action_list_with_search_match_and_match_not(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + if count in [4, 12]: + doc['freezer_action']['mode'] = 'nova' + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match_not': [{'mode': 'nova'}], + 'match': [{'max_retries': 10}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(10, actionmap['max_retries']) + self.assertEqual('fs', + actionmap['freezer_action']['mode']) + + def test_action_list_with_search_match_list(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + if count in [4, 12]: + doc['freezer_action']['mode'] = 'nova' + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match': [{'max_retries': 10}, + {'mode': 'nova'}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(10, actionmap['max_retries']) + self.assertEqual('nova', + actionmap['freezer_action']['mode']) + + def test_action_list_with_search_match_not_list(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + if count in [4, 12]: + doc['freezer_action']['mode'] = 'nova' + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match_not': + [{'mode': 'nova'}, + {'max_retries': 5}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(10, actionmap['max_retries']) + self.assertEqual('fs', + actionmap['freezer_action']['mode']) + + def test_action_list_with_search_with_all_opt_one_match(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match': [{'_all': '[{"max_retries": 10}]'}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 5) + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(10, actionmap['max_retries']) + + def test_action_list_with_search_with_all_opt_two_matchs(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + if count in [4, 12]: + doc['freezer_action']['mode'] = 'nova' + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match': + [{'_all': + '[{"max_retries": 10}, ' + '{"mode": "nova"}]'}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + actionmap = result[index] + self.assertEqual(10, actionmap['max_retries']) + self.assertEqual('nova', + actionmap['freezer_action']['mode']) + + def test_action_list_with_search_with_error_all_opt_return_alltuples(self): + count = 0 + actionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_action_3) + action_id = common.get_fake_action_id() + doc['action_id'] = action_id + if count in [0, 4, 8, 12, 16]: + doc['max_retries'] = 10 + result = self.dbapi.add_action(user_id=self.fake_action_3. + get('user_id'), + doc=doc) + + self.assertIsNotNone(result) + self.assertEqual(result, action_id) + actionids.append(action_id) + count += 1 + + search_opt = {'match': [{'_all': '{"max_retries": 10}'}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + search_opt = {'match': [{'_all': 'max_retries=10'}]} + result = self.dbapi.search_action(user_id=self.fake_action_3. + get('user_id'), + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) diff --git a/freezer_api/tests/unit/sqlalchemy/v1/test_backup.py b/freezer_api/tests/unit/sqlalchemy/v1/test_backup.py new file mode 100644 index 00000000..e4f7bc47 --- /dev/null +++ b/freezer_api/tests/unit/sqlalchemy/v1/test_backup.py @@ -0,0 +1,108 @@ +# (c) Copyright 2018 ZTE Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for manipulating Backup via the DB API""" + + +import copy +from oslo_config import cfg + +from freezer_api.tests.unit import common +from freezer_api.tests.unit.sqlalchemy import base + +CONF = cfg.CONF + + +class DbBackupTestCase(base.DbTestCase): + + def setUp(self): + super(DbBackupTestCase, self).setUp() + self.fake_backup_metadata = common.get_fake_backup_metadata() + self.fake_user_id = common.fake_data_0_user_id + self.fake_user_name = common.fake_data_0_user_name + CONF.enable_v1_api = True + + def tearDown(self): + super(DbBackupTestCase, self).tearDown() + CONF.enable_v1_api = False + + def test_add_and_get_backup(self): + backup_doc = copy.deepcopy(self.fake_backup_metadata) + backup_id = self.dbapi.add_backup(user_id=self.fake_user_id, + user_name=self.fake_user_name, + doc=backup_doc) + self.assertIsNotNone(backup_id) + + result = self.dbapi.get_backup(user_id=self.fake_user_id, + backup_id=backup_id) + self.assertIsNotNone(result) + + self.assertEqual(result.get('user_name'), + self.fake_user_name) + + self.assertEqual(result.get('client_id'), + self.fake_backup_metadata.get('client_id')) + + self.assertEqual(result.get('user_id'), + self.fake_user_id) + + backup_metadata = result.get('backup_metadata') + + self.assertEqual(backup_metadata, + self.fake_backup_metadata) + + def test_add_and_delete_backup(self): + backup_doc = copy.deepcopy(self.fake_backup_metadata) + backup_id = self.dbapi.add_backup(user_id=self.fake_user_id, + user_name=self.fake_user_name, + doc=backup_doc) + + self.assertIsNotNone(backup_id) + + result = self.dbapi.delete_backup(user_id=self.fake_user_id, + backup_id=backup_id) + + self.assertIsNotNone(result) + + self.assertEqual(result, backup_id) + + result = self.dbapi.get_backup(user_id=self.fake_user_id, + backup_id=backup_id) + self.assertEqual(len(result), 0) + + def test_add_and_search_backup(self): + count = 0 + backupids = [] + while (count < 20): + backup_doc = copy.deepcopy(self.fake_backup_metadata) + backup_id = self.dbapi.add_backup(user_id=self.fake_user_id, + user_name=self.fake_user_name, + doc=backup_doc) + + self.assertIsNotNone(backup_id) + backupids.append(backup_id) + count += 1 + + result = self.dbapi.search_backup(user_id=self.fake_user_id, + limit=10, + offset=0) + + self.assertIsNotNone(result) + + self.assertEqual(len(result), 10) + + for index in range(len(result)): + backupmap = result[index] + self.assertEqual(backupids[index], backupmap['backup_id']) diff --git a/freezer_api/tests/unit/sqlalchemy/v1/test_client.py b/freezer_api/tests/unit/sqlalchemy/v1/test_client.py new file mode 100644 index 00000000..df62790b --- /dev/null +++ b/freezer_api/tests/unit/sqlalchemy/v1/test_client.py @@ -0,0 +1,330 @@ +# (c) Copyright 2018 ZTE Corporation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Tests for manipulating Client via the DB API""" + + +import copy +from oslo_config import cfg + +from freezer_api.tests.unit import common +from freezer_api.tests.unit.sqlalchemy import base + +CONF = cfg.CONF + + +class DbClientTestCase(base.DbTestCase): + + def setUp(self): + super(DbClientTestCase, self).setUp() + self.fake_client_0 = common.get_fake_client_0() + self.fake_client_doc = self.fake_client_0.get('client') + self.fake_user_id = self.fake_client_0.get('user_id') + CONF.enable_v1_api = True + + def tearDown(self): + super(DbClientTestCase, self).tearDown() + CONF.enable_v1_api = False + + def test_add_and_get_client(self): + client_doc = copy.deepcopy(self.fake_client_doc) + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + + result = self.dbapi.get_client(user_id=self.fake_user_id, + client_id=client_id) + + self.assertIsNotNone(result) + + self.assertEqual(len(result), 1) + + self.assertEqual(result[0].get('user_id'), + self.fake_user_id) + + client = result[0].get('client') + + self.assertEqual(client.get('client_id'), + self.fake_client_doc.get('client_id')) + + self.assertEqual(client.get('description'), + self.fake_client_doc.get('description')) + + def test_add_and_delete_client(self): + client_doc = copy.deepcopy(self.fake_client_doc) + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + + result = self.dbapi.delete_client(user_id=self.fake_user_id, + client_id=client_id) + + self.assertIsNotNone(result) + + self.assertEqual(result, client_id) + + result = self.dbapi.get_client(user_id=self.fake_user_id, + client_id=client_id) + + self.assertEqual(len(result), 0) + + def test_add_and_search_client(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=10, + offset=0) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 10) + + for index in range(len(result)): + clientmap = result[index] + clientid = clientmap['client'].get('client_id') + self.assertEqual(clientids[index], clientid) + + def test_add_and_search_client_with_search_match_and_match_not(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['description'] = "tecs" + if count in [4, 12]: + client_doc['hostname'] = 'node2' + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match_not': [{'hostname': 'node2'}], + 'match': [{'description': 'tecs'}]} + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + + for index in range(len(result)): + clientmap = result[index] + hostname = clientmap['client'].get('hostname') + description = clientmap['client'].get('description') + self.assertEqual('node1', hostname) + self.assertEqual('tecs', description) + + def test_add_and_search_client_with_search_match_list(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['description'] = "tecs" + if count in [4, 12]: + client_doc['hostname'] = 'node2' + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match': [{'hostname': 'node2'}, + {'description': 'tecs'}]} + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + + for index in range(len(result)): + clientmap = result[index] + hostname = clientmap['client'].get('hostname') + description = clientmap['client'].get('description') + self.assertEqual('node2', hostname) + self.assertEqual('tecs', description) + + def test_add_and_search_client_with_search_match_not_list(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['description'] = "tecs" + if count in [4, 12]: + client_doc['hostname'] = 'node2' + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match_not': [{'hostname': 'node2'}, + {'description': 'some usefule text here'}]} + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + + for index in range(len(result)): + clientmap = result[index] + hostname = clientmap['client'].get('hostname') + description = clientmap['client'].get('description') + self.assertEqual('node1', hostname) + self.assertEqual('tecs', description) + + def test_add_and_search_client_with_all_opt_one_match(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['description'] = "tecs" + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match': [{'_all': '[{"description": "tecs"}]'}]} + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 5) + + for index in range(len(result)): + clientmap = result[index] + description = clientmap['client'].get('description') + self.assertEqual('tecs', description) + + def test_add_and_search_client_with_all_opt_two_match(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['hostname'] = "node2" + if count in [4, 12]: + client_doc['description'] = "tecs" + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match': + [{'_all': + '[{"description": "tecs"}, ' + '{"hostname": "node2"}]'}]} + + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + + for index in range(len(result)): + clientmap = result[index] + description = clientmap['client'].get('description') + hostname = clientmap['client'].get('hostname') + self.assertEqual('tecs', description) + self.assertEqual('node2', hostname) + + def test_add_and_search_client_with_error_all_opt_return_alltuples(self): + count = 0 + clientids = [] + while (count < 20): + client_doc = copy.deepcopy(self.fake_client_doc) + clientid = common.get_fake_client_id() + client_doc['client_id'] = clientid + client_doc['hostname'] = "node1" + if count in [0, 4, 8, 12, 16]: + client_doc['hostname'] = "node2" + + client_id = self.dbapi.add_client(user_id=self.fake_user_id, + doc=client_doc) + self.assertIsNotNone(client_id) + self.assertEqual(clientid, client_id) + clientids.append(client_id) + count += 1 + + search_opt = {'match': [{'_all': '{"hostname": "node2"}'}]} + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + + search_opt = {'match': [{'_all': 'hostname=node2'}]} + result = self.dbapi.get_client(user_id=self.fake_user_id, + limit=20, + offset=0, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) diff --git a/freezer_api/tests/unit/sqlalchemy/v1/test_job.py b/freezer_api/tests/unit/sqlalchemy/v1/test_job.py new file mode 100644 index 00000000..bc1c9290 --- /dev/null +++ b/freezer_api/tests/unit/sqlalchemy/v1/test_job.py @@ -0,0 +1,405 @@ +# -*- encoding: utf-8 -*- +# (c) Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Tests for manipulating job via the DB API""" + +import copy +from oslo_config import cfg + +from freezer_api.tests.unit import common +from freezer_api.tests.unit.sqlalchemy import base + +CONF = cfg.CONF + + +class DbJobTestCase(base.DbTestCase): + + def setUp(self): + super(DbJobTestCase, self).setUp() + self.fake_job_0 = common.get_fake_job_0() + self.fake_job_0.pop('job_id') + self.fake_job_2 = common.get_fake_job_2() + self.fake_job_2.pop('job_id') + self.fake_job_3 = common.get_fake_job_3() + self.fake_job_3.pop('job_id') + self.fake_job_id = common.get_fake_job_id() + CONF.enable_v1_api = True + + def tearDown(self): + super(DbJobTestCase, self).tearDown() + CONF.enable_v1_api = False + + def test_add_and_get_job(self): + job_doc = copy.deepcopy(self.fake_job_0) + job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'), + doc=job_doc) + self.assertIsNotNone(job_id) + result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'), + job_id=job_id) + self.assertIsNotNone(result) + self.assertEqual(result.get('client_id'), + self.fake_job_0.get('client_id')) + self.assertEqual(result.get('description'), + self.fake_job_0.get('description')) + self.assertEqual(result.get('job_actions'), + self.fake_job_0.get('job_actions')) + self.assertEqual(result.get('user_id'), + self.fake_job_0.get('user_id')) + self.assertEqual(result.get('job_schedule').get('status'), + self.fake_job_0.get('job_schedule').get('status')) + self.assertEqual(result.get('job_schedule').get('result'), + self.fake_job_0.get('job_schedule').get('result')) + self.assertEqual(result.get('job_schedule').get('schedule_date'), + self.fake_job_0.get('job_schedule'). + get('schedule_date')) + self.assertEqual(result.get('job_schedule'). + get('schedule_interval'), + self.fake_job_0.get('job_schedule'). + get('schedule_interval')) + + def test_add_and_delete_job(self): + job_doc = copy.deepcopy(self.fake_job_0) + job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'), + doc=job_doc) + self.assertIsNotNone(job_id) + + result = self.dbapi.delete_job(user_id=self.fake_job_0.get('user_id'), + job_id=job_id) + + self.assertIsNotNone(result) + self.assertEqual(result, job_id) + result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'), + job_id=job_id) + self.assertEqual(len(result), 0) + + def test_add_and_update_job(self): + job_doc = copy.deepcopy(self.fake_job_0) + job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'), + doc=job_doc) + self.assertIsNotNone(job_id) + + patch_doc = copy.deepcopy(self.fake_job_2) + + result = self.dbapi.update_job(user_id=self.fake_job_2.get('user_id'), + job_id=job_id, + patch_doc=patch_doc) + self.assertIsNotNone(result) + self.assertEqual(result, 0) + + result = self.dbapi.get_job(user_id=self.fake_job_0.get('user_id'), + job_id=job_id) + self.assertIsNotNone(result) + self.assertEqual(result.get('client_id'), + self.fake_job_2.get('client_id')) + self.assertEqual(result.get('description'), + self.fake_job_2.get('description')) + self.assertEqual(result.get('job_schedule'). + get('schedule_interval'), + self.fake_job_2.get('job_schedule'). + get('schedule_interval')) + self.assertEqual(result.get('job_actions'), + self.fake_job_2.get('job_actions')) + + def test_add_and_replace_job(self): + job_doc = copy.deepcopy(self.fake_job_0) + job_id = self.dbapi.add_job(user_id=self.fake_job_0.get('user_id'), + doc=job_doc) + self.assertIsNotNone(job_id) + + patch_doc = copy.deepcopy(self.fake_job_2) + result = self.dbapi.replace_job(user_id=self. + fake_job_2.get('user_id'), + job_id=job_id, + doc=patch_doc) + + self.assertIsNotNone(result) + self.assertEqual(result, job_id) + + result = self.dbapi.get_job(user_id=self.fake_job_2.get('user_id'), + job_id=job_id) + + self.assertIsNotNone(result) + self.assertEqual(result.get('client_id'), + self.fake_job_2.get('client_id')) + self.assertEqual(result.get('description'), + self.fake_job_2.get('description')) + self.assertEqual(result.get('job_schedule'). + get('schedule_interval'), + self.fake_job_2.get('job_schedule'). + get('schedule_interval')) + self.assertEqual(result.get('job_actions'), + self.fake_job_2.get('job_actions')) + + patch_doc1 = copy.deepcopy(self.fake_job_0) + result = self.dbapi.replace_job(user_id=self. + fake_job_2.get('user_id'), + job_id=self.fake_job_id, + doc=patch_doc1) + self.assertIsNotNone(result) + result = self.dbapi.get_job(user_id=self.fake_job_2.get('user_id'), + job_id=self.fake_job_id) + + self.assertEqual(result.get('job_id'), self.fake_job_id) + + def test_job_list_without_search(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=10) + + self.assertIsNotNone(result) + + self.assertEqual(len(result), 10) + + for index in range(len(result)): + jobmap = result[index] + self.assertEqual(jobids[index], jobmap['job_id']) + + def test_job_list_with_search_match_and_match_not(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + if count in [4, 12]: + doc['job_schedule']['schedule_interval'] = '10 days' + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match_not': [{'schedule_interval': '10 days'}], + 'match': [{'client_id': 'node1'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + self.assertEqual('14 days', + jobmap['job_schedule']['schedule_interval']) + + def test_job_list_with_search_match_list(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + if count in [4, 12]: + doc['job_schedule']['schedule_interval'] = '10 days' + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match': [{'client_id': 'node1'}, + {'schedule_interval': '10 days'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + self.assertEqual('10 days', + jobmap['job_schedule']['schedule_interval']) + + def test_job_list_with_search_match_not_list(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + if count in [4, 12]: + doc['job_schedule']['schedule_interval'] = '10 days' + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match_not': + [{'schedule_interval': '10 days'}, + {'client_id': 'mytenantid_myhostname2'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + self.assertEqual('14 days', + jobmap['job_schedule']['schedule_interval']) + + def test_job_list_with_search_with_all_opt_one_match(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match': [{'_all': '[{"client_id": "node1"}]'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 5) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + + def test_job_list_with_search_with_all_opt_two_matches(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + if count in [4, 12]: + doc['job_schedule']['schedule_interval'] = '10 days' + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match': + [{'_all': + '[{"client_id": "node1"}, ' + '{"schedule_interval": "10 days"}]'}]} + + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + self.assertEqual('10 days', + jobmap['job_schedule']['schedule_interval']) + + def test_job_list_with_search_with_error_all_opt_return_alltuples(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + search_opt = {'match': [{'_all': '{"client_id": "node1"}'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + search_opt = {'match': [{'_all': 'client_id=node1'}]} + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + + def test_job_list_with_search_and_offset_and_limit(self): + count = 0 + jobids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_job_3) + if count in [0, 4, 8, 12, 16]: + doc['client_id'] = "node1" + job_id = self.dbapi.add_job(user_id=self.fake_job_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(job_id) + jobids.append(job_id) + count += 1 + # There are 5 records. + search_opt = {'match': [{'_all': '[{"client_id": "node1"}]'}]} + # First, we can get 3 tuples + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=0, + limit=3, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) + # Second, we can get 2 tuples + result = self.dbapi.search_job(user_id=self.fake_job_3. + get('user_id'), + offset=3, + limit=3, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + jobmap = result[index] + self.assertEqual('node1', jobmap['client_id']) diff --git a/freezer_api/tests/unit/sqlalchemy/v1/test_session.py b/freezer_api/tests/unit/sqlalchemy/v1/test_session.py new file mode 100644 index 00000000..8b57c909 --- /dev/null +++ b/freezer_api/tests/unit/sqlalchemy/v1/test_session.py @@ -0,0 +1,407 @@ +# -*- encoding: utf-8 -*- +# (c) Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Tests for manipulating job via the DB API""" + +import copy +from oslo_config import cfg + +from freezer_api.tests.unit import common +from freezer_api.tests.unit.sqlalchemy import base + +CONF = cfg.CONF + + +class DbSessionTestCase(base.DbTestCase): + + def setUp(self): + super(DbSessionTestCase, self).setUp() + self.fake_session_0 = common.get_fake_session_0() + self.fake_session_0.pop('session_id') + self.fake_session_2 = common.get_fake_session_2() + self.fake_session_2.pop('session_id') + self.fake_session_3 = common.get_fake_session_3() + self.fake_session_3.pop('session_id') + self.fake_session_id = common.get_fake_session_id() + CONF.enable_v1_api = True + + def tearDown(self): + super(DbSessionTestCase, self).tearDown() + CONF.enable_v1_api = False + + def test_add_and_get_session(self): + session_doc = copy.deepcopy(self.fake_session_0) + session_id = self.dbapi.add_session(user_id=self.fake_session_0. + get('user_id'), + doc=session_doc) + self.assertIsNotNone(session_id) + result = self.dbapi.get_session(user_id=self.fake_session_0. + get('user_id'), + session_id=session_id) + self.assertIsNotNone(result) + self.assertEqual(result.get('session_tag'), + self.fake_session_0.get('session_tag')) + self.assertEqual(result.get('description'), + self.fake_session_0.get('description')) + self.assertEqual(result.get('hold_off'), + self.fake_session_0.get('hold_off')) + self.assertEqual(result.get('time_start'), + self.fake_session_0.get('time_start')) + self.assertEqual(result.get('time_end'), + self.fake_session_0.get('time_end')) + self.assertEqual(result.get('user_id'), + self.fake_session_0.get('user_id')) + self.assertEqual(result.get('schedule'), + self.fake_session_0.get('schedule')) + + def test_add_and_delete_session(self): + session_doc = copy.deepcopy(self.fake_session_0) + session_id = self.dbapi.add_session(user_id=self.fake_session_0. + get('user_id'), + doc=session_doc) + self.assertIsNotNone(session_id) + + result = self.dbapi.delete_session(user_id=self.fake_session_0. + get('user_id'), + session_id=session_id) + + self.assertIsNotNone(result) + self.assertEqual(result, session_id) + result = self.dbapi.get_session(user_id=self.fake_session_0. + get('user_id'), + session_id=session_id) + self.assertEqual(len(result), 0) + + def test_add_and_update_session(self): + session_doc = copy.deepcopy(self.fake_session_0) + session_id = self.dbapi.add_session(user_id=self.fake_session_0. + get('user_id'), + doc=session_doc) + self.assertIsNotNone(session_id) + + patch_doc = copy.deepcopy(self.fake_session_2) + result = self.dbapi.update_session(user_id=self.fake_session_0. + get('user_id'), + session_id=session_id, + patch_doc=patch_doc) + self.assertIsNotNone(result) + self.assertEqual(result, 0) + + result = self.dbapi.get_session(user_id=self.fake_session_0. + get('user_id'), + session_id=session_id) + self.assertIsNotNone(result) + self.assertEqual(result.get('description'), + self.fake_session_2.get('description')) + self.assertEqual(result.get('hold_off'), + self.fake_session_2.get('hold_off')) + self.assertEqual(result.get('schedule').get('schedule_date'), + self.fake_session_2.get('schedule'). + get('schedule_date')) + + def test_add_and_replace_session(self): + session_doc = copy.deepcopy(self.fake_session_0) + session_id = self.dbapi.add_session(user_id=self.fake_session_0. + get('user_id'), + doc=session_doc) + self.assertIsNotNone(session_id) + + patch_doc = copy.deepcopy(self.fake_session_2) + result = self.dbapi.replace_session(user_id=self. + fake_session_2.get('user_id'), + session_id=session_id, + doc=patch_doc) + + self.assertIsNotNone(result) + self.assertEqual(result, session_id) + + result = self.dbapi.get_session(user_id=self.fake_session_2. + get('user_id'), + session_id=session_id) + + self.assertIsNotNone(result) + self.assertEqual(result.get('description'), + self.fake_session_2.get('description')) + self.assertEqual(result.get('hold_off'), + self.fake_session_2.get('hold_off')) + self.assertEqual(result.get('schedule').get('schedule_date'), + self.fake_session_2.get('schedule'). + get('schedule_date')) + + patch_doc1 = copy.deepcopy(self.fake_session_0) + result = self.dbapi.replace_session(user_id=self. + fake_session_0.get('user_id'), + session_id=self.fake_session_id, + doc=patch_doc1) + self.assertIsNotNone(result) + result = self.dbapi.get_session(user_id=self.fake_session_2. + get('user_id'), + session_id=self.fake_session_id) + self.assertEqual(result.get('session_id'), self.fake_session_id) + + def test_session_list_without_search(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=10) + self.assertIsNotNone(result) + self.assertEqual(len(result), 10) + + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(sessionids[index], sessionmap['session_id']) + + def test_session_list_with_search_match_and_match_not(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + if count in [4, 12]: + doc['schedule']['schedule_date'] = \ + '2018-12-12T00:00:00' + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + search_opt = {'match': [{'hold_off': 100}], + 'match_not': + [{'schedule_date': '2018-12-12T00:00:00'}] + } + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + self.assertEqual('2018-11-14T16:20:00', + sessionmap['schedule']['schedule_date']) + + def test_session_list_with_search_match_list(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + if count in [4, 12]: + doc['schedule']['schedule_date'] = '2018-12-12T00:00:00' + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + search_opt = {'match': [{'hold_off': 100}, + {'schedule_date': '2018-12-12T00:00:00'}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + self.assertEqual('2018-12-12T00:00:00', + sessionmap['schedule']['schedule_date']) + + def test_session_list_with_search_match_not_list(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + if count in [4, 12]: + doc['schedule']['schedule_date'] = '2018-12-12T00:00:00' + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + search_opt = {'match_not': [{'schedule_date': '2018-11-14T16:20:00'}, + {'hold_off': 60}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + self.assertEqual('2018-12-12T00:00:00', + sessionmap['schedule']['schedule_date']) + + def test_session_list_with_search_with_all_opt_one_match(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + + search_opt = {'match': [{'_all': '[{"hold_off": 100}]'}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 5) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + + def test_session_list_with_search_with_all_opt_two_matches(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + if count in [4, 12]: + doc['schedule']['schedule_date'] = '2018-12-12T00:00:00' + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + + search_opt = {'match': [{'_all': '[{"hold_off": 100},' + '{"schedule_date": ' + '"2018-12-12T00:00:00"}]'}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + self.assertEqual('2018-12-12T00:00:00', + sessionmap['schedule']['schedule_date']) + + def test_session_list_with_search_error_all_opt_return_alltuples(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + + search_opt = {'match': [{'_all': '{"hold_off": 100}'}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + + search_opt = {'match': [{'_all': 'hold_off=100'}]} + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=20, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 20) + + def test_session_list_with_search_and_offset_and_limit(self): + count = 0 + sessionids = [] + while (count < 20): + doc = copy.deepcopy(self.fake_session_3) + if count in [0, 4, 8, 12, 16]: + doc['hold_off'] = 100 + session_id = self.dbapi.add_session(user_id=self.fake_session_3. + get('user_id'), + doc=doc) + self.assertIsNotNone(session_id) + sessionids.append(session_id) + count += 1 + # There are 5 records. + search_opt = {'match': [{'_all': '[{"hold_off": 100}]'}]} + # First, we can get 3 tuples + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=0, + limit=3, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 3) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) + # Second, we can get 2 tuples + result = self.dbapi.search_session(user_id=self.fake_session_3. + get('user_id'), + offset=3, + limit=3, + search=search_opt) + + self.assertIsNotNone(result) + self.assertEqual(len(result), 2) + for index in range(len(result)): + sessionmap = result[index] + self.assertEqual(100, sessionmap['hold_off']) diff --git a/freezer_api/tests/unit/sqlalchemy/v2/__init__.py b/freezer_api/tests/unit/sqlalchemy/v2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/freezer_api/tests/unit/sqlalchemy/test_action.py b/freezer_api/tests/unit/sqlalchemy/v2/test_action.py similarity index 98% rename from freezer_api/tests/unit/sqlalchemy/test_action.py rename to freezer_api/tests/unit/sqlalchemy/v2/test_action.py index 9572d013..35dbc653 100644 --- a/freezer_api/tests/unit/sqlalchemy/test_action.py +++ b/freezer_api/tests/unit/sqlalchemy/v2/test_action.py @@ -175,8 +175,11 @@ class DbActionTestCase(base.DbTestCase): doc=patch_doc1, action_id=self.fake_action_id) self.assertIsNotNone(result) - - self.assertEqual(result, self.fake_action_id) + result = self.dbapi.get_action(project_id=self.fake_project_id, + user_id=self.fake_action_2. + get('user_id'), + action_id=self.fake_action_id) + self.assertEqual(result.get('action_id'), self.fake_action_id) def test_add_and_search_action(self): count = 0 diff --git a/freezer_api/tests/unit/sqlalchemy/test_backup.py b/freezer_api/tests/unit/sqlalchemy/v2/test_backup.py similarity index 100% rename from freezer_api/tests/unit/sqlalchemy/test_backup.py rename to freezer_api/tests/unit/sqlalchemy/v2/test_backup.py diff --git a/freezer_api/tests/unit/sqlalchemy/test_client.py b/freezer_api/tests/unit/sqlalchemy/v2/test_client.py similarity index 100% rename from freezer_api/tests/unit/sqlalchemy/test_client.py rename to freezer_api/tests/unit/sqlalchemy/v2/test_client.py diff --git a/freezer_api/tests/unit/sqlalchemy/test_job.py b/freezer_api/tests/unit/sqlalchemy/v2/test_job.py similarity index 98% rename from freezer_api/tests/unit/sqlalchemy/test_job.py rename to freezer_api/tests/unit/sqlalchemy/v2/test_job.py index e0c8d8c6..6b854e29 100644 --- a/freezer_api/tests/unit/sqlalchemy/test_job.py +++ b/freezer_api/tests/unit/sqlalchemy/v2/test_job.py @@ -156,8 +156,10 @@ class DbJobTestCase(base.DbTestCase): doc=patch_doc1, project_id=self.fake_project_id) self.assertIsNotNone(result) - - self.assertEqual(result, self.fake_job_id) + result = self.dbapi.get_job(project_id=self.fake_project_id, + user_id=self.fake_job_2.get('user_id'), + job_id=self.fake_job_id) + self.assertEqual(result.get('job_id'), self.fake_job_id) def test_job_list_without_search(self): count = 0 diff --git a/freezer_api/tests/unit/sqlalchemy/test_session.py b/freezer_api/tests/unit/sqlalchemy/v2/test_session.py similarity index 98% rename from freezer_api/tests/unit/sqlalchemy/test_session.py rename to freezer_api/tests/unit/sqlalchemy/v2/test_session.py index ba07bf6d..ef3e6feb 100644 --- a/freezer_api/tests/unit/sqlalchemy/test_session.py +++ b/freezer_api/tests/unit/sqlalchemy/v2/test_session.py @@ -166,8 +166,12 @@ class DbSessionTestCase(base.DbTestCase): project_id=self.fake_session_2. get('project_id')) self.assertIsNotNone(result) - - self.assertEqual(result, self.fake_session_id) + result = self.dbapi.get_session(project_id=self.fake_session_2. + get('project_id'), + user_id=self.fake_session_2. + get('user_id'), + session_id=self.fake_session_id) + self.assertEqual(result.get('session_id'), self.fake_session_id) def test_session_list_without_search(self): count = 0