Adding RethinkDB driver
Closes-Bug: #1625206 Closes-Bug: #1527217 Partial-Bug: #1530877 Change-Id: I2e592b752a51cd3651be6c020ca052f57fa9efef
This commit is contained in:
parent
ef409d8e29
commit
25356f1405
|
@ -103,11 +103,19 @@ if is_service_enabled df-zookeeper ; then
|
|||
source $DEST/dragonflow/devstack/zookeeper_driver
|
||||
NB_DRIVER_CLASS="zookeeper_nb_db_driver"
|
||||
fi
|
||||
|
||||
if is_service_enabled df-cassandra ; then
|
||||
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
|
||||
source $DEST/dragonflow/devstack/cassandra_driver
|
||||
NB_DRIVER_CLASS="cassandra_nb_db_driver"
|
||||
fi
|
||||
|
||||
if is_service_enabled df-rethinkdb ; then
|
||||
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
|
||||
source $DEST/dragonflow/devstack/rethinkdb_driver
|
||||
NB_DRIVER_CLASS="rethinkdb_nb_db_driver"
|
||||
fi
|
||||
|
||||
if is_service_enabled df-redis ; then
|
||||
is_df_db_driver_selected && die $LINENO "More than one database service is set for Dragonflow."
|
||||
source $DEST/dragonflow/devstack/redis_driver
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
# ``plugin.sh`` calls the following methods in the sourced driver:
|
||||
#
|
||||
# - nb_db_driver_install_server
|
||||
# - nb_db_driver_install_client
|
||||
# - nb_db_driver_start_server
|
||||
# - nb_db_driver_stop_server
|
||||
# - nb_db_driver_clean
|
||||
RETHINKDB_IP=${RETHINKDB_IP:-"$HOST_IP"}
|
||||
RETHINKDB_PORT=${RETHINKDB_PORT:-'4001'}
|
||||
|
||||
function nb_db_driver_install_server {
|
||||
if is_service_enabled df-rethinkdb-server ; then
|
||||
echo "Installing RethinkDB Server"
|
||||
if is_ubuntu || is_fedora; then
|
||||
if is_ubuntu; then
|
||||
source /etc/lsb-release && echo "deb http://download.rethinkdb.com/apt $DISTRIB_CODENAME main" | sudo tee /etc/apt/sources.list.d/rethinkdb.list
|
||||
wget -qO- http://download.rethinkdb.com/apt/pubkey.gpg | sudo apt-key add -
|
||||
sudo apt-get update
|
||||
sudo apt-get install rethinkdb
|
||||
elif is_fedora; then
|
||||
sudo wget https://download.rethinkdb.com/centos/7/$(uname -m)/rethinkdb.repo \
|
||||
-O /etc/yum.repos.d/rethinkdb.repo
|
||||
sudo dnf install -y rethinkdb
|
||||
fi
|
||||
echo "Configuring Rethingdb server"
|
||||
sudo sh -c "cat > /etc/rethinkdb/instances.d/dragonflow.conf" << EOF
|
||||
bind=all
|
||||
driver-port=${RETHINKDB_PORT}
|
||||
EOF
|
||||
echo "starting rethinkdb"
|
||||
start_service rethinkdb
|
||||
until pids=$(pidof rethinkdb); do
|
||||
echo "sleep 1, waiting for rethinkdb to start"
|
||||
sleep 1
|
||||
done
|
||||
echo "sleep 5, waiting for rethinkdb to start"
|
||||
echo 'Creating dragonflow database'
|
||||
sleep 5
|
||||
python -c "
|
||||
import rethinkdb as r
|
||||
r.connect('$RETHINKDB_IP', $RETHINKDB_PORT).repl()
|
||||
try:
|
||||
r.db_drop('dragonflow').run()
|
||||
except r.errors.ReqlOpFailedError:
|
||||
pass # Database probably doesn't exist
|
||||
r.db_create('dragonflow').run()
|
||||
"
|
||||
stop_service rethinkdb
|
||||
else
|
||||
die $LINENO "Warning - RethinkDB is currently supported only for Ubuntu and Fedora. Any other distros are currently not supported."
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_install_client {
|
||||
# We can't actually install rethinkdb due to licensing issues
|
||||
python -c 'import rethinkdb' > /dev/null || die "rethinkdb python client not install. Please install manually"
|
||||
echo "WARNING: You have to install python's rethingdb yourself"
|
||||
echo >&2 "WARNING: You have to install python's rethingdb yourself"
|
||||
}
|
||||
|
||||
function nb_db_driver_start_server {
|
||||
if is_service_enabled df-rethinkdb-server ; then
|
||||
start_service rethinkdb
|
||||
until pids=$(pidof rethinkdb); do
|
||||
sleep 1
|
||||
echo "sleep 1, waiting for rethinkdb to start"
|
||||
done
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_stop_server {
|
||||
if is_service_enabled df-rethinkdb-server ; then
|
||||
stop_service rethinkdb
|
||||
fi
|
||||
}
|
||||
|
||||
function nb_db_driver_status_server
|
||||
{
|
||||
TEMP_PIDS=`ps cax | grep rethinkdb`
|
||||
if [ -z "$TEMP_PIDS" ]; then
|
||||
return 1
|
||||
fi
|
||||
return 0
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from dragonflow._i18n import _
|
||||
|
||||
|
||||
df_rethinkdb_opts = [
|
||||
cfg.IntOpt(
|
||||
'connection_pool_size',
|
||||
default=10,
|
||||
help=_('The maximum number of concurrent connections to the database'),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def register_opts():
|
||||
cfg.CONF.register_opts(df_rethinkdb_opts, group='df_rethinkdb')
|
||||
|
||||
|
||||
def list_opts():
|
||||
return {'df_rethinkdb': df_rethinkdb_opts}
|
|
@ -0,0 +1,137 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
from eventlet import queue
|
||||
import rethinkdb as rdb
|
||||
|
||||
from dragonflow.common import exceptions
|
||||
from dragonflow import conf as cfg
|
||||
from dragonflow.db import db_api
|
||||
|
||||
_DF_DATABASE = 'dragonflow'
|
||||
|
||||
|
||||
class RethinkDbDriver(db_api.DbApi):
|
||||
def __init__(self):
|
||||
super(RethinkDbDriver, self).__init__()
|
||||
self._pool = queue.Queue()
|
||||
self._pool_size = 0
|
||||
self._pool_lock = threading.Lock()
|
||||
|
||||
def _create_connection(self):
|
||||
return rdb.connect(host=self._db_host, port=self._db_port,
|
||||
db=_DF_DATABASE)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _get_conn(self):
|
||||
with self._pool_lock:
|
||||
conn_pool_size = cfg.CONF.df_rethinkdb.connection_pool_size
|
||||
if self._pool.empty() and self._pool_size < conn_pool_size:
|
||||
conn = self._create_connection()
|
||||
self._pool_size += 1
|
||||
else:
|
||||
conn = None
|
||||
try:
|
||||
if conn is None:
|
||||
conn = self._pool.get()
|
||||
yield conn
|
||||
finally:
|
||||
self._pool.put(conn)
|
||||
|
||||
def initialize(self, db_ip, db_port, **args):
|
||||
self._db_host = db_ip
|
||||
self._db_port = db_port
|
||||
|
||||
def create_table(self, table):
|
||||
with self._get_conn() as conn:
|
||||
rdb.table_create(table).run(conn)
|
||||
|
||||
def delete_table(self, table):
|
||||
with self._get_conn() as conn:
|
||||
rdb.table_drop(table).run(conn)
|
||||
|
||||
def _query_key(self, table, key):
|
||||
return rdb.table(table).get(key)
|
||||
|
||||
def get_key(self, table, key, topic=None):
|
||||
with self._get_conn() as conn:
|
||||
try:
|
||||
res = self._query_key(table, key).run(conn)
|
||||
except rdb.errors.ReqlOpFailedError:
|
||||
res = None
|
||||
if res is None:
|
||||
raise exceptions.DBKeyNotFound(key=key)
|
||||
return res['value']
|
||||
|
||||
def set_key(self, table, key, value, topic=None):
|
||||
# FIXME cannot marshall None values
|
||||
with self._get_conn() as conn:
|
||||
res = self._query_key(table, key).update({
|
||||
'id': key,
|
||||
'value': value,
|
||||
}).run(conn)
|
||||
|
||||
if res['skipped'] == 1:
|
||||
raise exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
def create_key(self, table, key, value, topic=None):
|
||||
with self._get_conn() as conn:
|
||||
rdb.table(table).insert({
|
||||
'id': key,
|
||||
'value': value,
|
||||
}).run(conn)
|
||||
|
||||
def delete_key(self, table, key, topic=None):
|
||||
with self._get_conn() as conn:
|
||||
res = self._query_key(table, key).delete().run(conn)
|
||||
|
||||
if res['skipped'] == 1:
|
||||
raise exceptions.DBKeyNotFound(key=key)
|
||||
|
||||
def get_all_entries(self, table, topic=None):
|
||||
with self._get_conn() as conn:
|
||||
try:
|
||||
cursor = rdb.table(table).pluck('value').run(conn)
|
||||
except rdb.errors.ReqlOpFailedError:
|
||||
return []
|
||||
return [entry['value'] for entry in cursor]
|
||||
|
||||
def get_all_keys(self, table, topic=None):
|
||||
with self._get_conn() as conn:
|
||||
try:
|
||||
cursor = rdb.table(table).pluck("id").run(conn)
|
||||
except rdb.errors.ReqlOpFailedError:
|
||||
return []
|
||||
return [entry['id'] for entry in cursor]
|
||||
|
||||
def allocate_unique_key(self, table_name):
|
||||
self._ensure_table_exists('unique_key')
|
||||
with self._get_conn() as conn:
|
||||
res = rdb.table('unique_key').get(table_name).replace(
|
||||
lambda post: {'id': table_name,
|
||||
'key': post['key'].default(0).add(1)},
|
||||
return_changes=True,
|
||||
).run(conn)
|
||||
return res['changes'][0]['new_val']['key']
|
||||
|
||||
def _ensure_table_exists(self, table):
|
||||
with self._get_conn() as conn:
|
||||
if table not in rdb.table_list().run(conn):
|
||||
rdb.table_create(table).run(conn)
|
||||
|
||||
def process_ha(self):
|
||||
pass
|
||||
|
||||
def set_neutron_server(self, is_neutron_server):
|
||||
pass # Not implemented
|
|
@ -72,6 +72,7 @@ dragonflow.nb_db_driver =
|
|||
zookeeper_nb_db_driver = dragonflow.db.drivers.zookeeper_db_driver:ZookeeperDbDriver
|
||||
redis_nb_db_driver = dragonflow.db.drivers.redis_db_driver:RedisDbDriver
|
||||
cassandra_nb_db_driver = dragonflow.db.drivers.cassandra_db_driver:CassandraDbDriver
|
||||
rethinkdb_nb_db_driver = dragonflow.db.drivers.rethink_db_driver:RethinkDbDriver
|
||||
_dummy_nb_db_driver = dragonflow.tests.database._dummy_db_driver:_DummyDbDriver
|
||||
dragonflow.neutron_notifier_driver =
|
||||
nb_api_neutron_notifier_driver = dragonflow.db.pubsub_drivers.nb_api_neutron_notifier:NbApiNeutronNotifier
|
||||
|
|
Loading…
Reference in New Issue