Async Client in Data Request

Recently, I have done some data request work from other data server using async client. Different from normally pulling data from a data api, using a client to request data works in opposite way. The data server will push data to client after getting some request message from a client.

I am going to talk about a scenario that the data server accepts and sends XML messages because that’s what I have done recently and learned how data gets handled. In this situation, the data provider will normally has some xsd files for the server side and the client side, which we can treat them as the message format for the server side and client side. And here I will mention about a python library xsdata to easily convert xsd files to the corresponding dataclass as the data models. And then we can apply those dataclass in our parse function in client or server class so that although the message is XML, we can still easily use in further python operations. Apart from a parse function in client and server class, we still need a serialize function to render dataclass back to transferable message.

Another thing I will mention before some sample code is the async part. Async is always helpful for handling concurrent requests in the same time. For async client, the server message will not be held when there is bunch of message sent to the client at a burst. For async server, when multiple clients request at the same time, the clients doesn’t need to wait for each other getting the response, the async server will just queue the request and send the response back one by one later.

With the above being said, let’s dive into some sample code of the client implementation.

class AsyncClient:
    
    def __init__(self, username, password):
        # Under this initial class, we need writer and reader; credentials, i.e username, password and some ssl;
        # serializer and parser; server hostname and port; other attributes used with in hte class 
        pass

    async def _connect(self):
        # initial reader and writer with asyncio.open_connection()
        pass
    
    async def read_message(self):
        # a method to combine messages from one request together 
        pass

    async def login(self):
        # send login message to server with writer
        pass
    
    async def send_heartbeat(self):
        # server always send heartbeat periodically to client conveying the server is live
        # and normally client will send back heartbeat as well to demonstrate the client is live too
        pass
    
    async def request_data(self):
        # send data request message server with writer
        pass
        
    async def close(self):
        # close the writer of client; no need to close reader
        # and set writer and reader back to None
        pass

Tests in industrial codebase is very important. To test the client code, we need to implement a mock server to send some mock data to client for the functionality tests of client.

class MockServer:

    def __init__(self):
        # some ssl initialization
        pass

    async def start(self):
        # we can use asyncio.start_server() to start a server
        pass

With more details, asyncio.start_server() requires a handle function. The handle function contains a handler and run concurrent tasks for example, sending heartbeat periodically and handling different messages task. With the mock server, when we try to test async function using pytest, we need a plugin of the decorator @pytest.mark.asyncio. It can help us create a event loop and we just need to write simple await syntax in front of async functions, e.g. server start, getting messages etc.. When the individual test finishes, the marker we help us terminate a server automatically so that the port of the mock server uses gets released. During running the parallel async tests, I encountered the address allocation issue because each tests will require spawning up a mock server and the port number get collision during the parallel running. What I did to solve this issue is using a try except block to do some port bumping.

async def start(self):
    self.bump()
    try:
       await self.mock_server.start()
    except:
       self.bump()
       await self.mock_server.start()

@pytest.mark.asyncio
async def test_login(self):
    await self.start()
    ...

The above is what I would like to summary on my recent work and I think this help me groom how to do the implementation of data request in another way, which is good stuff transferable on future data play.

comments powered by Disqus