asynchat – Asynchronous protocol handler¶
Purpose: | Asynchronous network communication protocol handler |
---|---|
Available In: | 1.5.2 and later |
The asynchat module builds on asyncore to make it easier to implement protocols based on passing messages back and forth between server and client. The async_chat class is an asyncore.dispatcher subclass that receives data and looks for a message terminator. Your subclass only needs to specify what to do when data comes in and how to respond once the terminator is found. Outgoing data is queued for transmission via FIFO objects managed by async_chat.
Message Terminators¶
Incoming messages are broken up based on terminators, controlled for each instance via set_terminator(). There are three possible configurations:
- If a string argument is passed to set_terminator(), the message is considered complete when that string appears in the input data.
- If a numeric argument is passed, the message is considered complete when that many bytes have been read.
- If None is passed, message termination is not managed by async_chat.
The EchoServer example below uses both a simple string terminator and a message length terminator, depending on the context of the incoming data. The HTTP request handler example in the standard library documentation offers another example of how to change the terminator based on the context to differentiate between HTTP headers and the HTTP POST request body.
Server and Handler¶
To make it easier to understand how asynchat is different from asyncore, the examples here duplicate the functionality of the EchoServer example from the asyncore discussion. The same pieces are needed: a server object to accept connections, handler objects to deal with communication with each client, and client objects to initiate the conversation.
The EchoServer needed to work with asynchat is essentially the same as the one created for the asyncore example, with fewer logging calls because they are less interesting this time around:
import asyncore
import logging
import socket
from asynchat_echo_handler import EchoHandler
class EchoServer(asyncore.dispatcher):
"""Receives connections and establishes handlers for each client.
"""
def __init__(self, address):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(address)
self.address = self.socket.getsockname()
self.listen(1)
return
def handle_accept(self):
# Called when a client connects to our socket
client_info = self.accept()
EchoHandler(sock=client_info[0])
# We only want to deal with one client at a time,
# so close as soon as we set up the handler.
# Normally you would not do this and the server
# would run forever or until it received instructions
# to stop.
self.handle_close()
return
def handle_close(self):
self.close()
The EchoHandler is based on asynchat.async_chat instead of the asyncore.dispatcher this time around. It operates at a slightly higher level of abstraction, so reading and writing are handled automatically. The buffer needs to know four things:
- what to do with incoming data (by overriding handle_incoming_data())
- how to recognize the end of an incoming message (via set_terminator())
- what to do when a complete message is received (in found_terminator())
- what data to send (using push())
The example application has two operating modes. It is either waiting for a command of the form ECHO length\n, or waiting for the data to be echoed. The mode is toggled back and forth by setting an instance variable process_data to the method to be invoked when the terminator is found and then changing the terminator as appropriate.
import asynchat
import logging
class EchoHandler(asynchat.async_chat):
"""Handles echoing messages from a single client.
"""
# Artificially reduce buffer sizes to illustrate
# sending and receiving partial messages.
ac_in_buffer_size = 64
ac_out_buffer_size = 64
def __init__(self, sock):
self.received_data = []
self.logger = logging.getLogger('EchoHandler')
asynchat.async_chat.__init__(self, sock)
# Start looking for the ECHO command
self.process_data = self._process_command
self.set_terminator('\n')
return
def collect_incoming_data(self, data):
"""Read an incoming message from the client and put it into our outgoing queue."""
self.logger.debug('collect_incoming_data() -> (%d bytes)\n"""%s"""', len(data), data)
self.received_data.append(data)
def found_terminator(self):
"""The end of a command or message has been seen."""
self.logger.debug('found_terminator()')
self.process_data()
def _process_command(self):
"""We have the full ECHO command"""
command = ''.join(self.received_data)
self.logger.debug('_process_command() "%s"', command)
command_verb, command_arg = command.strip().split(' ')
expected_data_len = int(command_arg)
self.set_terminator(expected_data_len)
self.process_data = self._process_message
self.received_data = []
def _process_message(self):
"""We have read the entire message to be sent back to the client"""
to_echo = ''.join(self.received_data)
self.logger.debug('_process_message() echoing\n"""%s"""', to_echo)
self.push(to_echo)
# Disconnect after sending the entire response
# since we only want to do one thing at a time
self.close_when_done()
Once the complete command is found, the handler switches to message-processing mode and waits for the complete set of text to be received. When all of the data is available, it is pushed onto the outgoing channel and set up the handler to be closed once the data is sent.
Client¶
The client works in much the same way as the handler. As with the asyncore implementation, the message to be sent is an argument to the client’s constructor. When the socket connection is established, handle_connect() is called so the client can send the command and message data.
The command is pushed directly, but a special “producer” class is used for the message text. The producer is polled for chunks of data to send out over the network. When the producer returns an empty string, it is assumed to be empty and writing stops.
The client expects just the message data in response, so it sets an integer terminator and collects data in a list until the entire message has been received.
import asynchat
import logging
import socket
class EchoClient(asynchat.async_chat):
"""Sends messages to the server and receives responses.
"""
# Artificially reduce buffer sizes to illustrate
# sending and receiving partial messages.
ac_in_buffer_size = 64
ac_out_buffer_size = 64
def __init__(self, host, port, message):
self.message = message
self.received_data = []
self.logger = logging.getLogger('EchoClient')
asynchat.async_chat.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger.debug('connecting to %s', (host, port))
self.connect((host, port))
return
def handle_connect(self):
self.logger.debug('handle_connect()')
# Send the command
self.push('ECHO %d\n' % len(self.message))
# Send the data
self.push_with_producer(EchoProducer(self.message, buffer_size=self.ac_out_buffer_size))
# We expect the data to come back as-is,
# so set a length-based terminator
self.set_terminator(len(self.message))
def collect_incoming_data(self, data):
"""Read an incoming message from the client and put it into our outgoing queue."""
self.logger.debug('collect_incoming_data() -> (%d)\n"""%s"""', len(data), data)
self.received_data.append(data)
def found_terminator(self):
self.logger.debug('found_terminator()')
received_message = ''.join(self.received_data)
if received_message == self.message:
self.logger.debug('RECEIVED COPY OF MESSAGE')
else:
self.logger.debug('ERROR IN TRANSMISSION')
self.logger.debug('EXPECTED "%s"', self.message)
self.logger.debug('RECEIVED "%s"', received_message)
return
class EchoProducer(asynchat.simple_producer):
logger = logging.getLogger('EchoProducer')
def more(self):
response = asynchat.simple_producer.more(self)
self.logger.debug('more() -> (%s bytes)\n"""%s"""', len(response), response)
return response
Putting It All Together¶
The main program for this example sets up the client and server in the same asyncore main loop.
import asyncore
import logging
import socket
from asynchat_echo_server import EchoServer
from asynchat_echo_client import EchoClient
logging.basicConfig(level=logging.DEBUG,
format='%(name)s: %(message)s',
)
address = ('localhost', 0) # let the kernel give us a port
server = EchoServer(address)
ip, port = server.address # find out what port we were given
message_data = open('lorem.txt', 'r').read()
client = EchoClient(ip, port, message=message_data)
asyncore.loop()
Normally you would have them in separate processes, but this makes it easier to show the combined output.
$ python asynchat_echo_main.py
EchoClient: connecting to ('127.0.0.1', 56193)
EchoClient: handle_connect()
EchoProducer: more() -> (64 bytes)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
"""
EchoHandler: collect_incoming_data() -> (8 bytes)
"""ECHO 166"""
EchoHandler: found_terminator()
EchoHandler: _process_command() "ECHO 166"
EchoHandler: collect_incoming_data() -> (55 bytes)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing eli"""
EchoProducer: more() -> (64 bytes)
"""egestas, enim et consectetuer ullamcorper, lectus ligula rutrum """
EchoHandler: collect_incoming_data() -> (64 bytes)
"""t. Donec
egestas, enim et consectetuer ullamcorper, lectus ligul"""
EchoProducer: more() -> (38 bytes)
"""leo, a
elementum elit tortor eu quam.
"""
EchoHandler: collect_incoming_data() -> (47 bytes)
"""a rutrum leo, a
elementum elit tortor eu quam.
"""
EchoHandler: found_terminator()
EchoHandler: _process_message() echoing
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
egestas, enim et consectetuer ullamcorper, lectus ligula rutrum leo, a
elementum elit tortor eu quam.
"""
EchoProducer: more() -> (0 bytes)
""""""
EchoClient: collect_incoming_data() -> (64)
"""Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
"""
EchoClient: collect_incoming_data() -> (64)
"""egestas, enim et consectetuer ullamcorper, lectus ligula rutrum """
EchoClient: collect_incoming_data() -> (38)
"""leo, a
elementum elit tortor eu quam.
"""
EchoClient: found_terminator()
EchoClient: RECEIVED COPY OF MESSAGE