7

What I'm trying to do

I'm trying to emulate the behavior of the following simple socat(1) command:

socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'

The above command creates a forking TCP server which forks and executes SOME_PROGRAM for each connection, redirecting both stdin and stdout of said command to the TCP socket.

Here's what I'd like to achieve:

  1. Create a simple TCP server with asyncio to handle multiple concurrent connections.
  2. Whenever a connection is received, start SOME_PROGRAM as a sub-process.
  3. Pass any data received from the socket to SOME_PROGRAM's standard input.
  4. Pass any data received from SOME_PROGRAM's standard output to the socket.
  5. When SOME_PROGRAM exits, write a goodbye message along with the exit code to the socket and close the connection.

I would like to do this in pure Python, without using external libraries using the asyncio module.

What I have so far

Here's the code I wrote so far:

import asyncio

class ServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.client_addr   = transport.get_extra_info('peername')
        self.transport     = transport
        self.child_process = None

        print('Connection with {} enstablished'.format(self.client_addr))

        asyncio.ensure_future(self._create_subprocess())

    def connection_lost(self, exception):
        print('Connection with {} closed.'.format(self.client_addr))

        if self.child_process.returncode is not None:
            self.child_process.terminate()

    def data_received(self, data):
        print('Data received: {!r}'.format(data))

        # Make sure the process has been spawned
        # Does this even make sense? Looks so awkward to me...
        while self.child_process is None:
            continue

        # Write any received data to child_process' stdin
        self.child_process.stdin.write(data)

    async def _create_subprocess(self):
        self.child_process = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE
        )

        # Start reading child stdout
        asyncio.ensure_future(self._pipe_child_stdout())

        # Ideally I would register some callback here so that when
        # child_process exits I can write to the socket a goodbye
        # message and close the connection, but I don't know how
        # I could do that...

    async def _pipe_child_stdout(self):
        # This does not seem to work, this function returns b'', that is an
        # empty buffer, AFTER the process exits...
        data = await self.child_process.stdout.read(100) # Arbitrary buffer size

        print('Child process data: {!r}'.format(data))

        if data:
            # Send to socket
            self.transport.write(data)
            # Reschedule to read more data
            asyncio.ensure_future(self._pipe_child_stdout())


SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
    server = loop.run_until_complete(coro)

    print('Serving on {}'.format(server.sockets[0].getsockname()))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

And also the ./test program I'm trying to run as subprocess:

#!/usr/bin/env python3

import sys

if sys.stdin.read(2) == 'a\n':
    sys.stdout.write('Good!\n')
else:
    sys.exit(1)

if sys.stdin.read(2) == 'b\n':
    sys.stdout.write('Wonderful!\n')
else:
    sys.exit(1)

sys.exit(0)

Unfortunately the above code doesn't really work, and I'm kind of lost about what to try next.

What works as intended:

  • The child process is correctly spawned, and also seems to correctly receive the input from the socket, because I can see it from htop and I can also see that as soon as I send b\n it terminates.

What doesn't work as intended:

Basically anything else...

  • The child process' output is never sent to the socket, and actually never read at all. The call await self.child_process.stdout.read(100) never seems to terminate: instead, it only terminates after the child process dies and the result is just b'' (an empty bytes object).
  • I'm not able to understand when the child process terminates: as I mentioned above, I'd like to send a "Goodbye" message to the socket along with self.child_process.returncode when this happens, but I don't know how to do this in a way which makes sense.

What I tried:

Questions

So, could someone please help me figure out what am I doing wrong? There has to be a way to make this work smoothly. When I first started, I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.

4
  • Have you seen this question: stackoverflow.com/questions/48506460/… ? Commented May 3, 2019 at 23:12
  • try running withe ayncio in debug mode, it may be useful Commented May 3, 2019 at 23:17
  • @Sanyash I have now, but I don't really see how that could help. I know how to write a TCP server with asyncio. What I don't know is how to get the clients to talk to a subprocess. Commented May 3, 2019 at 23:22
  • You might want to try a higher level approach similar to this simple tcp proxy, replacing asyncio.open_connection in the client handler with asyncio.create_subprocess_exec Commented May 4, 2019 at 8:46

1 Answer 1

8

Your code has two immediate implementation issues:

  • The server strips the whitespace off received data before transmitting it to the subprocess. This removes the trailing newline, so if the TCP client sends "a\n", the subprocess will receive just "a". That way the subprocess never encounters the expected "a\n" string, and it always terminates after reading two bytes. This explains the empty string (EOF) coming from the subprocess. (Stripping has been removed in a subsequent edit to the question.)
  • The subprocess doesn't flush its output, so the server doesn't receive any of its writes. The writes are seen only when the subprocess exits, or when it fills up its output buffer, which measures in kilobytes and takes a while to fill when displaying short debugging messages.

The other issue is on the design level. As mentioned in the comments, unless your explicit intention is to implement a new asyncio protocol, it is recommended to stick to the higher-level stream-based API, in this case the start_server function. The even lower-level functionality like SubprocessProtocol, connect_write_pipe, and connect_read_pipe is also not something you would want to use in application code. The rest of this answer assumes a stream-based implementation.

start_server accepts a coroutine which will be spawned as a new task whenever a client connects. It is called with two asyncio stream arguments, one for reading and one for writing. The coroutine contains the logic of communicating to a client; in your case it would spawn the subprocess and transfer data between it and the client.

Note that the bidirectional data transfer between the socket and the subprocess cannot be achieved with a simple loop with reads followed by writes. For example, consider this loop:

# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
    proc_data = await child.stdout.read(1024)  # (1)
    sock_writer.write(proc_data)
    sock_data = await sock_reader.read(1024)
    child.stdin.write(sock_data)               # (2)

This kind of loop is prone to deadlocks. If the subprocess is responding to data it receives from the TCP client, it will sometimes only provide output after it receives some input. That will block the loop at (1) indefinitely, because it can get the data from child's stdout only after sending the child the sock_data, which happens later, at (2). Effectively, (1) waits for (2) and vice versa, constituting a deadlock. Note that reversing the order of transfers won't help because then the loop would deadlock if the TCP client is processing the output of the server's subprocess.

With asyncio at our disposal, such deadlock is easy to avoid: simply spawn two coroutines in parallel, one that transfers data from the socket to the subprocess's stdin, and another that transfers data from the subrocess's stdout to the socket. For example:

# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
    while True:
        data = await src.read(1024)
        if data == b'':
            break
        dest.write(data)

child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()

The difference between this setup and the first while loop is that the two transfers independent of each other. The deadlock cannot occur because the read from the socket never waits for the read from the subprocess and vice versa.

Applied to the question, the whole server would look like this:

import asyncio

class ProcServer:
    async def _transfer(self, src, dest):
        while True:
            data = await src.read(1024)
            if data == b'':
                break
            dest.write(data)

    async def _handle_client(self, r, w):
        loop = asyncio.get_event_loop()
        print(f'Connection from {w.get_extra_info("peername")}')
        child = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)
        sock_to_child = loop.create_task(self._transfer(r, child.stdin))
        child_to_sock = loop.create_task(self._transfer(child.stdout, w))
        await child.wait()
        sock_to_child.cancel()
        child_to_sock.cancel()
        w.write(b'Process exited with status %d\n' % child.returncode)
        w.close()

    async def start_serving(self):
        await asyncio.start_server(self._handle_client,
                                   '0.0.0.0', SERVER_PORT)

SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    server = ProcServer()
    loop.run_until_complete(server.start_serving())
    loop.run_forever()

The accompanying test program must also be modified to call sys.stdout.flush() after every sys.stdout.write(), otherwise the messages linger in its stdio buffers instead of being sent to the parent.

When I first started I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.

On Unix-like systems it is certainly possible to redirect a socket to a spawned subprocess, so that the subprocess is directly talking to the client. (The old inetd Unix server works like this.) But that mode of operation is not supported by asyncio, for two reasons:

  • it does not work on all systems supported by Python and asyncio, in particular on Windows.
  • it is incompatible with core asyncio features, such as transports/protocols and streams, which assume ownership and exclusive access to the underlying sockets.

Even if you don't care about portability, consider the second point: you might need to process or log the data exchanged between the TCP client and the subprocess, and you can't do that if they're welded together on the kernel level. Also, timeouts and cancelation are much easier to implement in asyncio coroutines than when dealing with just an opaque subprocess.

If non-portability and inability to control the communication is fine for your use case, then you probably don't need asyncio in the first place - nothing prevents you from spawning a thread that runs a classic blocking server which handles each client with the same sequence of os.fork, os.dup2, and os.execlp that you'd write in C.

EDIT

As the OP points out in a comment, the original code handles the TCP client disconnecting by killing the child process. At the stream layer, a connection loss is reflected by the stream either signaling end-of-file or raising an exception. In the above code, one could easily react to connection loss by replacing the generic self._transfer() with a more specific coroutine that handles that case. For example, instead of:

sock_to_child = loop.create_task(self._transfer(r, child.stdin))

...one could write:

sock_to_child = loop.create_task(self._sock_to_child(r, child))

and define _sock_to_child like this (untested):

async def _sock_to_child(self, reader, child):
    try:
        await self._transfer(reader, child.stdin)
    except IOError as e:
        # IO errors are an expected part of the workflow,
        # we don't want to propagate them
        print('exception:', e)
    child.kill()

If the child outlives the TCP client, the child.kill() line will likely never execute because the coroutine will be canceled by _handle_client while suspended in src.read() inside _transfer().

Sign up to request clarification or add additional context in comments.

9 Comments

First of all, thank you so much for the time you spent to answer my question! The .strip() on the data was an editing mistake, sorry about that, but thanks for pointing it out. You're also right that I should flush() stdout after writing on it. Your code has one problem though: unlike in mine, if the connection is closed by the client, the child process is never terminated. This is critical to say the least, and I was subclassing asyncio.Protocol exactly for this reason (the connection_lost callback). Could this issue be solved even with your simpler approach?
@MarcoBonelli Sure, connection loss translates to EOF/exception on the stream that communicates with the client. In this case you'd invoke a more specialized coroutine that invokes the generic transfer and then kills the child. See the edited answer.
It's really a shame that asyncio took the whole transport/protocol thing from Twisted and made it public. It only confuses learners, and its callback-driven approach undoes many of the advantages of using a system like asyncio in the first place. In git terminology, it should have remained a plumbing layer.
Okay, I see, it looks a bit clumsy to be honest, but it gets the job done. One note though: the try-except block is not necessary since IOError will never be raised since src.read() returns b'' when EOF is reached (i.e. the client disconnects), even if we try to read() from the socket after the client disconnects. Also you're right, the whole asyncio module and logic was pretty confusing when I started playing around with it. Thank you again :)
@MarcoBonelli That's true if the client disconnects cleanly, but you can easily get IOError in case of network failures and timeouts. If you've programmed with BSD sockets, you'll have seen "connection reset by peer" and similar errors - that's the kind of exception StreamReader.read will raise.
|

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.