博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的AsyncWaitOperator
阅读量:6029 次
发布时间:2019-06-20

本文共 30602 字,大约阅读时间需要 102 分钟。

本文主要研究一下flink的AsyncWaitOperator

AsyncWaitOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

@Internalpublic class AsyncWaitOperator
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
, OperatorActions { private static final long serialVersionUID = 1L; private static final String STATE_NAME = "_async_wait_operator_state_"; /** Capacity of the stream element queue. */ private final int capacity; /** Output mode for this operator. */ private final AsyncDataStream.OutputMode outputMode; /** Timeout for the async collectors. */ private final long timeout; protected transient Object checkpointingLock; /** {@link TypeSerializer} for inputs while making snapshots. */ private transient StreamElementSerializer
inStreamElementSerializer; /** Recovered input stream elements. */ private transient ListState
recoveredStreamElements; /** Queue to store the currently in-flight stream elements into. */ private transient StreamElementQueue queue; /** Pending stream element which could not yet added to the queue. */ private transient StreamElementQueueEntry
pendingStreamElementQueueEntry; private transient ExecutorService executor; /** Emitter for the completed stream element queue entries. */ private transient Emitter
emitter; /** Thread running the emitter. */ private transient Thread emitterThread; public AsyncWaitOperator( AsyncFunction
asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) { super(asyncFunction); chainingStrategy = ChainingStrategy.ALWAYS; Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0."); this.capacity = capacity; this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode"); this.timeout = timeout; } @Override public void setup(StreamTask
containingTask, StreamConfig config, Output
> output) { super.setup(containingTask, config, output); this.checkpointingLock = getContainingTask().getCheckpointLock(); this.inStreamElementSerializer = new StreamElementSerializer<>( getOperatorConfig().
getTypeSerializerIn1(getUserCodeClassloader())); // create the operators executor for the complete operations of the queue entries this.executor = Executors.newSingleThreadExecutor(); switch (outputMode) { case ORDERED: queue = new OrderedStreamElementQueue( capacity, executor, this); break; case UNORDERED: queue = new UnorderedStreamElementQueue( capacity, executor, this); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } } @Override public void open() throws Exception { super.open(); // create the emitter this.emitter = new Emitter<>(checkpointingLock, output, queue, this); // start the emitter thread this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')'); emitterThread.setDaemon(true); emitterThread.start(); // process stream elements from state, since the Emit thread will start as soon as all // elements from previous state are in the StreamElementQueue, we have to make sure that the // order to open all operators in the operator chain proceeds from the tail operator to the // head operator. if (recoveredStreamElements != null) { for (StreamElement element : recoveredStreamElements.get()) { if (element.isRecord()) { processElement(element.
asRecord()); } else if (element.isWatermark()) { processWatermark(element.asWatermark()); } else if (element.isLatencyMarker()) { processLatencyMarker(element.asLatencyMarker()); } else { throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator."); } } recoveredStreamElements = null; } } @Override public void processElement(StreamRecord
element) throws Exception { final StreamRecordQueueEntry
streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture
timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { userFunction.timeout(element.getValue(), streamRecordBufferEntry); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry
> value) -> { timerFuture.cancel(true); }, executor); } addAsyncBufferEntry(streamRecordBufferEntry); userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } @Override public void processWatermark(Watermark mark) throws Exception { WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark); addAsyncBufferEntry(watermarkBufferEntry); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); ListState
partitionableState = getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); Collection
> values = queue.values(); try { for (StreamElementQueueEntry
value : values) { partitionableState.add(value.getStreamElement()); } // add the pending stream element queue entry if the stream element queue is currently full if (pendingStreamElementQueueEntry != null) { partitionableState.add(pendingStreamElementQueueEntry.getStreamElement()); } } catch (Exception e) { partitionableState.clear(); throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e); } } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); } @Override public void close() throws Exception { try { assert(Thread.holdsLock(checkpointingLock)); while (!queue.isEmpty()) { // wait for the emitter thread to output the remaining elements // for that he needs the checkpointing lock and thus we have to free it checkpointingLock.wait(); } } finally { Exception exception = null; try { super.close(); } catch (InterruptedException interrupted) { exception = interrupted; Thread.currentThread().interrupt(); } catch (Exception e) { exception = e; } try { // terminate the emitter, the emitter thread and the executor stopResources(true); } catch (InterruptedException interrupted) { exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); Thread.currentThread().interrupt(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception); } } } @Override public void dispose() throws Exception { Exception exception = null; try { super.dispose(); } catch (InterruptedException interrupted) { exception = interrupted; Thread.currentThread().interrupt(); } catch (Exception e) { exception = e; } try { stopResources(false); } catch (InterruptedException interrupted) { exception = ExceptionUtils.firstOrSuppressed(interrupted, exception); Thread.currentThread().interrupt(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } if (exception != null) { throw exception; } } private void stopResources(boolean waitForShutdown) throws InterruptedException { emitter.stop(); emitterThread.interrupt(); executor.shutdown(); if (waitForShutdown) { try { if (!executor.awaitTermination(365L, TimeUnit.DAYS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } /* * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so * that the emitter thread can complete/react to the interrupt signal. */ if (Thread.holdsLock(checkpointingLock)) { while (emitterThread.isAlive()) { checkpointingLock.wait(100L); } } emitterThread.join(); } else { executor.shutdownNow(); } } private
void addAsyncBufferEntry(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { assert(Thread.holdsLock(checkpointingLock)); pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { // we wait for the emitter to notify us if the queue has space left again checkpointingLock.wait(); } pendingStreamElementQueueEntry = null; } @Override public void failOperator(Throwable throwable) { getContainingTask().getEnvironment().failExternally(throwable); }}复制代码
  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法
  • setup方法使用Executors.newSingleThreadExecutor()创建了ExecutorService,之后根据不同的outputMode创建不同的StreamElementQueue(OrderedStreamElementQueue或者UnorderedStreamElementQueue);open方法使用Emitter创建并启动AsyncIO-Emitter-Thread,另外就是处理recoveredStreamElements,根据不同的类型分别调用processElement、processWatermark、processLatencyMarker方法
  • processElement方法首先根据timeout注册一个timer,在ProcessingTimeCallback的onProcessingTime方法里头执行userFunction.timeout,之后将StreamRecordQueueEntry添加到StreamElementQueue中,最后触发userFunction.asyncInvoke;close和dispose方法会调用stopResources方法来关闭资源,不同的是waitForShutdown参数传值不同,close方法传true,而dispose方法传false

Emitter

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

@Internalpublic class Emitter
implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Emitter.class); /** Lock to hold before outputting. */ private final Object checkpointLock; /** Output for the watermark elements. */ private final Output
> output; /** Queue to consume the async results from. */ private final StreamElementQueue streamElementQueue; private final OperatorActions operatorActions; /** Output for stream records. */ private final TimestampedCollector
timestampedCollector; private volatile boolean running; public Emitter( final Object checkpointLock, final Output
> output, final StreamElementQueue streamElementQueue, final OperatorActions operatorActions) { this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock"); this.output = Preconditions.checkNotNull(output, "output"); this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.timestampedCollector = new TimestampedCollector<>(this.output); this.running = true; } @Override public void run() { try { while (running) { LOG.debug("Wait for next completed async stream element result."); AsyncResult streamElementEntry = streamElementQueue.peekBlockingly(); output(streamElementEntry); } } catch (InterruptedException e) { if (running) { operatorActions.failOperator(e); } else { // Thread got interrupted which means that it should shut down LOG.debug("Emitter thread got interrupted, shutting down."); } } catch (Throwable t) { operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " + "unexpected throwable.", t)); } } private void output(AsyncResult asyncResult) throws InterruptedException { if (asyncResult.isWatermark()) { synchronized (checkpointLock) { AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark(); LOG.debug("Output async watermark."); output.emitWatermark(asyncWatermarkResult.getWatermark()); // remove the peeked element from the async collector buffer so that it is no longer // checkpointed streamElementQueue.poll(); // notify the main thread that there is again space left in the async collector // buffer checkpointLock.notifyAll(); } } else { AsyncCollectionResult
streamRecordResult = asyncResult.asResultCollection(); if (streamRecordResult.hasTimestamp()) { timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp()); } else { timestampedCollector.eraseTimestamp(); } synchronized (checkpointLock) { LOG.debug("Output async stream element collection result."); try { Collection
resultCollection = streamRecordResult.get(); if (resultCollection != null) { for (OUT result : resultCollection) { timestampedCollector.collect(result); } } } catch (Exception e) { operatorActions.failOperator( new Exception("An async function call terminated with an exception. " + "Failing the AsyncWaitOperator.", e)); } // remove the peeked element from the async collector buffer so that it is no longer // checkpointed streamElementQueue.poll(); // notify the main thread that there is again space left in the async collector // buffer checkpointLock.notifyAll(); } } } public void stop() { running = false; }}复制代码
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
  • Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素

StreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

@Internalpublic interface StreamElementQueue {	
void put(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException;
boolean tryPut(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException; AsyncResult peekBlockingly() throws InterruptedException; AsyncResult poll() throws InterruptedException; Collection
> values() throws InterruptedException; boolean isEmpty(); int size();}复制代码
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry

UnorderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java

@Internalpublic class UnorderedStreamElementQueue implements StreamElementQueue {	private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);	/** Capacity of this queue. */	private final int capacity;	/** Executor to run the onComplete callbacks. */	private final Executor executor;	/** OperatorActions to signal the owning operator a failure. */	private final OperatorActions operatorActions;	/** Queue of uncompleted stream element queue entries segmented by watermarks. */	private final ArrayDeque
>> uncompletedQueue; /** Queue of completed stream element queue entries. */ private final ArrayDeque
> completedQueue; /** First (chronologically oldest) uncompleted set of stream element queue entries. */ private Set
> firstSet; // Last (chronologically youngest) uncompleted set of stream element queue entries. New // stream element queue entries are inserted into this set. private Set
> lastSet; private volatile int numberEntries; /** Locks and conditions for the blocking queue. */ private final ReentrantLock lock; private final Condition notFull; private final Condition hasCompletedEntries; public UnorderedStreamElementQueue( int capacity, Executor executor, OperatorActions operatorActions) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.executor = Preconditions.checkNotNull(executor, "executor"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.uncompletedQueue = new ArrayDeque<>(capacity); this.completedQueue = new ArrayDeque<>(capacity); this.firstSet = new HashSet<>(capacity); this.lastSet = firstSet; this.numberEntries = 0; this.lock = new ReentrantLock(); this.notFull = lock.newCondition(); this.hasCompletedEntries = lock.newCondition(); } @Override public
void put(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { while (numberEntries >= capacity) { notFull.await(); } addEntry(streamElementQueueEntry); } finally { lock.unlock(); } } @Override public
boolean tryPut(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (numberEntries < capacity) { addEntry(streamElementQueueEntry); LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return true; } else { LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity); return false; } } finally { lock.unlock(); } } @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } LOG.debug("Peeked head element from unordered stream element queue with filling degree " + "({}/{}).", numberEntries, capacity); return completedQueue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } numberEntries--; notFull.signalAll(); LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity); return completedQueue.poll(); } finally { lock.unlock(); } } @Override public Collection
> values() throws InterruptedException { lock.lockInterruptibly(); try { StreamElementQueueEntry
[] array = new StreamElementQueueEntry[numberEntries]; array = completedQueue.toArray(array); int counter = completedQueue.size(); for (StreamElementQueueEntry
entry: firstSet) { array[counter] = entry; counter++; } for (Set
> asyncBufferEntries : uncompletedQueue) { for (StreamElementQueueEntry
streamElementQueueEntry : asyncBufferEntries) { array[counter] = streamElementQueueEntry; counter++; } } return Arrays.asList(array); } finally { lock.unlock(); } } @Override public boolean isEmpty() { return numberEntries == 0; } @Override public int size() { return numberEntries; } public void onCompleteHandler(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (firstSet.remove(streamElementQueueEntry)) { completedQueue.offer(streamElementQueueEntry); while (firstSet.isEmpty() && firstSet != lastSet) { firstSet = uncompletedQueue.poll(); Iterator
> it = firstSet.iterator(); while (it.hasNext()) { StreamElementQueueEntry
bufferEntry = it.next(); if (bufferEntry.isDone()) { completedQueue.offer(bufferEntry); it.remove(); } } } LOG.debug("Signal unordered stream element queue has completed entries."); hasCompletedEntries.signalAll(); } } finally { lock.unlock(); } } private
void addEntry(StreamElementQueueEntry
streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); if (streamElementQueueEntry.isWatermark()) { lastSet = new HashSet<>(capacity); if (firstSet.isEmpty()) { firstSet.add(streamElementQueueEntry); } else { Set
> watermarkSet = new HashSet<>(1); watermarkSet.add(streamElementQueueEntry); uncompletedQueue.offer(watermarkSet); } uncompletedQueue.offer(lastSet); } else { lastSet.add(streamElementQueueEntry); } streamElementQueueEntry.onComplete( (StreamElementQueueEntry
value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { // The accept executor thread got interrupted. This is probably cause by // the shutdown of the executor. LOG.debug("AsyncBufferEntry could not be properly completed because the " + "executor thread has been interrupted.", e); } catch (Throwable t) { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } }, executor); numberEntries++; }}复制代码
  • UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
  • peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue

OrderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java

@Internalpublic class OrderedStreamElementQueue implements StreamElementQueue {	private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);	/** Capacity of this queue. */	private final int capacity;	/** Executor to run the onCompletion callback. */	private final Executor executor;	/** Operator actions to signal a failure to the operator. */	private final OperatorActions operatorActions;	/** Lock and conditions for the blocking queue. */	private final ReentrantLock lock;	private final Condition notFull;	private final Condition headIsCompleted;	/** Queue for the inserted StreamElementQueueEntries. */	private final ArrayDeque
> queue; public OrderedStreamElementQueue( int capacity, Executor executor, OperatorActions operatorActions) { Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0."); this.capacity = capacity; this.executor = Preconditions.checkNotNull(executor, "executor"); this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions"); this.lock = new ReentrantLock(false); this.headIsCompleted = lock.newCondition(); this.notFull = lock.newCondition(); this.queue = new ArrayDeque<>(capacity); } @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity); return queue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } notFull.signalAll(); LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity); return queue.poll(); } finally { lock.unlock(); } } @Override public Collection
> values() throws InterruptedException { lock.lockInterruptibly(); try { StreamElementQueueEntry
[] array = new StreamElementQueueEntry[queue.size()]; array = queue.toArray(array); return Arrays.asList(array); } finally { lock.unlock(); } } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public int size() { return queue.size(); } @Override public
void put(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { while (queue.size() >= capacity) { notFull.await(); } addEntry(streamElementQueueEntry); } finally { lock.unlock(); } } @Override public
boolean tryPut(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (queue.size() < capacity) { addEntry(streamElementQueueEntry); LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity); return true; } else { LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity); return false; } } finally { lock.unlock(); } } private
void addEntry(StreamElementQueueEntry
streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); queue.addLast(streamElementQueueEntry); streamElementQueueEntry.onComplete( (StreamElementQueueEntry
value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { // we got interrupted. This indicates a shutdown of the executor LOG.debug("AsyncBufferEntry could not be properly completed because the " + "executor thread has been interrupted.", e); } catch (Throwable t) { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } }, executor); } private void onCompleteHandler(StreamElementQueueEntry
streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (!queue.isEmpty() && queue.peek().isDone()) { LOG.debug("Signal ordered stream element queue has completed head element."); headIsCompleted.signalAll(); } } finally { lock.unlock(); } }}复制代码
  • OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
  • peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()

AsyncResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

@Internalpublic interface AsyncResult {	boolean isWatermark();	boolean isResultCollection();	AsyncWatermarkResult asWatermark();	
AsyncCollectionResult
asResultCollection();}复制代码
  • AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果

StreamElementQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

@Internalpublic abstract class StreamElementQueueEntry
implements AsyncResult { private final StreamElement streamElement; public StreamElementQueueEntry(StreamElement streamElement) { this.streamElement = Preconditions.checkNotNull(streamElement); } public StreamElement getStreamElement() { return streamElement; } public boolean isDone() { return getFuture().isDone(); } public void onComplete( final Consumer
> completeFunction, Executor executor) { final StreamElementQueueEntry
thisReference = this; getFuture().whenCompleteAsync( // call the complete function for normal completion as well as exceptional completion // see FLINK-6435 (value, throwable) -> completeFunction.accept(thisReference), executor); } protected abstract CompletableFuture
getFuture(); @Override public final boolean isWatermark() { return AsyncWatermarkResult.class.isAssignableFrom(getClass()); } @Override public final boolean isResultCollection() { return AsyncCollectionResult.class.isAssignableFrom(getClass()); } @Override public final AsyncWatermarkResult asWatermark() { return (AsyncWatermarkResult) this; } @Override public final
AsyncCollectionResult
asResultCollection() { return (AsyncCollectionResult
) this; }}复制代码
  • StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

WatermarkQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java

@Internalpublic class WatermarkQueueEntry extends StreamElementQueueEntry
implements AsyncWatermarkResult { private final CompletableFuture
future; public WatermarkQueueEntry(Watermark watermark) { super(watermark); this.future = CompletableFuture.completedFuture(watermark); } @Override public Watermark getWatermark() { return (Watermark) getStreamElement(); } @Override protected CompletableFuture
getFuture() { return future; }}复制代码
  • WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口

StreamRecordQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java

@Internalpublic class StreamRecordQueueEntry
extends StreamElementQueueEntry
> implements AsyncCollectionResult
, ResultFuture
{ /** Timestamp information. */ private final boolean hasTimestamp; private final long timestamp; /** Future containing the collection result. */ private final CompletableFuture
> resultFuture; public StreamRecordQueueEntry(StreamRecord
streamRecord) { super(streamRecord); hasTimestamp = streamRecord.hasTimestamp(); timestamp = streamRecord.getTimestamp(); resultFuture = new CompletableFuture<>(); } @Override public boolean hasTimestamp() { return hasTimestamp; } @Override public long getTimestamp() { return timestamp; } @Override public Collection
get() throws Exception { return resultFuture.get(); } @Override protected CompletableFuture
> getFuture() { return resultFuture; } @Override public void complete(Collection
result) { resultFuture.complete(result); } @Override public void completeExceptionally(Throwable error) { resultFuture.completeExceptionally(error); }}复制代码
  • StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口

小结

  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

doc

转载地址:http://uldhx.baihongyu.com/

你可能感兴趣的文章
JMJS系统总结系列----Jquery分页扩展库(五)
查看>>
学习笔记5
查看>>
Excel技巧之——英文大小写转换(转)
查看>>
网页防止跨框架攻击
查看>>
理解和配置 Linux 下的 OOM Killer
查看>>
性能测试场景设计之用户模式设置
查看>>
box-sizing 属性
查看>>
我要写一篇文章吗?
查看>>
iOS保存model数据(自定义Model 可以存放到本地)
查看>>
svn导出文件进行比较
查看>>
Google 翻译的妙用
查看>>
【300】◀▶ IDL - ENVI API
查看>>
【090】Excel VBA 基础
查看>>
算法导论--python--插入排序
查看>>
BM和KMP字符串匹配算法学习
查看>>
MVC与MVP(转)
查看>>
java/.net-常用工具下载地址&常用学习网址&快捷键
查看>>
財哥面京东dm的经历【帮財哥发的】
查看>>
Codeforces Round #261 (Div. 2) D 树状数组应用
查看>>
猜数字
查看>>