masakari-monitors/masakarimonitors/instancemonitor/instance.py

165 lines
6.7 KiB
Python

# Copyright(c) 2016 Nippon Telegraph and Telephone Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
import time
import eventlet
import libvirt
from oslo_log import log as oslo_logging
from masakarimonitors.instancemonitor.libvirt_handler import eventfilter
from masakarimonitors import manager
LOG = oslo_logging.getLogger(__name__)
class InstancemonitorManager(manager.Manager):
"""Manages the masakari-instancemonitor."""
def __init__(self, *args, **kwargs):
super(InstancemonitorManager, self).__init__(
service_name="instancemonitor", *args, **kwargs)
self.evf = eventfilter.EventFilter()
# This keeps track of what thread is running the event loop,
# (if it is run in a background thread)
self.event_loop_thread = None
def _vir_event_loop_native_run(self):
# Directly run the event loop in the current thread
while True:
libvirt.virEventRunDefaultImpl()
def _vir_event_loop_native_start(self):
libvirt.virEventRegisterDefaultImpl()
self.event_loop_thread = threading.Thread(
target=self._vir_event_loop_native_run,
name="lib_virt_eventLoop")
self.event_loop_thread.setDaemon(True)
self.event_loop_thread.start()
def _my_domain_event_callback(self, conn, dom, event, detail, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
event, detail, dom.UUIDString())
def _my_domain_event_reboot_callback(self, conn, dom, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_REBOOT,
-1, -1, dom.UUIDString())
def _my_domain_event_rtc_change_callback(self, conn, dom, utcoffset,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE,
-1, -1, dom.UUIDString())
def _my_domain_event_watchdog_callback(self, conn, dom, action, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG,
action, -1, dom.UUIDString())
def _my_domain_event_io_error_callback(self, conn, dom, srcpath,
devalias, action, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR,
action, -1, dom.UUIDString())
def _my_domain_event_graphics_callback(self, conn, dom, phase, localAddr,
remoteAddr, authScheme, subject,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS,
-1, phase, dom.UUIDString())
def _my_domain_event_disk_change_callback(self, conn, dom, oldSrcPath,
newSrcPath, devAlias, reason,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE,
-1, -1, dom.UUIDString())
def _my_domain_event_io_error_reason_callback(self, conn, dom, srcPath,
devAlias, action, reason,
opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON,
-1, -1, dom.UUIDString())
def _my_domain_event_generic_callback(self, conn, dom, opaque):
self.evf.vir_event_filter(libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR,
-1, -1, dom.UUIDString())
def _err_handler(self, ctxt, err):
LOG.warning("Error from libvirt : %s", err[2])
def _virt_event(self, uri):
# Run a background thread with the event loop
self._vir_event_loop_native_start()
event_callback_handlers = {
libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE:
self._my_domain_event_callback,
libvirt.VIR_DOMAIN_EVENT_ID_REBOOT:
self._my_domain_event_reboot_callback,
libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE:
self._my_domain_event_rtc_change_callback,
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR:
self._my_domain_event_io_error_callback,
libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG:
self._my_domain_event_watchdog_callback,
libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS:
self._my_domain_event_graphics_callback,
libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE:
self._my_domain_event_disk_change_callback,
libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR_REASON:
self._my_domain_event_io_error_reason_callback,
libvirt.VIR_DOMAIN_EVENT_ID_CONTROL_ERROR:
self._my_domain_event_generic_callback
}
# Connect to libvert - If be disconnected, reprocess.
self.running = True
while self.running:
vc = libvirt.openReadOnly(uri)
# Event callback settings
callback_ids = []
for event, callback in event_callback_handlers.items():
cid = vc.domainEventRegisterAny(None, event, callback, None)
callback_ids.append(cid)
# Connection monitoring.
vc.setKeepAlive(5, 3)
while vc.isAlive() == 1 and self.running:
eventlet.greenthread.sleep(1)
# If connection between libvirtd was lost,
# clear callback connection.
LOG.warning("Libvirt Connection Closed Unexpectedly.")
for cid in callback_ids:
try:
vc.domainEventDeregisterAny(cid)
except Exception:
pass
vc.close()
del vc
time.sleep(3)
def stop(self):
self.running = False
def main(self):
"""Main method.
Set the URI, error handler, and executes event loop processing.
"""
uri = "qemu:///system"
LOG.debug("Using uri:" + uri)
# set error handler & do event loop
libvirt.registerErrorHandler(self._err_handler, '_virt_event')
self._virt_event(uri)