Multiple changes to Recorder

- Add start_periodic - start and return a periodic task
- Make it possible to run multiple engines pr Recorder.
- Remove self.config and use cfg.CONF[self.name] instead
- Pass record service down to the engine

Change-Id: I62cc54cca51bf11eda0acc55f418e56af23c3f04
This commit is contained in:
Endre Karlson 2012-11-23 09:39:48 +01:00
parent 9c5e39402b
commit 50c8d61801
5 changed files with 81 additions and 16 deletions

View File

@ -21,7 +21,7 @@ import eventlet
from bufunfa.openstack.common import cfg
from bufunfa.openstack.common import log
from bufunfa.openstack.common import service
from bufunfa.recorder.service import Service
from bufunfa.recorder.service import RecordService
eventlet.monkey_patch()
@ -36,5 +36,5 @@ cfg.CONF(sys.argv[1:], project="bufunfa", prog="bufunfa-recorder-sync",
log.setup('bufunfa')
launcher = service.launch(Service())
launcher = service.launch(RecordService())
launcher.wait()

View File

@ -18,6 +18,7 @@ from stevedore import driver
from bufunfa.openstack.common import cfg
from bufunfa.openstack.common import log as logging
from bufunfa.openstack.common.loopingcall import LoopingCall
from bufunfa import exceptions
@ -32,8 +33,8 @@ class Plugin(object):
def __init__(self):
self.name = self.get_canonical_name()
self.config = cfg.CONF[self.name]
LOG.debug("Loaded plugin %s", self.name)
self.tasks = []
def is_enabled(self):
"""
@ -129,3 +130,13 @@ class Plugin(object):
"""
Stop this plugin from doing anything
"""
for task in self.tasks:
task.stop()
def start_periodic(self, func, interval, initial_delay=None,
raise_on_error=False):
initial_delay = initial_delay or interval
task = LoopingCall(func)
task.start(interval=interval, initial_delay=initial_delay)
return task

View File

@ -25,16 +25,17 @@ class RecordEngine(Plugin):
Base Record engine for getting Records from external systems
"""
__plugin_type__ = 'recorder'
__plugin_ns__ = 'bufunfa.recorder'
def __init__(self):
def __init__(self, record_service):
super(RecordEngine, self).__init__()
self.admin_context = get_admin_context()
self.record_service = record_service
@classmethod
def get_opts(cls):
return [
cfg.IntOpt('poll_age', default=86400,
help='How far back to pull data from'),
cfg.IntOpt('poll_interval', default=60),
cfg.BoolOpt('record_audit_logging', default=False,
help='Logs individual records pr get_records()')]

View File

@ -32,6 +32,11 @@ LOG = log.getLogger(__name__)
class RecordEngine(OpenstackEngine):
__plugin_name__ = 'ceilometer'
def start(self):
self.periodic = self.start_periodic(
self.process_records,
cfg.CONF[self.name].poll_interval)
def get_client(self):
"""
Get a ceilometerclient
@ -66,6 +71,9 @@ class RecordEngine(OpenstackEngine):
central_api.set_polled_at(self.admin_context, project_id, started)
self.record_service.publish_records(self.admin_context,
project_records)
def get_poll_start(self, project_id):
"""
Get poll start time
@ -155,7 +163,7 @@ class RecordEngine(OpenstackEngine):
end_timestamp=duration_info.get('end_timestamp'),
duration=duration_info.get('duration')
)
if self.config.record_audit_logging:
if cfg.CONF[self.name].record_audit_logging:
LOG.debug("Record: %s", record)
return record

View File

@ -14,12 +14,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from stevedore.named import NamedExtensionManager
from bufunfa.openstack.common import cfg
from bufunfa.openstack.common import log
from bufunfa.service import PeriodicService
from bufunfa.recorder import get_plugin
from bufunfa.openstack.common.context import get_admin_context
from bufunfa.openstack.common.rpc.service import Service
from bufunfa.central import api as central_api
from bufunfa.recorder.base import RecordEngine
LOG = log.getLogger(__name__)
@ -46,16 +48,59 @@ CLI_OPTIONS = [
cfg.CONF.register_cli_opts(CLI_OPTIONS)
class Service(PeriodicService):
cfg.CONF.register_opts([
cfg.ListOpt('record-engines', default=[], help="What engines to enable")
])
class RecordService(Service):
def __init__(self, *args, **kw):
kw.update(
host=cfg.CONF.host,
topic=cfg.CONF.worker_topic)
super(Service, self).__init__(*args, **kw)
self.plugin = get_plugin(cfg.CONF)
super(RecordService, self).__init__(*args, **kw)
def periodic_tasks(self, context, raise_on_error=False):
records = self.plugin.process_records()
if records and len(records) >= 1:
central_api.process_records(context, records)
self.admin_context = get_admin_context()
self.engines = self._init_extensions()
def _init_extensions(self):
""" Loads and prepares all enabled extensions """
self.extensions_manager = NamedExtensionManager(
RecordEngine.__plugin_ns__, names=cfg.CONF.record_engines)
def _load_extension(ext):
handler_cls = ext.plugin
handler_cls.register_opts(cfg.CONF)
return handler_cls(record_service=self)
try:
return self.extensions_manager.map(_load_extension)
except RuntimeError:
# No handlers enabled. No problem.
return []
def start(self):
"""
Start underlying engines
"""
super(RecordService, self).start()
for engine in self.engines:
engine.start()
def stop(self):
"""
Stop underlying engines
"""
super(RecordService, self).stop()
for engine in self.engines:
engine.stop()
def publish_records(self, context, records):
"""
Publish a record to the central service
:param record: The record
"""
return central_api.process_records(context, records)