4949import java .util .Collections ;
5050import java .util .Iterator ;
5151import java .util .List ;
52+ import java .util .concurrent .atomic .AtomicBoolean ;
5253import java .util .function .Function ;
5354import java .util .stream .Collectors ;
5455
@@ -60,7 +61,7 @@ class GrpcStruct extends Struct implements Serializable {
6061 private final List <Object > rowData ;
6162 private final DecodeMode decodeMode ;
6263 private final BitSet colDecoded ;
63- private boolean rowDecoded ;
64+ private final AtomicBoolean rowDecoded ;
6465
6566 /**
6667 * Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as a
@@ -224,7 +225,7 @@ private GrpcStruct(
224225 this .type = type ;
225226 this .rowData = rowData ;
226227 this .decodeMode = decodeMode ;
227- this .rowDecoded = rowDecoded ;
228+ this .rowDecoded = new AtomicBoolean ( rowDecoded ) ;
228229 this .colDecoded = colDecoded ;
229230 }
230231
@@ -234,29 +235,31 @@ public String toString() {
234235 }
235236
236237 boolean consumeRow (Iterator <com .google .protobuf .Value > iterator ) {
237- rowData .clear ();
238- if (decodeMode == DecodeMode .LAZY_PER_ROW ) {
239- rowDecoded = false ;
240- } else if (decodeMode == DecodeMode .LAZY_PER_COL ) {
241- colDecoded .clear ();
242- }
243- if (!iterator .hasNext ()) {
244- return false ;
245- }
246- for (Type .StructField fieldType : getType ().getStructFields ()) {
238+ synchronized (rowData ) {
239+ rowData .clear ();
240+ if (decodeMode == DecodeMode .LAZY_PER_ROW ) {
241+ rowDecoded .set (false );
242+ } else if (decodeMode == DecodeMode .LAZY_PER_COL ) {
243+ colDecoded .clear ();
244+ }
247245 if (!iterator .hasNext ()) {
248- throw newSpannerException (
249- ErrorCode .INTERNAL ,
250- "Invalid value stream: end of stream reached before row is complete" );
246+ return false ;
251247 }
252- com .google .protobuf .Value value = iterator .next ();
253- if (decodeMode == DecodeMode .DIRECT ) {
254- rowData .add (decodeValue (fieldType .getType (), value ));
255- } else {
256- rowData .add (value );
248+ for (Type .StructField fieldType : getType ().getStructFields ()) {
249+ if (!iterator .hasNext ()) {
250+ throw newSpannerException (
251+ ErrorCode .INTERNAL ,
252+ "Invalid value stream: end of stream reached before row is complete" );
253+ }
254+ com .google .protobuf .Value value = iterator .next ();
255+ if (decodeMode == DecodeMode .DIRECT ) {
256+ rowData .add (decodeValue (fieldType .getType (), value ));
257+ } else {
258+ rowData .add (value );
259+ }
257260 }
261+ return true ;
258262 }
259- return true ;
260263 }
261264
262265 private static Object decodeValue (Type fieldType , com .google .protobuf .Value proto ) {
@@ -367,12 +370,16 @@ private static void checkType(
367370 }
368371
369372 Struct immutableCopy () {
370- return new GrpcStruct (
371- type ,
372- new ArrayList <>(rowData ),
373- this .decodeMode ,
374- this .rowDecoded ,
375- this .colDecoded == null ? null : (BitSet ) this .colDecoded .clone ());
373+ synchronized (rowData ) {
374+ return new GrpcStruct (
375+ type ,
376+ this .decodeMode == DecodeMode .DIRECT
377+ ? new ArrayList <>(rowData )
378+ : Collections .synchronizedList (new ArrayList <>(rowData )),
379+ this .decodeMode ,
380+ this .rowDecoded .get (),
381+ this .colDecoded == null ? null : (BitSet ) this .colDecoded .clone ());
382+ }
376383 }
377384
378385 @ Override
@@ -382,9 +389,14 @@ public Type getType() {
382389
383390 @ Override
384391 public boolean isNull (int columnIndex ) {
385- if ((decodeMode == DecodeMode .LAZY_PER_ROW && !rowDecoded )
386- || (decodeMode == DecodeMode .LAZY_PER_COL && !colDecoded .get (columnIndex ))) {
387- return ((com .google .protobuf .Value ) rowData .get (columnIndex )).hasNullValue ();
392+ if (decodeMode == DecodeMode .LAZY_PER_ROW || decodeMode == DecodeMode .LAZY_PER_COL ) {
393+ synchronized (rowData ) {
394+ if ((decodeMode == DecodeMode .LAZY_PER_ROW && !rowDecoded .get ())
395+ || (decodeMode == DecodeMode .LAZY_PER_COL && !colDecoded .get (columnIndex ))) {
396+ return ((com .google .protobuf .Value ) rowData .get (columnIndex )).hasNullValue ();
397+ }
398+ return rowData .get (columnIndex ) == null ;
399+ }
388400 }
389401 return rowData .get (columnIndex ) == null ;
390402 }
@@ -496,14 +508,18 @@ private boolean isUnrecognizedType(int columnIndex) {
496508 }
497509
498510 boolean canGetProtoValue (int columnIndex ) {
499- return isUnrecognizedType (columnIndex )
500- || (decodeMode == DecodeMode .LAZY_PER_ROW && !rowDecoded )
501- || (decodeMode == DecodeMode .LAZY_PER_COL && !colDecoded .get (columnIndex ));
511+ synchronized (rowData ) {
512+ return isUnrecognizedType (columnIndex )
513+ || (decodeMode == DecodeMode .LAZY_PER_ROW && !rowDecoded .get ())
514+ || (decodeMode == DecodeMode .LAZY_PER_COL && !colDecoded .get (columnIndex ));
515+ }
502516 }
503517
504518 protected com .google .protobuf .Value getProtoValueInternal (int columnIndex ) {
505- checkProtoValueSupported (columnIndex );
506- return (com .google .protobuf .Value ) rowData .get (columnIndex );
519+ synchronized (rowData ) {
520+ checkProtoValueSupported (columnIndex );
521+ return (com .google .protobuf .Value ) rowData .get (columnIndex );
522+ }
507523 }
508524
509525 private void checkProtoValueSupported (int columnIndex ) {
@@ -515,30 +531,56 @@ private void checkProtoValueSupported(int columnIndex) {
515531 decodeMode != DecodeMode .DIRECT ,
516532 "Getting proto value is not supported when DecodeMode#DIRECT is used." );
517533 Preconditions .checkState (
518- !(decodeMode == DecodeMode .LAZY_PER_ROW && rowDecoded ),
534+ !(decodeMode == DecodeMode .LAZY_PER_ROW && rowDecoded . get () ),
519535 "Getting proto value after the row has been decoded is not supported." );
520536 Preconditions .checkState (
521537 !(decodeMode == DecodeMode .LAZY_PER_COL && colDecoded .get (columnIndex )),
522538 "Getting proto value after the column has been decoded is not supported." );
523539 }
524540
525541 private void ensureDecoded (int columnIndex ) {
526- if (decodeMode == DecodeMode .LAZY_PER_ROW && !rowDecoded ) {
527- for (int i = 0 ; i < rowData .size (); i ++) {
528- rowData .set (
529- i ,
530- decodeValue (
531- type .getStructFields ().get (i ).getType (),
532- (com .google .protobuf .Value ) rowData .get (i )));
542+ if (decodeMode == DecodeMode .LAZY_PER_ROW ) {
543+ synchronized (rowData ) {
544+ if (!rowDecoded .get ()) {
545+ for (int i = 0 ; i < rowData .size (); i ++) {
546+ rowData .set (
547+ i ,
548+ decodeValue (
549+ type .getStructFields ().get (i ).getType (),
550+ (com .google .protobuf .Value ) rowData .get (i )));
551+ }
552+ }
553+ rowDecoded .set (true );
554+ }
555+ } else if (decodeMode == DecodeMode .LAZY_PER_COL ) {
556+ boolean decoded ;
557+ Object value ;
558+ synchronized (rowData ) {
559+ decoded = colDecoded .get (columnIndex );
560+ value = rowData .get (columnIndex );
561+ }
562+ if (!decoded ) {
563+ // Use the column as a lock during decoding to ensure that we decode once (mostly), but also
564+ // that multiple different columns can be decoded in parallel if requested.
565+ synchronized (type .getStructFields ().get (columnIndex )) {
566+ // Note: It can be that we decode the value twice if two threads request this at the same
567+ // time, but the synchronization on rowData above and below makes sure that we always get
568+ // and set a consistent value (and only set it once).
569+ if (!colDecoded .get (columnIndex )) {
570+ value =
571+ decodeValue (
572+ type .getStructFields ().get (columnIndex ).getType (),
573+ (com .google .protobuf .Value ) value );
574+ decoded = true ;
575+ }
576+ }
577+ if (decoded ) {
578+ synchronized (rowData ) {
579+ rowData .set (columnIndex , value );
580+ colDecoded .set (columnIndex );
581+ }
582+ }
533583 }
534- rowDecoded = true ;
535- } else if (decodeMode == DecodeMode .LAZY_PER_COL && !colDecoded .get (columnIndex )) {
536- rowData .set (
537- columnIndex ,
538- decodeValue (
539- type .getStructFields ().get (columnIndex ).getType (),
540- (com .google .protobuf .Value ) rowData .get (columnIndex )));
541- colDecoded .set (columnIndex );
542584 }
543585 }
544586
0 commit comments