image.png

    1. /*
    2. * Licensed to the Apache Software Foundation (ASF) under one
    3. * or more contributor license agreements. See the NOTICE file
    4. * distributed with this work for additional information
    5. * regarding copyright ownership. The ASF licenses this file
    6. * to you under the Apache License, Version 2.0 (the
    7. * "License"); you may not use this file except in compliance
    8. * with the License. You may obtain a copy of the License at
    9. *
    10. * http://www.apache.org/licenses/LICENSE-2.0
    11. *
    12. * Unless required by applicable law or agreed to in writing, software
    13. * distributed under the License is distributed on an "AS IS" BASIS,
    14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15. * See the License for the specific language governing permissions and
    16. * limitations under the License.
    17. */
    18. package org.apache.flink.api.common.typeutils;
    19. import org.apache.flink.annotation.PublicEvolving;
    20. import org.apache.flink.core.memory.DataInputView;
    21. import org.apache.flink.core.memory.DataOutputView;
    22. import java.io.IOException;
    23. import java.io.Serializable;
    24. /**
    25. * This interface describes the methods that are required for a data type to be handled by the Flink
    26. * runtime. Specifically, this interface contains the serialization and copying methods.
    27. *
    28. * <p>The methods in this class are not necessarily thread safe. To avoid unpredictable side effects,
    29. * it is recommended to call {@code duplicate()} method and use one serializer instance per thread.
    30. *
    31. * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
    32. *
    33. * <p>This section is relevant if you implemented a TypeSerializer in Flink versions up to 1.6 and want
    34. * to adapt that implementation to the new interfaces that support proper state schema evolution, while maintaining
    35. * backwards compatibility. Please follow these steps:
    36. *
    37. * <ul>
    38. * <li>Change the type serializer's config snapshot to implement {@link TypeSerializerSnapshot}, rather
    39. * than extending {@code TypeSerializerConfigSnapshot} (as previously).
    40. * <li>If the above step was completed, then the upgrade is done. Otherwise, if changing to implement
    41. * {@link TypeSerializerSnapshot} directly in-place as the same class isn't possible (perhaps because the new snapshot
    42. * is intended to have completely different written contents or intended to have a different class name),
    43. * retain the old serializer snapshot class (extending {@code TypeSerializerConfigSnapshot}) under
    44. * the same name and give the updated serializer snapshot class (the one extending {@code TypeSerializerSnapshot})
    45. * a new name.
    46. * <li>Override the {@code TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)}
    47. * method to perform the compatibility check based on configuration written by the old serializer snapshot class.
    48. * </ul>
    49. *
    50. * @param <T> The data type that the serializer serializes.
    51. */
    52. @PublicEvolving
    53. public abstract class TypeSerializer<T> implements Serializable {
    54. private static final long serialVersionUID = 1L;
    55. // --------------------------------------------------------------------------------------------
    56. // General information about the type and the serializer
    57. // --------------------------------------------------------------------------------------------
    58. /**
    59. * Gets whether the type is an immutable type.
    60. *
    61. * @return True, if the type is immutable.
    62. */
    63. public abstract boolean isImmutableType();
    64. /**
    65. * Creates a deep copy of this serializer if it is necessary, i.e. if it is stateful. This
    66. * can return itself if the serializer is not stateful.
    67. *
    68. * We need this because Serializers might be used in several threads. Stateless serializers
    69. * are inherently thread-safe while stateful serializers might not be thread-safe.
    70. */
    71. public abstract TypeSerializer<T> duplicate();
    72. // --------------------------------------------------------------------------------------------
    73. // Instantiation & Cloning
    74. // --------------------------------------------------------------------------------------------
    75. /**
    76. * Creates a new instance of the data type.
    77. *
    78. * @return A new instance of the data type.
    79. */
    80. public abstract T createInstance();
    81. /**
    82. * Creates a deep copy of the given element in a new element.
    83. *
    84. * @param from The element reuse be copied.
    85. * @return A deep copy of the element.
    86. */
    87. public abstract T copy(T from);
    88. /**
    89. * Creates a copy from the given element.
    90. * The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
    91. * This is, however, not guaranteed.
    92. *
    93. * @param from The element to be copied.
    94. * @param reuse The element to be reused. May or may not be used.
    95. * @return A deep copy of the element.
    96. */
    97. public abstract T copy(T from, T reuse);
    98. // --------------------------------------------------------------------------------------------
    99. /**
    100. * Gets the length of the data type, if it is a fix length data type.
    101. *
    102. * @return The length of the data type, or <code>-1</code> for variable length data types.
    103. */
    104. public abstract int getLength();
    105. // --------------------------------------------------------------------------------------------
    106. /**
    107. * Serializes the given record to the given target output view.
    108. *
    109. * @param record The record to serialize.
    110. * @param target The output view to write the serialized data to.
    111. *
    112. * @throws IOException Thrown, if the serialization encountered an I/O related error. Typically raised by the
    113. * output view, which may have an underlying I/O channel to which it delegates.
    114. */
    115. public abstract void serialize(T record, DataOutputView target) throws IOException;
    116. /**
    117. * De-serializes a record from the given source input view.
    118. *
    119. * @param source The input view from which to read the data.
    120. * @return The deserialized element.
    121. *
    122. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
    123. * input view, which may have an underlying I/O channel from which it reads.
    124. */
    125. public abstract T deserialize(DataInputView source) throws IOException;
    126. /**
    127. * De-serializes a record from the given source input view into the given reuse record instance if mutable.
    128. *
    129. * @param reuse The record instance into which to de-serialize the data.
    130. * @param source The input view from which to read the data.
    131. * @return The deserialized element.
    132. *
    133. * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
    134. * input view, which may have an underlying I/O channel from which it reads.
    135. */
    136. public abstract T deserialize(T reuse, DataInputView source) throws IOException;
    137. /**
    138. * Copies exactly one record from the source input view to the target output view. Whether this operation
    139. * works on binary data or partially de-serializes the record to determine its length (such as for records
    140. * of variable length) is up to the implementer. Binary copies are typically faster. A copy of a record containing
    141. * two integer numbers (8 bytes total) is most efficiently implemented as
    142. * {@code target.write(source, 8);}.
    143. *
    144. * @param source The input view from which to read the record.
    145. * @param target The target output view to which to write the record.
    146. *
    147. * @throws IOException Thrown if any of the two views raises an exception.
    148. */
    149. public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
    150. public abstract boolean equals(Object obj);
    151. public abstract int hashCode();
    152. // --------------------------------------------------------------------------------------------
    153. // Serializer configuration snapshot for checkpoints/savepoints
    154. // --------------------------------------------------------------------------------------------
    155. /**
    156. * Snapshots the configuration of this TypeSerializer. This method is only relevant if the serializer is
    157. * used to state stored in checkpoints/savepoints.
    158. *
    159. * <p>The snapshot of the TypeSerializer is supposed to contain all information that affects the serialization
    160. * format of the serializer. The snapshot serves two purposes: First, to reproduce the serializer when the
    161. * checkpoint/savepoint is restored, and second, to check whether the serialization format is compatible
    162. * with the serializer used in the restored program.
    163. *
    164. * <p><b>IMPORTANT:</b> TypeSerializerSnapshots changed after Flink 1.6. Serializers implemented against
    165. * Flink versions up to 1.6 should still work, but adjust to new model to enable state evolution and be
    166. * future-proof.
    167. * See the class-level comments, section "Upgrading TypeSerializers to the new TypeSerializerSnapshot model"
    168. * for details.
    169. *
    170. * @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
    171. *
    172. * @return snapshot of the serializer's current configuration (cannot be {@code null}).
    173. */
    174. public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
    175. }