Task base deployment

* async task: puppet, shell, cobbler, sync, noop and reboot;
* not async: upload file, upload files, copy files;
* new report format based on task progress instead of log;
* support union graph deployment: pre/post tasks included in main graph;
* plugins tasks also included in main graph;
* support previous failt tollerance strategy (percent of failed nodes).
* add tests.

Implements: blueprint task-based-deployment-astute
Closes-Bug: #1506962
Closes-Bug: #1430470

Change-Id: Ibe24d047ab502b88ea0ae9f8c77f94440000d289
This commit is contained in:
Vladimir Sharshov (warpc) 2015-11-26 18:17:29 +03:00
parent e8c753d6ce
commit 583d68b1cb
25 changed files with 1701 additions and 8 deletions

View File

@ -45,14 +45,18 @@ require 'astute/task_manager'
require 'astute/pre_delete'
require 'astute/version'
require 'astute/server/async_logger'
require 'astute/task'
require 'astute/task_deployment'
require 'astute/task_node'
require 'fuel_deployment'
['/astute/pre_deployment_actions/*.rb',
'/astute/pre_deploy_actions/*.rb',
'/astute/pre_node_actions/*.rb',
'/astute/post_deploy_actions/*.rb',
'/astute/post_deployment_actions/*.rb',
'/astute/common_actions/*.rb'
'/astute/common_actions/*.rb',
'/astute/tasks/*.rb'
].each do |path|
Dir[File.dirname(__FILE__) + path].each{ |f| require f }
end

View File

@ -59,6 +59,7 @@ module Astute
conf[:puppet_deploy_interval] = 2 # sleep for ## sec, then check puppet status again
conf[:puppet_fade_timeout] = 120 # how long it can take for puppet to exit after dumping to last_run_summary
conf[:puppet_retries] = 2 # how many times astute will try to run puppet
conf[:puppet_module_path] = '/etc/puppet/modules' # where we should find basic modules for puppet
conf[:mc_retries] = 10 # MClient tries to call mcagent before failure
conf[:mc_retry_interval] = 1 # MClient sleeps for ## sec between retries
conf[:puppet_fade_interval] = 30 # retry every ## seconds to check puppet state if it was running
@ -66,7 +67,9 @@ module Astute
conf[:reboot_timeout] = 240 # how long it can take for node to reboot
conf[:dump_timeout] = 3600 # maximum time it waits for the dump (meaningles to be larger
# than the specified in timeout of execute_shell_command mcagent
conf[:shell_timeout] = 300 # default timeout for shell task
conf[:shell_cwd] = '/' # default cwd for shell task
conf[:rsync_options] = '-c -r --delete' # default rsync options
conf[:keys_src_dir] = '/var/lib/fuel/keys' # path where ssh and openssl keys will be created
conf[:puppet_ssh_keys] = [
'neutron',
@ -96,6 +99,8 @@ module Astute
# Please increase if nodes could not provisioning
conf[:agent_nodiscover_file] = '/etc/nailgun-agent/nodiscover' # if this file in place, nailgun-agent will do nothing
conf[:bootstrap_profile] = 'bootstrap' # use the CentOS based bootstrap by default
conf[:graph_dot_dir] = "/tmp/" # default dir patch for debug graph file
conf[:enable_graph_file] = false # enable debug graph records to file
# Server settings
conf[:broker_host] = 'localhost'

View File

@ -35,5 +35,9 @@ module Astute
reporter.report(data)
end
def report(msg)
@reporter.report msg
end
end
end

View File

@ -131,7 +131,21 @@ module Astute
super
@actions = [
EnablePuppetDeploy.new,
UploadFacts.new
UploadFacts.new,
InitialConnectFacts.new
]
end
end
class TaskPreDeploymentActions < DeployActions
def initialize(deployment_info, context)
super
@actions = [
EnablePuppetDeploy.new,
UploadFacts.new,
InitialConnectFacts.new
]
end

View File

@ -28,5 +28,7 @@ module Astute
class MClientError < AstuteError; end
# MClient timeout error
class MClientTimeout < Timeout::Error; end
# Task validation error
class TaskValidationError < AstuteError; end
end

View File

@ -91,7 +91,9 @@ module Astute
if @on_respond_timeout
@on_respond_timeout.call not_responded
else
err_msg += "MCollective agents '#{not_responded.join(',')}' didn't respond within the allotted time.\n"
err_msg += "MCollective agents '#{@agent}' " \
"'#{not_responded.join(',')}' didn't respond within the " \
"allotted time.\n"
timeout_nodes_count += not_responded.size
end
end

View File

@ -30,6 +30,9 @@ module Astute
report_result({}, up_reporter)
end
# Deprecated deploy method. Use monolitic site.pp. Do not use from 7.1.
# Report progress based on puppet logs
def deploy(up_reporter, task_id, deployment_info, pre_deployment=[], post_deployment=[])
deploy_cluster(
up_reporter,
@ -41,6 +44,9 @@ module Astute
)
end
# Deploy method which use small tasks, but run block of tasks for role
# instead of run it using full graph. Use from 7.1 to 8.0. Report progress
# based on puppet logs
def granular_deploy(up_reporter, task_id, deployment_info, pre_deployment=[], post_deployment=[])
deploy_cluster(
up_reporter,
@ -52,6 +58,16 @@ module Astute
)
end
# Deploy method which use small tasks in full graph.
# Use from 8.0 (experimental). Report progress based on tasks
def task_deploy(up_reporter, task_id, deployment_info, deployment_tasks)
context = Context.new(task_id, up_reporter)
Astute.logger.info "Task based deployment will be used"
deployment_engine = TaskDeployment.new(context)
deployment_engine.deploy(deployment_info, deployment_tasks)
end
def provision(up_reporter, task_id, provisioning_info, provision_method)
proxy_reporter = ProxyReporter::ProvisiningProxyReporter.new(
up_reporter,

View File

@ -61,8 +61,8 @@ module Astute
# ready, error or deploying
result.fetch('status', 'deploying')
rescue MClientTimeout
Astute.logger.warn "MCollective agents #{@node['uid']} " \
rescue MClientTimeout, Timeout::Error
Astute.logger.warn "Puppet agent #{@node['uid']} " \
"didn't respond within the allotted time"
'error'
end

View File

@ -104,6 +104,25 @@ module Astute
end
end
def task_deploy(data)
Astute.logger.info("'task_deploy' method called with data:\n"\
"#{data.pretty_inspect}")
reporter = create_reporter(data)
begin
@orchestrator.task_deploy(
reporter,
data['args']['task_uuid'],
data['args']['deployment_info'],
data['args']['deployment_tasks']
)
rescue Timeout::Error
msg = "Timeout of deployment is exceeded."
Astute.logger.error(msg)
reporter.report('status' => 'error', 'error' => msg)
end
end
def verify_networks(data)
data.fetch('subtasks', []).each do |subtask|
if self.respond_to?(subtask['method'])

205
lib/astute/task.rb Normal file
View File

@ -0,0 +1,205 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class Task
ALLOWED_STATUSES = [:successful, :failed, :running, :pending]
def initialize(task, context)
# WARNING: this code expect that only one node will be send
# on one hook.
@task = task
@status = :pending
@ctx = context
end
# Run current task on node, specified in task
def run
validation
setup_default
running!
process
rescue => e
Astute.logger.warn("Fail to run task #{@task['type']} #{task_name}" \
" with error #{e.message} trace: #{e.format_backtrace}")
failed!
end
# Polls the status of the task
def status
calculate_status unless finished?
@status
rescue => e
Astute.logger.warn("Fail to detect status of the task #{@task['type']}" \
" #{task_name} with error #{e.message} trace: #{e.format_backtrace}")
failed!
end
def status=(value)
value = value.to_sym
unless ALLOWED_STATUSES.include?(value)
raise AstuteError::InvalidArgument,
"#{self}: Invalid task status: #{value}"
end
@status = value
end
private
# Run current task on node, specified in task
# should be fast and async and do not raise exceptions
# @abstract Should be implemented in a subclass
def process
raise NotImplementedError
end
# Polls the status of the task
# should update the task status and do not raise exceptions
# @abstract Should be implemented in a subclass
def calculate_status
raise NotImplementedError
end
def validate_presence(data, key)
raise TaskValidationError,
"Missing a required parameter #{key}" unless data[key].present?
end
# Pre validation of the task
# should check task and raise error if something went wrong
# @raise [TaskValidationError] if the object is not a task or has missing fields
def validation
end
# Setup default value for hook
# should not raise any exception
def setup_default
end
# Run short shell commands
# should use only in case of short run command
# In other case please use shell task
# Synchronous (blocking) call
def run_shell_without_check(node_uid, cmd, timeout=2)
shell = MClient.new(
@ctx,
'execute_shell_command',
Array(node_uid),
check_result=false,
timeout=timeout
)
results = shell.execute(:cmd => cmd)
#FIXME(vsharshov): remove after find problem with
# undefined method `results' for nil:NilClas
Astute.logger.debug("Mcollective shell result: #{results}")
if results
result = results.first
Astute.logger.debug(
"#{@ctx.task_id}: cmd: #{cmd}\n" \
"stdout: #{result.results[:data][:stdout]}\n" \
"stderr: #{result.results[:data][:stderr]}\n" \
"exit code: #{result.results[:data][:exit_code]}")
{
:stdout =>result.results[:data][:stdout].chomp,
:stderr => result.results[:data][:stderr].chomp,
:exit_code => result.results[:data][:exit_code]
}
else
Astute.logger.warn("#{@ctx.task_id}: Failed to run shell #{cmd} on " \
"node #{node_uid}. Error will not raise because shell was run " \
"without check")
{}
end
end
# Create file with content on selected node
# should use only for small file
# In other case please use separate thread or
# use upload file task.
# Synchronous (blocking) call
def upload_file(node_uid, mco_params={})
upload_mclient = Astute::MClient.new(
@ctx,
"uploadfile",
Array(node_uid)
)
mco_params['overwrite'] = true if mco_params['overwrite'].nil?
mco_params['parents'] = true if mco_params['parents'].nil?
mco_params['permissions'] ||= '0644'
mco_params['user_owner'] ||= 'root'
mco_params['group_owner'] ||= 'root'
mco_params['dir_permissions'] ||= '0755'
upload_mclient.upload(
:path => mco_params['path'],
:content => mco_params['content'],
:overwrite => mco_params['overwrite'],
:parents => mco_params['parents'],
:permissions => mco_params['permissions'],
:user_owner => mco_params['user_owner'],
:group_owner => mco_params['group_owner'],
:dir_permissions => mco_params['dir_permissions']
)
Astute.logger.debug("#{@ctx.task_id}: file was uploaded " \
"#{mco_params['path']} on node #{node_uid} successfully")
true
rescue MClientTimeout, MClientError => e
Astute.logger.error("#{@ctx.task_id}: file was not uploaded "\
"#{mco_params['path']} on node #{node_uid}: #{e.message}")
false
end
def finished?
[:successful, :failed].include? @status
end
def failed!
self.status = :failed
end
def failed?
@status == :failed
end
def running!
self.status = :running
end
def running?
@status == :running
end
def succeed!
self.status = :successful
end
def successful?
@status == :successful
end
def pending?
@status == :pending
end
def task_name
@task['id'] || @task['diagnostic_name']
end
end
end

View File

@ -0,0 +1,246 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'fuel_deployment'
module Astute
class TaskDeployment
def initialize(context)
@ctx = context
end
def deploy(deployment_info, deployment_tasks)
raise DeploymentEngineError, "Deployment info are not provided!" if
deployment_info.blank? || deployment_tasks.blank?
deployment_info, offline_uids = remove_failed_nodes(deployment_info)
Astute::TaskPreDeploymentActions.new(deployment_info, @ctx).process
deployment_tasks = support_virtual_node(deployment_tasks)
nodes = {}
deployment_tasks.keys.each do |node_id|
node = TaskNode.new(node_id)
node.context = @ctx
node.set_critical if critical_node_uids(deployment_info).include?(node_id)
node.set_status_failed if offline_uids.include? node_id
nodes[node_id] = node
end
deployment_tasks.each do |node_id, tasks|
tasks.each do |task|
nodes[node_id].graph.create_task(
task['id'],
task.merge({'node_id' => node_id})
)
end
end
deployment_tasks.each do |node_id, tasks|
tasks.each do |task|
task['requires'].each do |d_t|
nodes[node_id][task['id']].depends nodes[d_t['node_id']][d_t['name']]
end
task['required_for'].each do |d_t|
nodes[node_id][task['id']].depended_on nodes[d_t['node_id']][d_t['name']]
end
end
end
deployment = Deployment::Process.new(nodes.values)
write_graph_to_file(deployment)
result = deployment.run
report_deploy_result(result)
end
private
def report_deploy_result(result)
if result[:success]
@ctx.report('status' => 'ready', 'progress' => 100)
else
result[:failed_nodes].each do |node|
report_status = {
'uid' => node.id,
'status' => 'error',
'error_type' => 'deploy',
'error_msg' => result[:status]
}
task = result[:failed_tasks].find{ |t| t.node == node }
if task
report_status.merge!({
'task' => task.name,
'task_status' => task.status.to_s
})
end
@ctx.report(report_status)
end
@ctx.report(
'status' => 'error',
'progress' => 100,
'error' => result[:status]
)
end
end
def write_graph_to_file(deployment)
return unless Astute.config.enable_graph_file
graph_file = File.join(
Astute.config.graph_dot_dir,
"graph-#{@ctx.task_id}.dot"
)
File.open(graph_file, 'w') { |f| f.write(deployment.to_dot) }
Astute.logger.info("Check graph into file #{graph_file}")
end
# Astute use special virtual node for deployment tasks, because
# any task must be connected to node. For task, which play
# synchronization role, we create virtual_sync_node
def support_virtual_node(deployment_tasks)
deployment_tasks['virtual_sync_node'] = deployment_tasks['null']
deployment_tasks.delete('null')
deployment_tasks.each do |node_id, tasks|
tasks.each do |task|
task['requires'].each do |d_t|
d_t['node_id'] = 'virtual_sync_node' if d_t['node_id'].nil?
end
task['required_for'].each do |d_t|
d_t['node_id'] = 'virtual_sync_node' if d_t['node_id'].nil?
end
end
end
deployment_tasks
end
def critical_node_uids(deployment_info)
@critcial_nodes ||= deployment_info.select{ |n| n['fail_if_error'] }
.map{ |n| n['uid'] }.uniq
end
# Removes nodes which failed to provision
def remove_failed_nodes(deployment_info)
uids = get_uids_from_deployment_info deployment_info
required_uids = critical_node_uids(deployment_info)
available_uids = detect_available_nodes(uids)
offline_uids = uids - available_uids
if offline_uids.present?
# set status for all failed nodes to error
nodes = (uids - available_uids).map do |uid|
{'uid' => uid,
'status' => 'error',
'error_type' => 'provision',
'error_msg' => 'Node is not ready for deployment: '\
'mcollective has not answered'
}
end
@ctx.report_and_update_status(
'nodes' => nodes,
'error' => 'Node is not ready for deployment'
)
# check if all required nodes are online
# if not, raise error
missing_required = required_uids - available_uids
if missing_required.present?
error_message = "Critical nodes are not available for deployment: " \
"#{missing_required}"
raise Astute::DeploymentEngineError, error_message
end
end
return remove_offline_nodes(
uids,
available_uids,
deployment_info,
offline_uids)
end
def remove_offline_nodes(uids, available_uids, deployment_info, offline_uids)
if offline_uids.blank?
return [deployment_info, offline_uids]
end
Astute.logger.info "Removing nodes which failed to provision: " \
"#{offline_uids}"
deployment_info = cleanup_nodes_block(deployment_info, offline_uids)
deployment_info = deployment_info.select do |node|
available_uids.include? node['uid']
end
[deployment_info, offline_uids]
end
def cleanup_nodes_block(deployment_info, offline_uids)
return deployment_info if offline_uids.blank?
nodes = deployment_info.first['nodes']
# In case of deploy in already existing cluster in nodes block
# we will have all cluster nodes. We should remove only missing
# nodes instead of stay only avaliable.
# Example: deploy 3 nodes, after it deploy 2 nodes.
# In 1 of 2 seconds nodes missing, in nodes block we should
# contain only 4 nodes.
nodes_wthout_missing = nodes.select do |node|
!offline_uids.include?(node['uid'])
end
deployment_info.each { |node| node['nodes'] = nodes_wthout_missing }
deployment_info
end
def detect_available_nodes(uids)
all_uids = uids.clone
available_uids = []
# In case of big amount of nodes we should do several calls to be sure
# about node status
Astute.config[:mc_retries].times.each do
systemtype = Astute::MClient.new(
@ctx,
"systemtype",
all_uids,
check_result=false,
10
)
available_nodes = systemtype.get_type.select do |node|
node.results[:data][:node_type].chomp == "target"
end
available_uids += available_nodes.map { |node| node.results[:sender] }
all_uids -= available_uids
break if all_uids.empty?
sleep Astute.config[:mc_retry_interval]
end
available_uids
end
def get_uids_from_deployment_info(deployment_info)
top_level_uids = deployment_info.map{ |node| node["uid"] }
inside_uids = deployment_info.inject([]) do |uids, node|
uids += node.fetch('nodes', []).map{ |n| n['uid'] }
end
top_level_uids | inside_uids
end
end
end

95
lib/astute/task_node.rb Normal file
View File

@ -0,0 +1,95 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'fuel_deployment'
module Astute
class TaskNode < Deployment::Node
def context=(context)
@ctx = context
end
def run(inbox_task)
self.task = inbox_task
@task_engine = select_task_engine(task.data)
@task_engine.run
task.set_status_running
set_status_busy
end
def poll
return unless busy?
debug("Node #{id}: task #{task.name}, task status #{task.status}")
# Please be informed that this code define special method
# of Deployment::Node class. We use special method `task`
# to manage task status, graph of tasks and nodes.
task.status = @task_engine.status
if @task.running?
@ctx.report({
'uid' => id,
'status' => 'deploying',
'task' => task.name,
'progress' => current_progress_bar
})
else
set_status_online
deploy_status = if !finished?
'deploying'
elsif successful?
'ready'
else
'error'
end
report_status = {
'uid' => id,
'status' => deploy_status,
'task' => task.name,
'task_status' => task.status.to_s,
'progress' => current_progress_bar
}
report_status.merge!('error_type' => 'deploy') if
deploy_status == 'error'
@ctx.report(report_status)
end
end
private
def current_progress_bar
100 * tasks_finished_count / tasks_total_count
end
def select_task_engine(data)
# TODO: replace by Object.const_get(type.split('_').collect(&:capitalize).join)
case data['type']
when 'shell' then Shell.new(data, @ctx)
when 'puppet' then Puppet.new(data, @ctx)
when 'upload_file' then UploadFile.new(data, @ctx)
when 'upload_files' then UploadFiles.new(data, @ctx)
when 'reboot' then Reboot.new(data, @ctx)
when 'sync' then Sync.new(data, @ctx)
when 'cobbler_sync' then CobblerSync.new(data, @ctx)
when 'copy_files' then CopyFiles.new(data, @ctx)
when 'noop' then Noop.new(data, @ctx)
when 'stage' then Noop.new(data, @ctx)
when 'skipped' then Noop.new(data, @ctx)
else raise TaskValidationError, "Unknown task type '#{data['type']}'"
end
end
end
end

View File

@ -0,0 +1,42 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class CobblerSync < Task
def initialize(task, context)
super
@work_thread = nil
end
private
def process
cobbler = CobblerManager.new(
@task['parameters']['provisioning_info']['engine'],
@ctx.reporter
)
@work_thread = Thread.new { cobbler.sync }
end
def calculate_status
@work_thread.join and succeed! unless @work_thread.alive?
end
def validation
validate_presence(@task['parameters'], 'provisioning_info')
end
end
end

View File

@ -0,0 +1,59 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class CopyFiles < Task
def initialize(task, context)
super
@work_thread = nil
@files_status = @task['parameters']['files'].inject({}) do |f_s, n|
f_s.merge({ n['src']+n['dst'] => :pending })
end
end
private
def process
@task['parameters']['files'].each do |file|
if File.file?(file['src']) && File.readable?(file['src'])
parameters = {
'content' => File.binread(file['src']),
'path' => file['dst'],
'permissions' => file['permissions'] || @task['parameters']['permissions'],
'dir_permissions' => file['dir_permissions'] || @task['parameters']['dir_permissions'],
}
@files_status[file['src']+file['dst']] =
upload_file(@task['node_id'], parameters)
else
@files_status[file['src']+file['dst']] = false
end
end # files
end
def calculate_status
if @files_status.values.all?{ |s| s != :pending }
failed! if @files_status.values.include?(false)
succeed! if @files_status.values.all?{ |s| s == true }
return
end
end
def validation
validate_presence(@task, 'node_id')
validate_presence(@task['parameters'], 'files')
end
end
end

29
lib/astute/tasks/noop.rb Normal file
View File

@ -0,0 +1,29 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class Noop < Task
private
def process
end
def calculate_status
succeed!
end
end
end

View File

@ -0,0 +1,72 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'timeout'
module Astute
class Puppet < Task
private
def process
@puppet_task = create_puppet_task
@puppet_task.run
end
def calculate_status
case @puppet_task.status
when 'ready' then succeed!
when 'error' then failed!
end
end
def validation
validate_presence(@task, 'node_id')
validate_presence(@task['parameters'], 'puppet_manifest')
end
def setup_default
@task['parameters']['cwd'] ||= '/'
@task['parameters']['timeout'] ||= Astute.config.puppet_timeout
@task['parameters']['retries'] ||= Astute.config.puppet_retries
@task['parameters']['debug'] = false unless @task['parameters']['debug'].present?
@task['parameters']['puppet_modules'] ||= Astute.config.puppet_module_path
end
def create_puppet_task
PuppetTask.new(
Context.new(
@ctx.task_id,
PuppetLoggerReporter.new,
LogParser::NoParsing.new
),
{'uid' => @task['node_id'].to_s, 'role' => task_name},
@task['parameters']['retries'],
@task['parameters']['puppet_manifest'],
@task['parameters']['puppet_modules'],
@task['parameters']['cwd'],
@task['parameters']['timeout'],
@task['parameters']['debug']
)
end
end # class
class PuppetLoggerReporter
def report(msg)
Astute.logger.debug msg
end
end
end

View File

@ -0,0 +1,84 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class Reboot < Task
def initialize(task, context)
super
@control_time = nil
@time_start = nil
@already_rebooted = false
end
private
def process
@control_time = boot_time
@time_start = Time.now.to_i
unless @control_time
failed!
Astute.logger.warn("#{@ctx.task_id}: #{task_name} failed because" \
"task could not get valid info about boot time")
return
end
reboot
end
def calculate_status
if Time.now.to_i - @time_start > @task['parameters']['timeout']
failed!
Astute.logger.warn("#{@ctx.task_id}: #{task_name} failed because" \
"reboot timeout #{@task['parameters']['timeout']} expired")
return
end
current_bt = boot_time
succeed! if current_bt != @control_time && !current_bt.nil?
end
def validation
validate_presence(@task, 'node_id')
end
def setup_default
@task['parameters']['timeout'] ||= Astute.config.reboot_timeout
end
def reboot
run_shell_without_check(
Array(@task['node_id']),
'reboot',
timeout=2
)
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.error("#{@ctx.task_id}: #{task_name} mcollective " \
"reboot command failed with error #{e.message}")
failed!
end
def boot_time
run_shell_without_check(
Array(@task['node_id']),
"stat --printf='%Y' /proc/1",
timeout=2
)[:stdout].to_i
rescue Astute::MClientTimeout, Astute::MClientError => e
Astute.logger.debug("#{@ctx.task_id}: #{task_name} mcollective " \
"boot time command failed with error #{e.message}")
nil
end
end
end

137
lib/astute/tasks/shell.rb Normal file
View File

@ -0,0 +1,137 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'erb'
module Astute
class Shell < Task
# Accept to run shell tasks using existing puppet asynchronous
# mechanism. It create and upload 2 files: shell script and
# puppet manifest. Then run puppet manifest
def initialize(task, context)
super
@puppet_task = nil
end
private
SHELL_MANIFEST_DIR = '/etc/puppet/shell_manifests'
def process
run_shell_without_check(
@task['node_id'],
"mkdir -p #{SHELL_MANIFEST_DIR}",
timeout=2
)
upload_shell_manifest
@puppet_task = Puppet.new(
generate_puppet_hook,
@ctx
)
@puppet_task.run
end
def calculate_status
self.status = @puppet_task.status
end
def validation
validate_presence(@task, 'node_id')
validate_presence(@task['parameters'], 'cmd')
end
def setup_default
@task['parameters']['timeout'] ||= Astute.config.shell_timeout
@task['parameters']['cwd'] ||= Astute.config.shell_cwd
@task['parameters']['retries'] ||= Astute.config.mc_retries
@task['parameters']['interval'] ||= Astute.config.mc_retry_interval
end
def puppet_exec_template
template = <<-eos
# Puppet manifest wrapper for task: <%= task_name %>
notice('MODULAR: <%= task_name %>')
exec { '<%= task_name %>_shell' :
path => '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
command => '/bin/bash "<%= shell_exec_file_path %>"',
logoutput => true,
}
eos
ERB.new(template, nil, '-').result(binding)
end
def shell_exec_template
command = "cd #{@task['parameters']['cwd']} &&" \
" #{@task['parameters']['cmd']}"
template = <<-eos
#!/bin/bash
# Puppet shell wrapper for task: <%= task_name %>
# Manifest: <%= puppet_exec_file_path %>
<%= command %>
eos
ERB.new(template, nil, '-').result(binding)
end
def shell_exec_file_path
File.join(SHELL_MANIFEST_DIR, "#{task_name}_command.sh")
end
def puppet_exec_file_path
File.join(SHELL_MANIFEST_DIR, manifest_name)
end
def upload_puppet_manifest
upload_file(@task['node_id'], {
'path' => puppet_exec_file_path,
'content' => puppet_exec_template,
'permissions' => '0755'
})
end
def upload_shell_file
upload_file(@task['node_id'], {
'path' => shell_exec_file_path,
'content' => shell_exec_template,
'permissions' => '0755'
})
end
def upload_shell_manifest
upload_puppet_manifest
upload_shell_file
end
def manifest_name
"#{task_name}_manifest.pp"
end
def generate_puppet_hook
{
'node_id' => @task['node_id'],
'id' => @task['id'],
'parameters' => {
"puppet_manifest" => manifest_name,
"cwd" => SHELL_MANIFEST_DIR,
"timeout" => @task['parameters']['timeout'],
"retries" => @task['parameters']['retries']
}
}
end
end
end

60
lib/astute/tasks/sync.rb Normal file
View File

@ -0,0 +1,60 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class Sync < Task
private
def process
@shell_task = Shell.new(
generate_shell_hook,
@ctx
)
@shell_task.run
end
def calculate_status
self.status = @shell_task.status
end
def validation
validate_presence(@task, 'node_id')
validate_presence(@task['parameters'], 'dst')
validate_presence(@task['parameters'], 'src')
end
def setup_default
@task['parameters']['timeout'] ||= 300
@task['parameters']['retries'] ||= 10
end
def generate_shell_hook
path = @task['parameters']['dst']
rsync_cmd = "mkdir -p #{path} && rsync #{Astute.config.rsync_options}" \
" #{@task['parameters']['src']} #{path}"
{
"node_id" => @task['node_id'],
"id" => @task['id'],
"parameters" => {
"cmd" => rsync_cmd,
"cwd" => "/",
"timeout" => @task['parameters']['timeout'],
"retries" => @task['parameters']['retries']
}
}
end
end
end

View File

@ -0,0 +1,47 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class UploadFile < Task
def initialize(task, context)
super
@upload_status = :pending
end
private
def process
@upload_status = upload_file(@task['node_id'], @task['parameters'])
end
def calculate_status
if [true, false].include? @upload_status
@upload_status ? succeed! : failed!
return
end
end
def validation
validate_presence(@task, 'node_id')
validate_presence(@task['parameters'], 'path')
validate_presence(@task['parameters'], 'data')
end
def setup_default
@task['parameters']['content'] = @task['parameters']['data']
end
end
end

View File

@ -0,0 +1,56 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class UploadFiles < Task
def initialize(task, context)
super
@nodes_status = @task['parameters']['nodes'].inject({}) do |n_s, n|
n_s.merge({ n['uid'] => :pending })
end
end
private
def process
hook['parameters']['nodes'].each do |node|
node['files'].each do |file|
parameters = {
'content' => file['data'],
'path' => file['dst'],
'permissions' => file['permissions'] || '0644',
'dir_permissions' => file['dir_permissions'] || '0755',
}
if @nodes_status[node['uid']]
@nodes_status[node['uid']] = upload_file(node['uid'], parameters)
end
end
end
end
def calculate_status
if @nodes_status.values.all? { |v| v != :pending }
failed! if @nodes_status.values.include?(false)
succeed! if @nodes_status.values.all?{ |s| s == true }
return
end
end
def validation
validate_presence(@task['parameters'], 'nodes')
end
end
end

View File

@ -68,7 +68,7 @@ describe MClient do
mclient = MClient.new(@ctx, "faketest", nodes.map {|x| x['uid']}, check_result=true, timeout=nil, retries=1)
expect { mclient.echo(:msg => 'hello world') }.to \
raise_error(Astute::MClientTimeout, /MCollective agents '3' didn't respond./)
raise_error(Astute::MClientTimeout, /MCollective agents 'faketest' '3' didn't respond./)
end
it "should raise error if agent returns statuscode != 0" do

View File

@ -24,6 +24,25 @@ describe Astute::Orchestrator do
@reporter.stub_everything
end
describe '#task_deployment' do
it 'should run task deployment' do
deployment_info = []
deployment_tasks = {'1' => []}
Astute::TaskDeployment.any_instance.expects(:deploy).with(
deployment_info,
deployment_tasks
)
@orchestrator.task_deploy(
@reporter,
'task_id',
deployment_info,
deployment_tasks
)
end
end
describe '#verify_networks' do
it 'should validate nodes availability before check' do
nodes = [{'uid' => '1'}, {'uid' => '2'}]

View File

@ -0,0 +1,181 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require File.join(File.dirname(__FILE__), '../spec_helper')
describe Astute::TaskDeployment do
include SpecHelpers
let(:ctx) do
ctx = mock('context')
ctx.stubs(:task_id)
ctx
end
let(:deployment_info) do
[
{
'uid' => '1',
'fail_if_error' => false
}
]
end
let(:deployment_tasks) do
{
"1"=>
[{
"type"=>"noop",
"fail_on_error"=>true,
"required_for"=>[],
"requires"=> [],
"id"=>"ironic_post_swift_key"
}],
"null"=> [{
"skipped"=>true,
"type"=>"skipped",
"fail_on_error"=>false,
"required_for"=>[],
"requires"=>[],
"id"=>"post_deployment_start"}]
}
end
let(:task_deployment) { Astute::TaskDeployment.new(ctx) }
describe '#deploy' do
it 'should run deploy' do
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
task_deployment.stubs(:write_graph_to_file)
ctx.stubs(:report)
Deployment::Process.any_instance.expects(:run).returns({:success => true})
task_deployment.deploy(deployment_info, deployment_tasks)
end
it 'should raise error if deployment info not provided' do
expect{task_deployment.deploy([],{})}.to raise_error(
Astute::DeploymentEngineError,
"Deployment info are not provided!"
)
end
it 'should run pre deployment task' do
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
task_deployment.stubs(:write_graph_to_file)
ctx.stubs(:report)
Deployment::Process.any_instance.stubs(:run).returns({:success => true})
pre_deployment = Astute::TaskPreDeploymentActions.new(deployment_info, ctx)
Astute::TaskPreDeploymentActions.expects(:new)
.with(deployment_info, ctx)
.returns(pre_deployment)
Astute::TaskPreDeploymentActions.any_instance.expects(:process)
task_deployment.deploy(deployment_info, deployment_tasks)
end
it 'should support virtual node' do
d_t = task_deployment.send(:support_virtual_node, deployment_tasks)
expect(d_t.keys.include?('virtual_sync_node')).to be_true
expect(d_t.keys.include?('null')).to be_false
end
it 'should remove failed nodes' do
#TODO(vsharshov): improve remove failed nodes check. Check mcollective
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
task_deployment.stubs(:write_graph_to_file)
ctx.stubs(:report)
task_deployment.expects(:remove_failed_nodes).returns([deployment_info, []])
Deployment::Process.any_instance.stubs(:run).returns({:success => true})
task_deployment.deploy(deployment_info, deployment_tasks)
end
context 'should report final status' do
it 'succeed status' do
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
Deployment::Process.any_instance.stubs(:run).returns({:success => true})
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
task_deployment.stubs(:write_graph_to_file)
ctx.expects(:report).with({'status' => 'ready', 'progress' => 100})
task_deployment.deploy(deployment_info, deployment_tasks)
end
it 'failed status' do
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
Deployment::Process.any_instance.stubs(:run).returns({
:success => false,
:failed_nodes => [],
:failed_tasks => [],
:status => 'Failed because of'})
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
task_deployment.stubs(:write_graph_to_file)
ctx.expects(:report).with({
'status' => 'error',
'progress' => 100,
'error' => 'Failed because of'})
task_deployment.deploy(deployment_info, deployment_tasks)
end
end
context 'graph file' do
around(:each) do |example|
old_value = Astute.config.enable_graph_file
example.run
Astute.config.enable_graph_file = old_value
end
it 'should write if disable' do
Astute.config.enable_graph_file = false
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
ctx.stubs(:report)
Deployment::Process.any_instance.stubs(:run).returns({:success => true})
file_handle = mock
file_handle.expects(:write).with(regexp_matches(/digraph/)).never
File.expects(:open).with("/tmp/graph-#{ctx.task_id}.dot", 'w')
.yields(file_handle).never
task_deployment.deploy(deployment_info, deployment_tasks)
end
it 'should write graph if enable' do
Astute.config.enable_graph_file = true
task_deployment.stubs(:remove_failed_nodes).returns([deployment_info, []])
Astute::TaskPreDeploymentActions.any_instance.stubs(:process)
ctx.stubs(:report)
Deployment::Process.any_instance.stubs(:run).returns({:success => true})
file_handle = mock
file_handle.expects(:write).with(regexp_matches(/digraph/)).once
File.expects(:open).with("/tmp/graph-#{ctx.task_id}.dot", 'w')
.yields(file_handle).once
task_deployment.deploy(deployment_info, deployment_tasks)
end
end # 'graph file'
end
end

295
spec/unit/task_node_spec.rb Normal file
View File

@ -0,0 +1,295 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require File.join(File.dirname(__FILE__), '../spec_helper')
describe Astute::TaskNode do
include SpecHelpers
let(:ctx) do
ctx = mock('context')
ctx.stubs(:task_id)
ctx
end
let(:task_node) do
node = Astute::TaskNode.new('node_id')
node.context = ctx
node
end
let(:task) do
task_node.graph.create_task(
task_data['id'],
task_data.merge({'node_id' => 'node_id'})
)
end
context '#run' do
let(:task_data) do
{
"parameters" => {
"puppet_modules" => "/etc/puppet/modules",
"puppet_manifest" => "/etc/puppet/modules/osnailyfacter/modular" \
"/openstack-haproxy/openstack-haproxy-mysqld.pp",
"timeout" => 300,
"cwd" => "/"
},
"type" => "puppet",
"fail_on_error" => true,
"required_for" => [],
"requires" => [],
"id" => "openstack-haproxy-mysqld"
}
end
it 'should run task' do
Astute::Puppet.any_instance.expects(:run)
task_node.run(task)
end
it 'should mark node as busy' do
Astute::Puppet.any_instance.stubs(:run)
task_node.run(task)
expect(task_node.status).to eql(:busy)
end
it 'should mark task as running' do
Astute::Puppet.any_instance.stubs(:run)
task_node.run(task)
expect(task.status).to eql(:running)
end
context 'support different task type' do
let(:task_data) do
{
"parameters" => {},
"type" => "noop",
"fail_on_error" => false,
"required_for" => [],
"requires" => [],
"id" => "test-task"
}
end
it 'shell' do
task_data['type'] = "shell"
Astute::Shell.any_instance.expects(:run)
task_node.run(task)
end
it 'puppet' do
task_data['type'] = "puppet"
Astute::Puppet.any_instance.expects(:run)
task_node.run(task)
end
it 'sync' do
task_data['type'] = "sync"
Astute::Sync.any_instance.expects(:run)
task_node.run(task)
end
it 'cobbler_sync' do
task_data['type'] = "cobbler_sync"
Astute::CobblerSync.any_instance.expects(:run)
task_node.run(task)
end
it 'noop' do
task_data['type'] = "noop"
Astute::Noop.any_instance.expects(:run)
task_node.run(task)
end
it 'skipped' do
task_data['type'] = "skipped"
Astute::Noop.any_instance.expects(:run)
task_node.run(task)
end
it 'stage' do
task_data['type'] = "stage"
Astute::Noop.any_instance.expects(:run)
task_node.run(task)
end
it 'reboot' do
task_data['type'] = "reboot"
Astute::Reboot.any_instance.expects(:run)
task_node.run(task)
end
it 'upload_file' do
task_data['type'] = "upload_file"
Astute::UploadFile.any_instance.expects(:run)
task_node.run(task)
end
it 'upload_files' do
task_data['type'] = "upload_files"
task_data['parameters']['nodes'] = []
Astute::UploadFiles.any_instance.expects(:run)
task_node.run(task)
end
it 'copy_files' do
task_data['type'] = "copy_files"
task_data['parameters']['files'] = []
Astute::CopyFiles.any_instance.expects(:run)
task_node.run(task)
end
it 'unkown type' do
task_data['type'] = "unknown"
expect{task_node.run(task)}.to raise_error(
Astute::TaskValidationError,
"Unknown task type 'unknown'")
end
end # support task type
end
context '#poll' do
context 'not busy' do
it 'should not raise any error' do
expect{task_node.poll}.not_to raise_error
end
it 'should not change node status' do
old_status = task_node.status
task_node.poll
expect(task_node.status).to eql(old_status)
end
end
context 'busy' do
let(:task_data) do
{
"parameters" => {},
"type" => "puppet",
"fail_on_error" => false,
"required_for" => [],
"requires" => [],
"id" => "test-task"
}
end
before(:each) do
Astute::Puppet.any_instance.stubs(:run)
end
context 'mark online' do
it 'if task successful' do
Astute::Puppet.any_instance.stubs(:status).returns(:successful)
ctx.stubs(:report)
task_node.run(task)
task_node.poll
expect(task_node.status).to eql(:online)
end
it 'if task failed' do
Astute::Puppet.any_instance.stubs(:status).returns(:failed)
ctx.stubs(:report)
task_node.run(task)
task_node.poll
expect(task_node.status).to eql(:online)
end
end
it 'should report progress if task running' do
Astute::Puppet.any_instance.expects(:status).returns(:running)
task_node.run(task)
ctx.expects(:report).with({
'uid' => 'node_id',
'status' => 'deploying',
'task' => task.name,
'progress' => 0
})
task_node.poll
end
it 'should report ready if task successful and no more task' do
Astute::Puppet.any_instance.expects(:status).returns(:successful)
task_node.run(task)
ctx.expects(:report).with({
'uid' => 'node_id',
'status' => 'ready',
'task' => task.name,
'task_status' => 'successful',
'progress' => 100
})
task_node.poll
end
it 'should report error if task failed and no more task' do
Astute::Puppet.any_instance.expects(:status).returns(:failed)
task_node.run(task)
ctx.expects(:report).with({
'uid' => 'node_id',
'status' => 'error',
'task' => task.name,
'task_status' => 'failed',
'error_type' => 'deploy',
'progress' => 100,
})
task_node.poll
end
it 'should report deploy progress if task successful and another tasks exists' do
Astute::Puppet.any_instance.expects(:status).returns(:successful)
task_node.graph.create_task(
'second_task',
task_data.merge({'node_id' => 'node_id'})
)
task_node.run(task)
ctx.expects(:report).with({
'uid' => 'node_id',
'status' => 'deploying',
'task' => task.name,
'task_status' => 'successful',
'progress' => 50
})
task_node.poll
end
it 'should report deploy progress if task failed and another tasks exists' do
Astute::Puppet.any_instance.expects(:status).returns(:failed)
task_node.graph.create_task(
'second_task',
task_data.merge({'node_id' => 'node_id'})
)
task_node.run(task)
ctx.expects(:report).with({
'uid' => 'node_id',
'status' => 'deploying',
'task' => task.name,
'task_status' => 'failed',
'progress' => 50
})
task_node.poll
end
end
end
end