Initialize profiler for scheduler threads

* It turns out that osprofiler wasn't initialized properly for
  threads in which scheduler runs its jobs. So profiling simply
  didn't work for this threads and lots of important info didn't
  get to the profiler log. This patch fixes it.
* When evaluating a YAQL expression, sometimes we get a huge result
  value that we always put into the debug log. In practice, it doesn't
  make much sense and, moreover, it utilizes lots of CPU and disk
  space. It's better shrink it to some reasonable size that would
  allow to make necessary analysis, if needed.
* Other minor style fixes according to the Mistral coding guidelines.

Change-Id: I3df3ab96342c456429e20a905615b90bcb94818d
This commit is contained in:
Renat Akhmerov 2020-01-30 15:57:47 +07:00
parent 680a9e2e07
commit 511be4f98f
5 changed files with 79 additions and 38 deletions

View File

@ -165,7 +165,7 @@ class InlineYAQLEvaluator(YAQLEvaluator):
LOG.debug(
"Finished evaluation. [expression='%s', result: %s]",
expression,
result
utils.cut(result, length=200)
)
return result

View File

@ -22,6 +22,7 @@ import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from osprofiler import profiler
from mistral import context
from mistral.db import utils as db_utils
@ -312,6 +313,12 @@ class DefaultScheduler(base.Scheduler):
@staticmethod
def _invoke_job(auth_ctx, func, args):
# Scheduler runs jobs in an separate thread that's neither related
# to an RPC nor a REST request processing thread. So we need to
# initialize a profiler specifically for this thread.
if cfg.CONF.profiler.enabled:
profiler.init(cfg.CONF.profiler.hmac_keys)
ctx_serializer = context.RpcContextSerializer()
try:

View File

@ -21,8 +21,10 @@ import random
import sys
import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from osprofiler import profiler
from mistral import context
from mistral.db import utils as db_utils
@ -129,6 +131,12 @@ class LegacyScheduler(sched_base.Scheduler):
self._thread.join()
def _loop(self):
# Scheduler runs jobs in an separate thread that's neither related
# to an RPC nor a REST request processing thread. So we need to
# initialize a profiler specifically for this thread.
if cfg.CONF.profiler.enabled:
profiler.init(cfg.CONF.profiler.hmac_keys)
while not self._stopped:
LOG.debug("Starting Scheduler loop [scheduler=%s]...", self)

View File

@ -54,22 +54,28 @@ class YaqlEvaluatorTest(base.BaseTest):
self._evaluator = expr.YAQLEvaluator()
def test_expression_result(self):
res = self._evaluator.evaluate('$.server', DATA)
self.assertEqual({
'id': "03ea824a-aa24-4105-9131-66c48ae54acf",
'name': 'cloud-fedora',
'status': 'ACTIVE'
}, res)
self.assertEqual(
{
'id': "03ea824a-aa24-4105-9131-66c48ae54acf",
'name': 'cloud-fedora',
'status': 'ACTIVE'
},
self._evaluator.evaluate('$.server', DATA)
)
res = self._evaluator.evaluate('$.server.id', DATA)
self.assertEqual('03ea824a-aa24-4105-9131-66c48ae54acf', res)
self.assertEqual(
'03ea824a-aa24-4105-9131-66c48ae54acf',
self._evaluator.evaluate('$.server.id', DATA)
)
res = self._evaluator.evaluate("$.server.status = 'ACTIVE'", DATA)
self.assertTrue(res)
self.assertTrue(
self._evaluator.evaluate("$.server.status = 'ACTIVE'", DATA)
)
def test_wrong_expression(self):
res = self._evaluator.evaluate("$.status = 'Invalid value'", DATA)
self.assertFalse(res)
self.assertFalse(
self._evaluator.evaluate("$.status = 'Invalid value'", DATA)
)
self.assertRaises(
exc.YaqlEvaluationException,
@ -79,8 +85,11 @@ class YaqlEvaluatorTest(base.BaseTest):
)
expression_str = 'invalid_expression_string'
res = self._evaluator.evaluate(expression_str, DATA)
self.assertEqual(expression_str, res)
self.assertEqual(
expression_str,
self._evaluator.evaluate(expression_str, DATA)
)
def test_select_result(self):
res = self._evaluator.evaluate(
@ -88,6 +97,7 @@ class YaqlEvaluatorTest(base.BaseTest):
SERVERS
)
item = list(res)[0]
self.assertEqual({'name': 'ubuntu'}, item)
def test_function_string(self):
@ -96,6 +106,7 @@ class YaqlEvaluatorTest(base.BaseTest):
def test_function_len(self):
self.assertEqual(3, self._evaluator.evaluate('len($)', 'hey'))
data = [{'some': 'thing'}]
self.assertEqual(
@ -111,17 +122,23 @@ class YaqlEvaluatorTest(base.BaseTest):
self._evaluator.validate('$.a1 * $.a2')
def test_validate_failed(self):
self.assertRaises(exc.YaqlGrammarException,
self._evaluator.validate,
'*')
self.assertRaises(
exc.YaqlGrammarException,
self._evaluator.validate,
'*'
)
self.assertRaises(exc.YaqlGrammarException,
self._evaluator.validate,
[1, 2, 3])
self.assertRaises(
exc.YaqlGrammarException,
self._evaluator.validate,
[1, 2, 3]
)
self.assertRaises(exc.YaqlGrammarException,
self._evaluator.validate,
{'a': 1})
self.assertRaises(
exc.YaqlGrammarException,
self._evaluator.validate,
{'a': 1}
)
def test_function_json_pp(self):
self.assertEqual('"3"', self._evaluator.evaluate('json_pp($)', '3'))
@ -143,12 +160,15 @@ class YaqlEvaluatorTest(base.BaseTest):
def test_function_json_pp_deprecation(self):
with warnings.catch_warnings(record=True) as w:
# ensure warnings aren't suppressed from other tests
# Ensure warnings aren't suppressed from other tests.
for name, mod in list(sys.modules.items()):
getattr(mod, '__warningregistry__', dict()).clear()
warnings.simplefilter('always')
result = self._evaluator.evaluate('json_pp($)', '3')
self.assertEqual('"3"', result)
self.assertEqual(len(w), 1)
self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
self.assertTrue(str(w[-1].message).startswith(
@ -184,7 +204,9 @@ class YaqlEvaluatorTest(base.BaseTest):
task_executions):
task_execution_result.return_value = 'task_execution_result'
time_now = utils.utc_now_sec()
task = type("obj", (object,), {
'id': 'id',
'name': 'name',
@ -211,19 +233,22 @@ class YaqlEvaluatorTest(base.BaseTest):
result = self._evaluator.evaluate('tasks(some)', ctx)
self.assertEqual(1, len(result))
self.assertDictEqual({
'id': task.id,
'name': task.name,
'published': task.published,
'result': task.result,
'spec': task.spec,
'state': task.state,
'state_info': task.state_info,
'type': task.type,
'workflow_execution_id': task.workflow_execution_id,
'created_at': task.created_at.isoformat(' '),
'updated_at': task.updated_at.isoformat(' ')
}, result[0])
self.assertDictEqual(
{
'id': task.id,
'name': task.name,
'published': task.published,
'result': task.result,
'spec': task.spec,
'state': task.state,
'state_info': task.state_info,
'type': task.type,
'workflow_execution_id': task.workflow_execution_id,
'created_at': task.created_at.isoformat(' '),
'updated_at': task.updated_at.isoformat(' ')
},
result[0]
)
def test_function_env(self):
ctx = {'__env': 'some'}

View File

@ -193,6 +193,7 @@ def json_pp_(context, data=None):
"The json_dump expression function can be used for outputting JSON",
DeprecationWarning
)
return jsonutils.dumps(
data or context,
indent=4