Refactoed config schema storage: store data in YAML; generate compact version records

This commit is contained in:
Maxim Kulkin 2013-11-12 16:15:50 +04:00
parent a2cfa5bcbf
commit 5637f040f6
6 changed files with 349 additions and 95 deletions

View File

@ -14,3 +14,4 @@ recordtype==1.1
paramiko==1.11.0
oslo.config==1.2.1
requests==1.2.0
PyYAML==3.10

View File

@ -138,7 +138,7 @@ class OpenstackComponent(Service):
# Apply defaults
if schema:
for parameter in filter(lambda p: p.default, schema.parameters):
if parameter.section == 'DEFAULT':
if not parameter.section or parameter.section == 'DEFAULT':
config.set_default(parameter.name, parameter.default)
else:
config.set_default(

View File

@ -1,5 +1,7 @@
from contextlib import contextmanager
import re
import os.path
import yaml
from rubick.common import Issue, MarkedIssue, Mark, Version, find, index
from rubick.exceptions import RubickException
@ -9,67 +11,8 @@ class SchemaError(RubickException):
pass
class SchemaVersionRecord(object):
# checkpoint's data is version number
def __init__(self, version, checkpoint):
super(SchemaVersionRecord, self).__init__()
self.version = Version(version)
self.checkpoint = checkpoint
self.adds = []
self.removals = []
self._current_section = 'DEFAULT'
def __repr__(self):
return (
'<SchemaVersionRecord %s%s>' % (
self.version, ' (checkpoint)' if self.checkpoint else '')
)
def __cmp__(self, other):
return self.version.__cmp__(other.version)
def section(self, name):
self._current_section = name
def param(self, *args, **kwargs):
if not 'section' in kwargs and self._current_section:
kwargs['section'] = self._current_section
self.adds.append(ConfigParameterSchema(*args, **kwargs))
def remove_param(self, name):
self.removals.append(name)
class SchemaBuilder(object):
def __init__(self, data):
super(SchemaBuilder, self).__init__()
self.data = data
@contextmanager
def version(self, version, checkpoint=False):
version_record = SchemaVersionRecord(version, checkpoint)
yield version_record
self.data.append(version_record)
self.data.sort()
class ConfigSchemaRegistry:
__schemas = {}
@classmethod
def register_schema(self, project, configname=None):
if not configname:
configname = '%s.conf' % project
fullname = '%s/%s' % (project, configname)
if fullname not in self.__schemas:
self.__schemas[fullname] = []
return SchemaBuilder(self.__schemas[fullname])
db_path = os.path.join(os.path.dirname(__file__), 'schemas')
@classmethod
def get_schema(self, project, version, configname=None):
@ -78,26 +21,47 @@ class ConfigSchemaRegistry:
fullname = '%s/%s' % (project, configname)
version = Version(version)
if not fullname in self.__schemas:
path = os.path.join(self.db_path, project, configname + '.yml')
if not os.path.exists(path):
return None
records = self.__schemas[fullname]
with open(path) as f:
records = yaml.load(f.read())
i = len(records) - 1
# Find latest checkpoint prior given version
while i >= 0 and not (records[i].checkpoint
and records[i].version <= version):
while i >= 0 and not (records[i].get('checkpoint', False)
and Version(records[i]['version']) <= version):
i -= 1
if i < 0:
return None
if Version(records[0]['version']) > version:
# Reached the earliest record yet haven't found version
return None
# Haven't found checkpoint but yearliest version is less than given
# Assuming first record is checkpoint
i = 0
parameters = []
seen_parameters = set()
last_version = None
while i < len(records) and records[i].version <= version:
last_version = records[i].version
for param in records[i].adds:
while i < len(records) and Version(records[i]['version']) <= version:
last_version = records[i]['version']
for param_data in records[i].get('added', []):
name = param_data['name']
section = None
if '.' in name:
section, name = name.split('.', 1)
param = ConfigParameterSchema(
name, param_data['type'], section=section,
default=param_data.get('default', None),
description=param_data.get('help', None),
required=param_data.get('required', False),
deprecation_message=param_data.get('deprecated', None))
if param.name in seen_parameters:
old_param_index = index(
parameters,
@ -107,13 +71,13 @@ class ConfigSchemaRegistry:
else:
parameters.append(param)
seen_parameters.add(param.name)
for param_name in records[i].removals:
for param_name in records[i].get('removed', []):
param_index = index(
parameters,
lambda p: p.name == param_name)
if index != -1:
parameters.pop(param_index)
seen_parameters.remove(param_name)
seen_parameters.discard(param_name)
i += 1
return ConfigSchema(fullname, last_version, 'ini', parameters)
@ -170,14 +134,18 @@ class ConfigParameterSchema:
class TypeValidatorRegistry:
__validators = {}
__default_validator = None
@classmethod
def register_validator(self, type_name, type_validator):
def register_validator(self, type_name, type_validator, default=False):
self.__validators[type_name] = type_validator
if default:
self.__default_validator = type_name
@classmethod
def get_validator(self, name):
return self.__validators[name]
return self.__validators.get(
name, self.__validators[self.__default_validator])
class SchemaIssue(Issue):
@ -195,21 +163,28 @@ class InvalidValueError(MarkedIssue):
class TypeValidator(object):
def __init__(self, f):
def __init__(self, base_type, f):
super(TypeValidator, self).__init__()
self.base_type = base_type
self.f = f
def validate(self, value):
if value is None:
return value
return getattr(self, 'f')(value)
def type_validator(name, **kwargs):
def type_validator(name, base_type=None, default=False, **kwargs):
if not base_type:
base_type = name
def wrap(fn):
def wrapped(s):
return fn(s, **kwargs)
o = TypeValidator(wrapped)
TypeValidatorRegistry.register_validator(name, o)
o = TypeValidator(base_type, wrapped)
TypeValidatorRegistry.register_validator(name, o, default=default)
return fn
return wrap
@ -219,6 +194,9 @@ def isissue(o):
@type_validator('boolean')
def validate_boolean(s):
if isinstance(s, bool):
return s
s = s.lower()
if s == 'true':
return True
@ -325,8 +303,8 @@ def validate_host_label(s):
return s
@type_validator('host')
@type_validator('host_address')
@type_validator('host', base_type='string')
@type_validator('host_address', base_type='string')
def validate_host_address(s):
result = validate_ipv4_address(s)
if not isissue(result):
@ -348,13 +326,13 @@ def validate_host_address(s):
return '.'.join(labels)
@type_validator('network')
@type_validator('network_address')
@type_validator('network', base_type='string')
@type_validator('network_address', base_type='string')
def validate_network_address(s):
return validate_ipv4_network(s)
@type_validator('host_and_port')
@type_validator('host_and_port', base_type='string')
def validate_host_and_port(s, default_port=None):
parts = s.strip().split(':', 2)
@ -374,15 +352,18 @@ def validate_host_and_port(s, default_port=None):
return (host_address, port)
@type_validator('string')
@type_validator('list')
@type_validator('multi')
@type_validator('string', base_type='string', default=True)
@type_validator('list', base_type='list')
@type_validator('multi', base_type='multi')
def validate_string(s):
return s
@type_validator('integer')
def validate_integer(s, min=None, max=None):
if isinstance(s, int):
return s
leading_whitespace_len = 0
while leading_whitespace_len < len(s) \
and s[leading_whitespace_len].isspace():
@ -419,16 +400,22 @@ def validate_integer(s, min=None, max=None):
@type_validator('float')
def validate_float(s):
if isinstance(s, float):
return s
# TODO: Implement proper validation
return float(s)
@type_validator('port')
@type_validator('port', base_type='integer')
def validate_port(s, min=1, max=65535):
return validate_integer(s, min=min, max=max)
def validate_list(s, element_type):
if isinstance(s, list):
return s
element_type_validator = TypeValidatorRegistry.get_validator(element_type)
if not element_type_validator:
return SchemaIssue('Invalid element type "%s"' % element_type)
@ -458,13 +445,16 @@ def validate_list(s, element_type):
return result
@type_validator('string_list')
@type_validator('string_list', base_type='list')
def validate_string_list(s):
return validate_list(s, element_type='string')
@type_validator('string_dict')
@type_validator('string_dict', base_type='multi')
def validate_dict(s, element_type='string'):
if isinstance(s, dict):
return s
element_type_validator = TypeValidatorRegistry.get_validator(element_type)
if not element_type_validator:
return SchemaIssue('Invalid element type "%s"' % element_type)
@ -500,7 +490,7 @@ def validate_dict(s, element_type='string'):
return result
@type_validator('rabbitmq_bind')
@type_validator('rabbitmq_bind', base_type='string')
def validate_rabbitmq_bind(s):
m = re.match('\d+', s)
if m:
@ -527,12 +517,15 @@ def validate_rabbitmq_bind(s):
def validate_rabbitmq_list(s, element_type):
if isinstance(s, list):
return s
if not (s.startswith('[') and s.endswith(']')):
return SchemaIssue('List should be surrounded by [ and ]')
return validate_list(s[1:-1], element_type=element_type)
@type_validator('rabbitmq_bind_list')
@type_validator('rabbitmq_bind_list', base_type='list')
def validate_rabbitmq_bind_list(s):
return validate_rabbitmq_list(s, element_type='rabbitmq_bind')

View File

@ -8,6 +8,8 @@ from copy import copy
from oslo.config import cfg
from rubick.schemas.yaml_utils import yaml_string, yaml_value
def identity(x):
return x
@ -77,11 +79,11 @@ class YamlSchemaWriter(object):
if self._current_section and self._current_section != 'DEFAULT':
fullname = '%s.%s' % (self._current_section, name)
self.file.write(" - name: %s\n" % fullname)
self.file.write(" type: %s\n" % type)
self.file.write(" default: %s\n" % repr(default_value))
self.file.write(" - name: %s\n" % yaml_string(fullname, allowSimple=True))
self.file.write(" type: %s\n" % yaml_string(type, allowSimple=True))
self.file.write(" default: %s\n" % yaml_value(default_value))
if description:
self.file.write(" help: %s\n" % repr(description))
self.file.write(" help: %s\n" % yaml_string(description))
self.file.write("\n")

237
rubick/schemas/generator.py Executable file
View File

@ -0,0 +1,237 @@
import glob
import os.path
import logging
from collections import OrderedDict
import yaml
from rubick.common import index, Version, Issue
from rubick.schema import TypeValidatorRegistry as TypeRegistry
from rubick.schemas.yaml_utils import yaml_string, yaml_value
DIFF_THRESHOLD=0.5
logger = logging.getLogger('rubick.schemas.generator')
def yaml_dump_schema_records(records):
lines = []
for record in records:
if len(record['added']) == 0 and len(record['removed']) == 0:
continue
lines.append('- version: %s' % yaml_string(str(record['version'])))
if 'checkpoint' in record:
lines.append(' checkpoint: %s' % yaml_value(record['checkpoint']))
if 'added' in record and len(record['added']) > 0:
lines.append(' added:')
for param in record['added']:
lines.append('')
lines.append(' - name: %s' % yaml_string(param['name'], allowSimple=True))
lines.append(' type: %s' % yaml_string(param['type'], allowSimple=True))
if 'default' in param:
lines.append(' default: %s' % yaml_value(param['default']))
if 'help' in param:
lines.append(' help: %s' % yaml_string(param['help']))
extra_data = [k for k in param.keys()
if k not in ['name', 'type', 'default', 'help']]
for attr in extra_data:
lines.append(' %s: %s' % (attr, yaml_value(param[attr])))
if 'removed' in record and len(record['removed']) > 0:
lines.append(' removed:')
for removed in record['removed']:
lines.append(' - %s' % yaml_string(removed, allowSimple=True))
lines.append('')
lines.append('# ====================================================')
lines.append('')
return "\n".join(lines)
def generate_project_schema(project):
logger.info('Processing project %s' % project)
project_path = os.path.join(os.path.dirname(__file__), project)
files = glob.glob(os.path.join(project_path, '*.yml'))
if files == []:
logger.info("Found no YAML files in project %s. Skipping it" % project)
return
x = index(files, lambda f: f.endswith('.conf.yml'))
if x != -1:
database_file = files[x]
del files[x]
else:
database_file = os.path.join(project_path, project+'.conf.yml')
schema_records = []
if os.path.exists(database_file):
logger.debug("Processing database file %s" % database_file)
with open(database_file) as f:
schema_records.extend(yaml.load(f.read()))
schema_versions = []
for version_file in files:
logger.debug("Processing version file %s" % version_file)
with open(version_file) as f:
schema_versions.append(yaml.load(f.read()))
schema_versions = sorted(schema_versions, key=lambda s: s['version'])
parameters = OrderedDict()
for schema in schema_versions:
added = []
seen = set()
logger.debug('Processing schema version %s' % schema['version'])
for param in schema['parameters']:
prev_param = parameters.get(param['name'], None)
if not prev_param:
logger.debug('Parameter %s does not exist yet, adding it as new' % param['name'])
param['comment'] = 'New param'
added.append(param)
else:
seen.add(param['name'])
if param['type'] != prev_param['type']:
validator = TypeRegistry.get_validator(prev_param['type'])
if param['type'] == validator.base_type:
param['type'] = prev_param['type']
if 'default' in param and param['default'] is not None:
value = validator.validate(param['default'])
if not isinstance(value, Issue):
param['default'] = value
else:
logger.error("In project '%s' version %s default value for parameter '%s' is not valid value of type %s: %s" %
(project, schema['version'], param['name'], param['type'], repr(param['default'])))
else:
logger.debug('Parameter %s type has changed from %s to %s' %
(param['name'], prev_param['type'], param['type']))
param['comment'] = 'Type has changed'
added.append(param)
continue
if param.get('default', None) != prev_param.get('default', None):
logger.debug('Parameter %s default value has changed from %s to %s' %
(param['name'], prev_param['default'], param['default']))
param['comment'] = 'Default value has changed'
added.append(param)
continue
if param.get('help', None) != prev_param.get('help', None):
param['comment'] = 'Help string has changed'
added.append(param)
removed = [name for name in parameters.keys() if name not in seen]
if len(removed) > 0:
logger.debug('Following parameters from previous schema version are not present in current version, marking as removed: %s' % ','.join(removed))
# Decide either to use full schema update or incremental
changes_count = sum(map(len, [added, removed]))
logger.debug('Found %d change(s) from previous version schema' % changes_count)
if changes_count > int(len(parameters)*DIFF_THRESHOLD):
logger.debug('Using full schema update')
new_parameters = parameters.copy()
for param in added:
new_parameters[param['name']] = param
for name in removed:
del new_parameters[name]
new_schema_record = dict(version=schema['version'],
added=new_parameters.values(),
removed=[],
checkpoint=True)
else:
logger.debug('Using incremental schema update')
new_schema_record = dict(version=schema['version'],
added=added, removed=removed)
# Place schema record either replacing existing one or appending as new
old_schema_record_idx = index(
schema_records, lambda r: r['version'] == new_schema_record['version'])
if old_schema_record_idx != -1:
old_schema_record = schema_records[old_schema_record_idx]
# Collect information from existing records
old_schema_parameters = {}
for param in old_schema_record.get('added', []):
old_schema_parameters[param['name']] = param
for param in added:
old_param = old_schema_parameters.get(param['name'], None)
if not old_param:
continue
extra_data = [(k, v) for k, v in old_param.items()
if k not in ['name', 'type', 'default', 'help']]
param.update(extra_data)
validator = TypeRegistry.get_validator(old_param['type'])
if param['type'] not in [old_param['type'], validator.base_type]:
param['comment'] = 'Type has changed'
# Type has changed, enforcing old type to prevent accidental data loss
param['type'] = old_param['type']
if 'default' in old_param:
param['default'] = old_param['default']
if 'default' in param:
if param['default'] is not None:
value = validator.validate(old_param['default'])
if not isinstance(value, Issue):
param['default'] = value
else:
logger.error("In project '%s' version %s default value for parameter '%s' is not valid value of type %s: %s" %
(project, schema['version'], param['name'], param['type'], repr(param['default'])))
if 'default' in param:
if param['default'] != old_param.get('default', None):
param['comment'] = 'Default value has changed'
continue
logger.debug('Replacing schema record %s' % repr(new_schema_record))
schema_records[old_schema_record_idx] = new_schema_record
else:
logger.debug('Appending schema record %s' % repr(new_schema_record))
schema_records.append(new_schema_record)
# Update parameter info
for param in new_schema_record.get('added', []):
parameters[param['name']] = param
for name in new_schema_record.get('removed', []):
del parameters[name]
schema_records = sorted(schema_records,
key=lambda r: Version(r['version']))
with open(database_file, 'w') as f:
f.write(yaml_dump_schema_records(schema_records))
def main():
logging.basicConfig(level=logging.INFO)
for project_path in glob.glob(os.path.join(os.path.dirname(__file__), '*')):
if not os.path.isdir(project_path):
continue
generate_project_schema(os.path.basename(project_path))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,21 @@
def yaml_string(s, allowSimple=False):
if "'" in s:
return '"%s"' % s.replace('\\', '\\\\').replace('"', '\\"')
else:
if not allowSimple or any([c in s for c in " :,"]):
return "'%s'" % s
else:
return s
def yaml_value(x):
if x is None:
return '~'
elif x == True:
return 'true'
elif x == False:
return 'false'
elif isinstance(x, str):
return yaml_string(x)
else:
return repr(x)