Adding RethinkDB driver

Closes-Bug: #1625206
Closes-Bug: #1527217
Partial-Bug: #1530877
Change-Id: I2e592b752a51cd3651be6c020ca052f57fa9efef
This commit is contained in:
Omer Anson 2017-08-29 16:50:30 +03:00
parent ef409d8e29
commit 25356f1405
5 changed files with 270 additions and 0 deletions

View File

@ -103,11 +103,19 @@ if is_service_enabled df-zookeeper ; then
source $DEST/dragonflow/devstack/zookeeper_driver
NB_DRIVER_CLASS="zookeeper_nb_db_driver"
fi
if is_service_enabled df-cassandra ; then
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
source $DEST/dragonflow/devstack/cassandra_driver
NB_DRIVER_CLASS="cassandra_nb_db_driver"
fi
if is_service_enabled df-rethinkdb ; then
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
source $DEST/dragonflow/devstack/rethinkdb_driver
NB_DRIVER_CLASS="rethinkdb_nb_db_driver"
fi
if is_service_enabled df-redis ; then
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
source $DEST/dragonflow/devstack/redis_driver

88
devstack/rethinkdb_driver Normal file
View File

@ -0,0 +1,88 @@
#!/bin/bash
#
#
# ``plugin.sh`` calls the following methods in the sourced driver:
#
# - nb_db_driver_install_server
# - nb_db_driver_install_client
# - nb_db_driver_start_server
# - nb_db_driver_stop_server
# - nb_db_driver_clean
RETHINKDB_IP=${RETHINKDB_IP:-"$HOST_IP"}
RETHINKDB_PORT=${RETHINKDB_PORT:-'4001'}
function nb_db_driver_install_server {
if is_service_enabled df-rethinkdb-server ; then
echo "Installing RethinkDB Server"
if is_ubuntu || is_fedora; then
if is_ubuntu; then
source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install rethinkdb
elif is_fedora; then
sudo wget https://download.rethinkdb.com/centos/7/$(uname -m)/rethinkdb.repo \
-O /etc/yum.repos.d/rethinkdb.repo
sudo dnf install -y rethinkdb
fi
echo "Configuring Rethingdb server"
sudo sh -c "cat > /etc/rethinkdb/instances.d/dragonflow.conf" << EOF
bind=all
driver-port=${RETHINKDB_PORT}
EOF
echo "starting rethinkdb"
start_service rethinkdb
until pids=$(pidof rethinkdb); do
echo "sleep 1, waiting for rethinkdb to start"
sleep 1
done
echo "sleep 5, waiting for rethinkdb to start"
echo 'Creating dragonflow database'
sleep 5
python -c "
import rethinkdb as r
r.connect('$RETHINKDB_IP', $RETHINKDB_PORT).repl()
try:
r.db_drop('dragonflow').run()
except r.errors.ReqlOpFailedError:
pass # Database probably doesn't exist
r.db_create('dragonflow').run()
"
stop_service rethinkdb
else
die $LINENO "Warning - RethinkDB is currently supported only for Ubuntu and Fedora. Any other distros are currently not supported."
fi
fi
}
function nb_db_driver_install_client {
# We can't actually install rethinkdb due to licensing issues
python -c 'import rethinkdb' > /dev/null || die "rethinkdb python client not install. Please install manually"
echo "WARNING: You have to install python's rethingdb yourself"
echo >&2 "WARNING: You have to install python's rethingdb yourself"
}
function nb_db_driver_start_server {
if is_service_enabled df-rethinkdb-server ; then
start_service rethinkdb
until pids=$(pidof rethinkdb); do
sleep 1
echo "sleep 1, waiting for rethinkdb to start"
done
fi
}
function nb_db_driver_stop_server {
if is_service_enabled df-rethinkdb-server ; then
stop_service rethinkdb
fi
}
function nb_db_driver_status_server
{
TEMP_PIDS=`ps cax | grep rethinkdb`
if [ -z "$TEMP_PIDS" ]; then
return 1
fi
return 0
}

View File

@ -0,0 +1,36 @@
# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from dragonflow._i18n import _
df_rethinkdb_opts = [
cfg.IntOpt(
'connection_pool_size',
default=10,
help=_('The maximum number of concurrent connections to the database'),
),
]
def register_opts():
cfg.CONF.register_opts(df_rethinkdb_opts, group='df_rethinkdb')
def list_opts():
return {'df_rethinkdb': df_rethinkdb_opts}

View File

@ -0,0 +1,137 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import threading
from eventlet import queue
import rethinkdb as rdb
from dragonflow.common import exceptions
from dragonflow import conf as cfg
from dragonflow.db import db_api
_DF_DATABASE = 'dragonflow'
class RethinkDbDriver(db_api.DbApi):
def __init__(self):
super(RethinkDbDriver, self).__init__()
self._pool = queue.Queue()
self._pool_size = 0
self._pool_lock = threading.Lock()
def _create_connection(self):
return rdb.connect(host=self._db_host, port=self._db_port,
db=_DF_DATABASE)
@contextlib.contextmanager
def _get_conn(self):
with self._pool_lock:
conn_pool_size = cfg.CONF.df_rethinkdb.connection_pool_size
if self._pool.empty() and self._pool_size < conn_pool_size:
conn = self._create_connection()
self._pool_size += 1
else:
conn = None
try:
if conn is None:
conn = self._pool.get()
yield conn
finally:
self._pool.put(conn)
def initialize(self, db_ip, db_port, **args):
self._db_host = db_ip
self._db_port = db_port
def create_table(self, table):
with self._get_conn() as conn:
rdb.table_create(table).run(conn)
def delete_table(self, table):
with self._get_conn() as conn:
rdb.table_drop(table).run(conn)
def _query_key(self, table, key):
return rdb.table(table).get(key)
def get_key(self, table, key, topic=None):
with self._get_conn() as conn:
try:
res = self._query_key(table, key).run(conn)
except rdb.errors.ReqlOpFailedError:
res = None
if res is None:
raise exceptions.DBKeyNotFound(key=key)
return res['value']
def set_key(self, table, key, value, topic=None):
# FIXME cannot marshall None values
with self._get_conn() as conn:
res = self._query_key(table, key).update({
'id': key,
'value': value,
}).run(conn)
if res['skipped'] == 1:
raise exceptions.DBKeyNotFound(key=key)
def create_key(self, table, key, value, topic=None):
with self._get_conn() as conn:
rdb.table(table).insert({
'id': key,
'value': value,
}).run(conn)
def delete_key(self, table, key, topic=None):
with self._get_conn() as conn:
res = self._query_key(table, key).delete().run(conn)
if res['skipped'] == 1:
raise exceptions.DBKeyNotFound(key=key)
def get_all_entries(self, table, topic=None):
with self._get_conn() as conn:
try:
cursor = rdb.table(table).pluck('value').run(conn)
except rdb.errors.ReqlOpFailedError:
return []
return [entry['value'] for entry in cursor]
def get_all_keys(self, table, topic=None):
with self._get_conn() as conn:
try:
cursor = rdb.table(table).pluck("id").run(conn)
except rdb.errors.ReqlOpFailedError:
return []
return [entry['id'] for entry in cursor]
def allocate_unique_key(self, table_name):
self._ensure_table_exists('unique_key')
with self._get_conn() as conn:
res = rdb.table('unique_key').get(table_name).replace(
lambda post: {'id': table_name,
'key': post['key'].default(0).add(1)},
return_changes=True,
).run(conn)
return res['changes'][0]['new_val']['key']
def _ensure_table_exists(self, table):
with self._get_conn() as conn:
if table not in rdb.table_list().run(conn):
rdb.table_create(table).run(conn)
def process_ha(self):
pass
def set_neutron_server(self, is_neutron_server):
pass # Not implemented

View File

@ -72,6 +72,7 @@ dragonflow.nb_db_driver =
zookeeper_nb_db_driver = dragonflow.db.drivers.zookeeper_db_driver:ZookeeperDbDriver
redis_nb_db_driver = dragonflow.db.drivers.redis_db_driver:RedisDbDriver
cassandra_nb_db_driver = dragonflow.db.drivers.cassandra_db_driver:CassandraDbDriver
rethinkdb_nb_db_driver = dragonflow.db.drivers.rethink_db_driver:RethinkDbDriver
_dummy_nb_db_driver = dragonflow.tests.database._dummy_db_driver:_DummyDbDriver
dragonflow.neutron_notifier_driver =
nb_api_neutron_notifier_driver = dragonflow.db.pubsub_drivers.nb_api_neutron_notifier:NbApiNeutronNotifier