1

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception

2020-12-31 10:35:23.831 [objectOperator -> Sink: objectSink (1/1)] WARN o.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. while recovering transaction KafkaTransactionState [transactionalId=objectOperator -> Sink: objectSink-bcabd9b643c47ab46ace22db2e1285b6-3, producerId=14698, epoch=7]. Presumably this transaction has been already committed before 2020-12-31 10:35:23.919 [userOperator -> Sink: userSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - userOperator -> Sink: userSink (1/1) (2a5a171aa335f444740b4acfc7688d7c) switched from RUNNING to FAILED. org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2020-12-31 10:35:24.131 [objectOperator -> Sink: objectSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - objectOperator -> Sink: objectSink (1/1) (07fe747a81b31e016e88ea6331b31433) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

1 Answer 1

0

Each Producer is assigned a unique PID when it is initialized. This PID is transparent to the application and is not exposed to the user at all. For a given PID, the sequence number will increase from 0, and each Topic-Partition will have an independent sequence number. When the Producer sends data, it will identify a sequence number for each msg, and the Server will use this to verify whether the data is duplicated. The PID here is globally unique, and a new PID will be assigned after the Producer is restarted after a failure. This is also one of the reasons why idempotence cannot be achieved across sessions.

If you resume from savepoint, the previous producerId will be used, and a new session will generate 1000 new producerIds (these id runs through the entire session, equivalent to the default value), so it will be non-default

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.