Implement thread pool for network verification

By default network checker runs ping to every ip
in a thread simultaneously, depending on HW packets
lost is possible, parallel_amount allows to configure
how many pings should be run in parallel.

Also add constraint on amount of connectivity checker
tasks.

Change-Id: I3aec37a2f50470cf905bccac34a6f79d2d5065d0
Closes-bug: #1653301
This commit is contained in:
Evgeny L 2016-11-23 22:48:28 +00:00 committed by Sergii Golovatiuk
parent 4f691b399f
commit ebb761c872
4 changed files with 70 additions and 36 deletions

View File

@ -11,6 +11,7 @@ Puppet::Type.type(:connectivity_checker).provide(:threaded) do
def ensure=(value)
# calculate host hash
network_scheme = @resource[:network_scheme]
parallel_amount = @resource[:parallel_amount]
network_metadata = @resource[:network_metadata]
exclude_network_roles = @resource[:exclude_network_roles]
@ -39,46 +40,63 @@ Puppet::Type.type(:connectivity_checker).provide(:threaded) do
}
end
end
#puts test_plan.to_yaml()
# test (ping) neighboors
work = {}
# fork per-host worker for parallel pinging.
workers = {}
# Tasks for verification.
ping_in_q = Queue.new
# Contains connectivity errors.
ping_err_q = Queue.new
test_plan.each do |nodename, networks|
work[nodename] = {}
work[nodename][:thr] = Thread.new(nodename, networks) do |nodename, networks|
#Thread.current[:nodename] = nodename
Thread.current[:errors] = []
networks.each do |netname, netattrs|
Thread.current[:cmd] = "ping -n -q -c #{netattrs[:tries]} -w #{netattrs[:timeout]} #{netattrs[:host]}"
Thread.current[:rc] = 0
if ! system(Thread.current[:cmd], {:out => :close, :err => :close})
Thread.current[:rc] = $?.exitstatus()
Thread.current[:errors] << {
:rc => Thread.current[:rc],
:cmd => "command `#{Thread.current[:cmd]}` was failed with code #{Thread.current[:rc]}",
:net => netname
}
end
end
networks.each do |netname, netattrs|
ping_in_q.push({
:netname => netname,
:nodename => nodename,
:cmd => "ping -n -q -c #{netattrs[:tries]} -w #{netattrs[:timeout]} #{netattrs[:host]}"})
end
end
# waitall only for all pinger threads
work.each do |nodename, rrr|
rrr[:thr].join()
end
# process results
err_report = {}
work.each do |nodename, rrr|
if ! rrr[:thr][:errors].empty?
err_report[nodename] = []
rrr[:thr][:errors].each do |err|
err_report[nodename] << "Unaccessible through '#{err[:net]}', #{err[:cmd]}"
(0..parallel_amount).each do |n|
workers[n] = Thread.new do
begin
while task = ping_in_q.pop(true)
unless system(task[:cmd], {:out => :close, :err => :close})
exit_status = $?.exitstatus()
ping_err_q << {
:rc => exit_status,
:cmd => "command `#{task[:cmd]}` was failed with code #{exit_status}",
:net => task[:netname],
:nodename => task[:nodename]
}
end
end
rescue ThreadError
# If Queue is empty, it reaises ThreadError, we are not interested
# in proceeding after we checked all addresses in queue.
end
end
end
if ! err_report.empty?
# Wait all for all threads to finish.
workers.each do |_, worker|
worker.join()
end
# Process results.
err_report = {}
unless ping_err_q.empty?
begin
while err = ping_err_q.pop(true)
unless err_report[err[:nodename]]
err_report[err[:nodename]] = []
end
err_report[err[:nodename]] << "Unaccessible through '#{err[:net]}', #{err[:cmd]}"
end
rescue ThreadError
end
end
unless err_report.empty?
msg = "Connectivity check error. Nodes: #{err_report.to_yaml.sub!('---','')}"
if @resource[:non_destructive].to_s == 'true'
warn(msg)

View File

@ -23,6 +23,21 @@ Puppet::Type.newtype(:connectivity_checker) do
desc 'List of network roles which should be excluded from check'
end
newparam(:parallel_amount) do
desc 'Amount of package senders run in parallel'
defaultto 20
validate do |val|
if val.to_i <= 0
raise ArgumentError, "parallel_amount should be positive integer, not '#{val}'"
end
end
munge do |val|
val.to_i
end
end
newparam(:non_destructive) do
desc "Define whether we should fail on connectivity issues"
newvalues(:true, :yes, :on, :false, :no, :off)

View File

@ -9,6 +9,6 @@ connectivity_checker { 'netconfig':
non_destructive => pick($network_checker_settings['non_destructive'], false),
ping_tries => pick($network_checker_settings['ping_tries'], 5),
ping_timeout => pick($network_checker_settings['ping_timeout'], 20),
parallel_amount => pick($network_checker_settings['parallel_amount'], 20),
exclude_network_roles => $exclude_network_roles,
}

View File

@ -130,6 +130,7 @@
parameters:
puppet_manifest: /etc/puppet/modules/osnailyfacter/manifests/connectivity-checker.pp
puppet_modules: /etc/puppet/modules
timeout: 300
strategy:
type: parallel
timeout: 3600
strategy:
type: parallel
amount: 50