Skip to content

Commit 46264d1

Browse files
authored
feat: Add experimental DirectPath support (#396)
* feat: Add experimental DirectPath support
1 parent 45d8419 commit 46264d1

File tree

3 files changed

+142
-29
lines changed

3 files changed

+142
-29
lines changed

google-cloud-spanner/pom.xml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
</parent>
1616
<properties>
1717
<site.installationModule>google-cloud-spanner</site.installationModule>
18+
<skipUTs>false</skipUTs>
1819
</properties>
1920

21+
2022
<build>
2123
<plugins>
2224
<plugin>
@@ -49,6 +51,7 @@
4951
<id>default-test</id>
5052
<configuration>
5153
<excludedGroups>com.google.cloud.spanner.TracerTest,com.google.cloud.spanner.IntegrationTest</excludedGroups>
54+
<skipTests>${skipUTs}</skipTests>
5255
</configuration>
5356
</execution>
5457
<execution>
@@ -360,5 +363,26 @@
360363
</plugins>
361364
</build>
362365
</profile>
366+
<profile>
367+
<id>spanner-directpath-it</id>
368+
<build>
369+
<plugins>
370+
<plugin>
371+
<groupId>org.apache.maven.plugins</groupId>
372+
<artifactId>maven-failsafe-plugin</artifactId>
373+
<configuration>
374+
<systemPropertyVariables>
375+
<spanner.testenv.config.class>com.google.cloud.spanner.GceTestEnvConfig</spanner.testenv.config.class>
376+
<spanner.testenv.instance>projects/directpath-prod-manual-testing/instances/spanner-testing</spanner.testenv.instance>
377+
<spanner.gce.config.project_id>directpath-prod-manual-testing</spanner.gce.config.project_id>
378+
<spanner.attempt_directpath>true</spanner.attempt_directpath>
379+
<spanner.directpath_test_scenario>ipv4</spanner.directpath_test_scenario>
380+
</systemPropertyVariables>
381+
<forkedProcessTimeoutInSeconds>3000</forkedProcessTimeoutInSeconds>
382+
</configuration>
383+
</plugin>
384+
</plugins>
385+
</build>
386+
</profile>
363387
</profiles>
364388
</project>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ private void awaitTermination() throws InterruptedException {
230230
private static final int DEFAULT_PERIOD_SECONDS = 10;
231231
private static final int GRPC_KEEPALIVE_SECONDS = 2 * 60;
232232

233+
// TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta.
234+
private static final String DIRECT_PATH_ENDPOINT = "aa423245250f2bbf.sandbox.googleapis.com:443";
235+
233236
private final ManagedInstantiatingExecutorProvider executorProvider;
234237
private boolean rpcIsClosed;
235238
private final SpannerStub spannerStub;
@@ -307,31 +310,37 @@ public GapicSpannerRpc(final SpannerOptions options) {
307310
.build());
308311
// First check if SpannerOptions provides a TransportChannerProvider. Create one
309312
// with information gathered from SpannerOptions if none is provided
313+
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
314+
InstantiatingGrpcChannelProvider.newBuilder()
315+
.setChannelConfigurator(options.getChannelConfigurator())
316+
.setEndpoint(options.getEndpoint())
317+
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
318+
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
319+
.setPoolSize(options.getNumChannels())
320+
.setExecutor(executorProvider.getExecutor())
321+
322+
// Set a keepalive time of 120 seconds to help long running
323+
// commit GRPC calls succeed
324+
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
325+
326+
// Then check if SpannerOptions provides an InterceptorProvider. Create a default
327+
// SpannerInterceptorProvider if none is provided
328+
.setInterceptorProvider(
329+
SpannerInterceptorProvider.create(
330+
MoreObjects.firstNonNull(
331+
options.getInterceptorProvider(),
332+
SpannerInterceptorProvider.createDefault()))
333+
.withEncoding(compressorName))
334+
.setHeaderProvider(mergedHeaderProvider);
335+
336+
// TODO(weiranf): Set to true by default once DirectPath goes to public beta.
337+
if (shouldAttemptDirectPath()) {
338+
defaultChannelProviderBuilder.setEndpoint(DIRECT_PATH_ENDPOINT).setAttemptDirectPath(true);
339+
}
340+
310341
TransportChannelProvider channelProvider =
311342
MoreObjects.firstNonNull(
312-
options.getChannelProvider(),
313-
InstantiatingGrpcChannelProvider.newBuilder()
314-
.setChannelConfigurator(options.getChannelConfigurator())
315-
.setEndpoint(options.getEndpoint())
316-
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
317-
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
318-
.setPoolSize(options.getNumChannels())
319-
.setExecutor(executorProvider.getExecutor())
320-
321-
// Set a keepalive time of 120 seconds to help long running
322-
// commit GRPC calls succeed
323-
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
324-
325-
// Then check if SpannerOptions provides an InterceptorProvider. Create a default
326-
// SpannerInterceptorProvider if none is provided
327-
.setInterceptorProvider(
328-
SpannerInterceptorProvider.create(
329-
MoreObjects.firstNonNull(
330-
options.getInterceptorProvider(),
331-
SpannerInterceptorProvider.createDefault()))
332-
.withEncoding(compressorName))
333-
.setHeaderProvider(mergedHeaderProvider)
334-
.build());
343+
options.getChannelProvider(), defaultChannelProviderBuilder.build());
335344

336345
CredentialsProvider credentialsProvider =
337346
GrpcTransportOptions.setUpCredentialsProvider(options);
@@ -422,6 +431,11 @@ public GapicSpannerRpc(final SpannerOptions options) {
422431
}
423432
}
424433

434+
// TODO(weiranf): Remove this once DirectPath goes to public beta.
435+
private static boolean shouldAttemptDirectPath() {
436+
return Boolean.getBoolean("spanner.attempt_directpath");
437+
}
438+
425439
private static void checkEmulatorConnection(
426440
SpannerOptions options,
427441
TransportChannelProvider channelProvider,

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@
2626
import io.grpc.ClientInterceptor;
2727
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
2828
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
29+
import io.grpc.Grpc;
2930
import io.grpc.Metadata;
3031
import io.grpc.MethodDescriptor;
3132
import io.grpc.Status;
3233
import java.io.FileInputStream;
3334
import java.io.IOException;
35+
import java.net.InetAddress;
36+
import java.net.InetSocketAddress;
37+
import java.net.SocketAddress;
3438
import java.util.Random;
3539
import java.util.concurrent.atomic.AtomicBoolean;
3640

@@ -41,6 +45,12 @@ public class GceTestEnvConfig implements TestEnvConfig {
4145
public static final String GCE_CREDENTIALS_FILE = "spanner.gce.config.credentials_file";
4246
public static final String GCE_STREAM_BROKEN_PROBABILITY =
4347
"spanner.gce.config.stream_broken_probability";
48+
public static final String ATTEMPT_DIRECT_PATH = "spanner.attempt_directpath";
49+
public static final String DIRECT_PATH_TEST_SCENARIO = "spanner.directpath_test_scenario";
50+
51+
// IP address prefixes allocated for DirectPath backends.
52+
public static final String DP_IPV6_PREFIX = "2001:4860:8040";
53+
public static final String DP_IPV4_PREFIX = "34.126";
4454

4555
private final SpannerOptions options;
4656

@@ -51,6 +61,8 @@ public GceTestEnvConfig() {
5161
double errorProbability =
5262
Double.parseDouble(System.getProperty(GCE_STREAM_BROKEN_PROBABILITY, "0.0"));
5363
checkState(errorProbability <= 1.0);
64+
boolean attemptDirectPath = Boolean.getBoolean(ATTEMPT_DIRECT_PATH);
65+
String directPathTestScenario = System.getProperty(DIRECT_PATH_TEST_SCENARIO, "");
5466
SpannerOptions.Builder builder =
5567
SpannerOptions.newBuilder().setAutoThrottleAdministrativeRequests();
5668
if (!projectId.isEmpty()) {
@@ -66,12 +78,14 @@ public GceTestEnvConfig() {
6678
throw new RuntimeException(e);
6779
}
6880
}
69-
options =
70-
builder
71-
.setInterceptorProvider(
72-
SpannerInterceptorProvider.createDefault()
73-
.with(new GrpcErrorInjector(errorProbability)))
74-
.build();
81+
SpannerInterceptorProvider interceptorProvider =
82+
SpannerInterceptorProvider.createDefault().with(new GrpcErrorInjector(errorProbability));
83+
if (attemptDirectPath) {
84+
interceptorProvider =
85+
interceptorProvider.with(new DirectPathAddressCheckInterceptor(directPathTestScenario));
86+
}
87+
builder.setInterceptorProvider(interceptorProvider);
88+
options = builder.build();
7589
}
7690

7791
@Override
@@ -87,6 +101,7 @@ public void tearDown() {}
87101

88102
/** Injects errors in streaming calls to simulate call restarts */
89103
private static class GrpcErrorInjector implements ClientInterceptor {
104+
90105
private final double errorProbability;
91106
private final Random random = new Random();
92107

@@ -140,4 +155,64 @@ private boolean mayInjectError() {
140155
return random.nextDouble() < errorProbability;
141156
}
142157
}
158+
159+
/**
160+
* Captures the request attributes "Grpc.TRANSPORT_ATTR_REMOTE_ADDR" when connection is
161+
* established and verifies if the remote address is a DirectPath address. This is only used for
162+
* DirectPath testing. {@link ClientCall#getAttributes()}
163+
*/
164+
private static class DirectPathAddressCheckInterceptor implements ClientInterceptor {
165+
private final String directPathTestScenario;
166+
167+
DirectPathAddressCheckInterceptor(String directPathTestScenario) {
168+
this.directPathTestScenario = directPathTestScenario;
169+
}
170+
171+
@Override
172+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
173+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
174+
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
175+
return new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
176+
@Override
177+
public void start(Listener<RespT> responseListener, Metadata headers) {
178+
super.start(
179+
new SimpleForwardingClientCallListener<RespT>(responseListener) {
180+
@Override
181+
public void onHeaders(Metadata headers) {
182+
// Check peer IP after connection is established.
183+
SocketAddress remoteAddr =
184+
clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
185+
if (!verifyRemoteAddress(remoteAddr)) {
186+
throw new RuntimeException(
187+
String.format(
188+
"Synthetically aborting the current request because it did not adhere"
189+
+ " to the test environment's requirement for DirectPath."
190+
+ " Expected test for DirectPath %s scenario,"
191+
+ " but RPC was destined for %s",
192+
directPathTestScenario, remoteAddr.toString()));
193+
}
194+
super.onHeaders(headers);
195+
}
196+
},
197+
headers);
198+
}
199+
};
200+
}
201+
202+
private boolean verifyRemoteAddress(SocketAddress remoteAddr) {
203+
if (remoteAddr instanceof InetSocketAddress) {
204+
InetAddress inetAddress = ((InetSocketAddress) remoteAddr).getAddress();
205+
String addr = inetAddress.getHostAddress();
206+
if (directPathTestScenario.equals("ipv4")) {
207+
// For ipv4-only VM, client should connect to ipv4 DirectPath addresses.
208+
return addr.startsWith(DP_IPV4_PREFIX);
209+
} else if (directPathTestScenario.equals("ipv6")) {
210+
// For ipv6-enabled VM, client could connect to either ipv4 or ipv6 DirectPath addresses.
211+
return addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
212+
}
213+
}
214+
// For all other scenarios(e.g. fallback), we should allow both DirectPath and CFE addresses.
215+
return true;
216+
}
217+
}
143218
}

0 commit comments

Comments
 (0)