Code Structure checker added with tests

Partial-blueprint: package-validation

Change-Id: I070bdd123b89cf4d10b4b664dc212bdfbc0217e1
This commit is contained in:
Krzysztof Szukiełojć 2016-08-22 14:32:47 +02:00
parent 3470094b08
commit 47fc634be5
4 changed files with 401 additions and 0 deletions

View File

View File

@ -0,0 +1,179 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import six
from muranopkgcheck.checkers import yaql_checker
from muranopkgcheck import error
ASSIGMENT_KEY = re.compile('^\$.?[\w]')
def check_req(check, required=True):
return locals()
CODE_STRUCTURE = {
'Try': {
'keywords': {
'Try': check_req('codeblock'),
'Catch': check_req('empty'),
'With': check_req('string'),
'As': check_req('string'),
'Do': check_req('codeblock'),
'Else': check_req('codeblock', False),
'Finally': check_req('codeblock', False)}},
'Parallel': {
'keywords': {
'Limit': check_req('codeblock', False),
'Parallel': check_req('codeblock')},
},
'Repeat': {
'keywords': {
'Repeat': check_req('number'),
'Do': check_req('codeblock')}},
'If': {
'keywords': {
'If': check_req('predicate'),
'Then': check_req('codeblock'),
'Else': check_req('codeblock', False)}
},
'Break': {
'keywords': {
'Break': check_req('empty')}
},
'Return': {
'Return': check_req('expression'),
},
'While': {
'keywords': {
'While': check_req('predicate'),
'Do': check_req('codeblock')}
},
'For': {
'keywords': {
'For': check_req('string'),
'In': check_req('expression'),
'Do': check_req('codeblock')}
},
'Match': {
'keywords': {
'Match': check_req(('expression', 'codeblock')),
'Value': check_req('expression'),
'Default': check_req('codeblock'),
}
},
'Switch': {
'keywords': {
'Switch': check_req(('predicate', 'codeblock')),
'Default': check_req('codeblock')}
},
'Throw': {
'keywords': {
'Throw': check_req('string'),
'Message': check_req('string')}
},
'Continue': {
'keywords': {
'Continue': check_req('empty'),
}
},
}
class CheckCodeStructure(object):
def __init__(self):
self._check_mappings = {
'codeblock': self.codeblock,
'predicate': self.yaql,
'empty': self.empty,
'expression': self.yaql,
'string': self.string,
'number': self.yaql,
}
self._yaql_checker = yaql_checker.YaqlChecker()
def string(self, value):
if not isinstance(value, six.string_types):
yield error.report.E203('Value should be string type '
'"{0}"'.format(value), value)
def empty(self, value):
if value:
yield error.report.E200('There should be no value here '
'"{0}"'.format(value), value)
def yaql(self, value):
if not self._yaql_checker(value):
yield error.report.E202('Not a valid yaql expression '
'"{0}"'.format(value), value)
def codeblock(self, codeblocks):
if isinstance(codeblocks, list):
for block in codeblocks:
yield self._single_block(block)
else:
yield self._single_block(codeblocks)
def _check_assigment(self, block):
key = next(iter(block))
if not isinstance(key, six.string_types) or\
not ASSIGMENT_KEY.match(key):
yield error.report.E201('Not valid variable name '
'"{0}"'.format(key), key)
def _single_block(self, block):
if isinstance(block, dict):
yield self._check_structure(block)
elif isinstance(block, six.string_types):
yield self.yaql(block)
def _run_check(self, check, value):
yield self._check_mappings[check](value)
def _check_structure(self, block):
for key, value in six.iteritems(CODE_STRUCTURE):
if key in block:
break
else:
if len(block.keys()) == 1:
yield self._check_assigment(block)
else:
yield error.report.E200('Wrong code structure/assigment '
'probably typo', block)
return
keywords = value.get('keywords', {})
kset = set(keywords.keys())
block_keys_set = set(block.keys())
for missing in (kset - block_keys_set):
if keywords[missing]['required']:
yield error.report.E200('Missing keyword "{0}" for "{1}" '
'code structure'
.format(missing, key), block)
for unknown in (block_keys_set - kset - {key}):
yield error.report.E201('Unknown keyword "{0}" in "{1}"'
.format(unknown, key), unknown)
for ckey, cvalue in six.iteritems(keywords):
check = cvalue['check']
data = block.get(ckey)
if not data:
continue
if isinstance(check, tuple):
for left, right in six.iteritems(data):
yield self._run_check(check[0], left)
yield self._run_check(check[1], right)
else:
yield self._run_check(check, data)

View File

@ -0,0 +1,67 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaql
ITERATORS_LIMIT = 100
EXPRESSION_MEMORY_QUOTA = 512 * 1024
ENGINE_10_OPTIONS = {
'yaql.limitIterators': ITERATORS_LIMIT,
'yaql.memoryQuota': EXPRESSION_MEMORY_QUOTA,
'yaql.convertSetsToLists': True,
'yaql.convertTuplesToLists': True,
'yaql.iterableDicts': True
}
ENGINE_12_OPTIONS = {
'yaql.limitIterators': ITERATORS_LIMIT,
'yaql.memoryQuota': EXPRESSION_MEMORY_QUOTA,
'yaql.convertSetsToLists': True,
'yaql.convertTuplesToLists': True
}
def _add_operators(engine_factory):
engine_factory.insert_operator(
'>', True, 'is',
yaql.factory.OperatorType.BINARY_LEFT_ASSOCIATIVE, False)
engine_factory.insert_operator(
None, True, ':',
yaql.factory.OperatorType.BINARY_LEFT_ASSOCIATIVE, True)
engine_factory.insert_operator(
':', True, ':',
yaql.factory.OperatorType.PREFIX_UNARY, False)
engine_factory.operators.insert(0, ())
def _create_engine():
engine_factory = yaql.factory.YaqlFactory()
_add_operators(engine_factory=engine_factory)
options = ENGINE_12_OPTIONS
return engine_factory.create(options=options)
class YaqlChecker(object):
def __init__(self):
self._engine = _create_engine()
def __call__(self, data):
try:
self._engine(data)
except yaql.utils.exceptions.YaqlParsingException:
return False
except TypeError:
return False
return True

View File

@ -0,0 +1,155 @@
# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from muranopkgcheck.checkers import code_structure
from muranopkgcheck.tests import test_validator_helpers as helpers
class CodeStructureTest(helpers.BaseValidatorTestClass):
def setUp(self):
super(CodeStructureTest, self).setUp()
self._checker = code_structure.CheckCodeStructure()
def test_simple(self):
SIMPLE_BODY = '$.deploy()'
self.g = self._checker.codeblock(SIMPLE_BODY)
def test_double_assigment(self):
SIMPLE_BODY = [{
'$a': '$.deploy()',
'$b': '$.string()'}]
self.g = self._checker.codeblock(SIMPLE_BODY)
self.assertIn('Wrong code structure/assigment probably typo',
next(self.g).message)
def test_multiline(self):
MULTILINE_BODY = [
'$.deploy()',
{'$res': 'new(YaqlStuff)'},
'$.call($res)',
]
self.g = self._checker.codeblock(MULTILINE_BODY)
def test_bad_assigment(self):
MULTILINE_BODY = [
'$.deploy()',
{1: 'new(YaqlStuff)'},
'$.call($res)',
]
self.g = self._checker.codeblock(MULTILINE_BODY)
self.assertIn('Not valid variable name "1"', next(self.g).message)
def test_bad_assigment_with_double_dollar(self):
MULTILINE_BODY = [
'$.deploy()',
{'$$': 'new(YaqlStuff)'},
'$.call($res)',
]
self.g = self._checker.codeblock(MULTILINE_BODY)
self.assertIn('Not valid variable name "$$"', next(self.g).message)
def test_bad_assigment_case2(self):
MULTILINE_BODY = [
'$.deploy()',
{'res': 'new(YaqlStuff)'},
'$.call($res)',
]
self.g = self._checker.codeblock(MULTILINE_BODY)
p = next(self.g)
self.assertIn('Not valid variable name "res"', p.message)
def test_if(self):
MULTILINE_BODY = [
{'If': '$.deploy()',
'Then': [
'$.w()',
{'$abc': '$a'}]}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
def test_while_missing_do(self):
MULTILINE_BODY = [
{'While': '$.deploy()'}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
p = next(self.g)
self.assertIn('Missing keyword "Do" for "While" code structure',
p.message)
def test_while_unknown_does(self):
MULTILINE_BODY = [
{'While': '$.deploy()',
'Does': ['$.a()', '$.b()']}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
p1 = next(self.g)
p2 = next(self.g)
six.assertCountEqual(self, [
'Unknown keyword "Does" in "While"',
'Missing keyword "Do" for "While" code structure'],
[p1.message, p2.message])
def test_empty_return(self):
MULTILINE_BODY = [
{'Return': ''}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
def test_switch(self):
MULTILINE_BODY = [
{'Switch': {
'$.black()': '$.single()',
'$.blue()': [
'$.b()',
{'$w': 3}]},
'Default': '$.a()'}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
def test_error_under_while_in_if(self):
MULTILINE_BODY = [
{'If': '1',
'Then': {'While': '$.deploy()',
'Do': [
{'www': '$.a()'},
'$.b()']}}
]
self.g = self._checker.codeblock(MULTILINE_BODY)
self.assertIn('Not valid variable name "www"', next(self.g).message)
def test_not_string(self):
MULTILINE_BODY = [
{'Try': [
'$port.deploy()'],
'Catch': '',
'With': 213,
'As': 'what',
'Do': [
'$.string()']}]
self.g = self._checker.codeblock(MULTILINE_BODY)
self.assertIn('Value should be string type "213"',
next(self.g).message)
def test_not_empty(self):
MULTILINE_BODY = [
'$.deploy()',
{'$d': 'new(YaqlStuff)'},
'$.call($res)',
{'Break': 'a'},
]
self.g = self._checker.codeblock(MULTILINE_BODY)
self.assertIn('There should be no value here "a"',
next(self.g).message)