Fix python3 compatibility
Change-Id: I1b3e4ffa2317e824be8454c5549669bbed8952e3
This commit is contained in:
parent
7c3cce78f4
commit
62d1412223
|
@ -63,7 +63,7 @@ class HostAlive(services_checks.ServicesCheck):
|
|||
error_message = 'Unable to open socket to host {0}'.format(host)
|
||||
self.log.warn(error_message)
|
||||
return False, error_message
|
||||
if banner.startswith('SSH'):
|
||||
if banner.startswith(b'SSH'):
|
||||
return True, None
|
||||
else:
|
||||
error_message = 'Unexpected response "{0}" from host {1}'.format(banner, host)
|
||||
|
|
|
@ -23,8 +23,8 @@ import sys
|
|||
import time
|
||||
|
||||
from httplib2 import Http
|
||||
from httplib2 import httplib
|
||||
from httplib2 import HttpLib2Error
|
||||
from six.moves import http_client
|
||||
|
||||
import monasca_agent.collector.checks.services_checks as services_checks
|
||||
import monasca_agent.common.config as cfg
|
||||
|
@ -130,7 +130,7 @@ class HTTPCheck(services_checks.ServicesCheck):
|
|||
self.log.warn(warn_string)
|
||||
return False, error_msg
|
||||
|
||||
except httplib.ResponseNotReady as e:
|
||||
except http_client.ResponseNotReady as e:
|
||||
length = int((time.time() - start) * 1000)
|
||||
error_msg = 'error: {0}. Network is not routable after {1} ' \
|
||||
'ms'.format(repr(e), length)
|
||||
|
|
|
@ -69,8 +69,7 @@ class KafkaCheck(checks.AgentCheck):
|
|||
assert isinstance(group, six.string_types)
|
||||
if isinstance(topics, dict):
|
||||
self.log.info("Found old config format, discarding partition list")
|
||||
topics = topics.keys()
|
||||
assert isinstance(topics, list)
|
||||
topics = list(topics.keys())
|
||||
assert isinstance(topics[0], six.string_types)
|
||||
consumer_groups[group] = topics
|
||||
return consumer_groups
|
||||
|
@ -122,14 +121,12 @@ class KafkaCheck(checks.AgentCheck):
|
|||
for p in partitions:
|
||||
try:
|
||||
response = kafka_conn.send_offset_request(
|
||||
[common.OffsetRequest(topic, p, -1, 1)])
|
||||
[common.OffsetRequest(topic.encode('utf-8'), p, -1, 1)])
|
||||
offset_responses.append(response[0])
|
||||
except common.KafkaError as e:
|
||||
self.log.error("Error fetching broker offset: {0}".format(e))
|
||||
|
||||
for resp in offset_responses:
|
||||
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
|
||||
|
||||
return consumer_offsets, broker_offsets
|
||||
|
||||
def check(self, instance):
|
||||
|
@ -146,13 +143,12 @@ class KafkaCheck(checks.AgentCheck):
|
|||
# Connect to Kafka and pull information
|
||||
with KafkaConnection(kafka_host_ports) as kafka_conn:
|
||||
consumer_offsets, broker_offsets = self._get_kafka_offsets(kafka_conn, consumer_groups)
|
||||
|
||||
# Report the broker data if full output
|
||||
if full_output:
|
||||
broker_dimensions = dimensions.copy()
|
||||
for (topic, partition), broker_offset in broker_offsets.items():
|
||||
broker_dimensions.update({'topic': topic, 'partition': str(partition)})
|
||||
broker_offset = broker_offsets.get((topic, partition))
|
||||
broker_offset = broker_offsets.get((topic.encode('utf-8'), partition))
|
||||
self.gauge('kafka.broker_offset', broker_offset,
|
||||
dimensions=self._set_dimensions(broker_dimensions, instance))
|
||||
|
||||
|
@ -162,7 +158,7 @@ class KafkaCheck(checks.AgentCheck):
|
|||
if per_partition:
|
||||
for partition, consumer_offset in offsets.items():
|
||||
# Get the broker offset
|
||||
broker_offset = broker_offsets.get((topic, partition))
|
||||
broker_offset = broker_offsets.get((topic.encode('utf-8'), partition))
|
||||
# Report the consumer offset and lag
|
||||
consumer_dimensions.update({'topic': topic, 'partition': str(partition),
|
||||
'consumer_group': consumer_group})
|
||||
|
@ -175,7 +171,7 @@ class KafkaCheck(checks.AgentCheck):
|
|||
consumer_dimensions.update({'topic': topic, 'consumer_group': consumer_group})
|
||||
total_lag = 0
|
||||
for partition, consumer_offset in offsets.items():
|
||||
broker_offset = broker_offsets.get((topic, partition))
|
||||
broker_offset = broker_offsets.get((topic.encode('utf-8'), partition))
|
||||
total_lag += broker_offset - consumer_offset
|
||||
|
||||
self.gauge('kafka.consumer_lag', total_lag,
|
||||
|
|
|
@ -39,6 +39,7 @@ import re
|
|||
import socket
|
||||
import struct
|
||||
|
||||
from oslo_utils import encodeutils
|
||||
from six import StringIO
|
||||
|
||||
from monasca_agent.collector.checks import AgentCheck
|
||||
|
@ -63,10 +64,10 @@ class Zookeeper(AgentCheck):
|
|||
try:
|
||||
# Connect to the zk client port and send the stat command
|
||||
sock.connect((host, port))
|
||||
sock.sendall('stat')
|
||||
sock.sendall(b'stat')
|
||||
|
||||
# Read the response into a StringIO buffer
|
||||
chunk = sock.recv(chunk_size)
|
||||
chunk = encodeutils.safe_decode(sock.recv(chunk_size), 'utf-8')
|
||||
buf.write(chunk)
|
||||
num_reads = 1
|
||||
max_reads = 10000
|
||||
|
@ -76,7 +77,7 @@ class Zookeeper(AgentCheck):
|
|||
raise Exception(
|
||||
"Read %s bytes before exceeding max reads of %s. " %
|
||||
(buf.tell(), max_reads))
|
||||
chunk = sock.recv(chunk_size)
|
||||
chunk = encodeutils.safe_decode(sock.recv(chunk_size), 'utf-8')
|
||||
buf.write(chunk)
|
||||
num_reads += 1
|
||||
except socket.timeout:
|
||||
|
@ -115,7 +116,7 @@ class Zookeeper(AgentCheck):
|
|||
raise Exception("Could not parse version from stat command output: %s" % start_line)
|
||||
else:
|
||||
version_tuple = match.groups()
|
||||
has_connections_val = map(int, version_tuple) >= [3, 4, 4]
|
||||
has_connections_val = list(map(int, version_tuple)) >= [3, 4, 4]
|
||||
|
||||
# Clients:
|
||||
buf.readline() # skip the Clients: header
|
||||
|
|
Loading…
Reference in New Issue