Skip to content

Commit ab25087

Browse files
olavloiteskuruppu
authored andcommitted
perf: close sessions async (#24)
* perf: close sessions async Sessions should be closed using an async gRPC call in order to speed up the closing of a large session pool. Instead of using its own executor, which is limited to 8 worker threads, to execute asynchronous delete session calls, the session pool should use the asynchronous call option in gRPC. This allows a larger number of asynchronous delete session calls to be executed in parallel and speeds up closing a session pool with a large number of sessions. Testing against a real Cloud Spanner database with a session pool containing 1,000 sessions shows the following performance for closing the session pool: Before (3 runs): 6603ms 8169ms 8367ms After (3 runs): 1297ms 1710ms 1851ms Fixes #19. * fix: wait for test servers to terminate * remove tracing for async call * do not use directExecutor which could be a gRPC thread * fix: return failed future instead of throwing exception * fix: remove commented code
1 parent 11e4a90 commit ab25087

17 files changed

+111
-49
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import com.google.api.core.ApiFuture;
20+
import com.google.protobuf.Empty;
21+
1922
/**
2023
* A {@code Session} can be used to perform transactions that read and/or modify data in a Cloud
2124
* Spanner database.
@@ -54,4 +57,10 @@ public interface Session extends DatabaseClient, AutoCloseable {
5457

5558
@Override
5659
void close();
60+
61+
/**
62+
* Closes the session asynchronously and returns the {@link ApiFuture} that can be used to monitor
63+
* the operation progress.
64+
*/
65+
ApiFuture<Empty> asyncClose();
5766
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2020
import static com.google.common.base.Preconditions.checkNotNull;
2121

22+
import com.google.api.core.ApiFuture;
2223
import com.google.cloud.Timestamp;
2324
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2425
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
@@ -27,6 +28,7 @@
2728
import com.google.cloud.spanner.spi.v1.SpannerRpc;
2829
import com.google.common.collect.Lists;
2930
import com.google.protobuf.ByteString;
31+
import com.google.protobuf.Empty;
3032
import com.google.spanner.v1.BeginTransactionRequest;
3133
import com.google.spanner.v1.CommitRequest;
3234
import com.google.spanner.v1.CommitResponse;
@@ -196,6 +198,11 @@ public void prepareReadWriteTransaction() {
196198
readyTransactionId = beginTransaction();
197199
}
198200

201+
@Override
202+
public ApiFuture<Empty> asyncClose() {
203+
return spanner.getRpc().asyncDeleteSession(name, options);
204+
}
205+
199206
@Override
200207
public void close() {
201208
Span span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION).startSpan();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2020

21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
2123
import com.google.cloud.Timestamp;
2224
import com.google.cloud.grpc.GrpcTransportOptions;
2325
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
@@ -35,6 +37,7 @@
3537
import com.google.common.util.concurrent.MoreExecutors;
3638
import com.google.common.util.concurrent.SettableFuture;
3739
import com.google.common.util.concurrent.Uninterruptibles;
40+
import com.google.protobuf.Empty;
3841
import io.opencensus.common.Scope;
3942
import io.opencensus.trace.Annotation;
4043
import io.opencensus.trace.AttributeValue;
@@ -767,6 +770,12 @@ public TransactionRunner readWriteTransaction() {
767770
return new SessionPoolTransactionRunner(SessionPool.this, this);
768771
}
769772

773+
@Override
774+
public ApiFuture<Empty> asyncClose() {
775+
close();
776+
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
777+
}
778+
770779
@Override
771780
public void close() {
772781
synchronized (lock) {
@@ -999,7 +1008,7 @@ private void closeIdleSessions(Instant currTime) {
9991008
}
10001009
for (PooledSession sess : sessionsToClose) {
10011010
logger.log(Level.FINE, "Closing session {0}", sess.getName());
1002-
closeSession(sess);
1011+
closeSessionAsync(sess);
10031012
}
10041013
}
10051014

@@ -1612,37 +1621,27 @@ int totalSessions() {
16121621
}
16131622
}
16141623

1615-
private void closeSessionAsync(final PooledSession sess) {
1616-
executor.submit(
1624+
private ApiFuture<Empty> closeSessionAsync(final PooledSession sess) {
1625+
ApiFuture<Empty> res = sess.delegate.asyncClose();
1626+
res.addListener(
16171627
new Runnable() {
16181628
@Override
16191629
public void run() {
1620-
closeSession(sess);
1630+
synchronized (lock) {
1631+
allSessions.remove(sess);
1632+
if (isClosed()) {
1633+
decrementPendingClosures(1);
1634+
return;
1635+
}
1636+
// Create a new session if needed to unblock some waiter.
1637+
if (numWaiters() > numSessionsBeingCreated) {
1638+
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
1639+
}
1640+
}
16211641
}
1622-
});
1623-
}
1624-
1625-
private void closeSession(PooledSession sess) {
1626-
try {
1627-
sess.delegate.close();
1628-
} catch (SpannerException e) {
1629-
// Backend will delete these sessions after a while even if we fail to close them.
1630-
if (logger.isLoggable(Level.FINE)) {
1631-
logger.log(Level.FINE, "Failed to close session: " + sess.getName(), e);
1632-
}
1633-
} finally {
1634-
synchronized (lock) {
1635-
allSessions.remove(sess);
1636-
if (isClosed()) {
1637-
decrementPendingClosures(1);
1638-
return;
1639-
}
1640-
// Create a new session if needed to unblock some waiter.
1641-
if (numWaiters() > numSessionsBeingCreated) {
1642-
createSessions(getAllowedCreateSessions(numWaiters() - numSessionsBeingCreated));
1643-
}
1644-
}
1645-
}
1642+
},
1643+
executor);
1644+
return res;
16461645
}
16471646

16481647
private void prepareSession(final PooledSession sess) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
2020

21+
import com.google.api.core.ApiFuture;
2122
import com.google.api.core.NanoClock;
2223
import com.google.api.gax.core.CredentialsProvider;
2324
import com.google.api.gax.core.ExecutorProvider;
@@ -523,9 +524,14 @@ public Session createSession(
523524
@Override
524525
public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
525526
throws SpannerException {
527+
get(asyncDeleteSession(sessionName, options));
528+
}
529+
530+
@Override
531+
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
526532
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
527533
GrpcCallContext context = newCallContext(options, sessionName);
528-
get(spannerStub.deleteSessionCallable().futureCall(request, context));
534+
return spannerStub.deleteSessionCallable().futureCall(request, context);
529535
}
530536

531537
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.spanner.spi.v1;
1818

19+
import com.google.api.core.ApiFuture;
1920
import com.google.api.gax.longrunning.OperationFuture;
2021
import com.google.cloud.ServiceRpc;
2122
import com.google.cloud.spanner.SpannerException;
@@ -219,6 +220,9 @@ Session createSession(
219220

220221
void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;
221222

223+
ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
224+
throws SpannerException;
225+
222226
StreamingCall read(
223227
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options);
224228

google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import static org.mockito.Mockito.spy;
2424
import static org.mockito.Mockito.when;
2525

26+
import com.google.api.core.ApiFutures;
2627
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
2728
import com.google.cloud.spanner.SessionPool.Clock;
29+
import com.google.protobuf.Empty;
2830
import java.util.concurrent.ScheduledExecutorService;
2931
import java.util.concurrent.ScheduledFuture;
3032
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -61,6 +63,7 @@ SessionImpl mockSession() {
6163
when(session.getName())
6264
.thenReturn(
6365
"projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex);
66+
when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
6467
sessionIndex++;
6568
return session;
6669
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminGaxTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ public static void startStaticServer() throws IOException {
216216
}
217217

218218
@AfterClass
219-
public static void stopServer() {
219+
public static void stopServer() throws InterruptedException {
220220
server.shutdown();
221+
server.awaitTermination();
221222
}
222223

223224
@Before

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminGaxTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,9 @@ public static void startStaticServer() throws IOException {
219219
}
220220

221221
@AfterClass
222-
public static void stopServer() {
222+
public static void stopServer() throws InterruptedException {
223223
server.shutdown();
224+
server.awaitTermination();
224225
}
225226

226227
@Before

google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,10 @@ public static void startStaticServer() throws IOException {
172172
}
173173

174174
@AfterClass
175-
public static void stopServer() {
175+
public static void stopServer() throws InterruptedException {
176176
spannerClient.close();
177177
server.shutdown();
178+
server.awaitTermination();
178179
}
179180

180181
@Before

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.grpc.StatusRuntimeException;
3030
import io.grpc.inprocess.InProcessServerBuilder;
3131
import java.io.IOException;
32+
import java.util.concurrent.ScheduledThreadPoolExecutor;
3233
import org.junit.After;
3334
import org.junit.AfterClass;
3435
import org.junit.Before;
@@ -55,13 +56,19 @@ public static void startStaticServer() throws IOException {
5556
mockSpanner = new MockSpannerServiceImpl();
5657
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
5758
String uniqueName = InProcessServerBuilder.generateName();
58-
server = InProcessServerBuilder.forName(uniqueName).addService(mockSpanner).build().start();
59+
server =
60+
InProcessServerBuilder.forName(uniqueName)
61+
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
62+
.addService(mockSpanner)
63+
.build()
64+
.start();
5965
channelProvider = LocalChannelProvider.create(uniqueName);
6066
}
6167

6268
@AfterClass
63-
public static void stopServer() {
69+
public static void stopServer() throws InterruptedException {
6470
server.shutdown();
71+
server.awaitTermination();
6572
}
6673

6774
@Before

0 commit comments

Comments
 (0)