序
本文主要研究一下flink的AsyncWaitOperator
AsyncWaitOperator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@Internalpublic class AsyncWaitOperatorextends AbstractUdfStreamOperator > implements OneInputStreamOperator , OperatorActions { private static final long serialVersionUID = 1L; private static final String STATE_NAME = "_async_wait_operator_state_"; /** Capacity of the stream element queue. */ private final int capacity; /** Output mode for this operator. */ private final AsyncDataStream.OutputMode outputMode; /** Timeout for the async collectors. */ private final long timeout; protected transient Object checkpointingLock; /** {@link TypeSerializer} for inputs while making snapshots. */ private transient StreamElementSerializer inStreamElementSerializer; /** Recovered input stream elements. */ private transient ListState recoveredStreamElements; /** Queue to store the currently in-flight stream elements into. */ private transient StreamElementQueue queue; /** Pending stream element which could not yet added to the queue. */ private transient StreamElementQueueEntry pendingStreamElementQueueEntry; private transient ExecutorService executor; /** Emitter for the completed stream element queue entries. */ private transient Emitter emitter; /** Thread running the emitter. */ private transient Thread emitterThread; public AsyncWaitOperator( AsyncFunction asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) { super(asyncFunction); chainingStrategy = ChainingStrategy.ALWAYS; Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0."); this.capacity = capacity; this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode"); this.timeout = timeout; } @Override public void setup(StreamTask containingTask, StreamConfig config, Output > output) { super.setup(containingTask, config, output); this.checkpointingLock = getContainingTask().getCheckpointLock(); this.inStreamElementSerializer = new StreamElementSerializer<>( getOperatorConfig(). getTypeSerializerIn1(getUserCodeClassloader())); // create the operators executor for the complete operations of the queue entries this.executor = Executors.newSingleThreadExecutor(); switch (outputMode) { case ORDERED: queue = new OrderedStreamElementQueue( capacity, executor, this); break; case UNORDERED: queue = new UnorderedStreamElementQueue( capacity, executor, this); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } } @Override public void open() throws Exception { super.open(); // create the emitter this.emitter = new Emitter<>(checkpointingLock, output, queue, this); // start the emitter thread this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')'); emitterThread.setDaemon(true); emitterThread.start(); // process stream elements from state, since the Emit thread will start as soon as all // elements from previous state are in the StreamElementQueue, we have to make sure that the // order to open all operators in the operator chain proceeds from the tail operator to the // head operator. if (recoveredStreamElements != null) { for (StreamElement element : recoveredStreamElements.get()) { if (element.isRecord()) { processElement(element. asRecord()); } else if (element.isWatermark()) { processWatermark(element.asWatermark()); } else if (element.isLatencyMarker()) { processLatencyMarker(element.asLatencyMarker()); } else { throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator."); } } recoveredStreamElements = null; } } @Override public void processElement(StreamRecord element) throws Exception { final StreamRecordQueueEntry streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { userFunction.timeout(element.getValue(), streamRecordBufferEntry); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry > value) -> { timerFuture.cancel(true); }, executor); } addAsyncBufferEntry(streamRecordBufferEntry); userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } @Override public void processWatermark(Watermark mark) throws Exception { WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark); addAsyncBufferEntry(watermarkBufferEntry); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); ListState partitionableState = getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); Collection > values = queue.values(); try { for (StreamElementQueueEntry value : values) { partitionableState.add(value.getStreamElement()); } // add the pending stream element queue entry if the stream element queue is currently full if (pendingStreamElementQueueEntry != null) { partitionableState.add(pendingStreamElementQueueEntry.getStreamElement()); } } catch (Exception e) { partitionableState.clear(); throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e); } } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); } @Override public void close() throws Exception { try { assert(Thread.holdsLock(checkpointingLock)); while (!queue.isEmpty()) { // wait for the emitter thread to output the remaining elements // for that he needs the checkpointing lock and thus we have to free it checkpointingLock.wait(); } } finally { Exception exception = null; try { super.close(); } catch (InterruptedException interrupted) { exception = interrupted; Thread.currentThread().interrupt(); } catch (Exception e) { exception = e; } try { // terminate the emitter, the emitter thread and the executor stopResources(true); } catch (InterruptedException interrupted) { exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); Thread.currentThread().interrupt(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); } } } @Override public void dispose() throws Exception { Exception exception = null; try { super.dispose(); } catch (InterruptedException interrupted) { exception = interrupted; Thread.currentThread().interrupt(); } catch (Exception e) { exception = e; } try { stopResources(false); } catch (InterruptedException interrupted) { exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); Thread.currentThread().interrupt(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { throw exception; } } private void stopResources(boolean waitForShutdown) throws InterruptedException { emitter.stop(); emitterThread.interrupt(); executor.shutdown(); if (waitForShutdown) { try { if (!executor.awaitTermination(365L, TimeUnit.DAYS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } /* * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so * that the emitter thread can complete/react to the interrupt signal. */ if (Thread.holdsLock(checkpointingLock)) { while (emitterThread.isAlive()) { checkpointingLock.wait(100L); } } emitterThread.join(); } else { executor.shutdownNow(); } } private void addAsyncBufferEntry(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { assert(Thread.holdsLock(checkpointingLock)); pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { // we wait for the emitter to notify us if the queue has space left again checkpointingLock.wait(); } pendingStreamElementQueueEntry = null; } @Override public void failOperator(Throwable throwable) { getContainingTask().getEnvironment().failExternally(throwable); }}复制代码
- AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
- setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(
OrderedStreamElementQueue或者UnorderedStreamElementQueue
);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法 - processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false
Emitter
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java
@Internalpublic class Emitterimplements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Emitter.class); /** Lock to hold before outputting. */ private final Object checkpointLock; /** Output for the watermark elements. */ private final Output > output; /** Queue to consume the async results from. */ private final StreamElementQueue streamElementQueue; private final OperatorActions operatorActions; /** Output for stream records. */ private final TimestampedCollector timestampedCollector; private volatile boolean running; public Emitter( final Object checkpointLock, final Output > output, final StreamElementQueue streamElementQueue, final OperatorActions operatorActions) { this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock"); this.output = Preconditions.checkNotNull(output, "output"); this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.timestampedCollector = new TimestampedCollector<>(this.output); this.running = true; } @Override public void run() { try { while (running) { LOG.debug("Wait for next completed async stream element result."); AsyncResult streamElementEntry = streamElementQueue.peekBlockingly(); output(streamElementEntry); } } catch (InterruptedException e) { if (running) { operatorActions.failOperator(e); } else { // Thread got interrupted which means that it should shut down LOG.debug("Emitter thread got interrupted, shutting down."); } } catch (Throwable t) { operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " + "unexpected throwable.", t)); } } private void output(AsyncResult asyncResult) throws InterruptedException { if (asyncResult.isWatermark()) { synchronized (checkpointLock) { AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark(); LOG.debug("Output async watermark."); output.emitWatermark(asyncWatermarkResult.getWatermark()); // remove the peeked element from the async collector buffer so that it is no longer // checkpointed streamElementQueue.poll(); // notify the main thread that there is again space left in the async collector // buffer checkpointLock.notifyAll(); } } else { AsyncCollectionResult streamRecordResult = asyncResult.asResultCollection(); if (streamRecordResult.hasTimestamp()) { timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp()); } else { timestampedCollector.eraseTimestamp(); } synchronized (checkpointLock) { LOG.debug("Output async stream element collection result."); try { Collection resultCollection = streamRecordResult.get(); if (resultCollection != null) { for (OUT result : resultCollection) { timestampedCollector.collect(result); } } } catch (Exception e) { operatorActions.failOperator( new Exception("An async function call terminated with an exception. " + "Failing the AsyncWaitOperator.", e)); } // remove the peeked element from the async collector buffer so that it is no longer // checkpointed streamElementQueue.poll(); // notify the main thread that there is again space left in the async collector // buffer checkpointLock.notifyAll(); } } } public void stop() { running = false; }}复制代码
- Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
- Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
- Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素
StreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java
@Internalpublic interface StreamElementQueue {void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException; AsyncResult peekBlockingly() throws InterruptedException; AsyncResult poll() throws InterruptedException; Collection > values() throws InterruptedException; boolean isEmpty(); int size();}复制代码
- StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry
UnorderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@Internalpublic class UnorderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Executor to run the onComplete callbacks. */ private final Executor executor; /** OperatorActions to signal the owning operator a failure. */ private final OperatorActions operatorActions; /** Queue of uncompleted stream element queue entries segmented by watermarks. */ private final ArrayDeque>> uncompletedQueue; /** Queue of completed stream element queue entries. */ private final ArrayDeque > completedQueue; /** First (chronologically oldest) uncompleted set of stream element queue entries. */ private Set > firstSet; // Last (chronologically youngest) uncompleted set of stream element queue entries. New // stream element queue entries are inserted into this set. private Set > lastSet; private volatile int numberEntries; /** Locks and conditions for the blocking queue. */ private final ReentrantLock lock; private final Condition notFull; private final Condition hasCompletedEntries; public UnorderedStreamElementQueue( int capacity, Executor executor, OperatorActions operatorActions) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.executor = Preconditions.checkNotNull(executor, "executor"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.uncompletedQueue = new ArrayDeque<>(capacity); this.completedQueue = new ArrayDeque<>(capacity); this.firstSet = new HashSet<>(capacity); this.lastSet = firstSet; this.numberEntries = 0; this.lock = new ReentrantLock(); this.notFull = lock.newCondition(); this.hasCompletedEntries = lock.newCondition(); } @Override public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { while (numberEntries >= capacity) { notFull.await(); } addEntry(streamElementQueueEntry); } finally { lock.unlock(); } } @Override public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (numberEntries < capacity) { addEntry(streamElementQueueEntry); LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity); return false; } } finally { lock.unlock(); } } @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } LOG.debug("Peeked head element from unordered stream element queue with filling degree " + "({}/{}).", numberEntries, capacity); return completedQueue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } numberEntries--; notFull.signalAll(); LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return completedQueue.poll(); } finally { lock.unlock(); } } @Override public Collection > values() throws InterruptedException { lock.lockInterruptibly(); try { StreamElementQueueEntry [] array = new StreamElementQueueEntry[numberEntries]; array = completedQueue.toArray(array); int counter = completedQueue.size(); for (StreamElementQueueEntry entry: firstSet) { array[counter] = entry; counter++; } for (Set > asyncBufferEntries : uncompletedQueue) { for (StreamElementQueueEntry streamElementQueueEntry : asyncBufferEntries) { array[counter] = streamElementQueueEntry; counter++; } } return Arrays.asList(array); } finally { lock.unlock(); } } @Override public boolean isEmpty() { return numberEntries == 0; } @Override public int size() { return numberEntries; } public void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (firstSet.remove(streamElementQueueEntry)) { completedQueue.offer(streamElementQueueEntry); while (firstSet.isEmpty() && firstSet != lastSet) { firstSet = uncompletedQueue.poll(); Iterator > it = firstSet.iterator(); while (it.hasNext()) { StreamElementQueueEntry bufferEntry = it.next(); if (bufferEntry.isDone()) { completedQueue.offer(bufferEntry); it.remove(); } } } LOG.debug("Signal unordered stream element queue has completed entries."); hasCompletedEntries.signalAll(); } } finally { lock.unlock(); } } private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); if (streamElementQueueEntry.isWatermark()) { lastSet = new HashSet<>(capacity); if (firstSet.isEmpty()) { firstSet.add(streamElementQueueEntry); } else { Set > watermarkSet = new HashSet<>(1); watermarkSet.add(streamElementQueueEntry); uncompletedQueue.offer(watermarkSet); } uncompletedQueue.offer(lastSet); } else { lastSet.add(streamElementQueueEntry); } streamElementQueueEntry.onComplete( (StreamElementQueueEntry value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { // The accept executor thread got interrupted. This is probably cause by // the shutdown of the executor. LOG.debug("AsyncBufferEntry could not be properly completed because the " + "executor thread has been interrupted.", e); } catch (Throwable t) { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } }, executor); numberEntries++; }}复制代码
- UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
- peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
- onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue
OrderedStreamElementQueue
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@Internalpublic class OrderedStreamElementQueue implements StreamElementQueue { private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class); /** Capacity of this queue. */ private final int capacity; /** Executor to run the onCompletion callback. */ private final Executor executor; /** Operator actions to signal a failure to the operator. */ private final OperatorActions operatorActions; /** Lock and conditions for the blocking queue. */ private final ReentrantLock lock; private final Condition notFull; private final Condition headIsCompleted; /** Queue for the inserted StreamElementQueueEntries. */ private final ArrayDeque> queue; public OrderedStreamElementQueue( int capacity, Executor executor, OperatorActions operatorActions) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.executor = Preconditions.checkNotNull(executor, "executor"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.lock = new ReentrantLock(false); this.headIsCompleted = lock.newCondition(); this.notFull = lock.newCondition(); this.queue = new ArrayDeque<>(capacity); } @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity); return queue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } notFull.signalAll(); LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity); return queue.poll(); } finally { lock.unlock(); } } @Override public Collection > values() throws InterruptedException { lock.lockInterruptibly(); try { StreamElementQueueEntry [] array = new StreamElementQueueEntry[queue.size()]; array = queue.toArray(array); return Arrays.asList(array); } finally { lock.unlock(); } } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public void put(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { while (queue.size() >= capacity) { notFull.await(); } addEntry(streamElementQueueEntry); } finally { lock.unlock(); } } @Override public boolean tryPut(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (queue.size() < capacity) { addEntry(streamElementQueueEntry); LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity); return false; } } finally { lock.unlock(); } } private void addEntry(StreamElementQueueEntry streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); queue.addLast(streamElementQueueEntry); streamElementQueueEntry.onComplete( (StreamElementQueueEntry value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { // we got interrupted. This indicates a shutdown of the executor LOG.debug("AsyncBufferEntry could not be properly completed because the " + "executor thread has been interrupted.", e); } catch (Throwable t) { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } }, executor); } private void onCompleteHandler(StreamElementQueueEntry streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (!queue.isEmpty() && queue.peek().isDone()) { LOG.debug("Signal ordered stream element queue has completed head element."); headIsCompleted.signalAll(); } } finally { lock.unlock(); } }}复制代码
- OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
- peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
- onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()
AsyncResult
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java
@Internalpublic interface AsyncResult { boolean isWatermark(); boolean isResultCollection(); AsyncWatermarkResult asWatermark();AsyncCollectionResult asResultCollection();}复制代码
- AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果
StreamElementQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java
@Internalpublic abstract class StreamElementQueueEntryimplements AsyncResult { private final StreamElement streamElement; public StreamElementQueueEntry(StreamElement streamElement) { this.streamElement = Preconditions.checkNotNull(streamElement); } public StreamElement getStreamElement() { return streamElement; } public boolean isDone() { return getFuture().isDone(); } public void onComplete( final Consumer > completeFunction, Executor executor) { final StreamElementQueueEntry thisReference = this; getFuture().whenCompleteAsync( // call the complete function for normal completion as well as exceptional completion // see FLINK-6435 (value, throwable) -> completeFunction.accept(thisReference), executor); } protected abstract CompletableFuture getFuture(); @Override public final boolean isWatermark() { return AsyncWatermarkResult.class.isAssignableFrom(getClass()); } @Override public final boolean isResultCollection() { return AsyncCollectionResult.class.isAssignableFrom(getClass()); } @Override public final AsyncWatermarkResult asWatermark() { return (AsyncWatermarkResult) this; } @Override public final AsyncCollectionResult asResultCollection() { return (AsyncCollectionResult ) this; }}复制代码
- StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry
WatermarkQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java
@Internalpublic class WatermarkQueueEntry extends StreamElementQueueEntryimplements AsyncWatermarkResult { private final CompletableFuture future; public WatermarkQueueEntry(Watermark watermark) { super(watermark); this.future = CompletableFuture.completedFuture(watermark); } @Override public Watermark getWatermark() { return (Watermark) getStreamElement(); } @Override protected CompletableFuture getFuture() { return future; }}复制代码
- WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口
StreamRecordQueueEntry
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java
@Internalpublic class StreamRecordQueueEntryextends StreamElementQueueEntry > implements AsyncCollectionResult , ResultFuture { /** Timestamp information. */ private final boolean hasTimestamp; private final long timestamp; /** Future containing the collection result. */ private final CompletableFuture > resultFuture; public StreamRecordQueueEntry(StreamRecord streamRecord) { super(streamRecord); hasTimestamp = streamRecord.hasTimestamp(); timestamp = streamRecord.getTimestamp(); resultFuture = new CompletableFuture<>(); } @Override public boolean hasTimestamp() { return hasTimestamp; } @Override public long getTimestamp() { return timestamp; } @Override public Collection get() throws Exception { return resultFuture.get(); } @Override protected CompletableFuture > getFuture() { return resultFuture; } @Override public void complete(Collection result) { resultFuture.complete(result); } @Override public void completeExceptionally(Throwable error) { resultFuture.completeExceptionally(error); }}复制代码
- StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口
小结
- AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
- Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
- StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry