From 25356f14053149267311bb1bd824d98923449cc3 Mon Sep 17 00:00:00 2001 From: Omer Anson Date: Tue, 29 Aug 2017 16:50:30 +0300 Subject: [PATCH] Adding RethinkDB driver Closes-Bug: #1625206 Closes-Bug: #1527217 Partial-Bug: #1530877 Change-Id: I2e592b752a51cd3651be6c020ca052f57fa9efef --- devstack/plugin.sh | 8 ++ devstack/rethinkdb_driver | 88 +++++++++++++ dragonflow/conf/df_rethinkdb.py | 36 ++++++ dragonflow/db/drivers/rethink_db_driver.py | 137 +++++++++++++++++++++ setup.cfg | 1 + 5 files changed, 270 insertions(+) create mode 100644 devstack/rethinkdb_driver create mode 100644 dragonflow/conf/df_rethinkdb.py create mode 100644 dragonflow/db/drivers/rethink_db_driver.py diff --git a/devstack/plugin.sh b/devstack/plugin.sh index ae7de567e..39db41dd8 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -103,11 +103,19 @@ if is_service_enabled df-zookeeper ; then source $DEST/dragonflow/devstack/zookeeper_driver NB_DRIVER_CLASS="zookeeper_nb_db_driver" fi + if is_service_enabled df-cassandra ; then is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow." source $DEST/dragonflow/devstack/cassandra_driver NB_DRIVER_CLASS="cassandra_nb_db_driver" fi + +if is_service_enabled df-rethinkdb ; then + is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow." + source $DEST/dragonflow/devstack/rethinkdb_driver + NB_DRIVER_CLASS="rethinkdb_nb_db_driver" +fi + if is_service_enabled df-redis ; then is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow." source $DEST/dragonflow/devstack/redis_driver diff --git a/devstack/rethinkdb_driver b/devstack/rethinkdb_driver new file mode 100644 index 000000000..1c7bec2ac --- /dev/null +++ b/devstack/rethinkdb_driver @@ -0,0 +1,88 @@ +#!/bin/bash +# +# +# ``plugin.sh`` calls the following methods in the sourced driver: +# +# - nb_db_driver_install_server +# - nb_db_driver_install_client +# - nb_db_driver_start_server +# - nb_db_driver_stop_server +# - nb_db_driver_clean +RETHINKDB_IP=${RETHINKDB_IP:-"$HOST_IP"} +RETHINKDB_PORT=${RETHINKDB_PORT:-'4001'} + +function nb_db_driver_install_server { + if is_service_enabled df-rethinkdb-server ; then + echo "Installing RethinkDB Server" + if is_ubuntu || is_fedora; then + if is_ubuntu; then + source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list + wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add - + sudo apt-get update + sudo apt-get install rethinkdb + elif is_fedora; then + sudo wget https://download.rethinkdb.com/centos/7/$(uname -m)/rethinkdb.repo \ + -O /etc/yum.repos.d/rethinkdb.repo + sudo dnf install -y rethinkdb + fi + echo "Configuring Rethingdb server" + sudo sh -c "cat > /etc/rethinkdb/instances.d/dragonflow.conf" << EOF +bind=all +driver-port=${RETHINKDB_PORT} +EOF + echo "starting rethinkdb" + start_service rethinkdb + until pids=$(pidof rethinkdb); do + echo "sleep 1, waiting for rethinkdb to start" + sleep 1 + done + echo "sleep 5, waiting for rethinkdb to start" + echo 'Creating dragonflow database' + sleep 5 + python -c " +import rethinkdb as r +r.connect('$RETHINKDB_IP', $RETHINKDB_PORT).repl() +try: + r.db_drop('dragonflow').run() +except r.errors.ReqlOpFailedError: + pass # Database probably doesn't exist +r.db_create('dragonflow').run() + " + stop_service rethinkdb + else + die $LINENO "Warning - RethinkDB is currently supported only for Ubuntu and Fedora. Any other distros are currently not supported." + fi + fi +} + +function nb_db_driver_install_client { + # We can't actually install rethinkdb due to licensing issues + python -c 'import rethinkdb' > /dev/null || die "rethinkdb python client not install. Please install manually" + echo "WARNING: You have to install python's rethingdb yourself" + echo >&2 "WARNING: You have to install python's rethingdb yourself" +} + +function nb_db_driver_start_server { + if is_service_enabled df-rethinkdb-server ; then + start_service rethinkdb + until pids=$(pidof rethinkdb); do + sleep 1 + echo "sleep 1, waiting for rethinkdb to start" + done + fi +} + +function nb_db_driver_stop_server { + if is_service_enabled df-rethinkdb-server ; then + stop_service rethinkdb + fi +} + +function nb_db_driver_status_server +{ + TEMP_PIDS=`ps cax | grep rethinkdb` + if [ -z "$TEMP_PIDS" ]; then + return 1 + fi + return 0 +} diff --git a/dragonflow/conf/df_rethinkdb.py b/dragonflow/conf/df_rethinkdb.py new file mode 100644 index 000000000..e39e713c1 --- /dev/null +++ b/dragonflow/conf/df_rethinkdb.py @@ -0,0 +1,36 @@ +# Copyright (c) 2016 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from oslo_config import cfg + +from dragonflow._i18n import _ + + +df_rethinkdb_opts = [ + cfg.IntOpt( + 'connection_pool_size', + default=10, + help=_('The maximum number of concurrent connections to the database'), + ), +] + + +def register_opts(): + cfg.CONF.register_opts(df_rethinkdb_opts, group='df_rethinkdb') + + +def list_opts(): + return {'df_rethinkdb': df_rethinkdb_opts} diff --git a/dragonflow/db/drivers/rethink_db_driver.py b/dragonflow/db/drivers/rethink_db_driver.py new file mode 100644 index 000000000..c81b49301 --- /dev/null +++ b/dragonflow/db/drivers/rethink_db_driver.py @@ -0,0 +1,137 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import contextlib +import threading + +from eventlet import queue +import rethinkdb as rdb + +from dragonflow.common import exceptions +from dragonflow import conf as cfg +from dragonflow.db import db_api + +_DF_DATABASE = 'dragonflow' + + +class RethinkDbDriver(db_api.DbApi): + def __init__(self): + super(RethinkDbDriver, self).__init__() + self._pool = queue.Queue() + self._pool_size = 0 + self._pool_lock = threading.Lock() + + def _create_connection(self): + return rdb.connect(host=self._db_host, port=self._db_port, + db=_DF_DATABASE) + + @contextlib.contextmanager + def _get_conn(self): + with self._pool_lock: + conn_pool_size = cfg.CONF.df_rethinkdb.connection_pool_size + if self._pool.empty() and self._pool_size < conn_pool_size: + conn = self._create_connection() + self._pool_size += 1 + else: + conn = None + try: + if conn is None: + conn = self._pool.get() + yield conn + finally: + self._pool.put(conn) + + def initialize(self, db_ip, db_port, **args): + self._db_host = db_ip + self._db_port = db_port + + def create_table(self, table): + with self._get_conn() as conn: + rdb.table_create(table).run(conn) + + def delete_table(self, table): + with self._get_conn() as conn: + rdb.table_drop(table).run(conn) + + def _query_key(self, table, key): + return rdb.table(table).get(key) + + def get_key(self, table, key, topic=None): + with self._get_conn() as conn: + try: + res = self._query_key(table, key).run(conn) + except rdb.errors.ReqlOpFailedError: + res = None + if res is None: + raise exceptions.DBKeyNotFound(key=key) + return res['value'] + + def set_key(self, table, key, value, topic=None): + # FIXME cannot marshall None values + with self._get_conn() as conn: + res = self._query_key(table, key).update({ + 'id': key, + 'value': value, + }).run(conn) + + if res['skipped'] == 1: + raise exceptions.DBKeyNotFound(key=key) + + def create_key(self, table, key, value, topic=None): + with self._get_conn() as conn: + rdb.table(table).insert({ + 'id': key, + 'value': value, + }).run(conn) + + def delete_key(self, table, key, topic=None): + with self._get_conn() as conn: + res = self._query_key(table, key).delete().run(conn) + + if res['skipped'] == 1: + raise exceptions.DBKeyNotFound(key=key) + + def get_all_entries(self, table, topic=None): + with self._get_conn() as conn: + try: + cursor = rdb.table(table).pluck('value').run(conn) + except rdb.errors.ReqlOpFailedError: + return [] + return [entry['value'] for entry in cursor] + + def get_all_keys(self, table, topic=None): + with self._get_conn() as conn: + try: + cursor = rdb.table(table).pluck("id").run(conn) + except rdb.errors.ReqlOpFailedError: + return [] + return [entry['id'] for entry in cursor] + + def allocate_unique_key(self, table_name): + self._ensure_table_exists('unique_key') + with self._get_conn() as conn: + res = rdb.table('unique_key').get(table_name).replace( + lambda post: {'id': table_name, + 'key': post['key'].default(0).add(1)}, + return_changes=True, + ).run(conn) + return res['changes'][0]['new_val']['key'] + + def _ensure_table_exists(self, table): + with self._get_conn() as conn: + if table not in rdb.table_list().run(conn): + rdb.table_create(table).run(conn) + + def process_ha(self): + pass + + def set_neutron_server(self, is_neutron_server): + pass # Not implemented diff --git a/setup.cfg b/setup.cfg index 8a430f470..64621505c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,7 @@ dragonflow.nb_db_driver = zookeeper_nb_db_driver = dragonflow.db.drivers.zookeeper_db_driver:ZookeeperDbDriver redis_nb_db_driver = dragonflow.db.drivers.redis_db_driver:RedisDbDriver cassandra_nb_db_driver = dragonflow.db.drivers.cassandra_db_driver:CassandraDbDriver + rethinkdb_nb_db_driver = dragonflow.db.drivers.rethink_db_driver:RethinkDbDriver _dummy_nb_db_driver = dragonflow.tests.database._dummy_db_driver:_DummyDbDriver dragonflow.neutron_notifier_driver = nb_api_neutron_notifier_driver = dragonflow.db.pubsub_drivers.nb_api_neutron_notifier:NbApiNeutronNotifier