Fix hudi connector gets stuck #19506#20027
Conversation
|
Hi @electrum |
Hi @willzgw, thank you very much for your contribution! This helps a lot. Just a suggestion for your question: In the hive connector there is this property called "hive.ignore-absent-partitions" with default "false" to switch the behavior. Maybe you could implement the same for the hudi connector? |
|
#20151 fixes the underlying problem of Trino failing on empty Hudi partitions. So when this PR gets merged, there should be no reason to handle it here. --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
@@ -14,6 +14,7 @@
package io.trino.plugin.hudi.split;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import io.trino.plugin.hive.util.AsyncQueue;
import io.trino.plugin.hudi.HudiTableHandle;
import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader;
@@ -28,8 +29,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
import static java.util.Objects.requireNonNull;
@@ -66,7 +67,7 @@ public class HudiBackgroundSplitLoader
{
Deque<String> partitionQueue = new ConcurrentLinkedDeque<>(partitions);
List<HudiPartitionInfoLoader> splitGeneratorList = new ArrayList<>();
- List<Future> splitGeneratorFutures = new ArrayList<>();
+ List<ListenableFuture<Void>> splitGeneratorFutures = new ArrayList<>();
// Start a number of partition split generators to generate the splits in parallel
for (int i = 0; i < splitGeneratorNumThreads; i++) {
@@ -79,16 +80,18 @@ public class HudiBackgroundSplitLoader
// Let the split generator stop once the partition queue is empty
generator.stopRunning();
}
-
- // Wait for all split generators to finish
- for (Future future : splitGeneratorFutures) {
- try {
- future.get();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
- }
+ try {
+ // Wait for all split generators to finish
+ Futures.whenAllComplete(splitGeneratorFutures) // also succeeds when some tasks fail
+ .run(asyncQueue::finish, directExecutor())
+ .get(); // will throw an ExecutionException when one of the tasks failed
+ }
+ catch (ExecutionException e) {
+ throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e);
}
- asyncQueue.finish();
}
} |
668ba94 to
63a8e38
Compare
|
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
|
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
63a8e38 to
aeb9d4a
Compare
@willzgw Sorry for my late reply. We could create the table on Spark and add the contents under |
Hi @ebyhr |
|
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
|
Should we merge this PR given that you approved @electrum ? |
|
Do I see it right, that one then has to explicitly set |
aeb9d4a to
edad883
Compare
chenjian2664
left a comment
There was a problem hiding this comment.
Please rearrange the commits and squash to two, one is fixing the stuck issue, another is adding the session property.
43b214a to
ec9a0e2
Compare
ec9a0e2 to
c9c67ed
Compare
|
@electrum can we merge this? |
c9c67ed to
c58ab7e
Compare
a4bf784 to
7f7d329
Compare
7f7d329 to
3543d37
Compare
3543d37 to
61fb7ab
Compare
chenjian2664
left a comment
There was a problem hiding this comment.
LGTM % commit message
Revisit the first commit changes and think the commit message could be improved.
The changes is actual Use Futures.whenAllComplete to handle splitGeneratorFutures (in HudiBackgroundSplitLoader)
Put the commit body something like:
This commit improved async handling using `Futures.whenAllComplete`
to wait/handle all the splitGeneratorFutures instead of using
for-loop to handle the future one by one, the change guarantee the
`asyncQueue.finish()` always be called to avoid the split leak
61fb7ab to
dd40c1b
Compare
83cd249 to
f634ff2
Compare
This commit solves the problem of HUDI connector stucks by using ExceptionCallback to ensure the asyncQueue is closed when an exception occurs, and optimizes the handling of multiple Futures with Futures.whenAllComplete instead of for_loop.
|
Thank you @electrum @yihua @codope @chenjian2664 and @willzgw |
Description
ignore_absent_partitionsAdditional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: