Skip to main content
Question Protected by CommunityBot
added 1116 characters in body
Source Link

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception

2020-12-31 10:35:23.831 [objectOperator -> Sink: objectSink (1/1)] WARN o.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. while recovering transaction KafkaTransactionState [transactionalId=objectOperator -> Sink: objectSink-bcabd9b643c47ab46ace22db2e1285b6-3, producerId=14698, epoch=7]. Presumably this transaction has been already committed before 2020-12-31 10:35:23.919 [userOperator -> Sink: userSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - userOperator -> Sink: userSink (1/1) (2a5a171aa335f444740b4acfc7688d7c) switched from RUNNING to FAILED. org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2020-12-31 10:35:24.131 [objectOperator -> Sink: objectSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - objectOperator -> Sink: objectSink (1/1) (07fe747a81b31e016e88ea6331b31433) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception

(org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception

2020-12-31 10:35:23.831 [objectOperator -> Sink: objectSink (1/1)] WARN o.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Encountered error org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state. while recovering transaction KafkaTransactionState [transactionalId=objectOperator -> Sink: objectSink-bcabd9b643c47ab46ace22db2e1285b6-3, producerId=14698, epoch=7]. Presumably this transaction has been already committed before 2020-12-31 10:35:23.919 [userOperator -> Sink: userSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - userOperator -> Sink: userSink (1/1) (2a5a171aa335f444740b4acfc7688d7c) switched from RUNNING to FAILED. org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2020-12-31 10:35:24.131 [objectOperator -> Sink: objectSink (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - objectOperator -> Sink: objectSink (1/1) (07fe747a81b31e016e88ea6331b31433) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

Corrected formatting
Source Link
Nishu Tayal
  • 20.9k
  • 8
  • 53
  • 105

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception (org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

(org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception (org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception

(org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me

Source Link

Flink Kafka Sink org.apache.kafka.common.errors.UnsupportedVersionException ERROR

version flink(1.11.3), kafka(2.1.1)

My flink datapipeline is kafka(source) -> flink -> kafka(sink).

When I submit job first, it works well. but after jobmanager or taskmanagers fail, if they restarted, they occur exception (org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 1)

I don't know why this error occurs.

my kafka producer code

 Properties props = new Properties();
        props.setProperty("bootstrap.servers", servers);
        props.setProperty("transaction.timeout.ms", "30000");
        FlinkKafkaProducer<CountModel> producer = new FlinkKafkaProducer<CountModel>(
                topic,((record, timestamp) -> new ProducerRecord<>(
                        topic
                        , Longs.toByteArray(record.getUserInKey())
                        , JsonUtils.toJsonBytes(record))), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

I don't think it's a version issue. It seems that no one has experienced the same error as me