StreamOperator.png

    1. /*
    2. * Licensed to the Apache Software Foundation (ASF) under one or more
    3. * contributor license agreements. See the NOTICE file distributed with
    4. * this work for additional information regarding copyright ownership.
    5. * The ASF licenses this file to You under the Apache License, Version 2.0
    6. * (the "License"); you may not use this file except in compliance with
    7. * the License. You may obtain a copy of the License at
    8. *
    9. * http://www.apache.org/licenses/LICENSE-2.0
    10. *
    11. * Unless required by applicable law or agreed to in writing, software
    12. * distributed under the License is distributed on an "AS IS" BASIS,
    13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14. * See the License for the specific language governing permissions and
    15. * limitations under the License.
    16. */
    17. package org.apache.flink.streaming.api.operators;
    18. import org.apache.flink.annotation.PublicEvolving;
    19. import org.apache.flink.metrics.MetricGroup;
    20. import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    21. import org.apache.flink.runtime.jobgraph.OperatorID;
    22. import org.apache.flink.runtime.state.CheckpointListener;
    23. import org.apache.flink.runtime.state.CheckpointStreamFactory;
    24. import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    25. import org.apache.flink.util.Disposable;
    26. import java.io.Serializable;
    27. /**
    28. * Basic interface for stream operators. Implementers would implement one of
    29. * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
    30. * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
    31. * that process elements.
    32. *
    33. * <p>The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
    34. * offers default implementation for the lifecycle and properties methods.
    35. *
    36. * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
    37. * the timer service, timer callbacks are also guaranteed not to be called concurrently with
    38. * methods on {@code StreamOperator}.
    39. *
    40. * @param <OUT> The output type of the operator
    41. */
    42. @PublicEvolving
    43. public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
    44. // ------------------------------------------------------------------------
    45. // life cycle
    46. // ------------------------------------------------------------------------
    47. /**
    48. * This method is called immediately before any elements are processed, it should contain the
    49. * operator's initialization logic.
    50. *
    51. * @implSpec In case of recovery, this method needs to ensure that all recovered data is processed before passing
    52. * back control, so that the order of elements is ensured during the recovery of an operator chain (operators
    53. * are opened from the tail operator to the head operator).
    54. *
    55. * @throws java.lang.Exception An exception in this method causes the operator to fail.
    56. */
    57. void open() throws Exception;
    58. /**
    59. * This method is called after all records have been added to the operators via the methods
    60. * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator#processElement(StreamRecord)}, or
    61. * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement1(StreamRecord)} and
    62. * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator#processElement2(StreamRecord)}.
    63. *
    64. * <p>The method is expected to flush all remaining buffered data. Exceptions during this
    65. * flushing of buffered should be propagated, in order to cause the operation to be recognized
    66. * as failed, because the last data items are not processed properly.
    67. *
    68. * @throws java.lang.Exception An exception in this method causes the operator to fail.
    69. */
    70. void close() throws Exception;
    71. /**
    72. * This method is called at the very end of the operator's life, both in the case of a successful
    73. * completion of the operation, and in the case of a failure and canceling.
    74. *
    75. * <p>This method is expected to make a thorough effort to release all resources
    76. * that the operator has acquired.
    77. */
    78. @Override
    79. void dispose() throws Exception;
    80. // ------------------------------------------------------------------------
    81. // state snapshots
    82. // ------------------------------------------------------------------------
    83. /**
    84. * This method is called when the operator should do a snapshot, before it emits its
    85. * own checkpoint barrier.
    86. *
    87. * <p>This method is intended not for any actual state persistence, but only for emitting some
    88. * data before emitting the checkpoint barrier. Operators that maintain some small transient state
    89. * that is inefficient to checkpoint (especially when it would need to be checkpointed in a
    90. * re-scalable way) but can simply be sent downstream before the checkpoint. An example are
    91. * opportunistic pre-aggregation operators, which have small the pre-aggregation state that is
    92. * frequently flushed downstream.
    93. *
    94. * <p><b>Important:</b> This method should not be used for any actual state snapshot logic, because
    95. * it will inherently be within the synchronous part of the operator's checkpoint. If heavy work is done
    96. * within this method, it will affect latency and downstream checkpoint alignments.
    97. *
    98. * @param checkpointId The ID of the checkpoint.
    99. * @throws Exception Throwing an exception here causes the operator to fail and go into recovery.
    100. */
    101. void prepareSnapshotPreBarrier(long checkpointId) throws Exception;
    102. /**
    103. * Called to draw a state snapshot from the operator.
    104. *
    105. * @return a runnable future to the state handle that points to the snapshotted state. For synchronous implementations,
    106. * the runnable might already be finished.
    107. *
    108. * @throws Exception exception that happened during snapshotting.
    109. */
    110. OperatorSnapshotFutures snapshotState(
    111. long checkpointId,
    112. long timestamp,
    113. CheckpointOptions checkpointOptions,
    114. CheckpointStreamFactory storageLocation) throws Exception;
    115. /**
    116. * Provides a context to initialize all state in the operator.
    117. */
    118. void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception;
    119. // ------------------------------------------------------------------------
    120. // miscellaneous
    121. // ------------------------------------------------------------------------
    122. void setKeyContextElement1(StreamRecord<?> record) throws Exception;
    123. void setKeyContextElement2(StreamRecord<?> record) throws Exception;
    124. MetricGroup getMetricGroup();
    125. OperatorID getOperatorID();
    126. }