Merge "Move sqlite code in common module"
This commit is contained in:
commit
0277b24427
|
@ -0,0 +1,90 @@
|
||||||
|
# Copyright 2023 OpenStack Foundation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Common code which will be used in SQLite and centralzed_db driver until SQLite
|
||||||
|
driver is removed from glance.
|
||||||
|
"""
|
||||||
|
from contextlib import contextmanager
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
from eventlet import sleep
|
||||||
|
from eventlet import timeout
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from glance.i18n import _LE
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
DEFAULT_SQL_CALL_TIMEOUT = 2
|
||||||
|
|
||||||
|
|
||||||
|
def dict_factory(cur, row):
|
||||||
|
return {col[0]: row[idx] for idx, col in enumerate(cur.description)}
|
||||||
|
|
||||||
|
|
||||||
|
class SqliteConnection(sqlite3.Connection):
|
||||||
|
|
||||||
|
"""
|
||||||
|
SQLite DB Connection handler that plays well with eventlet,
|
||||||
|
slightly modified from Swift's similar code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.timeout_seconds = kwargs.get('timeout', DEFAULT_SQL_CALL_TIMEOUT)
|
||||||
|
kwargs['timeout'] = 0
|
||||||
|
sqlite3.Connection.__init__(self, *args, **kwargs)
|
||||||
|
|
||||||
|
def _timeout(self, call):
|
||||||
|
with timeout.Timeout(self.timeout_seconds):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return call()
|
||||||
|
except sqlite3.OperationalError as e:
|
||||||
|
if 'locked' not in str(e):
|
||||||
|
raise
|
||||||
|
sleep(0.05)
|
||||||
|
|
||||||
|
def execute(self, *args, **kwargs):
|
||||||
|
return self._timeout(lambda: sqlite3.Connection.execute(
|
||||||
|
self, *args, **kwargs))
|
||||||
|
|
||||||
|
def commit(self):
|
||||||
|
return self._timeout(lambda: sqlite3.Connection.commit(self))
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def get_db(db_path):
|
||||||
|
"""
|
||||||
|
Returns a context manager that produces a database connection that
|
||||||
|
self-closes and calls rollback if an error occurs while using the
|
||||||
|
database connection
|
||||||
|
"""
|
||||||
|
conn = sqlite3.connect(db_path, check_same_thread=False,
|
||||||
|
factory=SqliteConnection)
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
conn.text_factory = str
|
||||||
|
conn.execute('PRAGMA synchronous = NORMAL')
|
||||||
|
conn.execute('PRAGMA count_changes = OFF')
|
||||||
|
conn.execute('PRAGMA temp_store = MEMORY')
|
||||||
|
try:
|
||||||
|
yield conn
|
||||||
|
except sqlite3.DatabaseError as e:
|
||||||
|
msg = _LE("Error executing SQLite call. Got error: %s") % e
|
||||||
|
LOG.error(msg)
|
||||||
|
conn.rollback()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
|
@ -22,8 +22,6 @@ import sqlite3
|
||||||
import stat
|
import stat
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from eventlet import sleep
|
|
||||||
from eventlet import timeout
|
|
||||||
from oslo_concurrency import lockutils
|
from oslo_concurrency import lockutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
@ -31,8 +29,10 @@ from oslo_utils import excutils
|
||||||
from oslo_utils import fileutils
|
from oslo_utils import fileutils
|
||||||
|
|
||||||
from glance.common import exception
|
from glance.common import exception
|
||||||
from glance.i18n import _, _LE, _LI, _LW
|
from glance.i18n import _, _LI, _LW
|
||||||
from glance.image_cache.drivers import base
|
from glance.image_cache.drivers import base
|
||||||
|
from glance.image_cache.drivers import common
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -69,42 +69,6 @@ Related options:
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.register_opts(sqlite_opts)
|
CONF.register_opts(sqlite_opts)
|
||||||
|
|
||||||
DEFAULT_SQL_CALL_TIMEOUT = 2
|
|
||||||
|
|
||||||
|
|
||||||
class SqliteConnection(sqlite3.Connection):
|
|
||||||
|
|
||||||
"""
|
|
||||||
SQLite DB Connection handler that plays well with eventlet,
|
|
||||||
slightly modified from Swift's similar code.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.timeout_seconds = kwargs.get('timeout', DEFAULT_SQL_CALL_TIMEOUT)
|
|
||||||
kwargs['timeout'] = 0
|
|
||||||
sqlite3.Connection.__init__(self, *args, **kwargs)
|
|
||||||
|
|
||||||
def _timeout(self, call):
|
|
||||||
with timeout.Timeout(self.timeout_seconds):
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
return call()
|
|
||||||
except sqlite3.OperationalError as e:
|
|
||||||
if 'locked' not in str(e):
|
|
||||||
raise
|
|
||||||
sleep(0.05)
|
|
||||||
|
|
||||||
def execute(self, *args, **kwargs):
|
|
||||||
return self._timeout(lambda: sqlite3.Connection.execute(
|
|
||||||
self, *args, **kwargs))
|
|
||||||
|
|
||||||
def commit(self):
|
|
||||||
return self._timeout(lambda: sqlite3.Connection.commit(self))
|
|
||||||
|
|
||||||
|
|
||||||
def dict_factory(cur, row):
|
|
||||||
return {col[0]: row[idx] for idx, col in enumerate(cur.description)}
|
|
||||||
|
|
||||||
|
|
||||||
class Driver(base.Driver):
|
class Driver(base.Driver):
|
||||||
|
|
||||||
|
@ -135,7 +99,7 @@ class Driver(base.Driver):
|
||||||
def create_db():
|
def create_db():
|
||||||
try:
|
try:
|
||||||
conn = sqlite3.connect(self.db_path, check_same_thread=False,
|
conn = sqlite3.connect(self.db_path, check_same_thread=False,
|
||||||
factory=SqliteConnection)
|
factory=common.SqliteConnection)
|
||||||
conn.executescript("""
|
conn.executescript("""
|
||||||
CREATE TABLE IF NOT EXISTS cached_images (
|
CREATE TABLE IF NOT EXISTS cached_images (
|
||||||
image_id TEXT PRIMARY KEY,
|
image_id TEXT PRIMARY KEY,
|
||||||
|
@ -178,7 +142,7 @@ class Driver(base.Driver):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
hits = 0
|
hits = 0
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
cur = db.execute("""SELECT hits FROM cached_images
|
cur = db.execute("""SELECT hits FROM cached_images
|
||||||
WHERE image_id = ?""",
|
WHERE image_id = ?""",
|
||||||
(image_id,))
|
(image_id,))
|
||||||
|
@ -190,12 +154,12 @@ class Driver(base.Driver):
|
||||||
Returns a list of records about cached images.
|
Returns a list of records about cached images.
|
||||||
"""
|
"""
|
||||||
LOG.debug("Gathering cached image entries.")
|
LOG.debug("Gathering cached image entries.")
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
cur = db.execute("""SELECT
|
cur = db.execute("""SELECT
|
||||||
image_id, hits, last_accessed, last_modified, size
|
image_id, hits, last_accessed, last_modified, size
|
||||||
FROM cached_images
|
FROM cached_images
|
||||||
ORDER BY image_id""")
|
ORDER BY image_id""")
|
||||||
cur.row_factory = dict_factory
|
cur.row_factory = common.dict_factory
|
||||||
return [r for r in cur]
|
return [r for r in cur]
|
||||||
|
|
||||||
def is_cached(self, image_id):
|
def is_cached(self, image_id):
|
||||||
|
@ -242,7 +206,7 @@ class Driver(base.Driver):
|
||||||
Removes all cached image files and any attributes about the images
|
Removes all cached image files and any attributes about the images
|
||||||
"""
|
"""
|
||||||
deleted = 0
|
deleted = 0
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
for path in self.get_cache_files(self.base_dir):
|
for path in self.get_cache_files(self.base_dir):
|
||||||
delete_cached_file(path)
|
delete_cached_file(path)
|
||||||
deleted += 1
|
deleted += 1
|
||||||
|
@ -257,7 +221,7 @@ class Driver(base.Driver):
|
||||||
:param image_id: Image ID
|
:param image_id: Image ID
|
||||||
"""
|
"""
|
||||||
path = self.get_image_filepath(image_id)
|
path = self.get_image_filepath(image_id)
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
delete_cached_file(path)
|
delete_cached_file(path)
|
||||||
db.execute("""DELETE FROM cached_images WHERE image_id = ?""",
|
db.execute("""DELETE FROM cached_images WHERE image_id = ?""",
|
||||||
(image_id, ))
|
(image_id, ))
|
||||||
|
@ -301,7 +265,7 @@ class Driver(base.Driver):
|
||||||
Return a tuple containing the image_id and size of the least recently
|
Return a tuple containing the image_id and size of the least recently
|
||||||
accessed cached file, or None if no cached files.
|
accessed cached file, or None if no cached files.
|
||||||
"""
|
"""
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
cur = db.execute("""SELECT image_id FROM cached_images
|
cur = db.execute("""SELECT image_id FROM cached_images
|
||||||
ORDER BY last_accessed LIMIT 1""")
|
ORDER BY last_accessed LIMIT 1""")
|
||||||
try:
|
try:
|
||||||
|
@ -329,7 +293,7 @@ class Driver(base.Driver):
|
||||||
incomplete_path = self.get_image_filepath(image_id, 'incomplete')
|
incomplete_path = self.get_image_filepath(image_id, 'incomplete')
|
||||||
|
|
||||||
def commit():
|
def commit():
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
final_path = self.get_image_filepath(image_id)
|
final_path = self.get_image_filepath(image_id)
|
||||||
LOG.debug("Fetch finished, moving "
|
LOG.debug("Fetch finished, moving "
|
||||||
"'%(incomplete_path)s' to '%(final_path)s'",
|
"'%(incomplete_path)s' to '%(final_path)s'",
|
||||||
|
@ -352,7 +316,7 @@ class Driver(base.Driver):
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
def rollback(e):
|
def rollback(e):
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
if os.path.exists(incomplete_path):
|
if os.path.exists(incomplete_path):
|
||||||
invalid_path = self.get_image_filepath(image_id, 'invalid')
|
invalid_path = self.get_image_filepath(image_id, 'invalid')
|
||||||
|
|
||||||
|
@ -398,36 +362,13 @@ class Driver(base.Driver):
|
||||||
with open(path, 'rb') as cache_file:
|
with open(path, 'rb') as cache_file:
|
||||||
yield cache_file
|
yield cache_file
|
||||||
now = time.time()
|
now = time.time()
|
||||||
with self.get_db() as db:
|
with common.get_db(self.db_path) as db:
|
||||||
db.execute("""UPDATE cached_images
|
db.execute("""UPDATE cached_images
|
||||||
SET hits = hits + 1, last_accessed = ?
|
SET hits = hits + 1, last_accessed = ?
|
||||||
WHERE image_id = ?""",
|
WHERE image_id = ?""",
|
||||||
(now, image_id))
|
(now, image_id))
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def get_db(self):
|
|
||||||
"""
|
|
||||||
Returns a context manager that produces a database connection that
|
|
||||||
self-closes and calls rollback if an error occurs while using the
|
|
||||||
database connection
|
|
||||||
"""
|
|
||||||
conn = sqlite3.connect(self.db_path, check_same_thread=False,
|
|
||||||
factory=SqliteConnection)
|
|
||||||
conn.row_factory = sqlite3.Row
|
|
||||||
conn.text_factory = str
|
|
||||||
conn.execute('PRAGMA synchronous = NORMAL')
|
|
||||||
conn.execute('PRAGMA count_changes = OFF')
|
|
||||||
conn.execute('PRAGMA temp_store = MEMORY')
|
|
||||||
try:
|
|
||||||
yield conn
|
|
||||||
except sqlite3.DatabaseError as e:
|
|
||||||
msg = _LE("Error executing SQLite call. Got error: %s") % e
|
|
||||||
LOG.error(msg)
|
|
||||||
conn.rollback()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def queue_image(self, image_id):
|
def queue_image(self, image_id):
|
||||||
"""
|
"""
|
||||||
This adds a image to be cache to the queue.
|
This adds a image to be cache to the queue.
|
||||||
|
|
Loading…
Reference in New Issue