Skip to main content
9 of 9
added 7 characters in body
OldCurmudgeon
  • 2.1k
  • 3
  • 19
  • 30

HTTP Client requests done right

As advised by many, I am using a client pool - specifically the Apache PoolingHttpClientConnectionManager.

For simplicity I wrap it in my own simple singleton class. Sorry about the rather OTT Stop mechanism:

public final class HttpClientPool {
  private static final Logger log = LoggerFactory.getLogger(HttpClientPool.class);

  // Single-element enum to implement Singleton.
  private static enum Singleton {
    // Just one of me so constructor will be called once.
    Client;
    // The thread-safe client.
    private final CloseableHttpClient threadSafeClient;
    // The pool monitor.
    private final IdleConnectionMonitorThread monitor;

    // The constructor creates it - thus late
    private Singleton() {
      PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
      // Increase max total connection to 200
      cm.setMaxTotal(200);
      // Increase default max connection per route to 20
      cm.setDefaultMaxPerRoute(20);
      // Build the client.
      threadSafeClient = HttpClients.custom()
              .setConnectionManager(cm)
              .build();
      // Start up an eviction thread.
      monitor = new IdleConnectionMonitorThread(cm);
      // Don't stop quitting.
      monitor.setDaemon(true);
      monitor.start();
    }

    public CloseableHttpClient get() {
      return threadSafeClient;
    }

  }

  public static CloseableHttpClient getClient() {
    // The thread safe client is held by the singleton.
    return Singleton.Client.get();
  }

  // Watches for stale connections and evicts them.
  private static class IdleConnectionMonitorThread extends Thread {
    // The manager to watch.
    private final PoolingHttpClientConnectionManager cm;
    // Use a BlockingQueue to stop everything.
    private final BlockingQueue<Stop> stopSignal = new ArrayBlockingQueue<Stop>(1);

    // Pushed up the queue.
    private static class Stop {
      // The return queue.
      private final BlockingQueue<Stop> stop = new ArrayBlockingQueue<Stop>(1);

      // Called by the process that is being told to stop.
      public void stopped() {
        // Push me back up the queue to indicate we are now stopped.
        stop.add(this);
      }

      // Called by the process requesting the stop.
      public void waitForStopped() throws InterruptedException {
        // Wait until the callee acknowledges that it has stopped.
        stop.take();
      }

    }

    IdleConnectionMonitorThread(PoolingHttpClientConnectionManager cm) {
      super();
      this.cm = cm;
    }

    @Override
    public void run() {
      try {
        // Holds the stop request that stopped the process.
        Stop stopRequest;
        // Every 5 seconds.
        while ((stopRequest = stopSignal.poll(5, TimeUnit.SECONDS)) == null) {
          // Close expired connections
          cm.closeExpiredConnections();
          // Optionally, close connections that have been idle too long.
          cm.closeIdleConnections(60, TimeUnit.SECONDS);
          // Look at pool stats.
          log.trace("Stats: {}", cm.getTotalStats());
        }
        // Acknowledge the stop request.
        stopRequest.stopped();
      } catch (InterruptedException ex) {
        // terminate
      }
    }

    public void shutdown() throws InterruptedException {
      log.trace("Shutting down client pool");
      // Signal the stop to the thread.
      Stop stop = new Stop();
      stopSignal.add(stop);
      // Wait for the stop to complete.
      stop.waitForStopped();
      // Close the pool - Added
      threadSafeClient.close();
      // Close the connection manager.
      cm.close();
      log.trace("Client pool shut down");
    }

  }

  public static void shutdown() throws InterruptedException {
    // Shutdown the monitor.
    Singleton.Client.monitor.shutdown();
  }

}

I use it exclusively with JSON requests:

  // General query of the website. Takes an object of type Q and returns one of class R.
  public static <Q extends JSONObject, R> R query(String urlBase, String op, Q q, Class<R> r) throws IOException {
    // The request.
    final HttpRequestBase request;
    //postRequest.addHeader("Accept-Encoding", "gzip,deflate");
    if (q != null) {
      // Prepare the post.
      HttpPost postRequest = new HttpPost(urlBase + op);
      // Get it all into a JSON string.
      StringEntity input = new StringEntity(asJSONString(q));
      input.setContentType("application/json");
      postRequest.setEntity(input);
      // Use that one.
      request = postRequest;
    } else {
      // Just get.
      request = new HttpGet(urlBase + op);
    }
    log.debug("> " + urlBase + op + (q == null ? "" : " " + q));
    // Post it and wait.
    return readResponse(request, HttpClientPool.getClient().execute(request), r);
  }
  public static <R> R readResponse(HttpRequestBase request, CloseableHttpResponse response, Class<R> r) throws IOException {
    // What was read.
    R red = null;
    try {
      // What happened?
      if (response.getStatusLine().getStatusCode() == 200) {
        // Roll out the results
        HttpEntity entity = response.getEntity();
        if (entity != null) {
          InputStream content = entity.getContent();
          try {
            // Roll it directly from the response stream.
            JsonParser rsp = getFactory().createJsonParser(content);
            // Bring back the response.
            red = rsp.readValueAs(r);
          } finally {
            // Always close the content.
            content.close();
          }
        }
      } else {
        // The finally below will clean up.
        throw new IOException("HTTP Response: " + response.getStatusLine().getStatusCode());
      }
    } finally {
      // Always close the response.
      response.close();
    }

    if (red == null) {
      log.debug("< {null}");
    } else {
      log.debug("< {}", red.getClass().isArray() ? Arrays.toString((Object[]) red) : red.toString());
    }
    return red;
  }

This seems to work fine under normal load - however, a recent high-load period caused everything to fall apart. We even interfered with other hosted apps. Not sure what caused the interference but I want to be sure this side of it is done right.

The exceptions I saw were:

org.apache.http.NoHttpResponseException: The target server failed to respond
java.net.BindException: Address already in use: connect
java.net.SocketException: No buffer space available (maximum connections reached?): connect -- Thousands of these per hour!!!

So - my questions:

  1. Am I using the pool correctly?

  2. Am I closing/not closing at the right time?

  3. Should I reset the request (request.reset())?

  4. Have I missed something?


Added

Spot the deliberate mistake - not closing the threadSafeClient at shutdown time. Not relevant to the issue but important. Fixed!

Also - failing to close the stream and checking the entity against null. Fixed!

OldCurmudgeon
  • 2.1k
  • 3
  • 19
  • 30