Merge "Fix for action_logs migration"

This commit is contained in:
Jenkins 2014-12-03 12:33:15 +00:00 committed by Gerrit Code Review
commit 58e89a2d01
9 changed files with 245 additions and 111 deletions

View File

@ -50,11 +50,13 @@
"additional_info": {
"type": "object",
"properties": {
"parent_task_id": {"type": "number"},
"parent_task_id": {"type": ["number", "null"]},
"subtasks_ids": {"type": "array"},
"operation": {"type": "string"},
"nodes_from_resp": {"type": "array"},
"ended_with_status": {"type": "string"}
"ended_with_status": {"type": "string"},
"message": {"type": ["string", "null"]},
"output": {"type": ["object", "null"]}
}
},
"is_sent": {"type": "boolean"},

View File

@ -196,7 +196,7 @@ class TestActionLogs(DbTest):
# about 1/3 is incomplete
"end_timestamp": "2" if i % 3 else None,
"additional_info": {
"parent_task_id": 0,
"parent_task_id": i if i % 2 else None,
"subtasks_ids": [],
"operation": "deployment"
},

View File

@ -31,7 +31,7 @@ def configure_app(mode=None):
'prod': 'collector.api.config.Production'
}
app.config.from_object(mode_map.get(mode))
app.config.from_envvar('COLLECTOR_SETTINGS', silent=False)
app.config.from_envvar('COLLECTOR_SETTINGS', silent=True)
setattr(app_module, 'db', flask_sqlalchemy.SQLAlchemy(app))
log.init_logger()
return app

View File

@ -14,112 +14,112 @@
import logging
LOG_FILE = '/var/log/migration.log'
LOG_FILE = "/var/log/migration.log"
LOG_LEVEL = logging.INFO
LOG_FILE_SIZE = 2048000
LOG_FILES_COUNT = 20
ELASTIC_HOST = 'localhost'
ELASTIC_HOST = "localhost"
ELASTIC_PORT = 9200
SYNC_EVERY_SECONDS = 3600
DB_CONNECTION_STRING = 'postgresql://collector:***@localhost/collector'
DB_CONNECTION_STRING = "postgresql://collector:***@localhost/collector"
# size of chunk for fetching objects for synchronization
# into Elasticsearch
DB_SYNC_CHUNK_SIZE = 1000
INDEX_MIGRATION = 'migration'
DOC_TYPE_MIGRATION_INFO = 'info'
INDEX_MIGRATION = "migration"
DOC_TYPE_MIGRATION_INFO = "info"
MAPPING_MIGRATION = {
DOC_TYPE_MIGRATION_INFO: {
'properties': {
"properties": {
# from
'db_table_name': {
'type': 'string',
'index': 'not_analyzed'
"db_table_name": {
"type": "string",
"index": "not_analyzed"
},
'db_id_name': {
'type': 'string',
'index': 'not_analyzed'
"db_id_name": {
"type": "string",
"index": "not_analyzed"
},
'db_sync_field_name': {
'type': 'string',
'index': 'not_analyzed'
"db_sync_field_name": {
"type": "string",
"index": "not_analyzed"
},
# to
'index_name': {
'type': 'string',
'index': 'not_analyzed'
"index_name": {
"type": "string",
"index": "not_analyzed"
},
'doc_type_name': {
'type': 'string',
'index': 'not_analyzed'
"doc_type_name": {
"type": "string",
"index": "not_analyzed"
},
# status
'last_sync_value': {
'enabled': False
"last_sync_value": {
"enabled": False
},
'last_sync_time': {
'type': 'date'
"last_sync_time": {
"type": "date"
}
}
}
}
INDEX_FUEL = 'fuel'
DOC_TYPE_STRUCTURE = 'structure'
DOC_TYPE_ACTION_LOGS = 'action_logs'
INDEX_FUEL = "fuel"
DOC_TYPE_STRUCTURE = "structure"
DOC_TYPE_ACTION_LOGS = "action_logs"
MAPPING_FUEL = {
DOC_TYPE_STRUCTURE: {
'properties': {
'master_node_uid': {
'type': 'string',
'index': 'not_analyzed'
"properties": {
"master_node_uid": {
"type": "string",
"index": "not_analyzed"
},
'allocated_nodes_num': {'type': 'long'},
'unallocated_nodes_num': {'type': 'long'},
'creation_date': {'type': 'date'},
'modification_date': {'type': 'date'},
'clusters': {
'type': 'nested',
'properties': {
'id': {'type': 'long'},
'status': {'type': 'string'},
'release': {
'type': 'nested',
'properties': {
'version': {
'type': 'string',
'index': 'not_analyzed'
"allocated_nodes_num": {"type": "long"},
"unallocated_nodes_num": {"type": "long"},
"creation_date": {"type": "date"},
"modification_date": {"type": "date"},
"clusters": {
"type": "nested",
"properties": {
"id": {"type": "long"},
"status": {"type": "string"},
"release": {
"type": "nested",
"properties": {
"version": {
"type": "string",
"index": "not_analyzed"
},
'os': {
'type': 'string',
'index': 'analyzed',
'analyzer': 'not_analyzed_lowercase'
"os": {
"type": "string",
"index": "analyzed",
"analyzer": "not_analyzed_lowercase"
},
'name': {
'type': 'string',
'index': 'not_analyzed'
"name": {
"type": "string",
"index": "not_analyzed"
}
}
},
'attributes': {
'type': 'nested',
'properties': {
'libvirt_type': {
'type': 'string',
'index': 'not_analyzed'
"attributes": {
"type": "nested",
"properties": {
"libvirt_type": {
"type": "string",
"index": "not_analyzed"
}
}
},
'nodes_num': {'type': 'long'},
'nodes': {
'type': 'nested',
'properties': {
'id': {'type': 'long'},
'manufacturer': {'type': 'string'}
"nodes_num": {"type": "long"},
"nodes": {
"type": "nested",
"properties": {
"id": {"type": "long"},
"manufacturer": {"type": "string"}
}
},
}
@ -127,45 +127,71 @@ MAPPING_FUEL = {
}
},
DOC_TYPE_ACTION_LOGS: {
'properties': {
'master_node_uid': {
'type': 'string',
'index': 'not_analyzed'
}
"properties": {
"master_node_uid": {
"type": "string",
"index": "not_analyzed"
},
"id": {"type": "long"},
"actor_id": {"type": "string"},
"action_group": {"type": "string"},
"action_name": {"type": "string"},
"action_type": {"type": "string"},
"start_timestamp": {"type": "date"},
"end_timestamp": {"type": "date"},
"additional_info": {
"type": "object",
"properties": {
# http request
"request_data": {"type": "object"},
"response_data": {"type": "object"},
# task
"parent_task_id": {"type": "long"},
"subtasks_ids": {"type": "long"},
"operation": {"type": "string"},
"nodes_from_resp": {"enabled": False},
"ended_with_status": {"type": "string"},
"message": {"type": "string"},
"output": {"enabled": False}
}
},
"is_sent": {"type": "boolean"},
"cluster_id": {"type": "long"},
"task_uuid": {"type": "string"}
}
}
}
ANALYSIS_INDEX_FUEL = {
'analyzer': {
'not_analyzed_lowercase': {
'filter': ['lowercase'],
'type': 'custom',
'tokenizer': 'keyword'
"analyzer": {
"not_analyzed_lowercase": {
"filter": ["lowercase"],
"type": "custom",
"tokenizer": "keyword"
}
}
}
STRUCTURES_DB_TABLE_NAME = 'installation_structures'
ACTION_LOGS_DB_TABLE_NAME = 'action_logs'
STRUCTURES_DB_TABLE_NAME = "installation_structures"
ACTION_LOGS_DB_TABLE_NAME = "action_logs"
INFO_TEMPLATES = {
STRUCTURES_DB_TABLE_NAME: {
'db_table_name': STRUCTURES_DB_TABLE_NAME,
'db_id_name': 'id',
'db_sync_field_name': 'modification_date',
'last_sync_value': '1970-01-01T00:00:00',
'index_name': 'fuel',
'doc_type_name': 'structure',
'last_sync_time': None
"db_table_name": STRUCTURES_DB_TABLE_NAME,
"db_id_name": "id",
"db_sync_field_name": "modification_date",
"last_sync_value": "1970-01-01T00:00:00",
"index_name": "fuel",
"doc_type_name": "structure",
"last_sync_time": None
},
ACTION_LOGS_DB_TABLE_NAME: {
'db_table_name': ACTION_LOGS_DB_TABLE_NAME,
'db_id_name': 'id',
'db_sync_field_name': 'id',
'last_sync_value': 0,
'index_name': 'fuel',
'doc_type_name': 'action_logs',
'last_sync_time': None
"db_table_name": ACTION_LOGS_DB_TABLE_NAME,
"db_id_name": "id",
"db_sync_field_name": "id",
"last_sync_value": 0,
"index_name": "fuel",
"doc_type_name": "action_logs",
"last_sync_time": None
}
}

View File

@ -44,19 +44,25 @@ NameMapping = namedtuple('NameMapping', ['source', 'dest'])
class MappingRule(object):
ID_FIELDS_GLUE = '_'
def __init__(self, db_id_name, json_fields=(), mixed_fields_mapping=()):
def __init__(self, db_id_names, json_fields=(), mixed_fields_mapping=()):
"""Describes how db object is mapped into Eslasticsearch document
:param db_id_name: NameMapping of db id field name
:param db_id_names: db fields names used for Elasticsearch document _id
:param json_fields: tuple of fields to be merged as dicts into
Elasicsearch document
:param mixed_fields_mapping: tuple of NameMapping for adding into
Elasicsearch document
"""
self.db_id_name = db_id_name
self.db_id_names = db_id_names
self.json_fields = json_fields
self.mixed_fields_mapping = mixed_fields_mapping
def _get_es_id(self, db_object):
values = ('{}'.format(getattr(db_object, db_id_name)) for
db_id_name in self.db_id_names)
return self.ID_FIELDS_GLUE.join(values)
def make_doc(self, index_name, doc_type_name, db_object):
"""Returns dictionary for sending into Elasticsearch
"""
@ -68,7 +74,7 @@ class MappingRule(object):
return {
'_index': index_name,
'_type': doc_type_name,
'_id': getattr(db_object, self.db_id_name),
'_id': self._get_es_id(db_object),
'_source': data
}
@ -129,7 +135,7 @@ class Migrator(object):
def migrate_installation_structure(self):
logger.info("Migration of installation structures is started")
mapping_rule = MappingRule(
'master_node_uid',
('master_node_uid',),
json_fields=('structure',),
mixed_fields_mapping=(
NameMapping(source='creation_date', dest='creation_date'),
@ -146,8 +152,8 @@ class Migrator(object):
def migrate_action_logs(self):
logger.info("Migration of action logs is started")
mapping_rule = MappingRule(
'master_node_uid',
json_fields=(),
('master_node_uid', 'external_id'),
json_fields=('body',),
mixed_fields_mapping=(
NameMapping(source='master_node_uid', dest='master_node_uid'),
))

View File

@ -35,6 +35,7 @@ class ActionLog(Base):
id = Column(Integer, primary_key=True)
master_node_uid = Column(String, nullable=False)
external_id = Column(Integer, nullable=False)
body = Column(JSON, nullable=False)
class InstallationStructure(Base):

View File

@ -198,13 +198,78 @@ class MigrationTest(ElasticTest, DbTest):
db_session.commit()
return mn_uid
def create_dumb_action_log(self):
mn_uid = '{}'.format(uuid.uuid4())
external_id = random.randint(1, 10000)
def _action_name(self):
return random.choice([
'deploy',
'deployment',
'provision',
'stop_deployment',
'reset_environment',
'update',
])
def _gen_id(self):
return random.randint(1, 10000)
def _task_status(self):
return random.choice([
'ready',
'running',
'error'
])
def _nodes(self):
return [self._gen_id() for _ in xrange(0, 100)]
def _subtasks(self):
return [self._gen_id() for _ in xrange(0, 5)]
def create_dumb_action_log(self, mn_uid=None):
if mn_uid is None:
mn_uid = '{}'.format(uuid.uuid4())
external_id = self._gen_id()
body = {
'id': self._gen_id(),
'actor_id': '{}'.format(uuid.uuid4()),
'action_group': random.choice([
'cluster_changes',
'cluster_checking',
'operations'
]),
'action_name': self._action_name(),
'action_type': random.choice(['http_request',
'nailgun_task']),
'start_timestamp': datetime.datetime.utcnow().isoformat(),
'end_timestamp': random.choice([
None,
(datetime.datetime.utcnow() + datetime.timedelta(
seconds=random.randint(0, 60)
)).isoformat(),
]),
'is_sent': random.choice([True, False]),
'cluster_id': self._gen_id(),
'task_uuid': '{}'.format(uuid.uuid4()),
'additional_info': random.choice([
{
# http request
'request_data': {},
'response_data': {},
},
{
# task
'parent_task_id': self._gen_id(),
'subtasks_ids': self._subtasks(),
'operation': self._action_name(),
'nodes_from_resp': self._nodes(),
'ended_with_status': self._task_status()
}
])
}
db_session.add(ActionLog(master_node_uid=mn_uid,
external_id=external_id))
external_id=external_id,
body=body))
db_session.commit()
return mn_uid
return '{0}_{1}'.format(mn_uid, external_id)
AggsCheck = namedtuple('AggsCheck', ['key', 'doc_count'])

View File

@ -29,7 +29,7 @@ class MappingRuleTest(MigrationTest):
db_obj = db_session.query(InstallationStructure).filter(
InstallationStructure.master_node_uid == mn_uid).one()
rule = MappingRule(
'master_node_uid',
('master_node_uid',),
json_fields=('structure',),
mixed_fields_mapping=(
NameMapping(source='creation_date', dest='creation_date'),

View File

@ -225,6 +225,40 @@ class MigratorTest(MigrationTest):
self.get_indexed_docs_num(sync_info))
# checking new docs are indexed
check_keys = [
'master_node_uid',
'id',
'actor_id',
'action_group',
'action_name',
'action_type',
'start_timestamp',
'end_timestamp',
'additional_info',
'is_sent',
'cluster_id',
'task_uuid'
]
for mn_uid in mn_uids:
self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
resp = self.es.get(sync_info.index_name, mn_uid,
doc_type=sync_info.doc_type_name)
doc = resp['_source']
for k in check_keys:
self.assertTrue(k in doc)
@patch('migration.config.DB_SYNC_CHUNK_SIZE', 2)
def test_action_logs_one_node_migration(self):
docs_num = 5
mn_uid = 'xx'
for _ in xrange(docs_num):
self.create_dumb_action_log(mn_uid=mn_uid)
migrator = Migrator()
sync_info = migrator.get_sync_info(config.ACTION_LOGS_DB_TABLE_NAME)
indexed_docs_before = self.get_indexed_docs_num(sync_info)
migrator.migrate_action_logs()
# checking all docs are migrated
self.es.indices.refresh(index=sync_info.index_name)
self.assertEquals(indexed_docs_before + docs_num,
self.get_indexed_docs_num(sync_info))