Implement thread pool for network verification
By default network checker runs ping to every ip in a thread simultaneously, depending on HW packets lost is possible, parallel_amount allows to configure how many pings should be run in parallel. Also add constraint on amount of connectivity checker tasks. Change-Id: I3aec37a2f50470cf905bccac34a6f79d2d5065d0 Closes-bug: #1653301
This commit is contained in:
parent
4f691b399f
commit
ebb761c872
|
@ -11,6 +11,7 @@ Puppet::Type.type(:connectivity_checker).provide(:threaded) do
|
|||
def ensure=(value)
|
||||
# calculate host hash
|
||||
network_scheme = @resource[:network_scheme]
|
||||
parallel_amount = @resource[:parallel_amount]
|
||||
network_metadata = @resource[:network_metadata]
|
||||
exclude_network_roles = @resource[:exclude_network_roles]
|
||||
|
||||
|
@ -39,46 +40,63 @@ Puppet::Type.type(:connectivity_checker).provide(:threaded) do
|
|||
}
|
||||
end
|
||||
end
|
||||
#puts test_plan.to_yaml()
|
||||
|
||||
# test (ping) neighboors
|
||||
work = {}
|
||||
# fork per-host worker for parallel pinging.
|
||||
workers = {}
|
||||
# Tasks for verification.
|
||||
ping_in_q = Queue.new
|
||||
# Contains connectivity errors.
|
||||
ping_err_q = Queue.new
|
||||
|
||||
test_plan.each do |nodename, networks|
|
||||
work[nodename] = {}
|
||||
work[nodename][:thr] = Thread.new(nodename, networks) do |nodename, networks|
|
||||
#Thread.current[:nodename] = nodename
|
||||
Thread.current[:errors] = []
|
||||
networks.each do |netname, netattrs|
|
||||
Thread.current[:cmd] = "ping -n -q -c #{netattrs[:tries]} -w #{netattrs[:timeout]} #{netattrs[:host]}"
|
||||
Thread.current[:rc] = 0
|
||||
if ! system(Thread.current[:cmd], {:out => :close, :err => :close})
|
||||
Thread.current[:rc] = $?.exitstatus()
|
||||
Thread.current[:errors] << {
|
||||
:rc => Thread.current[:rc],
|
||||
:cmd => "command `#{Thread.current[:cmd]}` was failed with code #{Thread.current[:rc]}",
|
||||
:net => netname
|
||||
}
|
||||
end
|
||||
end
|
||||
networks.each do |netname, netattrs|
|
||||
ping_in_q.push({
|
||||
:netname => netname,
|
||||
:nodename => nodename,
|
||||
:cmd => "ping -n -q -c #{netattrs[:tries]} -w #{netattrs[:timeout]} #{netattrs[:host]}"})
|
||||
end
|
||||
end
|
||||
# waitall only for all pinger threads
|
||||
work.each do |nodename, rrr|
|
||||
rrr[:thr].join()
|
||||
end
|
||||
# process results
|
||||
err_report = {}
|
||||
work.each do |nodename, rrr|
|
||||
if ! rrr[:thr][:errors].empty?
|
||||
err_report[nodename] = []
|
||||
rrr[:thr][:errors].each do |err|
|
||||
err_report[nodename] << "Unaccessible through '#{err[:net]}', #{err[:cmd]}"
|
||||
|
||||
(0..parallel_amount).each do |n|
|
||||
workers[n] = Thread.new do
|
||||
begin
|
||||
while task = ping_in_q.pop(true)
|
||||
unless system(task[:cmd], {:out => :close, :err => :close})
|
||||
exit_status = $?.exitstatus()
|
||||
ping_err_q << {
|
||||
:rc => exit_status,
|
||||
:cmd => "command `#{task[:cmd]}` was failed with code #{exit_status}",
|
||||
:net => task[:netname],
|
||||
:nodename => task[:nodename]
|
||||
}
|
||||
end
|
||||
end
|
||||
rescue ThreadError
|
||||
# If Queue is empty, it reaises ThreadError, we are not interested
|
||||
# in proceeding after we checked all addresses in queue.
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if ! err_report.empty?
|
||||
# Wait all for all threads to finish.
|
||||
workers.each do |_, worker|
|
||||
worker.join()
|
||||
end
|
||||
|
||||
# Process results.
|
||||
err_report = {}
|
||||
unless ping_err_q.empty?
|
||||
begin
|
||||
while err = ping_err_q.pop(true)
|
||||
unless err_report[err[:nodename]]
|
||||
err_report[err[:nodename]] = []
|
||||
end
|
||||
err_report[err[:nodename]] << "Unaccessible through '#{err[:net]}', #{err[:cmd]}"
|
||||
end
|
||||
rescue ThreadError
|
||||
end
|
||||
end
|
||||
|
||||
unless err_report.empty?
|
||||
msg = "Connectivity check error. Nodes: #{err_report.to_yaml.sub!('---','')}"
|
||||
if @resource[:non_destructive].to_s == 'true'
|
||||
warn(msg)
|
||||
|
|
|
@ -23,6 +23,21 @@ Puppet::Type.newtype(:connectivity_checker) do
|
|||
desc 'List of network roles which should be excluded from check'
|
||||
end
|
||||
|
||||
newparam(:parallel_amount) do
|
||||
desc 'Amount of package senders run in parallel'
|
||||
defaultto 20
|
||||
|
||||
validate do |val|
|
||||
if val.to_i <= 0
|
||||
raise ArgumentError, "parallel_amount should be positive integer, not '#{val}'"
|
||||
end
|
||||
end
|
||||
|
||||
munge do |val|
|
||||
val.to_i
|
||||
end
|
||||
end
|
||||
|
||||
newparam(:non_destructive) do
|
||||
desc "Define whether we should fail on connectivity issues"
|
||||
newvalues(:true, :yes, :on, :false, :no, :off)
|
||||
|
|
|
@ -9,6 +9,6 @@ connectivity_checker { 'netconfig':
|
|||
non_destructive => pick($network_checker_settings['non_destructive'], false),
|
||||
ping_tries => pick($network_checker_settings['ping_tries'], 5),
|
||||
ping_timeout => pick($network_checker_settings['ping_timeout'], 20),
|
||||
parallel_amount => pick($network_checker_settings['parallel_amount'], 20),
|
||||
exclude_network_roles => $exclude_network_roles,
|
||||
}
|
||||
|
||||
|
|
|
@ -130,6 +130,7 @@
|
|||
parameters:
|
||||
puppet_manifest: /etc/puppet/modules/osnailyfacter/manifests/connectivity-checker.pp
|
||||
puppet_modules: /etc/puppet/modules
|
||||
timeout: 300
|
||||
strategy:
|
||||
type: parallel
|
||||
timeout: 3600
|
||||
strategy:
|
||||
type: parallel
|
||||
amount: 50
|
||||
|
|
Loading…
Reference in New Issue