From 0c619c133d3c248d62a2c5f6441d4fae0bf7042a Mon Sep 17 00:00:00 2001 From: Monsyne Dragon Date: Sun, 7 Sep 2014 04:07:20 +0000 Subject: [PATCH] Add database admin command. Add admin command for db schema upgrade/downgrade/etc. Move alembic migrations so above can find them when installed as a package. Fix up packaging to use setup.cfg and pbr. Flesh out README. --- AUTHORS | 1 + ChangeLog | 34 +++++ MANIFEST.in | 10 ++ README.md | 87 +++++++++++ alembic.ini | 51 ------- setup.cfg | 24 +++ setup.py | 38 +---- winchester/db/__init__.py | 4 + winchester/db/alembic_command.py | 142 ++++++++++++++++++ winchester/db/db_admin.py | 35 +++++ winchester/{db.py => db/interface.py} | 0 {alembic => winchester/db/migrations}/README | 0 winchester/db/migrations/__init__.py | 0 {alembic => winchester/db/migrations}/env.py | 11 +- .../db/migrations}/script.py.mako | 0 .../db/migrations}/versions/3ab6d7bf80cd_.py | 0 .../db/migrations}/versions/44289d1492e6_.py | 0 17 files changed, 348 insertions(+), 89 deletions(-) create mode 100644 AUTHORS create mode 100644 ChangeLog delete mode 100644 alembic.ini mode change 100644 => 100755 setup.py create mode 100644 winchester/db/__init__.py create mode 100644 winchester/db/alembic_command.py create mode 100644 winchester/db/db_admin.py rename winchester/{db.py => db/interface.py} (100%) rename {alembic => winchester/db/migrations}/README (100%) create mode 100644 winchester/db/migrations/__init__.py rename {alembic => winchester/db/migrations}/env.py (85%) rename {alembic => winchester/db/migrations}/script.py.mako (100%) rename {alembic => winchester/db/migrations}/versions/3ab6d7bf80cd_.py (100%) rename {alembic => winchester/db/migrations}/versions/44289d1492e6_.py (100%) diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..9ee0aa5 --- /dev/null +++ b/AUTHORS @@ -0,0 +1 @@ +Monsyne Dragon diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 0000000..eb89b0e --- /dev/null +++ b/ChangeLog @@ -0,0 +1,34 @@ +commit a6f84d16036e143b1b605c50b90055a623e3235b +Author: Monsyne Dragon +Date: Thu Sep 4 20:43:41 2014 +0000 + + Fixed a few bugs, added more logging. + + Fixed timestamp bug, and streamstate issue missed in unittests. + Added more logging for pipeline manager. + +commit c2aa498beb14cf0a61066fe1e7df833a16db5733 +Author: Monsyne Dragon +Date: Thu Sep 4 18:05:19 2014 +0000 + + Move yagi handler into winchester codebase. + +commit a8f373e4bf14762ad09a20f8ad9ea543e11c5be7 +Author: Monsyne Dragon +Date: Thu Sep 4 01:49:19 2014 +0000 + + Added full stream processing, pipeline workers, etc. + + Full trigger logic now works. + Added pipeline workers, and test handler. + Added example configs + Lots of unittests. + +commit aa8fb55e879e782268c663f81e73384673d56847 +Author: Monsyne Dragon +Date: Thu Jun 26 01:55:26 2014 +0000 + + Initial commit of DB schema. + + Initial commit of the event schema for the database. + This includes models and alembic migration. \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in index 8afbefe..006b913 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,12 @@ include README.md include requirements.txt +include LICENSE +include winchester/db/migrations/README +include winchester/db/migrations/script.py.mako + +recursive-include etc * +recursive-include winchester/db/migrations/versions * + +exclude .gitignore +exclude .gitreview +global-exclude *.pyc diff --git a/README.md b/README.md index 7df48b2..dc595ff 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,90 @@ winchester An OpenStack notification event processing library based on persistant streams. +Winchester is designed to process event streams, such as those produced from +OpenStack notifications. Events are represented as simple python dictionaries. +They should be flat dictionaries (not nested), with a minimum of three keys: + + "message_id": A unique identifier for this event, such as a uuid. + "event_type": A string identifying the event's type. Usually a hierarchical dotted name like "foo.bar.baz" + "timestamp": Time the event occurred (a python datetime, in UTC) + +The individual keys of the event dictionary are called *traits* and can be +strings, integers, floats or datetimes. For processing of the (often large) +notifications that come out of OpenStack, winchester uses the +[StackDistiller library](https://github.com/StackTach/stackdistiller) to +extract flattened events from the notifications, that only contain the data +you actually need for processing. + +Winchester's processing is done through *triggers* and *pipelines*. + +A *trigger* is composed of a *match_criteria* which is like a +persistant query, collecting events you want to process into a +persistant *stream* (stored in a sql database), a set of distinguishing +traits, which can separate your list of events into distinct streams, +similar to a **GROUP BY** clause in an SQL query, and a *fire_criteria*, +which specifies the conditions a given *stream* has to match for the +trigger to fire. When it does, the events in the *stream* are sent to +a *pipeline* listed as the *fire_pipeline* for processing as a batch. +Also listed is an *expire_timestamp*. If a given stream does not meet +the *fire_criteria* by that time, it is expired, and can be sent to +an *expire_pipeline* for alternate processing. Both *fire_pipeline* +and *expire_pipeline* are optional, but at least one of them must +be specified. + +A *pipeline* is simply a list of simple *handlers*. Each *handler* +in the pipeline receives the list of events in a given stream, +sorted by timestamp, in turn. *Handlers* can filter events from the list, +or add new events to it. These changes will be seen by *handlers* further +down the pipeline. *Handlers* should avoid operations with side-effects, +other than modifying the list of events, as pipeline processing can be +re-tried later if there is an error. Instead, if all handlers process the +list of events without raising an exception, a *commit* call is made on +each handler, giving it the chance to perform actions, like sending data +to external systems. *Handlers* are simple to write, as pretty much any +object that implements the appropriate *handle_events*, *commit* and +*rollback* methods can be a *handler*. + +## Installing and running. + +Winchster is installable as a simple python package. +Once installed, and the appropriate database url is specified in the +*winchester.yaml* config file (example included in the *etc* directory), +you can create the appropriate database schema with: + + winchester_db -c /winchester.yaml upgrade head + +If you need to run the SQL by hand, or just want to look at the schema, the +following will print out the appropriate table creation SQL: + + winchester_db -c /winchester.yaml upgrade --sql head + +Once you have done that, and configured the appropriate *triggers.yaml*, +*pipelines.yaml*, and, if using StackDistiller, *event_definitions.yaml* configs +(again, examples are in *etc* in the winchester codebase), you can add events +into the system by calling the *add_event* method of Winchester's TriggerManager. +If you are processing OpenStack notifications, you can call *add_notification*, +which will pare down the notification into an event with StackDistiller, and +then call *add_event* with that. If you are reading OpenStack notifications off +of a RabbitMQ queue, there is a plugin for the +[Yagi](https://github.com/rackerlabs/yagi) notification processor included with +Winchester. Simply add "winchester.yagi\_handler.WinchesterHandler" to the "apps" +line in your *yagi.conf* section for the queues you want to listen to, and add a: + + [winchester] + config_file = /winchester.yaml + +section to the *yagi.conf*. + +To run the actual pipeline processing, which is run as a separate daemon, run: + + pipeline_worker -c /winchester.yaml + +You can pass the *-d* flag to the *pipeline_worker* to tell it to run as a background +daemon. + +Winchester uses an optimistic locking scheme in the database to coordinate firing, +expiring, and processing of streams, so you can run as many processes (like +Yagi's *yagi-event* daemon) feeding TriggerManagers as you need to handle the +incoming events, and as many *pipeline_worker*s as you need to handle the resulting +processing load, scaling the system horizontally. diff --git a/alembic.ini b/alembic.ini deleted file mode 100644 index 4590e00..0000000 --- a/alembic.ini +++ /dev/null @@ -1,51 +0,0 @@ -# A generic, single database configuration. - -[alembic] -# path to migration scripts -script_location = alembic - -# template used to generate migration files -# file_template = %%(rev)s_%%(slug)s - -# set to 'true' to run the environment during -# the 'revision' command, regardless of autogenerate -# revision_environment = false - -#sqlalchemy.url = driver://user:pass@localhost/dbname -sqlalchemy.url = mysql://winchester:testpasswd@localhost/winchester - - -# Logging configuration -[loggers] -keys = root,sqlalchemy,alembic - -[handlers] -keys = console - -[formatters] -keys = generic - -[logger_root] -level = WARN -handlers = console -qualname = - -[logger_sqlalchemy] -level = WARN -handlers = -qualname = sqlalchemy.engine - -[logger_alembic] -level = INFO -handlers = -qualname = alembic - -[handler_console] -class = StreamHandler -args = (sys.stderr,) -level = NOTSET -formatter = generic - -[formatter_generic] -format = %(levelname)-5.5s [%(name)s] %(message)s -datefmt = %H:%M:%S diff --git a/setup.cfg b/setup.cfg index 973f459..1636d5b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,28 @@ [metadata] description-file = README.md +name = winchester +version = 0.10 +author = Monsyne Dragon +author_email = mdragon@rackspace.com +summary = An OpenStack notification event processing library. +license = Apache-2 +keywords = + stacktach + event_processing + pipeline + events + notification + openstack + triggers + +classifiers = + Development Status :: 3 - Alpha + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python :: 2.6 + Programming Language :: Python :: 2.7 + +home-page = https://github.com/StackTach/winchester [files] packages = @@ -8,3 +31,4 @@ packages = [entry_points] console_scripts = pipeline_worker = winchester.worker:main + winchester_db=winchester.db.db_admin:main diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index d1922ef..aa2d8a0 --- a/setup.py +++ b/setup.py @@ -1,38 +1,8 @@ -import os -from pip.req import parse_requirements -from setuptools import setup, find_packages - - -def read(fname): - return open(os.path.join(os.path.dirname(__file__), fname)).read() - - -req_file = os.path.join(os.path.dirname(__file__), "requirements.txt") -install_reqs = [str(r.req) for r in parse_requirements(req_file)] +#!/usr/bin/env python +from setuptools import setup setup( - name='winchester', - version='0.10', - author='Monsyne Dragon', - author_email='mdragon@rackspace.com', - description=("An OpenStack notification event processing library."), - license='Apache License (2.0)', - keywords='OpenStack notifications events processing triggers', - packages=find_packages(exclude=['tests']), - classifiers=[ - 'Development Status :: 3 - Alpha', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 2.6', - 'Programming Language :: Python :: 2.7', - ], - url='https://github.com/StackTach/winchester', - scripts=[], - long_description=read('README.md'), - install_requires=install_reqs, - entry_points = { - 'console_scripts': ['pipeline_worker=winchester.worker:main'], - }, - zip_safe=False + setup_requires=['pbr'], + pbr=True, ) diff --git a/winchester/db/__init__.py b/winchester/db/__init__.py new file mode 100644 index 0000000..c30f386 --- /dev/null +++ b/winchester/db/__init__.py @@ -0,0 +1,4 @@ +from winchester.db.interface import DuplicateError, LockError +from winchester.db.interface import DBInterface + + diff --git a/winchester/db/alembic_command.py b/winchester/db/alembic_command.py new file mode 100644 index 0000000..74315d9 --- /dev/null +++ b/winchester/db/alembic_command.py @@ -0,0 +1,142 @@ +from alembic import util, command, config +import argparse +import inspect + + +class AlembicCommandLine(object): + prog = None + description = None + allowed_commands = None + + def __init__(self, prog=None, description=None, allowed_commands=None): + if prog is not None: + self.prog = prog + if description is not None: + self.description = description + if allowed_commands is not None: + self.allowed_commands = allowed_commands + + + self.parser = self.generate_options() + + def add_command_options(self, parser, positional, kwargs): + if 'template' in kwargs: + parser.add_argument("-t", "--template", + default='generic', + type=str, + help="Setup template for use with 'init'") + if 'message' in kwargs: + parser.add_argument("-m", "--message", + type=str, + help="Message string to use with 'revision'") + if 'sql' in kwargs: + parser.add_argument("--sql", + action="store_true", + help="Don't emit SQL to database - dump to " + "standard output/file instead") + if 'tag' in kwargs: + parser.add_argument("--tag", + type=str, + help="Arbitrary 'tag' name - can be used by " + "custom env.py scripts.") + if 'autogenerate' in kwargs: + parser.add_argument("--autogenerate", + action="store_true", + help="Populate revision script with candidate " + "migration operations, based on comparison " + "of database to model.") + # "current" command + if 'head_only' in kwargs: + parser.add_argument("--head-only", + action="store_true", + help="Only show current version and " + "whether or not this is the head revision.") + + if 'rev_range' in kwargs: + parser.add_argument("-r", "--rev-range", + action="store", + help="Specify a revision range; " + "format is [start]:[end]") + + + positional_help = { + 'directory': "location of scripts directory", + 'revision': "revision identifier" + } + for arg in positional: + parser.add_argument(arg, help=positional_help.get(arg)) + + def add_options(self, parser): + parser.add_argument("-c", "--config", + type=str, + default="alembic.ini", + help="Alternate config file") + parser.add_argument("-n", "--name", + type=str, + default="alembic", + help="Name of section in .ini file to " + "use for Alembic config") + parser.add_argument("-x", action="append", + help="Additional arguments consumed by " + "custom env.py scripts, e.g. -x " + "setting1=somesetting -x setting2=somesetting") + + def generate_options(self): + parser = argparse.ArgumentParser(prog=self.prog) + self.add_options(parser) + subparsers = parser.add_subparsers() + + for fn, name, doc, positional, kwarg in self.get_commands(): + subparser = subparsers.add_parser(name, help=doc) + self.add_command_options(subparser, positional, kwarg) + subparser.set_defaults(cmd=(fn, positional, kwarg)) + return parser + + def get_commands(self): + cmds = [] + for fn in [getattr(command, n) for n in dir(command)]: + if (inspect.isfunction(fn) and + fn.__name__[0] != '_' and + fn.__module__ == 'alembic.command'): + + if (self.allowed_commands and + fn.__name__ not in self.allowed_commands): + continue + + spec = inspect.getargspec(fn) + if spec[3]: + positional = spec[0][1:-len(spec[3])] + kwarg = spec[0][-len(spec[3]):] + else: + positional = spec[0][1:] + kwarg = [] + cmds.append((fn, fn.__name__, fn.__doc__, positional, kwarg)) + return cmds + + def get_config(self, options): + return config.Config(file_=options.config, + ini_section=options.name, + cmd_opts=options) + + def run_cmd(self, config, options): + fn, positional, kwarg = options.cmd + + try: + fn(config, *[getattr(options, k) for k in positional], + **dict((k, getattr(options, k)) for k in kwarg)) + except util.CommandError as e: + util.err(str(e)) + + def main(self, argv=None): + options = self.parser.parse_args(argv) + if not hasattr(options, "cmd"): + # see http://bugs.python.org/issue9253, argparse + # behavior changed incompatibly in py3.3 + self.parser.error("too few arguments") + else: + self.run_cmd(self.get_config(options), options) + + +if __name__ == '__main__': + cmdline = AlembicCommandLine() + cmdline.main() diff --git a/winchester/db/db_admin.py b/winchester/db/db_admin.py new file mode 100644 index 0000000..9d7d879 --- /dev/null +++ b/winchester/db/db_admin.py @@ -0,0 +1,35 @@ +import argparse +import alembic +import logging + +from winchester.db.alembic_command import AlembicCommandLine + + +logger = logging.getLogger(__name__) + + +class DBAdminCommandLine(AlembicCommandLine): + description = "Winchester DB admin commandline tool." + + def add_options(self, parser): + parser.add_argument('--config', '-c', + default='winchester.yaml', + type=str, + help='The name of the winchester config file') + + + def get_config(self, options): + alembic_cfg = alembic.config.Config() + alembic_cfg.set_main_option("winchester_config", options.config) + alembic_cfg.set_main_option("script_location", "winchester.db:migrations") + return alembic_cfg + + +def main(): + cmd = DBAdminCommandLine(allowed_commands=['upgrade', 'downgrade', + 'current', 'history', 'stamp']) + cmd.main() + + +if __name__ == '__main__': + main() diff --git a/winchester/db.py b/winchester/db/interface.py similarity index 100% rename from winchester/db.py rename to winchester/db/interface.py diff --git a/alembic/README b/winchester/db/migrations/README similarity index 100% rename from alembic/README rename to winchester/db/migrations/README diff --git a/winchester/db/migrations/__init__.py b/winchester/db/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/alembic/env.py b/winchester/db/migrations/env.py similarity index 85% rename from alembic/env.py rename to winchester/db/migrations/env.py index e82b51f..50aec44 100644 --- a/alembic/env.py +++ b/winchester/db/migrations/env.py @@ -9,15 +9,18 @@ config = context.config # Interpret the config file for Python logging. # This line sets up loggers basically. -fileConfig(config.config_file_name) +#fileConfig(config.config_file_name) # add your model's MetaData object here # for 'autogenerate' support # from myapp import mymodel # target_metadata = mymodel.Base.metadata +from winchester.config import ConfigManager from winchester.models import Base target_metadata = Base.metadata +winchester_config = ConfigManager.load_config_file( + config.get_main_option("winchester_config")) # other values from the config, defined by the needs of env.py, # can be acquired: # my_important_option = config.get_main_option("my_important_option") @@ -35,7 +38,7 @@ def run_migrations_offline(): script output. """ - url = config.get_main_option("sqlalchemy.url") + url = winchester_config['database']['url'] context.configure(url=url) with context.begin_transaction(): @@ -49,8 +52,8 @@ def run_migrations_online(): """ engine = engine_from_config( - config.get_section(config.config_ini_section), - prefix='sqlalchemy.', + winchester_config['database'], + prefix='', poolclass=pool.NullPool) connection = engine.connect() diff --git a/alembic/script.py.mako b/winchester/db/migrations/script.py.mako similarity index 100% rename from alembic/script.py.mako rename to winchester/db/migrations/script.py.mako diff --git a/alembic/versions/3ab6d7bf80cd_.py b/winchester/db/migrations/versions/3ab6d7bf80cd_.py similarity index 100% rename from alembic/versions/3ab6d7bf80cd_.py rename to winchester/db/migrations/versions/3ab6d7bf80cd_.py diff --git a/alembic/versions/44289d1492e6_.py b/winchester/db/migrations/versions/44289d1492e6_.py similarity index 100% rename from alembic/versions/44289d1492e6_.py rename to winchester/db/migrations/versions/44289d1492e6_.py