Granular deployment

Async puppet deployment task support.
Use puppet nailgun hooks as main deployment
mechanism for new(6.1) and old(<6.1) clusters.

TODO:
- more tests for puppet tasks;
- refactoring.

Implements blueprint granular-deployment-based-on-tasks

Change-Id: I28928e86ea4017288478703c6075b315b120349a
This commit is contained in:
Vladimir Sharshov (warpc) 2014-12-12 13:36:44 +03:00
parent 16b252d93b
commit c5b11b1552
9 changed files with 760 additions and 431 deletions

View File

@ -32,12 +32,15 @@ require 'astute/network'
require 'astute/puppetd'
require 'astute/deployment_engine/nailyfact'
require 'astute/deployment_engine/tasklib'
require 'astute/deployment_engine/granular_deployment'
require 'astute/cobbler'
require 'astute/cobbler_manager'
require 'astute/image_provision'
require 'astute/dump'
require 'astute/deploy_actions'
require 'astute/nailgun_hooks'
require 'astute/puppet_task'
require 'astute/task_manager'
['/astute/pre_deployment_actions/*.rb',
'/astute/pre_deploy_actions/*.rb',

View File

@ -0,0 +1,182 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class Astute::DeploymentEngine::GranularDeployment < Astute::DeploymentEngine
NAILGUN_STATUS = ['ready', 'error', 'deploying']
def deploy_piece(nodes, retries=1)
return false unless validate_nodes(nodes)
@ctx.reporter.report(nodes_status(nodes, 'deploying', {'progress' => 0}))
log_preparation(nodes)
Astute.logger.info "#{@ctx.task_id}: Starting deployment"
@running_tasks = {}
@nodes_roles = nodes.inject({}) { |h, n| h.merge({n['uid'] => n['role']}) }
@nodes_by_uid = nodes.inject({}) { |h, n| h.merge({ n['uid'] => n }) }
begin
@task_manager = Astute::TaskManager.new(nodes)
@hook_context = Astute::Context.new(
@ctx.task_id,
HookReporter.new,
Astute::LogParser::NoParsing.new
)
deploy_nodes(nodes)
rescue => e
# We should fail all nodes in case of post deployment
# process. In other case they will not sending back
# for redeploy
report_nodes = nodes.uniq{ |n| n['uid'] }.map do |node|
{ 'uid' => node['uid'],
'status' => 'error',
'role' => 'hook',
'error_type' => 'deploy'
}
end
@ctx.report_and_update_status('nodes' => report_nodes)
raise e
end
Astute.logger.info "#{@ctx.task_id}: Finished deployment of nodes" \
" => roles: #{@nodes_roles.inspect}"
end
def puppet_task(node_id, task)
# Use fake reporter because of logic. We need to handle report here
Astute::PuppetTask.new(
@hook_context,
@nodes_by_uid[node_id], # Use single node uid instead of task['uids']
retries=2,
task['parameters']['puppet_manifest'],
task['parameters']['puppet_modules'],
task['parameters']['cwd'],
task['parameters']['timeout']
)
end
def run_task(node_id, task)
Astute.logger.info "#{@ctx.task_id}: run task '#{task.to_yaml}' on node #{node_id}"
@running_tasks[node_id] = puppet_task(node_id, task)
@running_tasks[node_id].run
end
def check_status(node_id)
status = @running_tasks[node_id].status
if NAILGUN_STATUS.include? status
status
else
raise "Internal error. Unknown status '#{status}'"
end
end
def deploy_nodes(nodes)
@task_manager.node_uids.each { |n| task = @task_manager.next_task(n) and run_task(n, task) }
while @task_manager.task_in_queue?
nodes_to_report = []
@task_manager.node_uids.each do |node_id|
if task = @task_manager.current_task(node_id)
case status = check_status(node_id)
when 'ready'
Astute.logger.info "Task '#{task}' on node uid=#{node_id} ended successfully"
new_task = @task_manager.next_task(node_id)
if new_task
run_task(node_id, new_task)
else
nodes_to_report << process_success_node(node_id, task)
end
when 'deploying'
progress_report = process_running_node(node_id, task, nodes)
nodes_to_report << progress_report if progress_report
when 'error'
Astute.logger.error "Task '#{task}' on node #{node_id} valid, but failed"
nodes_to_report << process_fail_node(node_id, task)
else
raise "Internal error. Known status '#{status}', but handler not provided"
end
else
Astute.logger.debug "No more tasks provided for node #{node_id}"
end
@ctx.report_and_update_status('nodes' => nodes_to_report) if nodes_to_report.present?
break unless @task_manager.task_in_queue?
sleep Astute.config.PUPPET_DEPLOY_INTERVAL
end
end
end
def process_success_node(node_id, task)
Astute.logger.info "No more tasks provided for node #{node_id}. All node " \
"tasks completed successfully"
{
"uid" => node_id,
'status' => 'ready',
'role' => @nodes_roles[node_id],
"progress" => 100,
'task' => task
}
end
def process_fail_node(node_id, task)
Astute.logger.error "No more tasks will be executed on the node #{node_id}"
@task_manager.delete_node(node_id)
{
'uid' => node_id,
'status' => 'error',
'error_type' => 'deploy',
'role' => @nodes_roles[node_id],
'task' => task
}
end
def process_running_node(node_id, task, nodes)
begin
# Pass nodes because logs calculation needs IP address of node, not just uid
nodes_progress = @ctx.deploy_log_parser.progress_calculate(Array(node_id), nodes)
if nodes_progress.present?
nodes_progress.map! { |x| x.merge!(
'status' => 'deploying',
'role' => @nodes_roles[x['uid']],
'task' => task
) }
nodes_progress.first
else
nil
end
rescue => e
Astute.logger.warn "Some error occurred when parse logs for nodes progress: #{e.message}, "\
"trace: #{e.format_backtrace}"
nil
end
end
def log_preparation(nodes)
@ctx.deploy_log_parser.prepare(nodes)
rescue => e
Astute.logger.warn "Some error occurred when prepare LogParser: " \
"#{e.message}, trace: #{e.format_backtrace}"
end
class HookReporter
def report(msg)
Astute.logger.debug msg
end
end
end

View File

@ -53,6 +53,17 @@ module Astute
)
end
def granular_deploy(up_reporter, task_id, deployment_info, pre_deployment=[], post_deployment=[])
deploy_cluster(
up_reporter,
task_id,
deployment_info,
Astute::DeploymentEngine::GranularDeployment,
pre_deployment,
post_deployment
)
end
def provision(reporter, task_id, engine_attrs, nodes)
raise "Nodes to provision are not provided!" if nodes.empty?
provision_method = engine_attrs['provision_method'] || 'cobbler'

233
lib/astute/puppet_task.rb Normal file
View File

@ -0,0 +1,233 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require 'timeout'
module Astute
class PuppetTask
def initialize(ctx, node, retries=1, puppet_manifest=nil, puppet_modules=nil, cwd=nil, timeout=nil)
@ctx = ctx
@node = node
@retries = retries
@puppet_manifest = puppet_manifest || '/etc/puppet/manifests/site.pp'
@puppet_modules = puppet_modules || '/etc/puppet/modules'
@cwd = cwd || '/'
@time_obsorver = TimeObserver.new(timeout || Astute.config.PUPPET_TIMEOUT)
@prev_summary = nil
@is_hung = false
end
def run
Astute.logger.debug "Waiting for puppet to finish deployment on " \
"node #{@node['uid']} (timeout = #{@time_obsorver.time_limit} sec)..."
@time_obsorver.start
@prev_summary ||= puppet_status
puppetd_runonce
end
# expect to run this method with respect of Astute.config.PUPPET_FADE_INTERVAL
def status
# TODO(vsharshov): Should we raise error?
raise Timeout::Error unless @time_obsorver.enough_time?
last_run = puppet_status
status = node_status(last_run)
Astute.logger.debug "Node #{@node['uid']}(#{@node['role']}) status: #{status}"
result = case status
when 'succeed'
processing_succeed_node
when 'running'
processing_running_node
when 'error'
processing_error_node(last_run)
end
#TODO(vsharshov): Should we move it to control module?
@ctx.report_and_update_status('nodes' => [result]) if result
# ready, error or deploying
result.fetch('status', 'deploying')
end
private
def puppetd
puppetd = MClient.new(@ctx, "puppetd", [@node['uid']])
puppetd.on_respond_timeout do |uids|
nodes = uids.map do |uid|
{
'uid' => uid,
'status' => 'error',
'error_type' => 'deploy',
'role' => @node['role']
}
end
@ctx.report_and_update_status('nodes' => nodes)
end
puppetd
end
def puppet_status
puppetd.last_run_summary.first[:data]
end
def puppet_run
puppetd.runonce(
:puppet_debug => true,
:manifest => @puppet_manifest,
:modules => @puppet_modules,
:cwd => @cwd
)
end
def running?(status)
['running'].include? status[:status]
end
def idling?(status)
['idling'].include? status[:status]
end
def stopped?(status)
['stopped', 'disabled'].include? status[:status]
end
def succeed?(status)
status[:status] == 'stopped' &&
status[:resources]['failed'].to_i == 0 &&
status[:resources]['failed_to_restart'].to_i == 0 &&
status[:time]['last_run'] != (@prev_summary && @prev_summary[:time]['last_run'])
end
# Runs puppetd.runonce only if puppet is stopped on the host at the time
# If it isn't stopped, we wait a bit and try again.
# Returns list of nodes uids which appear to be with hung puppet.
def puppetd_runonce
started = Time.now.to_i
while Time.now.to_i - started < Astute.config.PUPPET_FADE_TIMEOUT
status = puppet_status
is_stopped = stopped?(status)
is_idling = idling?(status)
is_running = running?(status)
#Try to kill 'idling' process and run again by 'runonce' call
puppet_run if is_stopped || is_idling
break if !is_running && !is_idling
sleep Astute.config.PUPPET_FADE_INTERVAL
end
if is_running || is_idling
Astute.logger.warn "Following nodes have puppet hung " \
"(#{is_running ? 'running' : 'idling'}): '#{@node['uid']}'"
@is_hung = true
else
@is_hung = false
end
end
def node_status(last_run)
case
when @is_hung
'error'
when succeed?(last_run) && !@is_hung
'succeed'
when (running?(last_run) || idling?(status)) && !@is_hung
'running'
when stopped?(status) && !succeed?(last_run) && !@is_hung
'error'
else
msg = "Unknow status: " \
"is_hung #{@is_hung}, succeed? #{succeed?(last_run)}, " \
"running? #{running?(last_run)}, stopped? #{stopped?(status)}, " \
"idling? #{idling?(status)}"
raise msg
end
end
def processing_succeed_node
Astute.logger.debug "Puppet completed within #{@time_obsorver.stop} seconds"
{ 'uid' => @node['uid'], 'status' => 'ready', 'role' => @node['role'] }
end
def processing_error_node(last_run)
if @retries > 0
@retries -= 1
Astute.logger.debug "Puppet on node #{@node['uid']} will be "\
"restarted. #{@retries} retries remained."
Astute.logger.info "Retrying to run puppet for following error " \
"nodes: #{@node['uid']}"
puppetd_runonce
# We need this magic with prev_summary to reflect new puppetd run statuses..
@prev_summary = last_run
node_report_format('status' => 'deploying')
else
Astute.logger.debug "Node #{@node['uid']} has failed to deploy. " \
"There is no more retries for puppet run."
node_report_format('status' => 'error', 'error_type' => 'deploy')
end
end
def processing_running_node
nodes_to_report = []
begin
# Pass nodes because logs calculation needs IP address of node, not just uid
nodes_progress = @ctx.deploy_log_parser.progress_calculate([@node['uid']], [@node])
if nodes_progress.present?
Astute.logger.debug "Got progress for nodes: #{nodes_progress.inspect}"
# Nodes with progress are running, so they are not included in nodes_to_report yet
nodes_progress.map! { |x| x.merge!('status' => 'deploying', 'role' => @node['role']) }
nodes_to_report = nodes_progress
end
rescue => e
Astute.logger.warn "Some error occurred when parse logs for " \
"nodes progress: #{e.message}, trace: #{e.format_backtrace}"
end
nodes_to_report.first || node_report_format('status' => 'deploying')
end
def node_report_format(add_info={})
add_info.merge('uid' => @node['uid'], 'role' => @node['role'])
end
end #PuppetTask
class TimeObserver
def initialize(timeout)
@timeout = timeout
end
def start
@time_before = Time.now
end
def stop
(Time.now - @time_before).to_i
end
def enough_time?
Time.now - @time_before < time_limit
end
def time_limit
@timeout
end
end #TimeObserver
end

View File

@ -1,4 +1,4 @@
# Copyright 2013 Mirantis, Inc.
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -21,8 +21,7 @@ module Astute
def self.deploy(ctx, nodes, retries=2, puppet_manifest=nil, puppet_modules=nil, cwd=nil)
@ctx = ctx
@nodes_roles = nodes.inject({}) { |h, n| h.merge({n['uid'].to_s => n['role']}) }
@node_retries = nodes.inject({}) { |h, n| h.merge({n['uid'].to_s => retries}) }
@retries = retries
@nodes = nodes
@puppet_manifest = puppet_manifest || '/etc/puppet/manifests/site.pp'
@puppet_modules = puppet_modules || '/etc/puppet/modules'
@ -32,7 +31,7 @@ module Astute
nodes (timeout = #{Astute.config.PUPPET_TIMEOUT} sec)..."
time_before = Time.now
deploy_nodes(@nodes.map { |n| n['uid'] })
deploy_nodes
time_spent = Time.now - time_before
Astute.logger.info "#{@ctx.task_id}: Spent #{time_spent} seconds on puppet run "\
@ -40,180 +39,18 @@ module Astute
end
private
# Runs puppetd.runonce only if puppet is stopped on the host at the time
# If it isn't stopped, we wait a bit and try again.
# Returns list of nodes uids which appear to be with hung puppet.
def self.puppetd_runonce(uids)
started = Time.now.to_i
while Time.now.to_i - started < Astute.config.PUPPET_FADE_TIMEOUT
running_uids = puppetd(uids).last_run_summary.select { |x|
['running', 'idling'].include?(x.results[:data][:status])
}.map { |n| n.results[:sender] }
stopped_uids = uids - running_uids
@nodes.select { |n| stopped_uids.include? n['uid'] }
.group_by { |n| n['debug'] }
.each do |debug, stop_nodes|
puppetd(stop_nodes.map { |n| n['uid'] }).runonce(
:puppet_debug => true,
:manifest => @puppet_manifest,
:modules => @puppet_modules,
:cwd => @cwd
)
end
break if running_uids.empty?
def self.deploy_nodes
puppet_tasks = @nodes.map { |n| puppet_task(n) }
puppet_tasks.each(&:run)
uids = running_uids
sleep Astute.config.PUPPET_FADE_INTERVAL
end
Astute.logger.debug "puppetd_runonce completed within #{Time.now.to_i - started} seconds."
Astute.logger.warn "Following nodes have puppet hung: '#{running_uids.join(',')}'" if running_uids.present?
running_uids
end
def self.calc_nodes_status(last_run, prev_run, hung_nodes=[])
# Finished are those which are not in running state,
# and changed their last_run time, which is changed after application of catalog,
# at the time of updating last_run_summary file. At that particular time puppet is
# still running, and will finish in a couple of seconds.
# If Puppet had crashed before it got a catalog (e.g. certificate problems),
# it didn't update last_run_summary file and switched to 'stopped' state.
# Consider only hung nodes which was in last_run
hung_nodes = hung_nodes & last_run.map { |n| n.results[:sender] }
stopped = last_run.select { |x| ['stopped', 'disabled'].include? x.results[:data][:status] }
# Select all finished nodes which not failed and changed last_run time.
succeed_nodes = stopped.select { |n|
prev_n = prev_run.find{|ps| ps.results[:sender] == n.results[:sender] }
n.results[:data][:status] == 'stopped' &&
n.results[:data][:resources]['failed'].to_i == 0 &&
n.results[:data][:resources]['failed_to_restart'].to_i == 0 &&
n.results[:data][:time]['last_run'] != (prev_n && prev_n.results[:data][:time]['last_run'])
}.map{|x| x.results[:sender] }
stopped_nodes = stopped.map { |x| x.results[:sender] }
error_nodes = stopped_nodes - succeed_nodes
running_nodes = last_run.map {|n| n.results[:sender]} - stopped_nodes
# Hunged nodes can change state at this moment(success, error or still run),
# but we should to turn it on only in error_nodes
succeed_nodes -= hung_nodes
error_nodes = (error_nodes + hung_nodes).uniq
running_nodes -= hung_nodes
nodes_to_check = running_nodes + succeed_nodes + error_nodes
all_nodes = last_run.map { |n| n.results[:sender] }
if nodes_to_check.size != all_nodes.size
raise "Internal error. Check: #{nodes_to_check.inspect}, passed #{all_nodes.inspect}"
end
{'succeed' => succeed_nodes, 'error' => error_nodes, 'running' => running_nodes}
end
def self.puppetd(uids)
puppetd = MClient.new(@ctx, "puppetd", Array(uids))
puppetd.on_respond_timeout do |uids|
nodes = uids.map do |uid|
{ 'uid' => uid, 'status' => 'error', 'error_type' => 'deploy', 'role' => @nodes_roles[uid] }
end
@ctx.report_and_update_status('nodes' => nodes)
end
puppetd
end
def self.processing_error_nodes(error_nodes)
nodes_to_report = []
nodes_to_retry = []
error_nodes.each do |uid|
if @node_retries[uid] > 0
@node_retries[uid] -= 1
Astute.logger.debug "Puppet on node #{uid.inspect} will be restarted. "\
"#{@node_retries[uid]} retries remained."
nodes_to_retry << uid
else
Astute.logger.debug "Node #{uid.inspect} has failed to deploy. There is no more retries for puppet run."
nodes_to_report << {'uid' => uid, 'status' => 'error', 'error_type' => 'deploy', 'role' => @nodes_roles[uid] }
end
end
return nodes_to_report, nodes_to_retry
end
def self.processing_running_nodes(running_nodes)
nodes_to_report = []
if running_nodes.present?
begin
# Pass nodes because logs calculation needs IP address of node, not just uid
nodes_progress = @ctx.deploy_log_parser.progress_calculate(running_nodes, @nodes)
if nodes_progress.present?
Astute.logger.debug "Got progress for nodes: #{nodes_progress.inspect}"
# Nodes with progress are running, so they are not included in nodes_to_report yet
nodes_progress.map! { |x| x.merge!('status' => 'deploying', 'role' => @nodes_roles[x['uid']]) }
nodes_to_report = nodes_progress
end
rescue => e
Astute.logger.warn "Some error occurred when parse logs for nodes progress: #{e.message}, "\
"trace: #{e.format_backtrace}"
end
end
nodes_to_report
end
def self.processing_succeed_nodes(succeed_nodes)
succeed_nodes.map do |uid|
{ 'uid' => uid, 'status' => 'ready', 'role' => @nodes_roles[uid] }
while puppet_tasks.any? { |t| t.status == 'deploying' }
sleep Astute.config.PUPPET_DEPLOY_INTERVAL
end
end
# As I (Andrey Danin) understand, Puppet agent goes through these steps:
# * Puppetd has 'stopped' state.
# * We run it as a run_once, and puppetd goes to 'idling' state - it trying to
# retrieve catalog.
# * If it can't retrieve catalog, it goes back to 'stopped' state without
# any update of last_run_summary file.
# * If puppetd retrieve catalog, it goes to 'running' state, which means
# it appying catalog to system.
# * When puppetd finished catalog run, it updates last_run_summary file
# but stays in 'running' state for a while.
# * After puppetd finished all internal jobs connected with finished catalog,
# it goes to 'idling' state.
# * After a short time it goes to 'stopped' state because we ran it as a run_once.
def self.deploy_nodes(nodes_to_check)
Timeout::timeout(Astute.config.PUPPET_TIMEOUT) do
prev_summary = puppetd(nodes_to_check).last_run_summary
hung_nodes = puppetd_runonce(nodes_to_check)
while nodes_to_check.present?
last_run = puppetd(nodes_to_check).last_run_summary
calc_nodes = calc_nodes_status(last_run, prev_summary, hung_nodes)
Astute.logger.debug "Nodes statuses: #{calc_nodes.inspect}"
report_succeed = processing_succeed_nodes calc_nodes['succeed']
report_error, nodes_to_retry = processing_error_nodes(calc_nodes['error'])
report_running = processing_running_nodes(calc_nodes['running'])
nodes_to_report = report_succeed + report_error + report_running
@ctx.report_and_update_status('nodes' => nodes_to_report) if nodes_to_report.present?
if nodes_to_retry.present?
Astute.logger.info "Retrying to run puppet for following error nodes: #{nodes_to_retry.join(',')}"
hung_nodes = puppetd_runonce(nodes_to_retry)
# We need this magic with prev_summary to reflect new puppetd run statuses..
prev_summary.delete_if { |x| nodes_to_retry.include?(x.results[:sender]) }
prev_summary += last_run.select { |x| nodes_to_retry.include?(x.results[:sender]) }
end
# we will iterate only over running nodes and those that we restart deployment for
nodes_to_check = calc_nodes['running'] + nodes_to_retry
break if nodes_to_check.empty?
sleep Astute.config.PUPPET_DEPLOY_INTERVAL
end
end
def self.puppet_task(n)
PuppetTask.new(@ctx, n, @retries, @puppet_manifest, @puppet_modules, @cwd)
end
end

View File

@ -91,6 +91,26 @@ module Astute
end
end
def granular_deploy(data)
Astute.logger.info("'granular_deploy' method called with data: #{data.inspect}")
reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
begin
@orchestrator.granular_deploy(
reporter,
data['args']['task_uuid'],
data['args']['deployment_info'],
data['args']['pre_deployment'] || [],
data['args']['post_deployment'] || []
)
reporter.report('status' => 'ready', 'progress' => 100)
rescue Timeout::Error
msg = "Timeout of deployment is exceeded."
Astute.logger.error msg
reporter.report('status' => 'error', 'error' => msg)
end
end
def verify_networks(data)
data.fetch('subtasks', []).each do |subtask|
if self.respond_to?(subtask['method'])
@ -114,7 +134,7 @@ module Astute
reporter = Astute::Server::Reporter.new(@producer, data['respond_to'], data['args']['task_uuid'])
result = @orchestrator.multicast_verification(reporter, data['args']['task_uuid'], data['args']['nodes'])
report_result(result, reporter)
end
end
def dump_environment(data)
task_id = data['args']['task_uuid']
@ -186,7 +206,8 @@ module Astute
Astute.logger.info "Try to kill running task #{target_task_uuid}"
service_data[:main_work_thread].kill
result = if ['deploy', 'task_deployment'].include? service_data[:tasks_queue].current_task_method
result = if ['deploy', 'task_deployment', 'granular_deploy'].include?
service_data[:tasks_queue].current_task_method
@orchestrator.stop_puppet_deploy(reporter, task_uuid, nodes)
@orchestrator.remove_nodes(reporter, task_uuid, data['args']['engine'], nodes)
else

View File

@ -0,0 +1,51 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
module Astute
class TaskManager
def initialize(nodes)
@tasks = nodes.inject({}) do |h, n|
h.merge({n['uid'] => n['tasks'].sort_by{ |f| f['priority'] }.each})
end
@current_task = {}
Astute.logger.info "The following tasks will be performed on nodes: " \
"#{@tasks.map {|k, v| {k => v.to_a}}.to_yaml}"
end
def current_task(node_id)
@current_task[node_id]
end
def next_task(node_id)
@current_task[node_id] = @tasks[node_id].next
rescue StopIteration
@current_task[node_id] = nil
delete_node(node_id)
end
def delete_node(node_id)
@tasks[node_id] = nil
end
def task_in_queue?
@tasks.select{ |_k,v| v }.present?
end
def node_uids
@tasks.select{ |_k,v| v }.keys
end
end
end

186
spec/unit/nailgun_spec.rb Normal file
View File

@ -0,0 +1,186 @@
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
require File.join(File.dirname(__FILE__), '../spec_helper')
include Astute
describe "Granular deployment engine" do
include SpecHelpers
let(:ctx) {
ctx = mock
ctx.stubs(:task_id)
ctx.stubs(:deploy_log_parser).returns(Astute::LogParser::NoParsing.new)
ctx.stubs(:status).returns({})
reporter = mock
reporter.stubs(:report)
up_reporter = Astute::ProxyReporter::DeploymentProxyReporter.new(reporter, nodes)
ctx.stubs(:reporter).returns(up_reporter)
ctx
}
let(:deploy_engine) do
Astute::DeploymentEngine::GranularDeployment.new(ctx)
end
let(:upload_file_hook) do
{
"priority" => 100,
"type" => "upload_file",
"fail_on_error" => false,
"diagnostic_name" => "upload-example-1.0",
"uids" => ['2', '3'],
"parameters" => {
"path" => "/etc/yum.repos.d/fuel_awesome_plugin-0.1.0.repo",
"data" => "[fuel_awesome_plugin-0.1.0]\\nname=Plugin fuel_awesome_plugin-0.1.0 repository\\nbaseurl=http => //10.20.0.2 => 8080/plugins/fuel_awesome_plugin-0.1.0/repositories/centos\\ngpgcheck=0"
}
}
end
let(:sync_hook) do
{
"priority" => 200,
"type" => "sync",
"fail_on_error" => false,
"diagnostic_name" => "sync-example-1.0",
"uids" => ['1', '2'],
"parameters" => {
"src" => "rsync://10.20.0.2/plugins/fuel_awesome_plugin-0.1.0/deployment_scripts/",
"dst" => "/etc/fuel/plugins/fuel_awesome_plugin-0.1.0/"
}
}
end
let(:shell_hook) do
{
"priority" => 100,
"type" => "shell",
"fail_on_error" => false,
"diagnostic_name" => "shell-example-1.0",
"uids" => ['1','2','3'],
"parameters" => {
"cmd" => "./deploy.sh",
"cwd" => "/etc/fuel/plugins/fuel_awesome_plugin-0.1.0/",
"timeout" => 60
}
}
end
let(:puppet_hook) do
{
"priority" => 300,
"type" => "puppet",
"fail_on_error" => false,
"diagnostic_name" => "puppet-example-1.0",
"uids" => ['1', '3'],
"parameters" => {
"puppet_manifest" => "cinder_glusterfs.pp",
"puppet_modules" => "modules",
"cwd" => "/etc/fuel/plugins/plugin_name-1.0",
"timeout" => 42
}
}
end
let(:nodes) do
[
{
'uid' => '45',
'priority' => 200,
'role' => 'ceph',
'tasks' => [
{
'priority' => 100,
'type' => 'puppet',
'uids' => ['45']
},
{
'priority' => 300,
'type' => 'puppet',
'uids' => ['45']
}
]
},
{
'uid' => '46',
'priority' => 200,
'role' => 'compute',
'tasks' => [
{
'priority' => 100,
'type' => 'puppet',
'uids' => ['46']
},
{
'priority' => 200,
'type' => 'puppet',
'uids' => ['46']
},
{
'priority' => 300,
'type' => 'puppet',
'uids' => ['46']
}
]
}
]
end
describe '#deploy_piace' do
it 'should run tasks using puppet task' do
ctx.stubs(:report_and_update_status)
deploy_engine.expects(:deploy_nodes).with(nodes)
deploy_engine.deploy_piece(nodes)
end
it 'should report error status if error erased' do
error_report = {'nodes' =>
[
{
'uid' => '45',
'status' => 'error',
'role' => 'hook',
'error_type' => 'deploy'
},
{
'uid' => '46',
'status' => 'error',
'role' => 'hook',
'error_type' => 'deploy'
}
]
}
ctx.expects(:report_and_update_status).with(error_report)
deploy_engine.expects(:deploy_nodes).with(nodes).raises("Error simulation")
deploy_engine.deploy_piece(nodes) rescue nil
end
it 'should not raise errir if no nodes was sent' do
expect{ deploy_engine.deploy_piece([])}.to_not raise_error
end
it 'should prepare log for parsing' do
deploy_engine.stubs(:deploy_nodes).with(nodes)
ctx.deploy_log_parser.expects(:prepare).with(nodes).once
deploy_engine.deploy_piece(nodes)
end
end # 'deploy_piace'
end # 'describe'

View File

@ -1,4 +1,4 @@
# Copyright 2013 Mirantis, Inc.
# Copyright 2014 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -17,269 +17,74 @@ require File.join(File.dirname(__FILE__), '../spec_helper')
include Astute
describe "Puppetd" do
describe "PuppetdDeployer" do
include SpecHelpers
context "PuppetdDeployer" do
let(:reporter) { mock('reporter') }
let(:ctx) do
Context.new("task id", ProxyReporter::DeploymentProxyReporter.new(reporter), Astute::LogParser::NoParsing.new)
end
let(:nodes) { [{'uid' => '1', 'role' => 'compute'}] }
let(:rpcclient) { mock_rpcclient(nodes) }
let(:last_run_result) do
let(:nodes) do
[
{
:statuscode =>0,
:data => {
:changes => {"total" => 1},
:time => {"last_run" => 1358425701},
:resources => {"failed" => 0},
:status => "stopped",
:enabled => 1,
:stopped => 1,
:idling => 0,
:running => 0,
:runtime => 1358425701
},
:sender=>"1"
'uid' => '1',
'role' => 'primary-controller',
'tasks' => [
{'name' => 'pr_controller_1', 'description' => 'test1'},
{'name' => 'pr_controller_2', 'description' => 'test2'},
{'name' => 'controller_3', 'description' => 'test3'}
],
'fail_if_error' => true
},
{
'uid' => '2',
'role' => 'controller',
'tasks' => [
{'name' => 'controller_1', 'description' => 'test1'},
{'name' => 'controller_3', 'description' => 'test3'}
],
'fail_if_error' => false
}
]
end
let(:ctx) do
ctx = mock
ctx.stubs(:task_id)
ctx.stubs(:deploy_log_parser).returns(Astute::LogParser::NoParsing.new)
ctx.stubs(:status).returns({})
reporter = mock
reporter.stubs(:report)
up_reporter = Astute::ProxyReporter::DeploymentProxyReporter.new(reporter, nodes)
ctx.stubs(:reporter).returns(up_reporter)
ctx
end
describe '.deploy' do
it 'should deploy nodes' do
PuppetdDeployer.expects(:deploy_nodes).once
PuppetdDeployer.deploy(ctx, nodes)
end
let(:last_run_result_running) do
res = deep_copy(last_run_result)
res[:data].merge!(:status => 'running', :running => 1, :stopped => 0)
res
it 'should use puppet task for deploy' do
puppet_task = mock('puppet_task')
PuppetdDeployer.expects(:puppet_task).with(nodes[0]).returns(puppet_task)
PuppetdDeployer.expects(:puppet_task).with(nodes[1]).returns(puppet_task)
puppet_task.expects(:run).times(nodes.size)
puppet_task.stubs(:status).returns('ready')
PuppetdDeployer.deploy(ctx, nodes)
end
let(:last_run_result_fail) do
res = deep_copy(last_run_result_running)
res[:data].merge!(:runtime => 1358426000,
:time => {"last_run" => 1358426000},
:resources => {"failed" => 1}
)
res
end
it 'should sleep between status checks' do
puppet_task = mock('puppet_task')
PuppetdDeployer.expects(:puppet_task).with(nodes[0]).returns(puppet_task)
PuppetdDeployer.expects(:puppet_task).with(nodes[1]).returns(puppet_task)
puppet_task.stubs(:run).times(nodes.size)
puppet_task.stubs(:status).returns('deploying')
.then.returns('ready')
.then.returns('ready')
let(:last_run_failed) do
res = deep_copy(last_run_result_fail)
res[:data].merge!(:status => 'stopped', :stopped => 1, :running => 0)
res
end
let(:last_run_result_finished) do
res = deep_copy last_run_result
res[:data][:time]['last_run'] = 1358428000
res[:data][:status] = 'stopped'
res
end
context 'reportet behavior' do
let(:prepare_mcollective_env) do
last_run_result_new = deep_copy last_run_result
last_run_result_new[:data][:time]['last_run'] = 1358426000
rpcclient_new_res = mock_mc_result(last_run_result_new)
rpcclient_finished_res = mock_mc_result(last_run_result_finished)
rpcclient_valid_result = mock_mc_result(last_run_result)
rpcclient.stubs(:last_run_summary).returns([rpcclient_valid_result]).then.
returns([rpcclient_valid_result]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([rpcclient_new_res]).then.
returns([rpcclient_finished_res])
rpcclient
end
it "reports ready status for node if puppet deploy finished successfully" do
prepare_mcollective_env
reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}])
rpcclient.expects(:runonce).at_least_once.returns([mock_mc_result(last_run_result)])
Astute::PuppetdDeployer.deploy(ctx, nodes, retries=0)
end
context 'multiroles behavior' do
let(:nodes) { [{'uid' => '1', 'role' => 'compute'}] }
let(:nodes_multiroles) { [{'uid' => '1', 'role' => 'controller'}] }
before(:each) do
@ctx = Context.new("task id",
ProxyReporter::DeploymentProxyReporter.new(reporter, nodes + nodes_multiroles),
Astute::LogParser::NoParsing.new
)
end
it "it should not send final status before all roles of node will deploy" do
prepare_mcollective_env
reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'deploying', 'progress' => 50, 'role' => 'compute'}])
rpcclient.expects(:runonce).at_least_once.returns([mock_mc_result(last_run_result)])
Astute::PuppetdDeployer.deploy(@ctx, nodes, retries=0)
end
end
end
context "puppet state transitions" do
let(:last_run_result_idle_pre) do
res = deep_copy(last_run_result)
res[:data].merge!(:status => 'idling', :idling => 1, :stopped => 0)
res
end
let(:last_run_result_idle_post) do
res = deep_copy(last_run_result_fail)
res[:data].merge!(:status => 'idling', :idling => 1, :running => 0)
mock_mc_result res
end
it "publishes error status for node if puppet failed (a full cycle)" do
rpcclient.stubs(:last_run_summary).times(9).
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result_idle_pre) ]).then.
returns([ mock_mc_result(last_run_result_idle_pre) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_fail) ]).then.
returns([ mock_mc_result(last_run_result_fail) ]).then.
returns([ mock_mc_result(last_run_failed) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).once.
returns([ mock_mc_result(last_run_result) ])
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
it "publishes error status for node if puppet failed (a cycle w/o idle states)" do
rpcclient.stubs(:last_run_summary).times(6).
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_fail) ]).then.
returns([ mock_mc_result(last_run_failed) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).once.
returns([ mock_mc_result(last_run_result) ])
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
it "publishes error status for node if puppet failed (a cycle w/o idle and finishing states)" do
rpcclient.stubs(:last_run_summary).times(4).
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_failed) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).once.
returns([ mock_mc_result(last_run_result) ])
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
it "publishes error status for node if puppet failed (a cycle with one running state only)" do
rpcclient.stubs(:last_run_summary).times(5).
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_fail) ]).then.
returns([ mock_mc_result(last_run_failed) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).once.
returns([ mock_mc_result(last_run_result) ])
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
it "publishes error status for node if puppet failed (a cycle w/o any transitional states)" do
rpcclient.stubs(:last_run_summary).times(3).
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_result) ]).then.
returns([ mock_mc_result(last_run_failed) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).once.
returns([ mock_mc_result(last_run_result) ])
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
context '' do
around(:each) do |example|
old_value = Astute.config.PUPPET_FADE_INTERVAL
example.run
Astute.config.PUPPET_FADE_INTERVAL = old_value
end
before(:each) do
Astute.config.PUPPET_FADE_INTERVAL = 1
end
it "publishes error status for node if puppet running alien task (attempts been exhausted)" do
rpcclient.stubs(:last_run_summary).at_least(3).
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ])
reporter.expects(:report).with('nodes' => [{'status' => 'error', 'error_type' => 'deploy', 'uid' => '1', 'role' => 'compute'}])
rpcclient.expects(:runonce).never
Astute::PuppetdDeployer.deploy(ctx, nodes, 0)
end
it "ignore exit code of puppet running of alien task (waited for alien task stop and launched own)" do
rpcclient.stubs(:last_run_summary).at_least(3).
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_failed) ]).then.
returns([ mock_mc_result(last_run_failed) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_running) ]).then.
returns([ mock_mc_result(last_run_result_finished) ])
rpcclient.expects(:runonce).at_least(1).returns([ mock_mc_result(last_run_result) ])
reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}])
Astute::PuppetdDeployer.deploy(ctx, nodes, 1)
end
end
end
it "retries to run puppet if it fails" do
rpcclient_valid_result = mock_mc_result(last_run_result)
rpcclient_failed = mock_mc_result(last_run_failed)
rpcclient_fail = mock_mc_result(last_run_result_fail)
rpcclient_succeed = mock_mc_result(last_run_result_finished)
rpcclient.stubs(:last_run_summary).returns([rpcclient_valid_result]).then.
returns([rpcclient_valid_result]).then.
returns([rpcclient_failed]).then.
returns([rpcclient_failed]).then.
returns([rpcclient_fail]).then.
returns([rpcclient_succeed])
reporter.expects(:report).with('nodes' => [{'uid' => '1', 'status' => 'ready', 'progress' => 100, 'role' => 'compute'}])
rpcclient.expects(:runonce).at_least_once.returns([rpcclient_valid_result])
MClient.any_instance.stubs(:rpcclient).returns(rpcclient)
Astute::PuppetdDeployer.deploy(ctx, nodes, retries=1)
PuppetdDeployer.expects(:sleep).with(Astute.config.PUPPET_DEPLOY_INTERVAL)
PuppetdDeployer.deploy(ctx, nodes)
end
end
end
end