I'm struggling to (integration) test a component that makes use of an infinite generator.
Background: At a really high level, this component is essentially a "middleman" in a kafka workflow. It subscribes to a kafka topic, consumes messages, does some operations on the messages, and re-publishes them on a different topic.
Our integration tests run in (very close to) a production-like environment: We bring up a docker compose stack, and then exec into various containers to execute tests (with pytest). We just avoid actually executing the startup scripts. So the tests in question here are executing against a real kafka cluster, (and real database, but the tests use a magic session that runs everything in a transaction that is rolled back at the end of a test). Similarly, we use the real kafka cluster, but we don't turn on our producers/consumers (we just let specific tests use the kafka cluster for the test). I think the best way to describe the philosophy is that we want production-like connections available to be tested, but we still try to preserve test-independence. The tests don't depend on pre-loaded data, and (attempt to) clean up after themselves so as to not leave behind data that could interfere with another test. Application code is tested using pytest and calling application functions/methods directly.
Note: We do have a higher-level integration test setup that tests "fully-externally" only using available API endpoints, etc.
High-level implementation (Python):
def startup():
config = get_config()
kafka_consumer = Consumer(config.kafka, INPUT_TOPIC)
kafka_producer = Producer(config.kafka, OUTPUT_TOPIC)
consume(kafka_consumer.get_messages(), kafka_producer.publish)
def consume(messages: Iterable, publish: Callable):
for message in messages:
payload = transform(message)
publish(payload)
Now, testing consume is easy: I can pass it any iterable of messages (as python objects) and any callable. All the business logic-y stuff is easy to test. (It's even unit testable instead of integration testable).
However, we would still like to have some tests that verify the integration. ie. that the component can connect to the kafka cluster, uses the correct topics, etc.
Our current solution for this is:
get_messagesaccepts parameters fornum_messagesandtimeout(both default to sentinel values indicating "infinity/no timeout")- the
get_messagesgenerator exits afternum_messagesor aftertimeout startupalso accepts these parameters/defaults, and passes them on.- running production code uses the defaults
Then: integration tests take the form
def test_startup(kafka_producer): # pytest fixtures
test_message = "..."
kafka_producer.publish(test_message)
startup(num_messages=1, timeout=5)
# startup exits after one message is processed
# if error causes message not to be processed, startup exits after 5 seconds
# this prevents tests from running forever if there are problems
# assert things
As you can see, all of the complicated "message limit" checking that goes on in the production code is just to make sure that tests will actually fail instead of hanging forever.
This feels like a lot of complexity has been added, just to enable testing. Also, this still doesn't really test the infinite case (different branches in the num_messages/timeout checking, compared to when these parameters are default/unused).
I don't think this problem is necessarily specific to kafka, but more to infinite generators in general. How can I test the code that configures an infinite generator?
Things I've tried:
- Leaving
get_messagesas an (always) infinite generator. I can manually step (next(generator)) or manually close (generator.close()) the generator, which is great for testingget_messagesdirectly, or for testing anything thatget_messagesis passed to as a parameter, but I can't see how to extend this to testingstartupbecause its where the generator is created and the test does not have access to it. This feels close to what I want, as if it is possible, but I can't quite see to connect the dots. - Dependency Injection: Pass the generator into
startup. This makesstartuptestable, but it just moves the issue to a different place. Something, somewhere has to create and start the generator, and I would like to test it. - Minimizing the code that lacks testing: Originally, the
consumefunction was a part ofstartup. Extracting it out and accepting the generator as a parameter enabled it to be tested more easily. Its an improvement, but as with dependency injection, it still avoids the issue.
startupmethod". Either way, @GregBurghardt has the right answer. If testing "the component", then we should let it run and talk to it as a client (Greg's answer). If testing "the code", then I think letting the test runstartupon a separate process (multiprocessing), so it can be terminated after is probably the way to do this (Inspired by Greg's answer).