Merge "Murano-agent side implementation of message signing"

This commit is contained in:
Zuul 2018-01-18 01:05:00 +00:00 committed by Gerrit Code Review
commit 57b285349b
7 changed files with 280 additions and 180 deletions

View File

@ -20,21 +20,17 @@ import time
from oslo_log import log as logging
from oslo_service import service
from oslo_utils import strutils
import semantic_version
import six
from muranoagent import bunch
from muranoagent.common import config
from muranoagent.common import messaging
from muranoagent import exceptions as exc
from muranoagent import execution_plan_queue
from muranoagent import execution_plan_runner
from muranoagent import execution_result as ex_result
from muranoagent import validation
CONF = config.CONF
LOG = logging.getLogger(__name__)
max_format_version = semantic_version.Spec('<=2.2.0')
class MuranoAgent(service.Service):
@ -80,11 +76,29 @@ class MuranoAgent(service.Service):
if plan is not None:
LOG.debug("Got an execution plan '{0}':".format(
strutils.mask_password(str(plan))))
self._run(plan)
if self._verify_plan(plan):
self._run(plan)
return
next(msg_iterator)
def _verify_plan(self, plan):
try:
validation.validate_plan(plan)
return True
except Exception as err:
try:
execution_result = ex_result.ExecutionResult.from_error(
err, plan)
if 'ReplyTo' in plan and CONF.enable_dynamic_result_queue:
execution_result['ReplyTo'] = plan.ReplyTo
self._send_result(execution_result)
except ValueError:
LOG.warning('Execution result is not produced')
finally:
return False
def _run(self, plan):
try:
with execution_plan_runner.ExecutionPlanRunner(plan) as runner:
@ -135,12 +149,18 @@ class MuranoAgent(service.Service):
prefetch_count=1) as subscription:
while True:
msg = subscription.get_message(timeout=5)
if msg is not None and isinstance(msg.body, dict):
self._handle_message(msg)
if msg is not None:
try:
self._queue.put_execution_plan(
msg.body,
msg.signature,
msg.id,
msg.reply_to)
finally:
msg.ack()
delay = 5
if msg is not None:
msg.ack()
yield
except KeyboardInterrupt:
break
@ -148,138 +168,3 @@ class MuranoAgent(service.Service):
LOG.warning('Communication error', exc_info=True)
time.sleep(delay)
delay = min(delay * 1.2, 60)
def _handle_message(self, msg):
if 'ID' not in msg.body and msg.id:
msg.body['ID'] = msg.id
if 'ReplyTo' not in msg.body and msg.reply_to:
msg.body['ReplyTo'] = msg.reply_to
try:
self._verify_plan(msg.body)
self._queue.put_execution_plan(msg.body)
except Exception as err:
try:
execution_result = ex_result.ExecutionResult.from_error(
err, bunch.Bunch(msg.body))
if ('ReplyTo' in msg.body) and \
CONF.enable_dynamic_result_queue:
execution_result['ReplyTo'] = msg.body.get('ReplyTo')
self._send_result(execution_result)
except ValueError:
LOG.warning('Execution result is not produced')
def _verify_plan(self, plan):
plan_format_version = semantic_version.Version(
plan.get('FormatVersion', '1.0.0'))
if plan_format_version not in max_format_version:
# NOTE(kazitsev) this is Version in Spec not str in str
raise exc.IncorrectFormat(
9,
"Unsupported format version {0} "
"(I support versions {1})".format(
plan_format_version, max_format_version))
for attr in ('Scripts', 'Files'):
if attr not in plan:
raise exc.IncorrectFormat(
2, '{0} is not in the execution plan'.format(attr))
for attr in ('Scripts', 'Files', 'Options'):
if attr in plan and not isinstance(
plan[attr], dict):
raise exc.IncorrectFormat(
2, '{0} is not a dictionary'.format(attr))
for name, script in plan.get('Scripts', {}).items():
self._validate_script(name, script, plan_format_version, plan)
for key, plan_file in plan.get('Files', {}).items():
self._validate_file(plan_file, key, plan_format_version)
def _validate_script(self, name, script, plan_format_version, plan):
for attr in ('Type', 'EntryPoint'):
if attr not in script or not isinstance(script[attr],
six.string_types):
raise exc.IncorrectFormat(
2, 'Incorrect {0} entry in script {1}'.format(
attr, name))
if plan_format_version in semantic_version.Spec('>=2.0.0,<2.1.0'):
if script['Type'] != 'Application':
raise exc.IncorrectFormat(
2, 'Type {0} is not valid for format {1}'.format(
script['Type'], plan_format_version))
if script['EntryPoint'] not in plan.get('Files', {}):
raise exc.IncorrectFormat(
2, 'Script {0} misses entry point {1}'.format(
name, script['EntryPoint']))
if plan_format_version in semantic_version.Spec('>=2.1.0'):
if script['Type'] not in ('Application', 'Chef', 'Puppet'):
raise exc.IncorrectFormat(
2, 'Script has not a valid type {0}'.format(
script['Type']))
if (script['Type'] == 'Application' and script['EntryPoint']
not in plan.get('Files', {})):
raise exc.IncorrectFormat(
2, 'Script {0} misses entry point {1}'.format(
name, script['EntryPoint']))
elif (script['Type'] != 'Application' and
"::" not in script['EntryPoint']):
raise exc.IncorrectFormat(
2, 'Wrong EntryPoint {0} for Puppet/Chef '
'executors. :: needed'.format(script['EntryPoint']))
for option in script['Options']:
if option in ('useBerkshelf', 'berksfilePath'):
if plan_format_version in semantic_version.Spec('<2.2.0'):
raise exc.IncorrectFormat(
2, 'Script has an option {0} invalid '
'for version {1}'.format(option,
plan_format_version))
elif script['Type'] != 'Chef':
raise exc.IncorrectFormat(
2, 'Script has an option {0} invalid '
'for type {1}'.format(option, script['Type']))
for additional_file in script.get('Files', []):
mns_error = ('Script {0} misses file {1}'.
format(name, additional_file))
if isinstance(additional_file, dict):
if (list(additional_file.keys())[0] not in
plan.get('Files', {}).keys()):
raise exc.IncorrectFormat(2, mns_error)
elif additional_file not in plan.get('Files', {}):
raise exc.IncorrectFormat(2, mns_error)
def _validate_file(self, plan_file, key, format_version):
if format_version in semantic_version.Spec('>=2.0.0,<2.1.0'):
for plan in plan_file.keys():
if plan in ('Type', 'URL'):
raise exc.IncorrectFormat(
2, 'Download file is {0} not valid for this '
'version {1}'.format(key, format_version))
if 'Type' in plan_file:
for attr in ('Type', 'URL', 'Name'):
if attr not in plan_file:
raise exc.IncorrectFormat(
2,
'Incorrect {0} entry in file {1}'.format(attr, key))
elif 'Body' in plan_file:
for attr in ('BodyType', 'Body', 'Name'):
if attr not in plan_file:
raise exc.IncorrectFormat(
2, 'Incorrect {0} entry in file {1}'.format(
attr, key))
if plan_file['BodyType'] not in ('Text', 'Base64'):
raise exc.IncorrectFormat(
2, 'Incorrect BodyType in file {1}'.format(key))
else:
raise exc.IncorrectFormat(
2, 'Invalid file {0}: {1}'.format(
key, plan_file))

View File

@ -24,17 +24,20 @@ from muranoagent import version
CONF = cfg.CONF
storage_opt = [
opts = [
cfg.StrOpt('storage',
default='/var/murano/plans',
help='Directory to store execution plans')
help='Directory to store execution plans'),
cfg.StrOpt('engine_key',
help='Public key of murano-engine')
]
message_routing_opt = [
cfg.BoolOpt('enable_dynamic_result_queue', help='Enable taking dynamic '
'result queue from task field reply_to',
default=False)
]
message_routing_opt = cfg.BoolOpt(
'enable_dynamic_result_queue',
help='Enable taking dynamic result queue from task field reply_to',
default=False)
rabbit_opts = [
cfg.HostAddressOpt('host',
@ -79,8 +82,8 @@ rabbit_opts = [
]
CONF.register_cli_opts(storage_opt)
CONF.register_cli_opts(message_routing_opt)
CONF.register_opts(opts)
CONF.register_cli_opt(message_routing_opt)
CONF.register_opts(rabbit_opts, group='rabbitmq')
logging.register_options(CONF)

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import anyjson
from oslo_log import log as logging
LOG = logging.getLogger("murano-common.messaging")
@ -27,20 +26,17 @@ class Message(object):
if message_handle:
self.id = message_handle.properties.get('message_id')
self._reply_to = message_handle.properties.get('reply_to')
self._signature = message_handle.headers.get('signature')
else:
self.id = None
self._reply_to = None
self._signature = None
try:
if message_handle:
if isinstance(message_handle.body, bytes):
message_handle.body = message_handle.body.decode('utf-8')
self.body = anyjson.loads(message_handle.body)
else:
self.body = None
except ValueError:
if message_handle:
self.body = message_handle.body
else:
self.body = None
LOG.exception('Message is not in JSON format')
@property
def body(self):
@ -64,3 +60,7 @@ class Message(object):
def ack(self):
self._message_handle.ack()
@property
def signature(self):
return self._signature

View File

@ -13,21 +13,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import shutil
import time
from cryptography.hazmat import backends
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from oslo_log import log as logging
from muranoagent import bunch
from muranoagent.common import config
CONF = config.CONF
LOG = logging.getLogger(__name__)
class ExecutionPlanQueue(object):
plan_filename = 'plan.json'
result_filename = 'result.json'
stamp_filename = 'stamp'
def __init__(self):
self._plans_folder = os.path.join(CONF.storage, 'plans')
@ -38,16 +46,25 @@ class ExecutionPlanQueue(object):
os.chmod(self._plans_folder, 0o700)
except OSError:
pass
self._key = None if not CONF.engine_key \
else serialization.load_pem_public_key(
CONF.engine_key, backends.default_backend())
self._load_stamp()
def put_execution_plan(self, execution_plan):
def put_execution_plan(self, execution_plan, signature, msg_id, reply_to):
timestamp = str(int(time.time() * 10000))
# execution_plan['_timestamp'] = timestamp
folder_path = os.path.join(self._plans_folder, timestamp)
os.mkdir(folder_path)
file_path = os.path.join(
plan_file_path = os.path.join(
folder_path, ExecutionPlanQueue.plan_filename)
with open(file_path, 'w') as out_file:
out_file.write(json.dumps(execution_plan))
with open(plan_file_path, 'wb') as out_file:
out_file.write(json.dumps({
'Data': base64.b64encode(execution_plan),
'Signature': base64.b64encode(signature or ''),
'ID': msg_id,
'ReplyTo': reply_to
}))
def _get_first_timestamp(self, filename):
def predicate(folder):
@ -70,11 +87,46 @@ class ExecutionPlanQueue(object):
return json.loads(json_file.read()), timestamp
def get_execution_plan(self):
ep, timestamp = self._get_first_file(ExecutionPlanQueue.plan_filename)
if ep is None:
return None
ep['_timestamp'] = timestamp
return bunch.Bunch(ep)
while True:
ep_info, timestamp = self._get_first_file(
ExecutionPlanQueue.plan_filename)
if ep_info is None:
return None
try:
data = base64.b64decode(ep_info['Data'])
if self._key:
signature = base64.b64decode(ep_info['Signature'])
self._verify_signature(data, signature)
ep = json.loads(data)
if not isinstance(ep, dict):
raise ValueError('Message is not a document')
stamp = ep.get('Stamp', -1)
if stamp >= 0:
if stamp <= self._last_stamp:
raise ValueError('Dropping old/duplicate message')
self._save_stamp(stamp)
if 'ID' not in ep:
ep['ID'] = ep_info['ID']
if 'ReplyTo' not in ep:
ep['ReplyTo'] = ep_info['ReplyTo']
ep['_timestamp'] = timestamp
return bunch.Bunch(ep)
except Exception as ex:
LOG.exception(ex)
self.remove(timestamp)
def _verify_signature(self, data, signature):
if not signature:
raise ValueError("Required signature was not found")
self._key.verify(
signature,
CONF.rabbitmq.input_queue + data,
padding.PKCS1v15(), hashes.SHA256())
def put_execution_result(self, result, execution_plan):
timestamp = execution_plan['_timestamp']
@ -93,3 +145,19 @@ class ExecutionPlanQueue(object):
def get_execution_plan_result(self):
return self._get_first_file(
ExecutionPlanQueue.result_filename)
def _load_stamp(self):
plan_file_path = os.path.join(
self._plans_folder, ExecutionPlanQueue.stamp_filename)
if os.path.exists(plan_file_path):
with open(plan_file_path) as f:
self._last_stamp = int(f.read())
else:
self._last_stamp = 0
def _save_stamp(self, stamp):
plan_file_path = os.path.join(
self._plans_folder, ExecutionPlanQueue.stamp_filename)
with open(plan_file_path, 'w') as f:
f.write(str(stamp))
self._last_stamp = stamp

View File

@ -16,8 +16,6 @@ import fixtures
import mock
import ssl as ssl_module
from oslo_service import sslutils
from muranoagent import app
from muranoagent import bunch
from muranoagent.common import config as cfg
@ -25,6 +23,7 @@ from muranoagent.common.messaging import mqclient
from muranoagent import exceptions as exc
from muranoagent.tests.unit import base
from muranoagent.tests.unit import execution_plan as ep
from muranoagent import validation
CONF = cfg.CONF
@ -35,11 +34,15 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture):
@mock.patch('os.path.exists')
def setUp(self, mock_path, mock_chmod):
super(TestApp, self).setUp()
mock_path.return_value = True
mock_path.side_effect = self._exists
self.agent = app.MuranoAgent()
CONF.set_override('storage', 'cache')
self.addCleanup(CONF.clear_override, 'storage')
@staticmethod
def _exists(path):
return 'stamp' not in path
def test_verify_execution_plan_downloable(self):
template = self.useFixture(ep.ExPlanDownloable()).execution_plan
self.agent._verify_plan(template)
@ -50,13 +53,13 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture):
FormatVersion='0.0.0',
)
self.assertRaises(exc.IncorrectFormat,
self.agent._verify_plan, template)
validation.validate_plan, template)
def test_verify_over_max_execution_plan(self):
template = self.useFixture(ep.ExPlanApplication()).execution_plan
template['FormatVersion'] = '1000.0.0'
self.assertRaises(exc.IncorrectFormat,
self.agent._verify_plan, template)
validation.validate_plan, template)
def test_verify_execution_application(self):
template = self.useFixture(ep.ExPlanApplication()).execution_plan
@ -71,12 +74,12 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture):
}
template['FormatVersion'] = '2.0.0'
self.assertRaises(exc.IncorrectFormat,
self.agent._verify_plan, template)
validation.validate_plan, template)
def test_verify_execution_plan_no_files(self):
template = self.useFixture(ep.ExPlanDownloableNoFiles()).execution_plan
self.assertRaises(exc.IncorrectFormat,
self.agent._verify_plan, template)
validation.validate_plan, template)
def test_verify_execution_plan_berkshelf(self):
template = self.useFixture(ep.ExPlanBerkshelf()).execution_plan
@ -85,7 +88,7 @@ class TestApp(base.MuranoAgentTestCase, fixtures.FunctionFixture):
def test_verify_execution_plan_berkshelf_wrong_version(self):
template = self.useFixture(ep.ExPlanBerkWrongVersion()).execution_plan
self.assertRaises(exc.IncorrectFormat,
self.agent._verify_plan, template)
validation.validate_plan, template)
@mock.patch.object(mqclient, 'random', autospec=True)
@mock.patch.object(mqclient, 'kombu', autospec=True)

140
muranoagent/validation.py Normal file
View File

@ -0,0 +1,140 @@
# Copyright (c) 2017 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import semantic_version
import six
from muranoagent import exceptions as exc
max_format_version = semantic_version.Spec('<=2.2.0')
def validate_plan(plan):
plan_format_version = semantic_version.Version(
plan.get('FormatVersion', '1.0.0'))
if plan_format_version not in max_format_version:
# NOTE(kazitsev) this is Version in Spec not str in str
raise exc.IncorrectFormat(
9,
"Unsupported format version {0} "
"(I support versions {1})".format(
plan_format_version, max_format_version))
for attr in ('Scripts', 'Files'):
if attr not in plan:
raise exc.IncorrectFormat(
2, '{0} is not in the execution plan'.format(attr))
for attr in ('Scripts', 'Files', 'Options'):
if attr in plan and not isinstance(
plan[attr], dict):
raise exc.IncorrectFormat(
2, '{0} is not a dictionary'.format(attr))
for name, script in plan.get('Scripts', {}).items():
_validate_script(name, script, plan_format_version, plan)
for key, plan_file in plan.get('Files', {}).items():
_validate_file(plan_file, key, plan_format_version)
def _validate_script(name, script, plan_format_version, plan):
for attr in ('Type', 'EntryPoint'):
if attr not in script or not isinstance(script[attr],
six.string_types):
raise exc.IncorrectFormat(
2, 'Incorrect {0} entry in script {1}'.format(
attr, name))
if plan_format_version in semantic_version.Spec('>=2.0.0,<2.1.0'):
if script['Type'] != 'Application':
raise exc.IncorrectFormat(
2, 'Type {0} is not valid for format {1}'.format(
script['Type'], plan_format_version))
if script['EntryPoint'] not in plan.get('Files', {}):
raise exc.IncorrectFormat(
2, 'Script {0} misses entry point {1}'.format(
name, script['EntryPoint']))
if plan_format_version in semantic_version.Spec('>=2.1.0'):
if script['Type'] not in ('Application', 'Chef', 'Puppet'):
raise exc.IncorrectFormat(
2, 'Script has not a valid type {0}'.format(
script['Type']))
if (script['Type'] == 'Application' and script['EntryPoint']
not in plan.get('Files', {})):
raise exc.IncorrectFormat(
2, 'Script {0} misses entry point {1}'.format(
name, script['EntryPoint']))
elif (script['Type'] != 'Application' and
"::" not in script['EntryPoint']):
raise exc.IncorrectFormat(
2, 'Wrong EntryPoint {0} for Puppet/Chef '
'executors. :: needed'.format(script['EntryPoint']))
for option in script['Options']:
if option in ('useBerkshelf', 'berksfilePath'):
if plan_format_version in semantic_version.Spec('<2.2.0'):
raise exc.IncorrectFormat(
2, 'Script has an option {0} invalid '
'for version {1}'.format(option,
plan_format_version))
elif script['Type'] != 'Chef':
raise exc.IncorrectFormat(
2, 'Script has an option {0} invalid '
'for type {1}'.format(option, script['Type']))
for additional_file in script.get('Files', []):
mns_error = ('Script {0} misses file {1}'.format(
name, additional_file))
if isinstance(additional_file, dict):
if (list(additional_file.keys())[0] not in
plan.get('Files', {}).keys()):
raise exc.IncorrectFormat(2, mns_error)
elif additional_file not in plan.get('Files', {}):
raise exc.IncorrectFormat(2, mns_error)
def _validate_file(plan_file, key, format_version):
if format_version in semantic_version.Spec('>=2.0.0,<2.1.0'):
for plan in plan_file.keys():
if plan in ('Type', 'URL'):
raise exc.IncorrectFormat(
2, 'Download file is {0} not valid for this '
'version {1}'.format(key, format_version))
if 'Type' in plan_file:
for attr in ('Type', 'URL', 'Name'):
if attr not in plan_file:
raise exc.IncorrectFormat(
2,
'Incorrect {0} entry in file {1}'.format(attr, key))
elif 'Body' in plan_file:
for attr in ('BodyType', 'Body', 'Name'):
if attr not in plan_file:
raise exc.IncorrectFormat(
2, 'Incorrect {0} entry in file {1}'.format(
attr, key))
if plan_file['BodyType'] not in ('Text', 'Base64'):
raise exc.IncorrectFormat(
2, 'Incorrect BodyType in file {0}'.format(key))
else:
raise exc.IncorrectFormat(
2, 'Invalid file {0}: {1}'.format(
key, plan_file))

View File

@ -14,3 +14,4 @@ PyYAML>=3.10 # MIT
six>=1.10.0 # MIT
semantic-version>=2.3.1 # BSD
requests>=2.14.2 # Apache-2.0
cryptography>=1.9,!=2.0 # BSD/Apache-2.0