0

I am currently running a small application that periodically polls data from my DB and then puts it in a Kafka topic. While running the application code independently, when I comment my Kafka sink, the application runs correctly:

KafkaSink<String> stringSink =
                    KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("asset.sync.kafka.sink.topic.name"));

            dbStream
                    .map(Object::toString)   // or map your custom object to String
                    .sinkTo(stringSink)
                    .name("String Kafka Sink");

But running both the sink and the source together gives the exception:

java.sql.SQLRecoverableException: IO Error: Socket read interrupted, Authentication lapse 0 ms.
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:874)
    at oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
    at oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
    at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
    at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562)
    at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:138)
    at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
    at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
    at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
    at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
    at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
    at com.zaxxer.hikari.HikariDataSource.<init>(HikariDataSource.java:81)
    at ***.***.hcmp.config.DBConfig.getDataSource(DBConfig.java:22)
    at ***.***.hcmp.source.assetMaster.DBPollingSource.open(DBPollingSource.java:62)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:101)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Socket read interrupted, Authentication lapse 0 ms.
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:870)
    ... 25 common frames omitted
Caused by: java.io.InterruptedIOException: Socket read interrupted
    at oracle.net.nt.TimeoutSocketChannel.handleInterrupt(TimeoutSocketChannel.java:258)
    at oracle.net.nt.TimeoutSocketChannel.read(TimeoutSocketChannel.java:180)
    at oracle.net.ns.NSProtocolNIO.doSocketRead(NSProtocolNIO.java:555)
    at oracle.net.ns.NIOPacket.readNIOPacket(NIOPacket.java:403)
    at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:127)
    at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340)
    at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596)
    at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588)

Here is the application code:

public class RefreshPollingApplication {

    private static final Logger log = LoggerFactory.getLogger(RefreshPollingApplication.class);

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromPropertiesFile(
                RefreshPollingApplication.class.getClassLoader().getResourceAsStream("application-local.properties")
        );
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, 
                Time.of(10, TimeUnit.SECONDS)
        ));
        try{
            Properties props = ConfigLoader.load("application-local.properties");
            Properties kafkaProps = getKafkaProperties(props);
            log.info("@@@@@@@@@@@@@ Properties loaded");

            SimpleDBConfig dbConfig = new SimpleDBConfig(params.get("flink.jdbc.url"), params.get("flink.jdbc.username"), params.get("flink.jdbc.password"));
            Connection connection = dbConfig.getConnection();
            DataStream<String> dbStream = env.addSource(
                    new JdbcPollingSource()
            ).setParallelism(1).name("DB Polling Source");
            KafkaSink<String> stringSink =
                    KafkaSinkFactory.createStringKafkaSink(props, kafkaProps, props.getProperty("****.***.kafka.sink.topic.name"));

            dbStream
                    .map(Object::toString)  
                    .sinkTo(stringSink)
                    .name("String Kafka Sink");
            env.execute("Refresh polling job.");
        }catch (Exception e){
            log.error("@@@@@@@@ERROR OCCURRED ON POLLING TASK, REASON: {}", e.getMessage());
            throw e;
        }
    }
    
}

Source function class:

public class JdbcPollingSource extends RichSourceFunction<String> {

    private volatile boolean isRunning = true;
    private transient Connection connection;
    private transient PreparedStatement stmt;

    private String url;
    private String userName;
    private String password;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("oracle.jdbc.OracleDriver");
        super.open(parameters);
        Map<String, String> globalParams = getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters()
                .toMap();
        ParameterTool params = ParameterTool.fromMap(globalParams);
        this.url = params.get("flink.jdbc.url");
        userName = params.get("flink.jdbc.username");
        password = params.get("flink.jdbc.password");
        reconnect();
    }

    private void reconnect() throws Exception {

        if (connection != null && !connection.isClosed()) {
            try { connection.close(); } catch (Exception ignored) {}
        }
        try {
            connection = DriverManager.getConnection(
                    url, userName, password
            );
            stmt = connection.prepareStatement(
                    "*******"
            );
        }catch (Exception e){
            System.out.println("Error connecting to data source, exception -> " + e.getMessage());
        }
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            try {
                ResultSet rs = stmt.executeQuery();
                while (rs.next()) {
                    String record = rs.getLong("***") + "," + rs.getString("***");
                    ctx.collect(record);
                }
            } catch (SQLException e) {
                reconnect();
            }
            Thread.sleep(120_000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        try { if (stmt != null) stmt.close(); } catch (Exception ignore) {}
        try { if (connection != null) connection.close(); } catch (Exception ignore) {}
    }
}

Is it possible that a background thread is interfering with the DB connection here?

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.