charm-keystone/hooks/uds_comms.py

357 lines
12 KiB
Python

import base64
import os
import socket
import six
# for matching in the Codec class
if six.PY3:
START_CHAR = 37 # 37 == %
END_CHAR = 36 # 36 == $
else:
START_CHAR = '%'
END_CHAR = '$'
class Codec():
"""A very simple codec that bounds messages with a start char of '%' and an
end char of '$'. The message itself mustn't contain either of these
characters, and this is ensured by encoding the message using base64 (which
doesn't contain either of those characters).
This is for sending over a unix domain socket which has interesting
buffering -- this makes sure we can reconstruct entire messages between two
processes.
"""
def __init__(self):
self.found_start = -1
self.message = None
self.buffer = b''
def _add(self, bites):
"""Add some bytes to the buffer: called from receive()
It looks for the beginning and end of a message, and if found returns
the encoded buffer without the '%' and '$' markers.
:param bites: the bytes to add to the buffer and search for a message
:type bites: bytes
:returns: Either a b64encoded message, or None
:rtype: Option[bytes, None]
"""
# current = len(self.buffer)
self.buffer += bites
if self.found_start < 0:
# skip till we found a '%'
for i, b in enumerate(self.buffer):
if b == START_CHAR:
self.found_start = i
break
if self.found_start > -1:
# see if the end of the message is available
for i, b in enumerate(self.buffer):
if i > self.found_start + 1 and b == END_CHAR:
# found the end
start = self.found_start + 1
self.message = (base64
.b64decode(self.buffer[start:i])
.decode('UTF-8'))
self.buffer = self.buffer[i + 1:]
self.found_start = -1
return self.message
return None
def receive(self, _callable):
"""Continuously calls the param _callable() until it returns None or a
full message is received.
If the message is already in the buffer, then it grabs it and doesn't
call the _callable().
_callable() should return bytes until it wants receive() to terminate,
when it should return None. receive() also returns when a message is
complete.
receive() will return a decoded UTF-8 string when a complete message is
received.
Any left over bytes are retained in the Codec object, and further calls
to receive() will consume these first.
:param _callable: A function that returns None or bytes
:type _callable: Callable()
:returns: None or a UTF-8 decoded string
:rtype: Option[None, str]
"""
# first see if the message is already in the buffer?
message = self._add(b'')
if message:
return message
while True:
# receive the data in chunks
data = _callable()
if data:
message = self._add(data)
if message:
return message
else:
break
return None
def encode(self, message):
"""Encode a message for sending on a channel with inconsistent
buffering (e.g. like a unix domain socket.
Encodes the message by UTF-8, then base64 and finally adds '%' and '$'
to the start and end of the message. This is so the message can be
recovered by searching through a receiving buffer.
:param message: The string that needs encoding.
:type message: str
:returns: the encoded message
:rtype: bytes
"""
buffer = base64.b64encode(message.encode('UTF-8'))
return b"%" + buffer + b"$"
# client and socket classes for the channel
#
# The Client connects to the server, and performs a READY handshake as part of
# the connect(). The server has to respond 'OK'. Once this is done the client
# and server are synchronised. Note that it is a one-to-one, synchronised
# connection with client and server exchanging messages. The theory is that
# the server initiates the Server, to bind to the socket, launches the script
# and then waits for the connection. There is no race as the client will wait
# until the servers calls wait_for_connection() which can be after the client
# has connected to the socket.
#
# The server then sends a "QUIT" to the client to get it to clean up and exit
# (but this is outside of the protocol in the Client() and Server() classes
class UDSException(Exception):
"""Used to gather up all exceptions and return a single one so that the
client/server can error out on comms failures.
"""
class UDSClient():
"""Unix Domain Socket Client class.
Provides a synchronised message/receive client for connecting to the
equivalent UDSServer() running in a different process.
The client/server is backwards, as the UDSClient() is expecting to receive
a message, which its user will then reply with a result. i.e. the Client
is implemented in a process that expects to get commands from the server.
This is so that the server can launch a child script, communicate with it,
and then terminate it when finished.
Example use:
client = Client(server_address)
client.connect()
message = client.receive()
if message == "DONE":
client.close()
return
client.send("OK")
# etc.
"""
BUFFER_SIZE = 256
def __init__(self, socket_path):
"""Initialise the Client.
:param socket_path: the file to use as a Unix Domain Socket
:type socket_path: str
:raises: UDSException on Error
"""
self.socket_path = socket_path
try:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
except Exception as e:
raise UDSException(str(e))
self.codec = Codec()
def connect(self):
"""Attempt to connect to the other side.
When the connection is made, automatically calls _ready() to indicate
that the client is ready as part of the handshake. When connect()
completes the user should call receive() to receive the first message
from the server.
:raises: UDSException on Error
"""
try:
self.sock.connect(self.socket_path)
self._ready()
except Exception as e:
raise UDSException(str(e))
def _ready(self):
"""Internal method to provide a handshake to the server"""
self.sock.sendall(self.codec.encode("READY"))
message = self.receive()
if message != "OK":
raise RuntimeError("Handshake failed")
def receive(self):
"""Receives a message from the Server() in the other process on the
other end of the UDS. Uses the Codec() class to ensure that the
messages are properly received and sent.
:returns: the string send by the Server.send() methdod.
:rtype: str
:raises: UDSException on Error
"""
try:
return self.codec.receive(
lambda: self.sock.recv(self.BUFFER_SIZE))
except Exception as e:
raise UDSException(str(e))
def send(self, buffer):
"""Send a message to the Server() in the other process.
:param buffer: the string to send
:type buffer: str
:raises: UDSException on Error
"""
try:
self.sock.sendall(self.codec.encode(buffer))
except Exception as e:
raise UDSException(str(e))
def close(self):
"""Close the socket -- good housekeeping, so should do it at the end of
the process.
:raises: UDSException on Error
"""
try:
self.sock.close()
except Exception as e:
raise UDSException(str(e))
class UDSServer():
"""The Server (or listening) end of the Unix Domain Socket chat protocol.
Uses Codec() to encode and decode messages on the channel.
The Server listens for a connection, performs a handshake, and then is in
control of the conversation. The user of Server() should then send a
message and wait for a reponse. It's up to the client to disconnect, so an
protocol level message should be used (e.g. QUIT) that the user of Client()
will use to close the connection.
Example use:
server = Server(server_address)
input("Press enter to continue ....")
server.wait_for_connection()
try:
# send some data
server.send(data)
# and await the reply
message = server.receive()
finally:
# clean up
server.send("DONE")
message = server.receive()
server.close()
"""
BUFFER_SIZE = 256
def __init__(self, socket_path):
"""Initialise the listener on the UDS. This binds to the socket and
ensures that a client can connect. The conversation doesn't get
started until the wait_for_connection() method is called.
The server can initialse the Server, then ask the client to connect,
and then at any point later call wait_for_connection() to get the
conversation going.
:param socket_path: the filename for the UDS.
:type socket_path: str
:raises: UDSException on Error
"""
self.socket_path = socket_path
self.sock = None
# Make sure the socket does not already exist
try:
os.unlink(socket_path)
except OSError:
if os.path.exists(socket_path):
raise
try:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# ensure the socket is created with 600 permissions
_mask = os.umask(0o177)
self.sock.bind(socket_path)
os.umask(_mask)
self.sock.listen(1)
except Exception as e:
raise UDSException(str(e))
self.codec = Codec()
def wait_for_connection(self):
"""Blocking method to wait for a connection from the client.
Performs the handshake to ensure that both ends are in sync.
:raises: UDSException on Error
"""
try:
self.connection, self.client_address = self.sock.accept()
self._handshake()
except Exception as e:
raise UDSException(str(e))
def _handshake(self):
"""Internal method to sync up the client and server"""
while True:
message = self.receive()
if message == 'READY':
self.send('OK')
break
def receive(self):
"""Receives a message from the Client() in the other process on the
other end of the UDS. Uses the Codec() class to ensure that the
messages are properly received and sent.
:returns: the string send by the Client.send() methdod.
:rtype: str
:raises: UDSException on Error
"""
try:
return self.codec.receive(
lambda: self.connection.recv(self.BUFFER_SIZE))
except Exception as e:
raise UDSException(str(e))
def send(self, buffer):
"""Send a message to the Client() in the other process.
:param buffer: the string to send
:type buffer: str
:raises: UDSException on Error
"""
try:
self.connection.sendall(self.codec.encode(buffer))
except Exception as e:
raise UDSException(str(e))
def close(self):
"""Close the socket -- good housekeeping, so should do it at the end of
the process.
:raises: UDSException on Error
"""
try:
self.connection.close()
except Exception as e:
raise UDSException(str(e))