Skip to content

Stop RecordingApmServer message processing before returning from tests #130007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,6 @@ tests:
- class: org.elasticsearch.search.query.VectorIT
method: testFilteredQueryStrategy
issue: https://github.com/elastic/elasticsearch/issues/129517
- class: org.elasticsearch.test.apmintegration.TracesApmIT
method: testApmIntegration
issue: https://github.com/elastic/elasticsearch/issues/129651
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
method: testSnapshotShutdownProgressTracker
issue: https://github.com/elastic/elasticsearch/issues/129752
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.xcontent.spi.XContentProvider;
import org.junit.rules.ExternalResource;

import java.io.BufferedReader;
Expand All @@ -25,7 +24,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -35,14 +33,12 @@
public class RecordingApmServer extends ExternalResource {
private static final Logger logger = LogManager.getLogger(RecordingApmServer.class);

private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent();

final ArrayBlockingQueue<String> received = new ArrayBlockingQueue<>(1000);

private static HttpServer server;
private final Thread messageConsumerThread = consumerThread();
private volatile Consumer<String> consumer;
private volatile boolean consumerRunning = true;
private volatile boolean running = true;

@Override
protected void before() throws Throwable {
Expand All @@ -56,7 +52,7 @@ protected void before() throws Throwable {

private Thread consumerThread() {
return new Thread(() -> {
while (consumerRunning) {
while (running) {
if (consumer != null) {
try {
String msg = received.poll(1L, TimeUnit.SECONDS);
Expand All @@ -74,28 +70,38 @@ private Thread consumerThread() {

@Override
protected void after() {
running = false;
server.stop(1);
consumerRunning = false;
consumer = null;
}

private void handle(HttpExchange exchange) throws IOException {
try (exchange) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
if (running) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
}
}
}

} catch (RuntimeException e) {
logger.warn("failed to parse request", e);
} catch (Throwable t) {
// The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
// messages before the test starts running. We should also stop handling them before the test ends (and the test
// cluster is torn down), or we may run into IOException as the communication channel is interrupted.
// Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
// we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
// the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
// if we have an error outside the test scope (before or after) that is OK.
logger.warn("failed to parse request", t);
}
}
exchange.sendResponseHeaders(201, 0);
}
}

private List<String> readJsonMessages(InputStream input) throws IOException {
private List<String> readJsonMessages(InputStream input) {
// parse NDJSON
return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList();
}
Expand All @@ -104,14 +110,7 @@ public int getPort() {
return server.getAddress().getPort();
}

public List<String> getMessages() {
List<String> list = new ArrayList<>(received.size());
received.drainTo(list);
return list;
}

public void addMessageConsumer(Consumer<String> messageConsumer) {
this.consumer = messageConsumer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void testApmIntegration() throws Exception {

client().performRequest(nodeStatsRequest);

finished.await(30, TimeUnit.SECONDS);
var completed = finished.await(30, TimeUnit.SECONDS);
assertTrue("Timeout when waiting for assertions to complete", completed);
assertThat(assertions, equalTo(Collections.emptySet()));
}

Expand Down Expand Up @@ -143,5 +144,4 @@ private Map<String, Object> parseMap(String message) {
return Collections.emptyMap();
}
}

}