Bandit: Remove bandit B311, B303 from skip list

Remove B303 (md5, sha1 for python<3.9) and
remove B311 (Standard pseudo-random generators are
not suitable for security/cryptographic purpose) from
the skip list of bandit execution.

Change-Id: I6e9e61e7f94dc9ca339942529af8997adef45e38
This commit is contained in:
elajkat 2024-03-28 11:35:53 +01:00
parent 076d9ad47f
commit 3c557b29f8
16 changed files with 38 additions and 35 deletions

View File

@ -16,8 +16,8 @@
import collections
import functools
import itertools
import random
import re
import secrets
import time
import uuid
@ -1435,7 +1435,7 @@ def _build_flow_expr_str(flow_dict, cmd, strict):
def generate_random_cookie():
# The OpenFlow spec forbids use of -1
return random.randrange(UINT64_BITMASK)
return secrets.SystemRandom().randrange(UINT64_BITMASK)
def check_cookie_mask(cookie):

View File

@ -15,8 +15,8 @@
import abc
import collections
import functools
from random import randint
import re
import secrets
import threading
import uuid
@ -361,7 +361,7 @@ class SbGlobalUpdateEvent(_OVNExtensionEvent, row_event.RowEvent):
# need to spread out the load by introducing a random delay.
# clamp the max delay between 3 and 10 seconds.
max_delay = max(min(cfg.CONF.agent_down_time // 3, 10), 3)
delay = randint(0, max_delay)
delay = secrets.SystemRandom().randint(0, max_delay)
LOG.debug("Delaying updating chassis table for %s seconds", delay)
timer = threading.Timer(delay, _update_chassis, [self, row])

View File

@ -14,7 +14,7 @@
# limitations under the License.
import copy
import random
import secrets
from neutron_lib.agent import topics
from neutron_lib.api import extensions
@ -233,7 +233,7 @@ class DhcpAgentNotifyAPI(object):
if method == 'port_create_end' and enabled_agents:
high_agent = enabled_agents.pop(
random.randint(0, len(enabled_agents) - 1))
secrets.SystemRandom().randint(0, len(enabled_agents) - 1))
self._notify_high_priority_agent(
context, copy.deepcopy(payload), high_agent)
for agent in enabled_agents:

View File

@ -24,8 +24,8 @@ import hmac
import importlib
import os
import os.path
import random
import re
import secrets
import signal
import socket
import sys
@ -667,7 +667,7 @@ def create_object_with_dependency(creator, dep_getter, dep_creator,
# sleep for a random time between 0 and 1 second to
# make sure a concurrent worker doesn't retry again
# at exactly the same time
time.sleep(random.uniform(0, 1))
time.sleep(secrets.SystemRandom().uniform(0, 1))
ctx.reraise = False
continue
try:

View File

@ -14,7 +14,7 @@
# under the License.
import datetime
import random
import secrets
import time
from neutron_lib.callbacks import events
@ -99,7 +99,7 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
interval = max(cfg.CONF.agent_down_time // 2, 1)
# add random initial delay to allow agents to check in after the
# neutron server first starts. random to offset multiple servers
initial_delay = random.randint(interval, interval * 2)
initial_delay = secrets.SystemRandom().randint(interval, interval * 2)
check_worker = neutron_worker.PeriodicWorker(function, interval,
initial_delay)

View File

@ -14,7 +14,7 @@
import functools
import itertools
import random
import secrets
import netaddr
from neutron_lib.api.definitions import l3 as l3_apidef
@ -165,7 +165,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
associated floating IP and delete them 5 minutes after detection.
"""
interval = 60 * 5 # only every 5 minutes. cleanups should be rare
initial_delay = random.randint(0, interval) # splay multiple servers
initial_delay = secrets.SystemRandom().randint(
0, interval) # splay multiple servers
janitor = neutron_worker.PeriodicWorker(self._clean_garbage, interval,
initial_delay)
self.add_worker(janitor)

View File

@ -14,7 +14,7 @@
#
import functools
import random
import secrets
import netaddr
from neutron_lib.api.definitions import l3 as l3_apidef
@ -122,7 +122,7 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
if not available_vr_ids:
return None
return random.choice(list(available_vr_ids))
return secrets.SystemRandom().choice(list(available_vr_ids))
@db_api.retry_if_session_inactive()
def _ensure_vr_id(self, context, router_db, ha_network):

View File

@ -14,7 +14,7 @@
# under the License.
import collections
import random
import secrets
from neutron_lib import constants
from neutron_lib.services.qos import constants as qos_consts
@ -55,7 +55,7 @@ class MeterIDGenerator(object):
cid = None
times = 0
while not cid or cid in used_meter_ids:
cid = random.randint(1, self.max_meter)
cid = secrets.SystemRandom().randint(1, self.max_meter)
times += 1
if times >= MAX_RETIES:
return

View File

@ -15,7 +15,7 @@
# under the License.
import functools
import random
import secrets
import debtcollector
import eventlet
@ -289,7 +289,8 @@ class BundledOpenFlowBridge(object):
if self.active_bundle is not None:
raise ActiveBundleRunning(bundle_id=self.active_bundle)
while True:
self.active_bundle = random.randrange(BUNDLE_ID_WIDTH)
self.active_bundle = secrets.SystemRandom().randrange(
BUNDLE_ID_WIDTH)
if self.active_bundle not in self.br.active_bundles:
self.br.active_bundles.add(self.active_bundle)
break

View File

@ -17,7 +17,7 @@ import abc
import collections
import functools
import itertools
import random
import secrets
from neutron_lib import constants as lib_const
from neutron_lib.db import api as lib_db_api
@ -333,11 +333,11 @@ class ChanceScheduler(L3Scheduler):
"""Randomly allocate an L3 agent for a router."""
def _choose_router_agent(self, plugin, context, candidates):
return random.choice(candidates)
return secrets.SystemRandom().choice(candidates)
def _choose_router_agents_for_ha(self, plugin, context, candidates):
num_agents = self._get_num_of_agents_for_ha(len(candidates))
return random.sample(candidates, num_agents)
return secrets.SystemRandom().sample(candidates, num_agents)
class LeastRoutersScheduler(L3Scheduler):

View File

@ -14,7 +14,7 @@
import abc
import copy
import random
import secrets
from oslo_log import log
@ -148,7 +148,7 @@ class OVNGatewayChanceScheduler(OVNGatewayScheduler):
def _select_gateway_chassis(self, nb_idl, sb_idl, candidates,
priority_min, priority_max, target_lrouter):
candidates = copy.deepcopy(candidates)
random.shuffle(candidates)
secrets.SystemRandom().shuffle(candidates)
return self._reorder_by_az(nb_idl, sb_idl, candidates)
@ -215,7 +215,7 @@ class OVNGatewayLeastLoadedScheduler(OVNGatewayScheduler):
if len(chassis_load) == 0:
break
leastload = min(chassis_load.values())
chassis = random.choice(
chassis = secrets.SystemRandom().choice(
[chassis for chassis, load in chassis_load.items()
if load == leastload])
selected_chassis.append(chassis)

View File

@ -15,7 +15,7 @@
import inspect
import os
import random
import secrets
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
@ -373,7 +373,8 @@ class Service(n_rpc.Service):
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
initial_delay = secrets.SystemRandom().randint(
0, self.periodic_fuzzy_delay)
else:
initial_delay = None

View File

@ -11,7 +11,7 @@
# under the License.
from collections import namedtuple
import random
import secrets
from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import resources
@ -183,7 +183,8 @@ class OVNDriver(base.DriverBase):
# once minimum version for OVN is >= 22.03
if hasattr(acl, "label"):
# Label needs to be an unsigned 32 bit number and not 0.
columns["label"] = random.randrange(1, MAX_INT_LABEL)
columns["label"] = secrets.SystemRandom().randrange(
1, MAX_INT_LABEL)
columns["options"] = {'log-related': "true"}
ovn_txn.add(self.ovn_nb.db_set(
"ACL", acl_uuid, *columns.items()))

View File

@ -11,7 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import secrets
import time
from neutron_lib.db import api as db_api
@ -33,13 +33,13 @@ class LokiPlugin(service_base.ServicePluginBase):
self.random_delay)
def random_deadlock(self, session, flush_context, instances):
if random.randrange(0, 51) > 49: # 1/50 probability
if secrets.SystemRandom().randrange(0, 51) > 49: # 1/50 probability
LOG.info("Loki has raised a DBDeadlock exception, instances %s",
instances)
raise db_exc.DBDeadlock()
def random_delay(self, session, instance):
if random.randrange(0, 201) > 199: # 1/200 probability
if secrets.SystemRandom().randrange(0, 201) > 199: # 1/200 probability
LOG.info("Loki has delayed loading of instance %s", instance)
time.sleep(1)

View File

@ -16,6 +16,7 @@
import collections
import contextlib
import datetime
import secrets
from unittest import mock
from neutron_lib.api import attributes
@ -661,7 +662,7 @@ class L3AgentChanceSchedulerTestCase(L3SchedulerTestCaseMixin,
self.patch_notifier.start()
def test_random_scheduling(self):
random_patch = mock.patch('random.choice')
random_patch = mock.patch.object(secrets.SystemRandom, 'choice')
random_mock = random_patch.start()
def side_effect(seq):

View File

@ -222,11 +222,9 @@ import_exceptions = neutron._i18n
[testenv:bandit]
deps = {[testenv:pep8]deps}
# B104: Possible binding to all interfaces
# B303: prohibit list calls: md5, sha1 for python<3.9
# B311: Standard pseudo-random generators are not suitable for security/cryptographic purpose
# B324: prohibit list calls: md5, sha1 for python>=3.9
# B604: any_other_function_with_shell_equals_true
commands = bandit -r neutron -x tests -n5 -s B104,B303,B311,B324,B604
commands = bandit -r neutron -x tests -n5 -s B104,B324,B604
[testenv:bashate]
deps = {[testenv:pep8]deps}