逻辑类型和物理类型检查
// 类org.apache.flink.table.utils.TypeMappingUtils
/**
* Checks whether the given physical field type and logical field type are compatible
* at the edges of the table ecosystem. Types are still compatible if the physical type
* is a legacy decimal type (converted from Types#BIG_DEC) and the logical type is
* DECIMAL(38, 18). This is to support legacy TypeInformation for {@link TableSource} and
* {@link org.apache.flink.table.sinks.TableSink}.
*
* @param physicalFieldType physical field type
* @param logicalFieldType logical field type
* @param physicalFieldName physical field name
* @param logicalFieldName logical field name
* @param isSource whether it is a source or sink, used for logging.
*/
public static void checkPhysicalLogicalTypeCompatible(
LogicalType physicalFieldType,
LogicalType logicalFieldType,
String physicalFieldName,
String logicalFieldName,
boolean isSource) {
Function<Throwable, ValidationException> exceptionSupplier = (cause) ->
new ValidationException(
String.format(
"Type %s of table field '%s' does not match with " +
"the physical type %s of the '%s' field of the %s type.",
logicalFieldType,
logicalFieldName,
physicalFieldType,
physicalFieldName,
isSource ? "TableSource return" : "TableSink consumed"),
cause);
try {
final boolean typesCompatible;
if (isSource) {
typesCompatible = checkIfCompatible(
physicalFieldType,
logicalFieldType);
} else {
typesCompatible = checkIfCompatible(
logicalFieldType,
physicalFieldType);
}
if (!typesCompatible) {
throw exceptionSupplier.apply(null);
}
} catch (Exception e) {
throw exceptionSupplier.apply(e);
}
}