Merge "Move delayed out of utils"
This commit is contained in:
commit
c571609b50
|
@ -18,6 +18,7 @@ import collections
|
|||
import itertools
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from concurrent import futures
|
||||
|
@ -362,6 +363,11 @@ class AgentManager(cotyledon.Service):
|
|||
def construct_group_id(self, discovery_group_id):
|
||||
return '%s-%s' % (self.group_prefix, discovery_group_id)
|
||||
|
||||
@staticmethod
|
||||
def _delayed(delay, target, *args, **kwargs):
|
||||
time.sleep(delay)
|
||||
return target(*args, **kwargs)
|
||||
|
||||
def start_polling_tasks(self):
|
||||
# set shuffle time before polling task if necessary
|
||||
delay_polling_time = random.randint(
|
||||
|
@ -384,7 +390,7 @@ class AgentManager(cotyledon.Service):
|
|||
def task(running_task):
|
||||
self.interval_task(running_task)
|
||||
|
||||
utils.spawn_thread(utils.delayed, delay_polling_time,
|
||||
utils.spawn_thread(self._delayed, delay_polling_time,
|
||||
self.polling_periodics.add, task, polling_task)
|
||||
|
||||
utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
|
||||
|
|
|
@ -357,8 +357,6 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
|||
self._batching_samples(4, 1)
|
||||
|
||||
def _batching_samples(self, expected_samples, call_count):
|
||||
self.useFixture(fixtures.MockPatchObject(manager.utils, 'delayed',
|
||||
side_effect=fakedelayed))
|
||||
poll_cfg = {
|
||||
'sources': [{
|
||||
'name': 'test_pipeline',
|
||||
|
@ -372,6 +370,7 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
|
|||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
self.mgr._delayed = fakedelayed
|
||||
polling_task = list(self.mgr.setup_polling_tasks().values())[0]
|
||||
|
||||
self.mgr.interval_task(polling_task)
|
||||
|
|
|
@ -22,7 +22,6 @@ import calendar
|
|||
import datetime
|
||||
import decimal
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
|
@ -155,11 +154,6 @@ def kill_listeners(listeners):
|
|||
listener.wait()
|
||||
|
||||
|
||||
def delayed(delay, target, *args, **kwargs):
|
||||
time.sleep(delay)
|
||||
return target(*args, **kwargs)
|
||||
|
||||
|
||||
def spawn_thread(target, *args, **kwargs):
|
||||
t = threading.Thread(target=target, args=args, kwargs=kwargs)
|
||||
t.daemon = True
|
||||
|
|
Loading…
Reference in New Issue