逻辑类型和物理类型检查

    1. // 类org.apache.flink.table.utils.TypeMappingUtils
    2. /**
    3. * Checks whether the given physical field type and logical field type are compatible
    4. * at the edges of the table ecosystem. Types are still compatible if the physical type
    5. * is a legacy decimal type (converted from Types#BIG_DEC) and the logical type is
    6. * DECIMAL(38, 18). This is to support legacy TypeInformation for {@link TableSource} and
    7. * {@link org.apache.flink.table.sinks.TableSink}.
    8. *
    9. * @param physicalFieldType physical field type
    10. * @param logicalFieldType logical field type
    11. * @param physicalFieldName physical field name
    12. * @param logicalFieldName logical field name
    13. * @param isSource whether it is a source or sink, used for logging.
    14. */
    15. public static void checkPhysicalLogicalTypeCompatible(
    16. LogicalType physicalFieldType,
    17. LogicalType logicalFieldType,
    18. String physicalFieldName,
    19. String logicalFieldName,
    20. boolean isSource) {
    21. Function<Throwable, ValidationException> exceptionSupplier = (cause) ->
    22. new ValidationException(
    23. String.format(
    24. "Type %s of table field '%s' does not match with " +
    25. "the physical type %s of the '%s' field of the %s type.",
    26. logicalFieldType,
    27. logicalFieldName,
    28. physicalFieldType,
    29. physicalFieldName,
    30. isSource ? "TableSource return" : "TableSink consumed"),
    31. cause);
    32. try {
    33. final boolean typesCompatible;
    34. if (isSource) {
    35. typesCompatible = checkIfCompatible(
    36. physicalFieldType,
    37. logicalFieldType);
    38. } else {
    39. typesCompatible = checkIfCompatible(
    40. logicalFieldType,
    41. physicalFieldType);
    42. }
    43. if (!typesCompatible) {
    44. throw exceptionSupplier.apply(null);
    45. }
    46. } catch (Exception e) {
    47. throw exceptionSupplier.apply(e);
    48. }
    49. }