本文承接文档提交之flush(三),继续依次介绍每一个流程点。
先给出文档提交之flush的整体流程图:
图1:
图2是文档提交之flush中的执行DWPT的doFlush()
的流程图,在前面的文章中,我们介绍到了此流程图中将DWPT中收集的索引信息生成一个段newSegment
的流程点,在本篇文章中会将执行DWPT的doFlush()
中剩余的流程点介绍完毕:
图2:
在文档提交之flush(二)中,我们简单的介绍了FlushTicket类,类中包含的主要变量如下:
static final class FlushTicket {
private final FrozenBufferedUpdates frozenUpdates;
private FlushedSegment segment;
... ....
}
frozenUpdates是一个包含删除信息且作用于其他段中的文档的全局FrozenBufferedUpdate对象(见文档提交之flush(二)),而segment则是图2中的流程点将DWPT中收集的索引信息生成一个段newSegment
执行结束后生成的FlushedSegment对象,它至少包含了一个DWPT处理的文档对应的索引信息(SegmentCommitInfo)、段中被删除的文档信息(FixedBitSet对象)、未处理的删除信息FrozenBufferedUpdates(见文档提交之flush(三))、Sorter.DocMap对象,以上内容在文档提交之flush(三)的文章中已介绍。
为什么FlushTicket中生成FrozenBufferedUpdates跟FlushedSegment是两个有先后关系的流程
如何处理DWPT未能正确的生成一个FlushedSegment对象的情况:
将DWPT中收集的索引信息生成一个段newSegment
流程点,那么需要删除该DWPT中的对应的所有索引数据(如果已经生成的索引文件的话)该队列用来存放FlushTicket对象,每一个DWPT执行doFlush后,都会生成一个FlushTicket对象,并同步的添加到Queue<FlushTicket> queue中。
eventQueue队列用来存放事件(Event),Event类是一个添加了@FunctionalInterface注解的类,每一个Event对象用来描述一个函数调用,通过函数调用实现一个事件的执行。
在多线程下,每个线程同步的从eventQueue队列中取出一个事件,即执行该事件对应的函数调用。
在eventQueue队列中,事件间(Between Event)执行结束的先后顺序是无法保证的,不过可以根据事件内(Inner Event)的同步机制实现某些事件间的同步。
什么时候会添加事件到eventQueue:
为什么要将事件添加到eventQueue中处理:
删除非复合索引文件
跟从Queue<FlushTicket> queue中取出每一个FlushTicket去执行某个操作
就属于可以并行的两个事件(其实两个方法中的部分代码块还是需要同步的)什么时候会执行eventQueue中的事件:
发布生成的段的过程描述的是依次从Queue<FlushTicket> queue中取出FlushTicket,将其包含全局删除信息的FrozenBufferedUpdates对象作用到当前索引目录中已有的段的过程,同时还是对FlushedSegment对象进行最终处理的过程,比如找出未处理的删除信息(在文档提交之flush(三)中我们只找出了部分删除的文档)等一些操作,这里先简单的提一下,因为发布生成的段的逻辑篇幅较长,会在下一篇的文章中展开介绍。
发布生成的段
还可以划分成两种类型,即强制发布生成的段
和尝试发布生成的段
,图1跟图2中均有该流程点:
发布生成的段
的逻辑,如果线程调用的是尝试发布生成的段
,那么当发现有其他线程正在执行发布生成的段
的操作,当前线程就不等待,继续执行后面的流程,否则等待其他线程执行结束,即等待Queue<FlushTicket> queue中的所有FlushTicket都被处理结束源码中是如何实现的:
尝试发布生成的段
跟强制发布生成的段
为什么要划分强制发布生成的段
和尝试发布生成的段
:
继续介绍执行DWPT的doFlush()中的剩余流程点。
图3:
该流程即将删除非复合索引文件的操作作为一个事件添到eventQueue中,如果此时有其他线程正在处理eventQueue中的事件,那么删除非复合索引文件的操作可能会被马上执行。
图4:
发生堆积情况即flush(主动或者自动flush)的速度慢于添加/更新文档的操作,判断是否堆积的条件如下:
xxxxxxxxxx
ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()
其中ticketQueue.getTicketCount( )描述的是Queue<FlushTicket> queue中FlushTicket的个数,该个数即当前等待flush为一个段的段的个数(可能包含出错的FlushTicket,见下文介绍),而perThreadPool.getActiveThreadStateCount( )描述的是线程池DWPTP中ThreadState的个数(见文档的增删改(中))
为什么通过上面的方式能判断是否发生堆积:
自动flush
的流程点,开始生成一个段,最后释放DWPT对象,而ThreadState对象则是回到DWPTP中,故在单线程下,DWPT的个数总是小于等于DWPTP中ThreadState的个数。在多线程下,ThreadState对象回到DWPTP之后,又有新的线程执行添加/更新的操作,那么ThreadState会再次持有新的DWPT对象去执行任务,如果再次出发自动flush,当flush的速度(即DWPT生成一个段)较慢时,就会满足ticketQueue.getTicketCount() >= perThreadPool.getActiveThreadStateCount()的条件,即发生了堆积DWPT满足自动flush后进入生成一个段
与ThreadState回到DWPTP
体现了flush跟添加/更新 文件是并行操作,另外主动flush的情况也是一样的,见文档提交之flush(一)上文中为什么说Queue<FlushTicket> queue中可能包含出错的FlushTicket:
强制发布生成的段
的事件到eventQueue中发生堆积后,通过该流程点使得所有执行flush的线程必须等待Queue<FlushTicket> queue中所有的FlushTicket处理结束后才能去执行新的添加/更新文档的任务来处理堆积问题。
图5:
到此流程点,我们需要更新几个全局的信息,以下的内容在前面的文章中已经介绍,不详细展开:
图6:
尝试发布生成的段
的概念在上文关于发布(publish)生成的段
的内容中已作介绍,不赘述,其详细的发布过程在下一篇文章会展开。
尝试发布生成的段
图7:
当前内存中的删除信息如果超过阈值的一半,那么需要处理删除信息,阈值即通过IndexWriterConfig setRAMBufferSizeMB设置允许缓存在内存的索引量(包括删除信息)的最大值,当超过该阈值,会触发自动flush(见文档提交之flush(一))
为什么内存中的删除信息如果超过阈值的一半,需要处理删除信息:
xxxxxxxxxx
activeBytes + deleteBytesUsed >= ramBufferSizeMB
如何处理删除信息:
添加处理删除信息事件到eventQueue
即可,处理删除信息事件实际是将强制发布生成的段
作为一个事件添加到eventQueue中。 为什么通过强制发布生成的段
能用来处理删除信息
至此我们介绍完了图1中逻辑相对最复杂的执行DWPT的doFlush()
的流程。
点击下载附件