Renew subscription on update with TTL

Now when we update subscription and specify TTL, subscription is not
renewed. It's life is not extended, and it's expiration date is still
old.

Because of that, it's impossible to extend life of subscription without
recreating it. But during subscription recreation some messages may not
be sent to subscriber, because there will be a moment when subscription
does not exist, which is bad.

This patch makes subscriptions renewable by PATCH method, when TTL is
specified in the body. In such case PATCH method sets subscription TTL
to a new value and sets expiration date to a recalculated value: current
time + TTL.

This patch also adds two functional tests for testing TTL-related things
in subscriptions. This increases functional testing time by ~2-4
minutes.

APIimpact
Closes-Bug: #1552866
Co-Authored-By: wangxiyuan <wangxiyuan@huawei.com>
Co-Authored-By: Eva Balycheva <ubershy@gmail.com>
Change-Id: I048c37d6485ff27cf18ed27988874e470301fb57
This commit is contained in:
wangxiyuan 2016-03-08 10:29:43 +08:00 committed by Eva Balycheva
parent f94a9ee4ce
commit 3308af985b
7 changed files with 164 additions and 0 deletions

View File

@ -136,6 +136,14 @@ class SubscriptionController(base.Subscription):
key_transform=key_transform)
assert fields, ('`subscriber`, `ttl`, '
'or `options` not found in kwargs')
new_ttl = fields.get('t', None)
if new_ttl is not None:
now = timeutils.utcnow_ts()
now_dt = datetime.datetime.utcfromtimestamp(now)
expires = now_dt + datetime.timedelta(seconds=new_ttl)
fields['e'] = expires
try:
res = self._collection.update(
{'_id': utils.to_oid(subscription_id),

View File

@ -207,9 +207,17 @@ class SubscriptionController(base.Subscription):
if new_options is not None:
fields['o'] = self._packer(new_options)
new_ttl = fields.get('t', None)
if new_ttl is not None:
now = timeutils.utcnow_ts()
expires = now + new_ttl
fields['e'] = expires
# Pipeline ensures atomic inserts.
with self._client.pipeline() as pipe:
pipe.hmset(subscription_id, fields)
if new_ttl is not None:
pipe.expire(subscription_id, new_ttl)
pipe.execute()
@utils.raises_conn_error

View File

@ -24,8 +24,10 @@ import six
from zaqar.api.v1 import response as response_v1
from zaqar.api.v1_1 import response as response_v1_1
from zaqar.api.v2 import response as response_v2
from zaqar import bootstrap
from zaqar.storage import mongodb
from zaqar.storage.redis import driver as redis
from zaqar import tests as testing
from zaqar.tests.functional import config
from zaqar.tests.functional import helpers
@ -52,6 +54,9 @@ class FunctionalTestBase(testing.TestBase):
server_class = None
config_file = None
class_bootstrap = None
# NOTE(Eva-i): ttl_gc_interval is the known maximum time interval between
# automatic resource TTL expirations. Depends on message store back end.
class_ttl_gc_interval = None
wipe_dbs_projects = set([])
def setUp(self):
@ -82,6 +87,13 @@ class FunctionalTestBase(testing.TestBase):
self.__class__.class_bootstrap = bootstrap.Bootstrap(self.mconf)
self.class_bootstrap.transport
datadriver = self.class_bootstrap.storage._storage
if isinstance(datadriver, redis.DataDriver):
self.__class__.class_ttl_gc_interval = 1
if isinstance(datadriver, mongodb.DataDriver):
# NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute
self.__class__.class_ttl_gc_interval = 60
if _TEST_INTEGRATION:
# TODO(kgriffs): This code should be replaced to use
# an external wsgi server instance.
@ -396,3 +408,9 @@ class V1_1FunctionalTestBase(FunctionalTestBase):
def setUp(self):
super(V1_1FunctionalTestBase, self).setUp()
self.response = response_v1_1.ResponseSchema(self.limits)
class V2FunctionalTestBase(FunctionalTestBase):
def setUp(self):
super(V2FunctionalTestBase, self).setUp()
self.response = response_v2.ResponseSchema(self.limits)

View File

@ -129,3 +129,9 @@ def create_pool_body(**kwargs):
}
return pool_body
def create_subscription_body(subscriber='http://fake:8080', ttl=600,
options_key='funny', options_value='no'):
options = {options_key: options_value}
return {'subscriber': subscriber, 'options': options, 'ttl': ttl}

View File

@ -29,6 +29,7 @@ class TestClaims(base.V1_1FunctionalTestBase):
def setUp(self):
super(TestClaims, self).setUp()
assert False, 'hehe'
self.headers = helpers.create_zaqar_headers(self.cfg)
self.client.headers = self.headers

View File

@ -0,0 +1,123 @@
# Copyright (c) 2013 Rackspace, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import time
import uuid
import ddt
from zaqar.tests.functional import base
from zaqar.tests.functional import helpers as func_helpers
from zaqar.tests import helpers
@ddt.ddt
class TestSubscriptions(base.V2FunctionalTestBase):
"""Tests for Subscriptions."""
server_class = base.ZaqarServer
def setUp(self):
super(TestSubscriptions, self).setUp()
self.queue_name = uuid.uuid1()
self.queue_url = ("{url}/{version}/queues/{queue}".format(
url=self.cfg.zaqar.url,
version="v2",
queue=self.queue_name))
self.client.put(self.queue_url)
self.subscriptions_url = self.queue_url + '/subscriptions/'
self.client.set_base_url(self.subscriptions_url)
def tearDown(self):
# Delete test queue subscriptions after each test case.
result = self.client.get(self.subscriptions_url)
subscriptions = result.json()['subscriptions']
for sub in subscriptions:
sub_url = self.subscriptions_url + sub['id']
self.client.delete(sub_url)
# Delete test queue.
self.client.delete(self.queue_url)
super(TestSubscriptions, self).tearDown()
@helpers.is_slow(condition=lambda self: self.class_ttl_gc_interval > 1)
def test_expired_subscription(self):
# Default TTL value is 600.
doc = func_helpers.create_subscription_body()
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
longlive_id = result.json()['subscription_id']
# This is a minimum TTL allowed by server.
ttl_for_shortlive = 60
doc = func_helpers.create_subscription_body(
subscriber='http://expire.me', ttl=ttl_for_shortlive)
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
shortlive_id = result.json()['subscription_id']
shortlive_url = self.subscriptions_url + shortlive_id
# Let's wait for subscription to expire.
for i in range(self.class_ttl_gc_interval + ttl_for_shortlive):
time.sleep(1)
result = self.client.get(shortlive_url)
if result.status_code == 404:
break
else:
self.fail("Didn't remove the subscription in time.")
# Make sure the expired subscription is not returned when listing.
result = self.client.get(self.subscriptions_url)
self.assertEqual(200, result.status_code)
subscriptions = result.json()['subscriptions']
self.assertEqual(1, len(subscriptions))
self.assertEqual(longlive_id, subscriptions[0]['id'])
@helpers.is_slow(condition=lambda self: self.class_ttl_gc_interval > 1)
def test_update_ttl(self):
# Default TTL value is 600.
doc = func_helpers.create_subscription_body()
result = self.client.post(data=doc)
self.assertEqual(201, result.status_code)
subscription_id = result.json()['subscription_id']
subscription_url = self.subscriptions_url + subscription_id
# This is a minimum TTL allowed by server.
updated_ttl = 60
update_fields = {
'ttl': updated_ttl
}
result = self.client.patch(subscription_url, data=update_fields)
self.assertEqual(204, result.status_code)
# Let's wait for updated subscription to expire.
for i in range(self.class_ttl_gc_interval + updated_ttl):
time.sleep(1)
result = self.client.get(subscription_url)
if result.status_code == 404:
break
else:
self.fail("Didn't remove the subscription in time.")
# Make sure the expired subscription is not returned when listing.
result = self.client.get(self.subscriptions_url)
self.assertEqual(200, result.status_code)
subscriptions = result.json()['subscriptions']
self.assertEqual(0, len(subscriptions))