1818
1919import com .google .api .core .ApiFuture ;
2020import com .google .api .core .ApiFutures ;
21- import com .google .api .core .ListenableFutureToApiFuture ;
2221import com .google .api .core .SettableApiFuture ;
2322import com .google .api .gax .core .ExecutorProvider ;
2423import com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
2928import com .google .common .collect .ImmutableList ;
3029import com .google .common .util .concurrent .ListeningScheduledExecutorService ;
3130import com .google .common .util .concurrent .MoreExecutors ;
31+ import com .google .spanner .v1 .PartialResultSet ;
3232import com .google .spanner .v1 .ResultSetMetadata ;
3333import com .google .spanner .v1 .ResultSetStats ;
3434import java .util .Collection ;
3535import java .util .LinkedList ;
3636import java .util .List ;
3737import java .util .concurrent .BlockingDeque ;
38- import java .util .concurrent .Callable ;
3938import java .util .concurrent .CountDownLatch ;
4039import java .util .concurrent .ExecutionException ;
4140import java .util .concurrent .Executor ;
4544import java .util .logging .Logger ;
4645
4746/** Default implementation for {@link AsyncResultSet}. */
48- class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
47+ class AsyncResultSetImpl extends ForwardingStructReader
48+ implements ListenableAsyncResultSet , AsyncResultSet .StreamMessageListener {
4949 private static final Logger log = Logger .getLogger (AsyncResultSetImpl .class .getName ());
5050
5151 /** State of an {@link AsyncResultSetImpl}. */
5252 private enum State {
5353 INITIALIZED ,
54+ STREAMING_INITIALIZED ,
5455 /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5556 SYNC ,
5657 CONSUMING ,
@@ -115,12 +116,15 @@ private enum State {
115116
116117 private State state = State .INITIALIZED ;
117118
119+ /** This variable indicates that produce rows thread is initiated */
120+ private volatile boolean produceRowsInitiated ;
121+
118122 /**
119123 * This variable indicates whether all the results from the underlying result set have been read.
120124 */
121125 private volatile boolean finished ;
122126
123- private volatile ApiFuture <Void > result ;
127+ private volatile SettableApiFuture <Void > result ;
124128
125129 /**
126130 * This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -329,12 +333,12 @@ public void run() {
329333 private final CallbackRunnable callbackRunnable = new CallbackRunnable ();
330334
331335 /**
332- * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in
336+ * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in
333337 * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
334338 */
335- private class ProduceRowsCallable implements Callable < Void > {
339+ private class ProduceRowsRunnable implements Runnable {
336340 @ Override
337- public Void call () throws Exception {
341+ public void run () {
338342 boolean stop = false ;
339343 boolean hasNext = false ;
340344 try {
@@ -393,12 +397,17 @@ public Void call() throws Exception {
393397 }
394398 // Call the callback if there are still rows in the buffer that need to be processed.
395399 while (!stop ) {
396- waitIfPaused ();
397- startCallbackIfNecessary ();
398- // Make sure we wait until the callback runner has actually finished.
399- consumingLatch .await ();
400- synchronized (monitor ) {
401- stop = cursorReturnedDoneOrException ;
400+ try {
401+ waitIfPaused ();
402+ startCallbackIfNecessary ();
403+ // Make sure we wait until the callback runner has actually finished.
404+ consumingLatch .await ();
405+ synchronized (monitor ) {
406+ stop = cursorReturnedDoneOrException ;
407+ }
408+ } catch (Throwable e ) {
409+ result .setException (e );
410+ return ;
402411 }
403412 }
404413 } finally {
@@ -410,14 +419,14 @@ public Void call() throws Exception {
410419 }
411420 synchronized (monitor ) {
412421 if (executionException != null ) {
413- throw executionException ;
414- }
415- if (state == State .CANCELLED ) {
416- throw CANCELLED_EXCEPTION ;
422+ result .setException (executionException );
423+ } else if (state == State .CANCELLED ) {
424+ result .setException (CANCELLED_EXCEPTION );
425+ } else {
426+ result .set (null );
417427 }
418428 }
419429 }
420- return null ;
421430 }
422431
423432 private void waitIfPaused () throws InterruptedException {
@@ -449,6 +458,26 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
449458 }
450459 }
451460
461+ private class InitiateStreamingRunnable implements Runnable {
462+
463+ @ Override
464+ public void run () {
465+ try {
466+ // This method returns true if the underlying result set is a streaming result set (e.g. a
467+ // GrpcResultSet).
468+ // Those result sets will trigger initiateProduceRows() when the first results are received.
469+ // Non-streaming result sets do not trigger this callback, and for those result sets, we
470+ // need to eagerly start the ProduceRowsRunnable.
471+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
472+ initiateProduceRows ();
473+ }
474+ } catch (Throwable exception ) {
475+ executionException = SpannerExceptionFactory .asSpannerException (exception );
476+ initiateProduceRows ();
477+ }
478+ }
479+ }
480+
452481 /** Sets the callback for this {@link AsyncResultSet}. */
453482 @ Override
454483 public ApiFuture <Void > setCallback (Executor exec , ReadyCallback cb ) {
@@ -458,16 +487,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
458487 this .state == State .INITIALIZED , "callback may not be set multiple times" );
459488
460489 // Start to fetch data and buffer these.
461- this .result =
462- new ListenableFutureToApiFuture <>(this .service .submit (new ProduceRowsCallable ()));
490+ this .result = SettableApiFuture .create ();
491+ this .state = State .STREAMING_INITIALIZED ;
492+ this .service .execute (new InitiateStreamingRunnable ());
463493 this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
464494 this .callback = Preconditions .checkNotNull (cb );
465- this .state = State .RUNNING ;
466495 pausedLatch .countDown ();
467496 return result ;
468497 }
469498 }
470499
500+ private void initiateProduceRows () {
501+ if (this .state == State .STREAMING_INITIALIZED ) {
502+ this .state = State .RUNNING ;
503+ }
504+ produceRowsInitiated = true ;
505+ this .service .execute (new ProduceRowsRunnable ());
506+ }
507+
471508 Future <Void > getResult () {
472509 return result ;
473510 }
@@ -578,6 +615,10 @@ public ResultSetMetadata getMetadata() {
578615 return delegateResultSet .get ().getMetadata ();
579616 }
580617
618+ boolean initiateStreaming (StreamMessageListener streamMessageListener ) {
619+ return StreamingUtil .initiateStreaming (delegateResultSet .get (), streamMessageListener );
620+ }
621+
581622 @ Override
582623 protected void checkValidState () {
583624 synchronized (monitor ) {
@@ -593,4 +634,22 @@ public Struct getCurrentRowAsStruct() {
593634 checkValidState ();
594635 return currentRow ;
595636 }
637+
638+ @ Override
639+ public void onStreamMessage (PartialResultSet partialResultSet , boolean bufferIsFull ) {
640+ synchronized (monitor ) {
641+ if (produceRowsInitiated ) {
642+ return ;
643+ }
644+ // if PartialResultSet contains a resume token or buffer size is full, or
645+ // we have reached the end of the stream, we can start the thread.
646+ boolean startJobThread =
647+ !partialResultSet .getResumeToken ().isEmpty ()
648+ || bufferIsFull
649+ || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
650+ if (startJobThread || state != State .STREAMING_INITIALIZED ) {
651+ initiateProduceRows ();
652+ }
653+ }
654+ }
596655}
0 commit comments