Merge "Implement scrubber util"
This commit is contained in:
commit
e8e182e687
|
@ -0,0 +1,73 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2017 - Nokia Networks
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Glare Scrub Service
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
# If ../glare/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(possible_topdir, 'glare', '__init__.py')):
|
||||
sys.path.insert(0, possible_topdir)
|
||||
import eventlet
|
||||
|
||||
import glance_store
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glare.common import config
|
||||
from glare import scrubber
|
||||
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True, time=True, select=True,
|
||||
thread=True, os=True)
|
||||
|
||||
CONF = cfg.CONF
|
||||
logging.register_options(CONF)
|
||||
CONF.set_default(name='use_stderr', default=True)
|
||||
|
||||
|
||||
def main():
|
||||
CONF.register_cli_opts(scrubber.scrubber_cmd_cli_opts, group='scrubber')
|
||||
CONF.register_opts(scrubber.scrubber_cmd_opts, group='scrubber')
|
||||
|
||||
try:
|
||||
config.parse_args()
|
||||
logging.setup(CONF, 'glare')
|
||||
|
||||
glance_store.register_opts(config.CONF)
|
||||
glance_store.create_stores(config.CONF)
|
||||
glance_store.verify_default_store()
|
||||
|
||||
app = scrubber.Scrubber()
|
||||
|
||||
if CONF.scrubber.daemon:
|
||||
server = scrubber.Daemon(CONF.scrubber.wakeup_time)
|
||||
server.start(app)
|
||||
server.wait()
|
||||
else:
|
||||
app.run()
|
||||
except RuntimeError as e:
|
||||
sys.exit("ERROR: %s" % e)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -30,6 +30,7 @@ import glare.common.wsgi
|
|||
import glare.notification
|
||||
import glare.objects.base
|
||||
import glare.objects.meta.registry
|
||||
import glare.scrubber
|
||||
|
||||
_artifacts_opts = [
|
||||
(None, list(itertools.chain(
|
||||
|
@ -45,7 +46,11 @@ _artifacts_opts = [
|
|||
glare.objects.meta.registry.registry_options))),
|
||||
profiler.list_opts()[0],
|
||||
('paste_deploy', glare.common.config.paste_deploy_opts),
|
||||
('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts)
|
||||
('keycloak_oidc', glare.api.middleware.keycloak_auth.keycloak_oidc_opts),
|
||||
('scrubber',
|
||||
glare.scrubber.scrubber_opts +
|
||||
glare.scrubber.scrubber_cmd_opts +
|
||||
glare.scrubber.scrubber_cmd_cli_opts)
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
# Copyright 2017 - Nokia Networks
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glare.api.middleware import context
|
||||
from glare.common import exception
|
||||
from glare.common import store_api
|
||||
from glare.db.sqlalchemy import api as db_api
|
||||
from glare.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
scrubber_opts = [
|
||||
cfg.IntOpt('scrub_time', default=0, min=0,
|
||||
help=_("""
|
||||
The amount of time, in seconds, to delay artifact scrubbing.
|
||||
When delayed delete is turned on, an artifact is put into
|
||||
``deleted`` state upon deletion until the scrubber deletes its data.
|
||||
Typically, soon
|
||||
after the artifact is put into ``deleted`` state, it is available
|
||||
for scrubbing. However, scrubbing can be delayed until a later point
|
||||
using this configuration option. This option denotes the time period
|
||||
an artifact spends in ``deleted`` state before it is available for
|
||||
scrubbing.
|
||||
It is important to realize that this has storage implications. The
|
||||
larger the ``scrub_time``, the longer the time to reclaim backend
|
||||
storage from deleted artifacts.
|
||||
Possible values:
|
||||
* Any non-negative integer
|
||||
Related options:
|
||||
* ``delayed_delete``
|
||||
""")),
|
||||
cfg.IntOpt('scrub_pool_size', default=1, min=1,
|
||||
help=_("""
|
||||
The size of thread pool to be used for scrubbing artifacts.
|
||||
When there are a large number of artifacts to scrub, it is
|
||||
beneficial to scrub artifacts in parallel so that the scrub queue
|
||||
stays in control and the backend storage is reclaimed in a timely
|
||||
fashion. This configuration option denotes the maximum number of
|
||||
artifacts to be scrubbed in parallel. The default value is one,
|
||||
which signifies serial scrubbing. Any value above one indicates
|
||||
parallel scrubbing.
|
||||
Possible values:
|
||||
* Any non-zero positive integer
|
||||
Related options:
|
||||
* ``delayed_delete``
|
||||
""")),
|
||||
]
|
||||
|
||||
scrubber_cmd_opts = [
|
||||
cfg.IntOpt('wakeup_time', default=300, min=0,
|
||||
help=_("""
|
||||
Time interval, in seconds, between scrubber runs in daemon mode.
|
||||
Scrubber can be run either as a cron job or daemon. When run as a
|
||||
daemon, this configuration time specifies the time period between
|
||||
two runs. When the scrubber wakes up, it fetches and scrubs all
|
||||
``deleted`` artifacts that are available for scrubbing after taking
|
||||
``scrub_time`` into consideration.
|
||||
If the ``wakeup_time`` is set to a large number, there may be a large
|
||||
number of artifacts to be scrubbed for each run. Also, this impacts
|
||||
how quickly the backend storage is reclaimed.
|
||||
Possible values:
|
||||
* Any non-negative integer
|
||||
Related options:
|
||||
* ``daemon``
|
||||
* ``delayed_delete``
|
||||
"""))
|
||||
]
|
||||
|
||||
scrubber_cmd_cli_opts = [
|
||||
cfg.BoolOpt('daemon',
|
||||
short='D',
|
||||
default=False,
|
||||
help=_("""
|
||||
Run scrubber as a daemon.
|
||||
This boolean configuration option indicates whether scrubber should
|
||||
run as a long-running process that wakes up at regular intervals to
|
||||
scrub artifacts. The wake up interval can be specified using the
|
||||
configuration option ``wakeup_time``.
|
||||
If this configuration option is set to ``False``, which is the
|
||||
default value, scrubber runs once to scrub artifacts and exits.
|
||||
In this case, if the operator wishes to implement continuous
|
||||
scrubbing of artifacts, scrubber needs to be scheduled as a cron job.
|
||||
Possible values:
|
||||
* True
|
||||
* False
|
||||
Related options:
|
||||
* ``wakeup_time``
|
||||
"""))
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(scrubber_opts, group='scrubber')
|
||||
|
||||
|
||||
class Daemon(object):
|
||||
def __init__(self, wakeup_time=300, threads=100):
|
||||
LOG.info("Starting Daemon: wakeup_time=%(wakeup_time)s "
|
||||
"threads=%(threads)s",
|
||||
{'wakeup_time': wakeup_time, 'threads': threads})
|
||||
self.wakeup_time = wakeup_time
|
||||
self.event = eventlet.event.Event()
|
||||
# This pool is used for periodic instantiation of scrubber
|
||||
self.daemon_pool = eventlet.greenpool.GreenPool(threads)
|
||||
|
||||
def start(self, application):
|
||||
self._run(application)
|
||||
|
||||
def wait(self):
|
||||
try:
|
||||
self.event.wait()
|
||||
except KeyboardInterrupt:
|
||||
LOG.info("Daemon Shutdown on KeyboardInterrupt")
|
||||
|
||||
def _run(self, application):
|
||||
LOG.debug("Running scrubber application")
|
||||
self.daemon_pool.spawn_n(application.run, self.event)
|
||||
eventlet.spawn_after(self.wakeup_time, self._run, application)
|
||||
LOG.debug("Next run scheduled in %s seconds", self.wakeup_time)
|
||||
|
||||
|
||||
class Scrubber(object):
|
||||
def __init__(self):
|
||||
self.context = context.RequestContext()
|
||||
self.context.is_admin = True
|
||||
self.pool = eventlet.greenpool.GreenPool(
|
||||
CONF.scrubber.scrub_pool_size)
|
||||
|
||||
def run(self, event=None):
|
||||
while True:
|
||||
artifacts = db_api._get_all(
|
||||
context=self.context,
|
||||
session=db_api.get_session(),
|
||||
limit=CONF.scrubber.scrub_pool_size,
|
||||
sort=[],
|
||||
filters=[('status', None, 'eq', None, 'deleted')])
|
||||
if not artifacts:
|
||||
break
|
||||
self.pool.imap(self._scrub_artifact, artifacts)
|
||||
|
||||
@staticmethod
|
||||
def _scrub_artifact(af):
|
||||
LOG.info("Begin scrubbing of artifact %s" % af.id)
|
||||
for blob in af.blobs:
|
||||
if not blob.external:
|
||||
try:
|
||||
store_api.delete_blob(blob.url, context=context)
|
||||
except exception.NotFound:
|
||||
# data has already been removed
|
||||
pass
|
||||
LOG.info("Blobs successfully deleted for artifact %s", af.id)
|
||||
|
||||
# delete artifact itself
|
||||
db_api.delete(context, af.id, db_api.get_session())
|
||||
|
||||
LOG.info("Artifact %s was scrubbed" % af.id)
|
|
@ -336,6 +336,56 @@ paste.filter_factory =
|
|||
"""
|
||||
|
||||
|
||||
class ScrubberDaemon(Server):
|
||||
"""
|
||||
Server object that starts/stops/manages the Scrubber server
|
||||
"""
|
||||
|
||||
def __init__(self, test_dir, policy_file, daemon=False, **kwargs):
|
||||
# NOTE(jkoelker): Set the port to 0 since we actually don't listen
|
||||
super(ScrubberDaemon, self).__init__(test_dir, 0)
|
||||
self.server_name = 'scrubber'
|
||||
self.server_module = 'glare.cmd.%s' % self.server_name
|
||||
self.daemon = daemon
|
||||
|
||||
self.blob_dir = os.path.join(self.test_dir, "artifacts")
|
||||
self.scrub_time = 5
|
||||
self.pid_file = os.path.join(self.test_dir, "scrubber.pid")
|
||||
self.log_file = os.path.join(self.test_dir, "scrubber.log")
|
||||
self.lock_path = self.test_dir
|
||||
|
||||
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
|
||||
self.sql_connection = os.environ.get('GLARE_TEST_SQL_CONNECTION',
|
||||
default_sql_connection)
|
||||
self.policy_file = policy_file
|
||||
self.policy_default_rule = 'default'
|
||||
|
||||
self.conf_base = """[DEFAULT]
|
||||
debug = %(debug)s
|
||||
log_file = %(log_file)s
|
||||
[scrubber]
|
||||
daemon = %(daemon)s
|
||||
wakeup_time = 2
|
||||
scrub_time = %(scrub_time)s
|
||||
[glance_store]
|
||||
filesystem_store_datadir=%(blob_dir)s
|
||||
[oslo_policy]
|
||||
policy_file = %(policy_file)s
|
||||
policy_default_rule = %(policy_default_rule)s
|
||||
[database]
|
||||
connection = %(sql_connection)s
|
||||
idle_timeout = 3600
|
||||
"""
|
||||
|
||||
def start(self, expect_exit=True, expected_exitcode=0, **kwargs):
|
||||
if 'daemon' in kwargs:
|
||||
expect_exit = False
|
||||
return super(ScrubberDaemon, self).start(
|
||||
expect_exit=expect_exit,
|
||||
expected_exitcode=expected_exitcode,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class FunctionalTest(test_utils.BaseTestCase):
|
||||
|
||||
"""Base test class for any test that wants to test the actual
|
||||
|
@ -353,6 +403,8 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
self.api_protocol = 'http'
|
||||
self.glare_port, glare_sock = test_utils.get_unused_port_and_socket()
|
||||
|
||||
self.include_scrubber = False
|
||||
|
||||
self.tracecmd = tracecmd_osmap.get(platform.system())
|
||||
|
||||
conf_dir = os.path.join(self.test_dir, 'etc')
|
||||
|
@ -365,7 +417,10 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
self.policy_file,
|
||||
sock=glare_sock)
|
||||
|
||||
self.pid_files = [self.glare_server.pid_file]
|
||||
self.scrubber_daemon = ScrubberDaemon(self.test_dir, self.policy_file)
|
||||
|
||||
self.pid_files = [self.glare_server.pid_file,
|
||||
self.scrubber_daemon.pid_file]
|
||||
self.files_to_destroy = []
|
||||
self.launched_servers = []
|
||||
|
||||
|
@ -379,6 +434,7 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
super(FunctionalTest, self).tearDown()
|
||||
|
||||
self.glare_server.dump_log('glare_server')
|
||||
self.scrubber_daemon.dump_log('scrubber_daemon')
|
||||
|
||||
def set_policy_rules(self, rules):
|
||||
with open(self.policy_file, 'w') as fap:
|
||||
|
@ -423,7 +479,8 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
# server is dead. This eliminates the possibility of a race
|
||||
# between a child process listening on a port actually dying
|
||||
# and a new process being started
|
||||
servers = [self.glare_server]
|
||||
servers = [self.glare_server,
|
||||
self.scrubber_daemon]
|
||||
for s in servers:
|
||||
try:
|
||||
s.stop()
|
||||
|
@ -510,6 +567,12 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
|
||||
self.start_with_retry(self.glare_server, 'glare_port', 3, **kwargs)
|
||||
|
||||
if self.include_scrubber:
|
||||
exitcode, out, err = self.scrubber_daemon.start(**kwargs)
|
||||
self.assertEqual(0, exitcode,
|
||||
"Failed to spin up the Scrubber daemon. "
|
||||
"Got: %s" % err)
|
||||
|
||||
def ping_server(self, port):
|
||||
"""Simple ping on the port. If responsive, return True, else
|
||||
return False.
|
||||
|
@ -580,7 +643,7 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
|
||||
return msg if expect_launch else None
|
||||
|
||||
def stop_server(self, server, name):
|
||||
def stop_server(self, server):
|
||||
"""Called to stop a single server in a normal fashion.
|
||||
|
||||
:param server: the server to stop
|
||||
|
@ -589,7 +652,10 @@ class FunctionalTest(test_utils.BaseTestCase):
|
|||
server.stop()
|
||||
|
||||
def stop_servers(self):
|
||||
self.stop_server(self.glare_server, 'Glare server')
|
||||
self.stop_server(self.glare_server)
|
||||
|
||||
if self.include_scrubber:
|
||||
self.stop_server(self.scrubber_daemon)
|
||||
|
||||
self._reset_database(self.glare_server.sql_connection)
|
||||
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
# Copyright 2017 - Nokia Networks
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
from six.moves import range
|
||||
|
||||
from glare.tests import functional
|
||||
from glare.tests.functional import base
|
||||
from glare.tests.utils import execute
|
||||
|
||||
|
||||
class TestScrubber(base.TestArtifact):
|
||||
|
||||
"""Test that delayed_delete works and the scrubber deletes"""
|
||||
|
||||
def setUp(self):
|
||||
functional.FunctionalTest.setUp(self)
|
||||
|
||||
self.include_scrubber = True
|
||||
self.set_user('user1')
|
||||
self.glare_server.deployment_flavor = 'noauth'
|
||||
|
||||
self.glare_server.enabled_artifact_types = ','.join(
|
||||
self.enabled_types)
|
||||
self.glare_server.custom_artifact_types_modules = (
|
||||
'glare.tests.sample_artifact')
|
||||
|
||||
def _create_sample_artifact(self):
|
||||
art = self.create_artifact({'name': 'test_art',
|
||||
'version': '1.0'})
|
||||
|
||||
url = '/sample_artifact/%s' % art['id']
|
||||
headers = {'Content-Type': 'application/octet-stream'}
|
||||
|
||||
# upload data to blob
|
||||
self.put(url=url + '/small_blob', data='aaaaaa',
|
||||
headers=headers)
|
||||
|
||||
# upload a couple of blobs to dict_of_blobs
|
||||
self.put(url + '/dict_of_blobs/blob1', data='bbbb',
|
||||
headers=headers)
|
||||
self.put(url + '/dict_of_blobs/blob2', data='cccc',
|
||||
headers=headers)
|
||||
|
||||
# add external location
|
||||
body = jsonutils.dumps(
|
||||
{'url': 'https://www.apache.org/licenses/LICENSE-2.0.txt',
|
||||
'md5': "fake", 'sha1': "fake_sha", "sha256": "fake_sha256"})
|
||||
headers = {'Content-Type':
|
||||
'application/vnd+openstack.glare-custom-location+json'}
|
||||
self.put(url=url + '/blob', data=body, status=200,
|
||||
headers=headers)
|
||||
|
||||
return url
|
||||
|
||||
def test_scrubber_delayed_delete(self):
|
||||
"""
|
||||
Test that artifacts don't get deleted immediately and that the scrubber
|
||||
scrubs them.
|
||||
"""
|
||||
self.start_servers(delayed_delete=True, daemon=True,
|
||||
**self.__dict__.copy())
|
||||
|
||||
url = self._create_sample_artifact()
|
||||
|
||||
# create another artifact
|
||||
art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'})
|
||||
|
||||
# delete sample artifact
|
||||
self.delete(url=url)
|
||||
art = self.get(url)
|
||||
self.assertEqual('deleted', art['status'])
|
||||
|
||||
self.wait_for_scrub(url)
|
||||
|
||||
# check that the second artifact wasn't removed
|
||||
art = self.get('/sample_artifact/%s' % art2['id'])
|
||||
self.assertEqual('drafted', art['status'])
|
||||
|
||||
def test_scrubber_app(self):
|
||||
"""
|
||||
Test that the scrubber script runs successfully when not in
|
||||
daemon mode.
|
||||
"""
|
||||
self.start_servers(delayed_delete=True,
|
||||
**self.__dict__.copy())
|
||||
|
||||
url = self._create_sample_artifact()
|
||||
|
||||
# wait for the scrub time on the artifacts to pass
|
||||
time.sleep(self.scrubber_daemon.scrub_time)
|
||||
|
||||
# create another artifact
|
||||
art2 = self.create_artifact({'name': 'test_art', 'version': '2.0'})
|
||||
|
||||
# delete sample artifact
|
||||
self.delete(url=url)
|
||||
art = self.get(url)
|
||||
self.assertEqual('deleted', art['status'])
|
||||
|
||||
# scrub artifacts and make sure they are deleted
|
||||
exe_cmd = "%s -m glare.cmd.scrubber" % sys.executable
|
||||
cmd = ("%s --config-file %s" %
|
||||
(exe_cmd, self.scrubber_daemon.conf_file_name))
|
||||
exitcode, out, err = execute(cmd, raise_error=False)
|
||||
self.assertEqual(0, exitcode)
|
||||
|
||||
self.wait_for_scrub(url)
|
||||
|
||||
# check that the second artifact wasn't removed
|
||||
art = self.get('/sample_artifact/%s' % art2['id'])
|
||||
self.assertEqual('drafted', art['status'])
|
||||
|
||||
def wait_for_scrub(self, url):
|
||||
"""
|
||||
The build servers sometimes take longer than 15 seconds
|
||||
to scrub. Give it up to 5 min, checking every 5 seconds.
|
||||
When/if it flips to deleted, bail immediately.
|
||||
"""
|
||||
wait_for = 300 # seconds
|
||||
check_every = 5 # seconds
|
||||
for _ in range(wait_for // check_every):
|
||||
time.sleep(check_every)
|
||||
try:
|
||||
self.get(url, status=404)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
self.fail("Artifact wasn't scrubbed")
|
Loading…
Reference in New Issue