To address the type conversion exceptions that occur when SeaTunnel forcefully converts numeric types to string types in Excel, this article explains in detail how to modify the source code to support compatible conversion from numeric types to string-type database fields, and how to package and deploy the fixed code using Maven.
Requirement
It is required to push numeric-type cells in Excel into string-type fields in the database.
Root Cause
SeaTunnel uses forced type casting when reading field types. If the data types do not match, a direct cast will throw an error.
Modification Location
org/apache/seatunnel/api/table/type/SeaTunnelRow.java
org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
Modified Code
@Override
public PreparedStatement toExternal(
TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
throws SQLException {
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
int statementIndex = fieldIndex + 1;
Object fieldValue = row.getField(fieldIndex);
if (fieldValue == null) {
statement.setObject(statementIndex, null);
continue;
}
switch (seaTunnelDataType.getSqlType()) {
case STRING:
//TODO wxt
//region
try {
// Direct type casting may cause issues. For example, double cannot be cast to String.
// Using toString here solves such type issues.
statement.setString(statementIndex, (String) row.getField(fieldIndex));
} catch (Exception e) {
statement.setString(statementIndex, row.getField(fieldIndex).toString());
}
//endregion
break;
case BOOLEAN:
statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));
break;
case TINYINT:
statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));
break;
case SMALLINT:
statement.setShort(statementIndex, (Short) row.getField(fieldIndex));
break;
case INT:
statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));
break;
case BIGINT:
statement.setLong(statementIndex, (Long) row.getField(fieldIndex));
break;
case FLOAT:
statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));
break;
case DOUBLE:
statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));
break;
case DECIMAL:
statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex));
break;
case DATE:
LocalDate localDate = (LocalDate) row.getField(fieldIndex);
statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
break;
case TIME:
writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));
break;
case TIMESTAMP:
LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
statement.setTimestamp(
statementIndex, java.sql.Timestamp.valueOf(localDateTime));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
break;
case NULL:
statement.setNull(statementIndex, java.sql.Types.NULL);
break;
case ARRAY:
Object[] array = (Object[]) row.getField(fieldIndex);
if (array == null) {
statement.setNull(statementIndex, java.sql.Types.ARRAY);
break;
}
statement.setObject(statementIndex, array);
break;
case MAP:
case ROW:
default:
throw new JdbcConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType);
}
}
return statement;
}
private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
if (v == null) {
return 0;
}
SqlType sqlType = dataType.getSqlType();
switch (sqlType) {
case STRING:
//region
//TODO Avoid issues caused by forced casting
try {
return ((String) v).length();
} catch (Exception e) {
return (v.toString()).length();
}
//endregion
case BOOLEAN:
case TINYINT:
return 1;
case SMALLINT:
return 2;
case INT:
case FLOAT:
return 4;
case BIGINT:
case DOUBLE:
return 8;
case DECIMAL:
return 36;
case NULL:
return 0;
case BYTES:
return ((byte[]) v).length;
case DATE:
return 24;
case TIME:
return 12;
case TIMESTAMP:
return 48;
case ARRAY:
return getBytesForArray(v, ((ArrayType) dataType).getElementType());
case MAP:
int size = 0;
MapType<?, ?> mapType = ((MapType<?, ?>) dataType);
for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
size +=
getBytesForValue(entry.getKey(), mapType.getKeyType())
+ getBytesForValue(entry.getValue(), mapType.getValueType());
}
return size;
case ROW:
int rowSize = 0;
SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);
SeaTunnelDataType<?>[] types = rowType.getFieldTypes();
SeaTunnelRow row = (SeaTunnelRow) v;
for (int i = 0; i < types.length; i++) {
rowSize += getBytesForValue(row.fields[i], types[i]);
}
return rowSize;
default:
throw new UnsupportedOperationException("Unsupported type: " + sqlType);
}
}
How to Package the Source Code
You can package it using the Maven plugin. After packaging, you will find the final output under the dist
directory, as shown in the screenshot below:
Top comments (0)