From b58b966f50b08a4e5b4d222879b16ea44069d2ba Mon Sep 17 00:00:00 2001 From: kong Date: Sun, 3 May 2015 00:00:13 +0800 Subject: [PATCH] Refactor local manager Change-Id: I1d17780079b2c08f3436c32b9e3bad44ddbc0eb9 --- terracotta/locals/manager.py | 233 ++++++++++++++--------------------- 1 file changed, 95 insertions(+), 138 deletions(-) diff --git a/terracotta/locals/manager.py b/terracotta/locals/manager.py index 6357144..d3dd301 100644 --- a/terracotta/locals/manager.py +++ b/terracotta/locals/manager.py @@ -1,11 +1,11 @@ - # Copyright 2012 Anton Beloglazov +# Copyright 2015 Huawei Technologies Co. Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -101,36 +101,36 @@ local manager performs the following steps: """ from contracts import contract -from neat.contracts_primitive import * -from neat.contracts_extra import * - -import requests from hashlib import sha1 +import libvirt +import os +import requests import time -import neat.common as common -from neat.config import * -from neat.db_utils import * +from oslo_config import cfg +from oslo_log import log as logging -import logging - -from terracotta.openstack.common import service +from terracotta import common +from terracotta.contracts_primitive import * +from terracotta.contracts_extra import * +from terracotta.openstack.common import periodic_task +from terracotta.openstack.common import threadgroup +from terracotta.utils import db_utils log = logging.getLogger(__name__) -class LocalManager(service.Service): - +class LocalManager(periodic_task.PeriodicTasks): def __init__(self): super(Service, self).__init__() self.state = self.init_state() - + self.tg = threadgroup.ThreadGroup() self.tg.add_dynamic_timer( - self.execute, - initial_delay=initial_delay, - periodic_interval_max=self.periodic_interval_max, - self.state + self.run_periodic_tasks, + initial_delay=None, + periodic_interval_max=1, + context=None ) def init_state(self): @@ -145,98 +145,90 @@ class LocalManager(service.Service): vir_connection = libvirt.openReadOnly(None) if vir_connection is None: message = 'Failed to open a connection to the hypervisor' - log.critical(message) + LOG.critical(message) raise OSError(message) physical_cpu_mhz_total = int( common.physical_cpu_mhz_total(vir_connection) * - float(config['host_cpu_usable_by_vms'])) + float(CONF.host_cpu_usable_by_vms)) return {'previous_time': 0., 'vir_connection': vir_connection, - 'db': init_db(config['sql_connection']), + 'db': db_utils.init_db(), 'physical_cpu_mhz_total': physical_cpu_mhz_total, 'hostname': vir_connection.getHostname(), - 'hashed_username': sha1(config['os_admin_user']).hexdigest(), - 'hashed_password': sha1(config['os_admin_password']).hexdigest()} + 'hashed_username': sha1(CONF.os_admin_user).hexdigest(), + 'hashed_password': sha1(CONF.os_admin_password).hexdigest()} - def execute(self, state): + @periodic_task.periodic_task + def execute(self): """ Execute an iteration of the local manager. - 1. Read the data on resource usage by the VMs running on the host from - the /vm directory. + 1. Read the data on resource usage by the VMs running on the host from + the /vm directory. - 2. Call the function specified in the algorithm_underload_detection - configuration option and pass the data on the resource usage by the - VMs, as well as the frequency of the CPU as arguments. + 2. Call the function specified in the algorithm_underload_detection + configuration option and pass the data on the resource usage by the + VMs, as well as the frequency of the CPU as arguments. - 3. If the host is underloaded, send a request to the REST API of the - global manager and pass a list of the UUIDs of all the VMs - currently running on the host in the vm_uuids parameter, as well as - the reason for migration as being 0. + 3. If the host is underloaded, send a request to the REST API of the + global manager and pass a list of the UUIDs of all the VMs + currently running on the host in the vm_uuids parameter, as well as + the reason for migration as being 0. - 4. If the host is not underloaded, call the function specified in the - algorithm_overload_detection configuration option and pass the data - on the resource usage by the VMs, as well as the frequency of the - host's CPU as arguments. + 4. If the host is not underloaded, call the function specified in the + algorithm_overload_detection configuration option and pass the data + on the resource usage by the VMs, as well as the frequency of the + host's CPU as arguments. - 5. If the host is overloaded, call the function specified in the - algorithm_vm_selection configuration option and pass the data on - the resource usage by the VMs, as well as the frequency of the - host's CPU as arguments + 5. If the host is overloaded, call the function specified in the + algorithm_vm_selection configuration option and pass the data on + the resource usage by the VMs, as well as the frequency of the + host's CPU as arguments - 6. If the host is overloaded, send a request to the REST API of the - global manager and pass a list of the UUIDs of the VMs selected by - the VM selection algorithm in the vm_uuids parameter, as well as - the reason for migration as being 1. + 6. If the host is overloaded, send a request to the REST API of the + global manager and pass a list of the UUIDs of the VMs selected by + the VM selection algorithm in the vm_uuids parameter, as well as + the reason for migration as being 1. - :param config: A config dictionary. - :type config: dict(str: *) - - :param state: A state dictionary. - :type state: dict(str: *) - - :return: The updated state dictionary. - :rtype: dict(str: *) """ - log.info('Started an iteration') - vm_path = common.build_local_vm_path(config['local_data_directory']) - vm_cpu_mhz = get_local_vm_data(vm_path) - vm_ram = get_ram(state['vir_connection'], vm_cpu_mhz.keys()) - vm_cpu_mhz = cleanup_vm_data(vm_cpu_mhz, vm_ram.keys()) + LOG.info('Started an iteration') + state = self.state + vm_path = common.build_local_vm_path(CONF.local_data_directory) + vm_cpu_mhz = self.get_local_vm_data(vm_path) + vm_ram = self.get_ram(state['vir_connection'], vm_cpu_mhz.keys()) + vm_cpu_mhz = self.cleanup_vm_data(vm_cpu_mhz, vm_ram.keys()) if not vm_cpu_mhz: - if log.isEnabledFor(logging.INFO): - log.info('The host is idle') - log.info('Skipped an iteration') - return state + LOG.info('Skipped an iteration') + return - host_path = common.build_local_host_path(config['local_data_directory']) - host_cpu_mhz = get_local_host_data(host_path) + host_path = common.build_local_host_path(CONF.local_data_directory) + host_cpu_mhz = self.get_local_host_data(host_path) - host_cpu_utilization = vm_mhz_to_percentage( + host_cpu_utilization = self.vm_mhz_to_percentage( vm_cpu_mhz.values(), host_cpu_mhz, state['physical_cpu_mhz_total']) - if log.isEnabledFor(logging.DEBUG): - log.debug('The total physical CPU Mhz: %s', str(state['physical_cpu_mhz_total'])) - log.debug('VM CPU MHz: %s', str(vm_cpu_mhz)) - log.debug('Host CPU MHz: %s', str(host_cpu_mhz)) - log.debug('CPU utilization: %s', str(host_cpu_utilization)) + LOG.debug('The total physical CPU Mhz: %s', + str(state['physical_cpu_mhz_total'])) + LOG.debug('VM CPU MHz: %s', str(vm_cpu_mhz)) + LOG.debug('Host CPU MHz: %s', str(host_cpu_mhz)) + LOG.debug('CPU utilization: %s', str(host_cpu_utilization)) if not host_cpu_utilization: - log.info('Not enough data yet - skipping to the next iteration') - log.info('Skipped an iteration') - return state + LOG.info('Not enough data yet - skipping to the next iteration') + LOG.info('Skipped an iteration') + return - time_step = int(config['data_collector_interval']) + time_step = int(CONF.data_collector_interval) migration_time = common.calculate_migration_time( - vm_ram, float(config['network_migration_bandwidth'])) + vm_ram, float(CONF.network_migration_bandwidth)) if 'underload_detection' not in state: underload_detection_params = common.parse_parameters( - config['algorithm_underload_detection_parameters']) + CONF.algorithm_underload_detection_parameters) underload_detection = common.call_function_by_name( - config['algorithm_underload_detection_factory'], + CONF.algorithm_underload_detection_factory, [time_step, migration_time, underload_detection_params]) @@ -244,9 +236,9 @@ class LocalManager(service.Service): state['underload_detection_state'] = {} overload_detection_params = common.parse_parameters( - config['algorithm_overload_detection_parameters']) + CONF.algorithm_overload_detection_parameters) overload_detection = common.call_function_by_name( - config['algorithm_overload_detection_factory'], + CONF.algorithm_overload_detection_factory, [time_step, migration_time, overload_detection_params]) @@ -254,9 +246,9 @@ class LocalManager(service.Service): state['overload_detection_state'] = {} vm_selection_params = common.parse_parameters( - config['algorithm_vm_selection_parameters']) + CONF.algorithm_vm_selection_parameters) vm_selection = common.call_function_by_name( - config['algorithm_vm_selection_factory'], + CONF.algorithm_vm_selection_factory, [time_step, migration_time, vm_selection_params]) @@ -267,71 +259,35 @@ class LocalManager(service.Service): overload_detection = state['overload_detection'] vm_selection = state['vm_selection'] - if log.isEnabledFor(logging.INFO): - log.info('Started underload detection') + LOG.info('Started underload detection') underload, state['underload_detection_state'] = underload_detection( host_cpu_utilization, state['underload_detection_state']) - if log.isEnabledFor(logging.INFO): - log.info('Completed underload detection') + LOG.info('Completed underload detection') - if log.isEnabledFor(logging.INFO): - log.info('Started overload detection') + LOG.info('Started overload detection') overload, state['overload_detection_state'] = overload_detection( host_cpu_utilization, state['overload_detection_state']) - if log.isEnabledFor(logging.INFO): - log.info('Completed overload detection') + LOG.info('Completed overload detection') if underload: - if log.isEnabledFor(logging.INFO): - log.info('Underload detected') - try: - r = requests.put('http://' + config['global_manager_host'] + - ':' + config['global_manager_port'], - {'username': state['hashed_username'], - 'password': state['hashed_password'], - 'time': time.time(), - 'host': state['hostname'], - 'reason': 0}) - if log.isEnabledFor(logging.INFO): - log.info('Received response: [%s] %s', - r.status_code, r.content) - except requests.exceptions.ConnectionError: - log.exception('Exception at underload request:') - + LOG.info('Underload detected') + # TODO(xylan): send rpc message to global manager else: if overload: - if log.isEnabledFor(logging.INFO): - log.info('Overload detected') + LOG.info('Overload detected') - log.info('Started VM selection') + LOG.info('Started VM selection') vm_uuids, state['vm_selection_state'] = vm_selection( vm_cpu_mhz, vm_ram, state['vm_selection_state']) - log.info('Completed VM selection') + LOG.info('Completed VM selection') - if log.isEnabledFor(logging.INFO): - log.info('Selected VMs to migrate: %s', str(vm_uuids)) - try: - r = requests.put('http://' + config['global_manager_host'] + - ':' + config['global_manager_port'], - {'username': state['hashed_username'], - 'password': state['hashed_password'], - 'time': time.time(), - 'host': state['hostname'], - 'reason': 1, - 'vm_uuids': ','.join(vm_uuids)}) - if log.isEnabledFor(logging.INFO): - log.info('Received response: [%s] %s', - r.status_code, r.content) - except requests.exceptions.ConnectionError: - log.exception('Exception at overload request:') + LOG.info('Selected VMs to migrate: %s', str(vm_uuids)) + # TODO(xylan): send rpc message to global manager else: - if log.isEnabledFor(logging.INFO): - log.info('No underload or overload detected') + LOG.info('No underload or overload detected') - if log.isEnabledFor(logging.INFO): - log.info('Completed an iteration') - - return state + LOG.info('Completed an iteration') + self.state = state @contract @@ -388,21 +344,21 @@ class LocalManager(service.Service): @contract - def get_ram(self, vir_connection, vms): + def get_ram(self, vir_connection, vm_ids): """ Get the maximum RAM for a set of VM UUIDs. :param vir_connection: A libvirt connection object. :type vir_connection: virConnect - :param vms: A list of VM UUIDs. - :type vms: list(str) + :param vm_ids: A list of VM UUIDs. + :type vm_ids: list(str) :return: The maximum RAM for the VM UUIDs. :rtype: dict(str : long) """ vms_ram = {} - for uuid in vms: - ram = get_max_ram(vir_connection, uuid) + for uuid in vm_ids: + ram = self.get_max_ram(vir_connection, uuid) if ram: vms_ram[uuid] = ram @@ -430,7 +386,8 @@ class LocalManager(service.Service): @contract - def vm_mhz_to_percentage(self, vm_mhz_history, host_mhz_history, physical_cpu_mhz): + def vm_mhz_to_percentage(self, vm_mhz_history, host_mhz_history, + physical_cpu_mhz): """ Convert VM CPU utilization to the host's CPU utilization. :param vm_mhz_history: A list of CPU utilization histories of VMs in MHz.