The Wayback Machine - https://web.archive.org/web/20220125174245/https://github.com/apache/kafka/pull/10814
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12888; Add transaction tool from KIP-664 #10814

Merged
merged 6 commits into from Jun 22, 2021
Merged

Conversation

@hachikuji
Copy link
Contributor

@hachikuji hachikuji commented Jun 4, 2021

This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the --find-hanging implementation, which will be left for a subsequent patch.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
Copy link
Contributor

@abbccdda abbccdda left a comment

Build fails with

[2021-06-04T01:24:08.975Z] > Task :shell:compileJava FAILED

[2021-06-04T01:24:08.975Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10814/shell/src/main/java/org/apache/kafka/shell/Commands.java:26: error: cannot find symbol

[2021-06-04T01:24:08.975Z] import net.sourceforge.argparse4j.internal.HelpScreenException;

[2021-06-04T01:24:08.975Z]                                           ^

[2021-06-04T01:24:08.975Z]   symbol:   class HelpScreenException

[2021-06-04T01:24:08.975Z]   location: package net.sourceforge.argparse4j.internal

[2021-06-04T01:24:08.975Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10814/shell/src/main/java/org/apache/kafka/shell/Commands.java:98: warning: [deprecation] newArgumentParser(java.lang.String,boolean) in net.sourceforge.argparse4j.ArgumentParsers has been deprecated

[2021-06-04T01:24:08.975Z]         this.parser = ArgumentParsers.newArgumentParser("", false);

[2021-06-04T01:24:08.975Z]                                      ^

[2021-06-04T01:24:08.975Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10814/shell/src/main/java/org/apache/kafka/shell/Commands.java:137: error: cannot find symbol

[2021-06-04T01:24:08.975Z]         } catch (HelpScreenException e) {

[2021-06-04T01:24:08.975Z]                  ^

[2021-06-04T01:24:08.975Z]   symbol:   class HelpScreenException

[2021-06-04T01:24:08.975Z]   location: class org.apache.kafka.shell.Commands
@hachikuji
Copy link
Contributor Author

@hachikuji hachikuji commented Jun 4, 2021

@abbccdda Thanks. Fallout from the upgrade to argparse4j. I will push another commit today to fix it.

Copy link
Contributor

@abbccdda abbccdda left a comment

Thanks for the PR, overall LGTM, left a couple of comments

gradle/dependencies.gradle Outdated Show resolved Hide resolved
}
}

static class DescribeProducersCommand extends TransactionsCommand {
Copy link
Contributor

@abbccdda abbccdda Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we define subclasses in their corresponding files instead of squeezing all of them into one file? Even better, we could get a sub-dir called transaction to contain all of them

Copy link
Contributor Author

@hachikuji hachikuji Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I understand where you're coming from, but I prefer to leave them here since they have no usage outside of TransactionsCommand. It's not really a similar situation to KafkaApis for example because there won't be an ever-growing set of sub-commands. The current size of the file around 600 lines does not seem crazy.

.type(Integer.class)
.required(false);

subparser.addArgument("--topic")
Copy link
Contributor

@abbccdda abbccdda Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: --topic and --partition could be extracted as helper static functions

Copy link
Contributor Author

@hachikuji hachikuji Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered doing this, but it felt like the benefit was small. Different tools have different requirements even if they use the same name. For example, --partition is not always a required argument in many commands.

TopicPartition topicPartition,
long startOffset
) throws Exception {
final DescribeProducersResult.PartitionProducerState result;
Copy link
Contributor

@abbccdda abbccdda Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we don't do the verification? Is this extra round trip necessary?

Copy link
Contributor Author

@hachikuji hachikuji Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose here is to collect additional information about the transaction that needs to be aborted. We need this in order to build the WriteTxnMarker request.

// as seen in the `DescribeProducers` output. In this case, we conservatively
// use a coordinator epoch of 0, which is less than or equal to any possible
// leader epoch.
if (coordinatorEpoch < 0) {
Copy link
Contributor

@abbccdda abbccdda Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does -1 apply to only new hanging producers? It doesn't sound like we will not meet any other epoch loss scenario in the reply which causing setting epoch to 0 always mismatch to the state on broker. Should we consider using -1 to skip the epoch check on broker, and proceed as a force delete?

Copy link
Contributor Author

@hachikuji hachikuji Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the advantage of doing so? One nice thing about this implementation is that it is compatible with older brokers which require a non-negative coordinator epoch.

List<List<String>> table = readOutputAsTable();
assertEquals(4, table.size());

// Assert expected headers
Copy link
Contributor

@abbccdda abbccdda Jun 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could just build the expected result as a whole set and compare

Copy link
Contributor Author

@hachikuji hachikuji Jun 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it this way so that we could validate that the headers came first.

@hachikuji
Copy link
Contributor Author

@hachikuji hachikuji commented Jun 15, 2021

@abbccdda Thanks for reviewing. I pushed an update and responded to a few comments. I think I remember why I wanted to upgrade argparse4j. Using 0.7.1, the --help argument displays the following:

This tool is used to analyze the transactional state  of  producers in the cluster. It can be used to
detect and recover from hanging transactions.

optional arguments:
  -h, --help             show this help message and exit
  -v, --version          show the version of this Kafka distribution and exit
  --command-config FILE  property file containing configs to be passed to admin client
  --bootstrap-server host:port
                         hostname and port for the  broker  to  connect  to,  in the form `host:port`
                         (multiple comma-separated entries can be given)

commands:
  COMMAND
    list                 list transactions
    describe             describe the state of an active transactional-id
    describe-producers   describe the states of active producers for a topic partition
    abort                abort a hanging transaction (requires administrative privileges)

Note that --bootstrap-servers is listed as an optional argument event though it is actually required. In 0.8.1, the help output calls these "named arguments" instead which is less likely to confuse. Anyway, we can still do this upgrade separately since it is a minor issue.

Copy link
Contributor

@dajac dajac left a comment

@hachikuji I made a first pass. Overall, it looks pretty good. I left a few comments/questions.

bin/kafka-transactions.sh Show resolved Hide resolved
if (startOffset == null && producerId == null) {
printErrorAndExit("The transaction to abort must be identified either with " +
"--start-offset (for newer brokers) or with " +
"--producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)");
return;
}
Copy link
Contributor

@dajac dajac Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could leverage the argument lib to handle such cases. It seems that the lib supports MutuallyExclusiveGroup. Have you already considered this?

Copy link
Contributor Author

@hachikuji hachikuji Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I recall trying this, but I do not think mutually exclusive groups allow the use of argument groups. If you know how to do it, please let me know.

.dest("command")
.title("commands")
.metavar("COMMAND");
commands.forEach(command -> command.addSubparser(subparsers));
Copy link
Contributor

@dajac dajac Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using commands is a really good idea. It makes the implementation so much clearer. One point to clarify is wether we want to prefix commands with -- in order to remain consistent with the other command line tools. What is your take on this? It seems that we defined aliases for commands so we might be able to have both.

Copy link
Contributor Author

@hachikuji hachikuji Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. I guess I can't object to being more consistent with other tools.

Copy link
Contributor Author

@hachikuji hachikuji Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some trouble getting this to work. The parser seems to get the commands confused with arguments if we precede them by --. I'll see if I can find a workaround.

Copy link
Contributor Author

@hachikuji hachikuji Jun 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't come up with a great way to do this. The best idea I thought of is to preprocess the arguments to strip off the -- from the commands before passing them into argparse4j. That works, but then the help output is inconsistent. An alternative is to discard the use of subparsers so that the arguments are all in one flattened list. That would be kind of a shame since then we'd lose (or have to rewrite) the contextualized help. I guess I'm inclined to leave it as is, but let me know what you think.

Copy link
Contributor

@dajac dajac Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at this as well and I could not find a clean way to do this. Overall, I believe that using commands is a great move for the future so I am also fine with leaving it as is. I think that we should aim for reworking the other tools to use a similar pattern in the future. We could perhaps file a JIRA to not forget about it.

Copy link
Contributor

@abbccdda abbccdda left a comment

LGTM, thanks for the patch!

dajac
dajac approved these changes Jun 22, 2021
Copy link
Contributor

@dajac dajac left a comment

LGTM

@hachikuji hachikuji merged commit fce7715 into apache:trunk Jun 22, 2021
4 of 12 checks passed
xdgrulez pushed a commit to xdgrulez/kafka that referenced this issue Dec 22, 2021
This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the `--find-hanging` implementation, which will be left for a subsequent patch.

Reviewers: Boyang Chen <boyang@apache.org>, David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
3 participants