Recently, I have done some data request work from other data server using async client. Different from normally pulling data from a data api, using a client to request data works in opposite way. The data server will push data to client after getting some request message from a client.
I am going to talk about a scenario that the data server accepts and sends XML messages because that’s what I have done
recently and learned how data gets handled. In this situation, the data provider will normally has some xsd
files for
the server side and the client side, which we can treat them as the message format for the server side and client side.
And here I will mention about a python library xsdata to easily convert xsd
files
to the corresponding dataclass as the data models. And then we can apply those dataclass in our parse function in
client or server class so that although the message is XML, we can still easily use in further python operations.
Apart from a parse function in client and server class, we still need a serialize function to render dataclass back
to transferable message.
Another thing I will mention before some sample code is the async part. Async is always helpful for handling concurrent requests in the same time. For async client, the server message will not be held when there is bunch of message sent to the client at a burst. For async server, when multiple clients request at the same time, the clients doesn’t need to wait for each other getting the response, the async server will just queue the request and send the response back one by one later.
With the above being said, let’s dive into some sample code of the client implementation.
class AsyncClient:
def __init__(self, username, password):
# Under this initial class, we need writer and reader; credentials, i.e username, password and some ssl;
# serializer and parser; server hostname and port; other attributes used with in hte class
pass
async def _connect(self):
# initial reader and writer with asyncio.open_connection()
pass
async def read_message(self):
# a method to combine messages from one request together
pass
async def login(self):
# send login message to server with writer
pass
async def send_heartbeat(self):
# server always send heartbeat periodically to client conveying the server is live
# and normally client will send back heartbeat as well to demonstrate the client is live too
pass
async def request_data(self):
# send data request message server with writer
pass
async def close(self):
# close the writer of client; no need to close reader
# and set writer and reader back to None
pass
Tests in industrial codebase is very important. To test the client code, we need to implement a mock server to send some mock data to client for the functionality tests of client.
class MockServer:
def __init__(self):
# some ssl initialization
pass
async def start(self):
# we can use asyncio.start_server() to start a server
pass
With more details, asyncio.start_server()
requires a handle
function. The handle function contains a handler and run
concurrent tasks for example, sending heartbeat periodically and handling different messages task. With the mock server,
when we try to test async function using pytest, we need a plugin of the decorator @pytest.mark.asyncio
. It can help us
create a event loop and we just need to write simple await
syntax in front of async functions, e.g. server start, getting
messages etc.. When the individual test finishes, the marker we help us terminate a server automatically so that the
port of the mock server uses gets released. During running the parallel async tests, I encountered the address allocation
issue because each tests will require spawning up a mock server and the port number get collision during the parallel
running. What I did to solve this issue is using a try except block to do some port bumping.
async def start(self):
self.bump()
try:
await self.mock_server.start()
except:
self.bump()
await self.mock_server.start()
@pytest.mark.asyncio
async def test_login(self):
await self.start()
...
The above is what I would like to summary on my recent work and I think this help me groom how to do the implementation of data request in another way, which is good stuff transferable on future data play.