17
17
package com .google .cloud .spanner .spi .v1 ;
18
18
19
19
import static org .junit .Assert .assertEquals ;
20
+ import static org .junit .Assert .assertNotNull ;
20
21
21
22
import com .google .auth .oauth2 .AccessToken ;
22
23
import com .google .auth .oauth2 .OAuth2Credentials ;
27
28
import com .google .cloud .spanner .Spanner ;
28
29
import com .google .cloud .spanner .SpannerOptions ;
29
30
import com .google .cloud .spanner .Statement ;
31
+ import com .google .common .base .MoreObjects ;
32
+ import com .google .common .base .Preconditions ;
30
33
import com .google .protobuf .ListValue ;
31
34
import com .google .spanner .v1 .ResultSetMetadata ;
32
35
import com .google .spanner .v1 .StructType ;
47
50
import io .opencensus .tags .TagValue ;
48
51
import java .io .IOException ;
49
52
import java .net .InetSocketAddress ;
50
- import java .util .HashMap ;
51
53
import java .util .List ;
52
54
import java .util .Map ;
53
55
import java .util .Random ;
@@ -80,26 +82,22 @@ public class GfeLatencyTest {
80
82
81
83
private static MockSpannerServiceImpl mockSpanner ;
82
84
private static Server server ;
83
- private static InetSocketAddress address ;
84
85
private static Spanner spanner ;
85
86
private static DatabaseClient databaseClient ;
86
87
87
- private static final Map <SpannerRpc .Option , Object > optionsMap = new HashMap <>();
88
-
89
88
private static MockSpannerServiceImpl mockSpannerNoHeader ;
90
89
private static Server serverNoHeader ;
91
- private static InetSocketAddress addressNoHeader ;
92
90
private static Spanner spannerNoHeader ;
93
91
private static DatabaseClient databaseClientNoHeader ;
94
92
95
- private static String instanceId = "fake-instance" ;
96
- private static String databaseId = "fake-database" ;
97
- private static String projectId = "fake-project" ;
93
+ private static final String INSTANCE_ID = "fake-instance" ;
94
+ private static final String DATABASE_ID = "fake-database" ;
95
+ private static final String PROJECT_ID = "fake-project" ;
98
96
99
- private static final long WAIT_FOR_METRICS_TIME_MS = 1_000 ;
100
- private static final int MAXIMUM_RETRIES = 5 ;
97
+ private static final int MAXIMUM_RETRIES = 50000 ;
101
98
102
- private static AtomicInteger fakeServerTiming = new AtomicInteger (new Random ().nextInt (1000 ) + 1 );
99
+ private static final AtomicInteger FAKE_SERVER_TIMING =
100
+ new AtomicInteger (new Random ().nextInt (1000 ) + 1 );
103
101
104
102
private static final Statement SELECT1AND2 =
105
103
Statement .of ("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1" );
@@ -135,6 +133,7 @@ public class GfeLatencyTest {
135
133
136
134
@ BeforeClass
137
135
public static void startServer () throws IOException {
136
+ //noinspection deprecation
138
137
SpannerRpcViews .registerGfeLatencyAndHeaderMissingCountViews ();
139
138
140
139
mockSpanner = new MockSpannerServiceImpl ();
@@ -143,7 +142,7 @@ public static void startServer() throws IOException {
143
142
MockSpannerServiceImpl .StatementResult .query (SELECT1AND2 , SELECT1_RESULTSET ));
144
143
mockSpanner .putStatementResult (
145
144
MockSpannerServiceImpl .StatementResult .update (UPDATE_FOO_STATEMENT , 1L ));
146
- address = new InetSocketAddress ("localhost" , 0 );
145
+ InetSocketAddress address = new InetSocketAddress ("localhost" , 0 );
147
146
server =
148
147
NettyServerBuilder .forAddress (address )
149
148
.addService (mockSpanner )
@@ -161,7 +160,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
161
160
public void sendHeaders (Metadata headers ) {
162
161
headers .put (
163
162
Metadata .Key .of ("server-timing" , Metadata .ASCII_STRING_MARSHALLER ),
164
- String .format ("gfet4t7; dur=%d" , fakeServerTiming .get ()));
163
+ String .format ("gfet4t7; dur=%d" , FAKE_SERVER_TIMING .get ()));
165
164
super .sendHeaders (headers );
166
165
}
167
166
},
@@ -170,25 +169,24 @@ public void sendHeaders(Metadata headers) {
170
169
})
171
170
.build ()
172
171
.start ();
173
- optionsMap .put (SpannerRpc .Option .CHANNEL_HINT , 1L );
174
172
spanner = createSpannerOptions (address , server ).getService ();
175
- databaseClient = spanner .getDatabaseClient (DatabaseId .of (projectId , instanceId , databaseId ));
173
+ databaseClient = spanner .getDatabaseClient (DatabaseId .of (PROJECT_ID , INSTANCE_ID , DATABASE_ID ));
176
174
177
175
mockSpannerNoHeader = new MockSpannerServiceImpl ();
178
176
mockSpannerNoHeader .setAbortProbability (0.0D );
179
177
mockSpannerNoHeader .putStatementResult (
180
178
MockSpannerServiceImpl .StatementResult .query (SELECT1AND2 , SELECT1_RESULTSET ));
181
179
mockSpannerNoHeader .putStatementResult (
182
180
MockSpannerServiceImpl .StatementResult .update (UPDATE_FOO_STATEMENT , 1L ));
183
- addressNoHeader = new InetSocketAddress ("localhost" , 0 );
181
+ InetSocketAddress addressNoHeader = new InetSocketAddress ("localhost" , 0 );
184
182
serverNoHeader =
185
183
NettyServerBuilder .forAddress (addressNoHeader )
186
184
.addService (mockSpannerNoHeader )
187
185
.build ()
188
186
.start ();
189
187
spannerNoHeader = createSpannerOptions (addressNoHeader , serverNoHeader ).getService ();
190
188
databaseClientNoHeader =
191
- spannerNoHeader .getDatabaseClient (DatabaseId .of (projectId , instanceId , databaseId ));
189
+ spannerNoHeader .getDatabaseClient (DatabaseId .of (PROJECT_ID , INSTANCE_ID , DATABASE_ID ));
192
190
}
193
191
194
192
@ AfterClass
@@ -221,12 +219,9 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException {
221
219
long latency =
222
220
getMetric (
223
221
SpannerRpcViews .SPANNER_GFE_LATENCY_VIEW ,
224
- projectId ,
225
- instanceId ,
226
- databaseId ,
227
222
"google.spanner.v1.Spanner/ExecuteStreamingSql" ,
228
223
false );
229
- assertEquals (fakeServerTiming .get (), latency );
224
+ assertEquals (FAKE_SERVER_TIMING .get (), latency );
230
225
}
231
226
232
227
@ Test
@@ -238,12 +233,9 @@ public void testGfeLatencyExecuteSql() throws InterruptedException {
238
233
long latency =
239
234
getMetric (
240
235
SpannerRpcViews .SPANNER_GFE_LATENCY_VIEW ,
241
- projectId ,
242
- instanceId ,
243
- databaseId ,
244
236
"google.spanner.v1.Spanner/ExecuteSql" ,
245
237
false );
246
- assertEquals (fakeServerTiming .get (), latency );
238
+ assertEquals (FAKE_SERVER_TIMING .get (), latency );
247
239
}
248
240
249
241
@ Test
@@ -254,9 +246,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
254
246
long count =
255
247
getMetric (
256
248
SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
257
- projectId ,
258
- instanceId ,
259
- databaseId ,
260
249
"google.spanner.v1.Spanner/ExecuteStreamingSql" ,
261
250
false );
262
251
assertEquals (0 , count );
@@ -267,9 +256,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
267
256
long count1 =
268
257
getMetric (
269
258
SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
270
- projectId ,
271
- instanceId ,
272
- databaseId ,
273
259
"google.spanner.v1.Spanner/ExecuteStreamingSql" ,
274
260
true );
275
261
assertEquals (1 , count1 );
@@ -283,9 +269,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
283
269
long count =
284
270
getMetric (
285
271
SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
286
- projectId ,
287
- instanceId ,
288
- databaseId ,
289
272
"google.spanner.v1.Spanner/ExecuteSql" ,
290
273
false );
291
274
assertEquals (0 , count );
@@ -296,9 +279,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
296
279
long count1 =
297
280
getMetric (
298
281
SpannerRpcViews .SPANNER_GFE_HEADER_MISSING_COUNT_VIEW ,
299
- projectId ,
300
- instanceId ,
301
- databaseId ,
302
282
"google.spanner.v1.Spanner/ExecuteSql" ,
303
283
true );
304
284
assertEquals (1 , count1 );
@@ -321,78 +301,75 @@ private static SpannerOptions createSpannerOptions(InetSocketAddress address, Se
321
301
}
322
302
323
303
private long getAggregationValueAsLong (AggregationData aggregationData ) {
324
- return aggregationData .match (
325
- new io .opencensus .common .Function <AggregationData .SumDataDouble , Long >() {
326
- @ Override
327
- public Long apply (AggregationData .SumDataDouble arg ) {
328
- return (long ) arg .getSum ();
329
- }
330
- },
331
- new io .opencensus .common .Function <AggregationData .SumDataLong , Long >() {
332
- @ Override
333
- public Long apply (AggregationData .SumDataLong arg ) {
334
- return arg .getSum ();
335
- }
336
- },
337
- new io .opencensus .common .Function <AggregationData .CountData , Long >() {
338
- @ Override
339
- public Long apply (AggregationData .CountData arg ) {
340
- return arg .getCount ();
341
- }
342
- },
343
- new io .opencensus .common .Function <AggregationData .DistributionData , Long >() {
344
- @ Override
345
- public Long apply (AggregationData .DistributionData arg ) {
346
- return (long ) arg .getMean ();
347
- }
348
- },
349
- new io .opencensus .common .Function <AggregationData .LastValueDataDouble , Long >() {
350
- @ Override
351
- public Long apply (AggregationData .LastValueDataDouble arg ) {
352
- return (long ) arg .getLastValue ();
353
- }
354
- },
355
- new io .opencensus .common .Function <AggregationData .LastValueDataLong , Long >() {
356
- @ Override
357
- public Long apply (AggregationData .LastValueDataLong arg ) {
358
- return arg .getLastValue ();
359
- }
360
- },
361
- new io .opencensus .common .Function <AggregationData , Long >() {
362
- @ Override
363
- public Long apply (AggregationData arg ) {
364
- throw new UnsupportedOperationException ();
365
- }
366
- });
304
+ return MoreObjects .firstNonNull (
305
+ aggregationData .match (
306
+ new io .opencensus .common .Function <AggregationData .SumDataDouble , Long >() {
307
+ @ Override
308
+ public Long apply (AggregationData .SumDataDouble arg ) {
309
+ return (long ) Preconditions .checkNotNull (arg ).getSum ();
310
+ }
311
+ },
312
+ new io .opencensus .common .Function <AggregationData .SumDataLong , Long >() {
313
+ @ Override
314
+ public Long apply (AggregationData .SumDataLong arg ) {
315
+ return Preconditions .checkNotNull (arg ).getSum ();
316
+ }
317
+ },
318
+ new io .opencensus .common .Function <AggregationData .CountData , Long >() {
319
+ @ Override
320
+ public Long apply (AggregationData .CountData arg ) {
321
+ return Preconditions .checkNotNull (arg ).getCount ();
322
+ }
323
+ },
324
+ new io .opencensus .common .Function <AggregationData .DistributionData , Long >() {
325
+ @ Override
326
+ public Long apply (AggregationData .DistributionData arg ) {
327
+ return (long ) Preconditions .checkNotNull (arg ).getMean ();
328
+ }
329
+ },
330
+ new io .opencensus .common .Function <AggregationData .LastValueDataDouble , Long >() {
331
+ @ Override
332
+ public Long apply (AggregationData .LastValueDataDouble arg ) {
333
+ return (long ) Preconditions .checkNotNull (arg ).getLastValue ();
334
+ }
335
+ },
336
+ new io .opencensus .common .Function <AggregationData .LastValueDataLong , Long >() {
337
+ @ Override
338
+ public Long apply (AggregationData .LastValueDataLong arg ) {
339
+ return Preconditions .checkNotNull (arg ).getLastValue ();
340
+ }
341
+ },
342
+ new io .opencensus .common .Function <AggregationData , Long >() {
343
+ @ Override
344
+ public Long apply (AggregationData arg ) {
345
+ throw new UnsupportedOperationException ();
346
+ }
347
+ }),
348
+ -1L );
367
349
}
368
350
369
- private long getMetric (
370
- View view ,
371
- String projectId ,
372
- String instanceId ,
373
- String databaseId ,
374
- String method ,
375
- boolean withOverride )
376
- throws InterruptedException {
351
+ private long getMetric (View view , String method , boolean withOverride ) {
377
352
List <TagValue > tagValues = new java .util .ArrayList <>();
378
353
for (TagKey column : view .getColumns ()) {
379
354
if (column == SpannerRpcViews .INSTANCE_ID ) {
380
- tagValues .add (TagValue .create (instanceId ));
355
+ tagValues .add (TagValue .create (INSTANCE_ID ));
381
356
} else if (column == SpannerRpcViews .DATABASE_ID ) {
382
- tagValues .add (TagValue .create (databaseId ));
357
+ tagValues .add (TagValue .create (DATABASE_ID ));
383
358
} else if (column == SpannerRpcViews .METHOD ) {
384
359
tagValues .add (TagValue .create (method ));
385
360
} else if (column == SpannerRpcViews .PROJECT_ID ) {
386
- tagValues .add (TagValue .create (projectId ));
361
+ tagValues .add (TagValue .create (PROJECT_ID ));
387
362
}
388
363
}
389
364
for (int i = 0 ; i < MAXIMUM_RETRIES ; i ++) {
390
- Thread .sleep ( WAIT_FOR_METRICS_TIME_MS );
365
+ Thread .yield ( );
391
366
ViewData viewData = SpannerRpcViews .viewManager .getView (view .getName ());
367
+ assertNotNull (viewData );
392
368
if (viewData .getAggregationMap () != null ) {
393
369
Map <List <TagValue >, AggregationData > aggregationMap = viewData .getAggregationMap ();
394
370
AggregationData aggregationData = aggregationMap .get (tagValues );
395
- if (withOverride && getAggregationValueAsLong (aggregationData ) == 0 ) {
371
+ if (aggregationData == null
372
+ || withOverride && getAggregationValueAsLong (aggregationData ) == 0 ) {
396
373
continue ;
397
374
}
398
375
return getAggregationValueAsLong (aggregationData );
0 commit comments