版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
例如,系统中的订单表,常常使用用户ID的Hash值来实现分表分库,这样是为了减例如,当我们需要查询出过滤条件下的所有订单,并按照订单的某个条件进行排序,单个数据源查询出来的数据是可以按照某个条件进行排序的,但多个数据源查询出来已经排序好的数据,并不代表合并后是正确的排序,所以我们需要在应用层对合并数据集合重新进行排序。在Java8之前,我们通常是通过for循环或者Itor迭代来重新排序合并数据,又或者通过重新定义Collections.sorts的Comparator方法来实现,这两种方式对于大数据量系Java8中添加了一个新的接口类Stream,他和我们之前接触的字节流概念不太一样,Java8集合中的Stream相当于高级版的I tor,他可以通过Lambda表达式对集合进行各种非常便利、高效的聚合操作(AggregateOperation),或者大批量数据操作(BulkDataOperation)。Stream的聚合操作与数据库SQL的聚合操作sorted、filter、map等类似。我们在应用层就可以高效地实现类似数据库SQL的聚合操作了,而在数据操作方面,Stream不仅可接下来我们就用一个简单的例子来体验下Stream的简洁与强大这个Demo的需求是过滤分组一所中学里身高在160cm以上的男女同学,我们先用传统代Map<String,List<Student>>stuMap=newHashMap<String,for(Studentstu:studentsList)if(stu.getHeight()>160){//如果身高大于if ())==null){//该还没分List<Student>list=newArrayList<Student>();//新建该学生的list.add(stu);//将学789
stuMap.put(stu.get(),list);//将列表放到map}else{//该分类已存stuMap.get(stu.get()).add(stu);//该分类已存在,则直接放进去即}我们再使用Java8中的StreamAPI进行实现串行实代1Map<String,List<Student>>stuMap=stuList.stream().filter((Students)->并行实代1Map<String,List<Student>>stuMap=stuList.parallelStream().filter((Students)->通过上面两个简单的例子,我们可以发现,Stream合Lambda达式实现遍历筛选功上面我们初步了解了Java8的StreamAPI,那Stream如何做到优化迭代的呢?并行又是如何实现的?下面我们就透过Stream源码剖析Stream的实现原理。Stream操作在了解Stream实现原理之前,我们先来了解下Stream操作分类,因为他的操作分类将Stream中的操作分为两大类:中间操作(Intermediateoperations)和终结操(Terminaloperations)。中间操作只对操作进行了记录,即只会返回一个流,不会进行 不受之前元素的影响,后者是指该操作只有拿到所有元后才能继续下去。终结操作又可以分为短路(Shor-circiing)与非短路(Ushor-irciing)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。操作分类详情如下图所示:理管道(Pipeline),实现了Stream的高效。Stream源码在了解Stream如何工作之前,我们先来了解下Stream包是由哪些主要结构类组合的,各个类的职责是什么。参照下BaseStream和Stream为最顶端的接口类。BaseStream主要定义了流的基本接口方法, tor、isParallel等;Stream则定义了一些流的常用操作方法,例如,map、filter等。 是一个结构类,他通过定义内部类组装了流。他定义了 essOp、StatefulOp三个内部类,实现了BaseStream与Stream的接口方Sink接口是定义每个Stream操作之间关系的协议,他包含begin()、end()、 t()四个方法。ReferencePipeline最终会将整个Stream流操作组装成一个调用链,而这条调用链上的各个Stream操作的上下关系就是通过SinkStream操作我们知道,一个Stream的各个操作是由处理管道组装,并统一完成数据处理的。在中每次的中断操作会以使用阶段(Stage)命名管道结构通常是由ReferencePipeline类实现的,前面讲解Stream包结构时,我提 essOp、StatefulOp三种内部类Head类主要用来定义数据源操作,在我们初次调用names.stream()方法时,会初次加Head对象,此时为加载数据源操作;接着加载的是中间操作,分别为无状态中间操 essOp象和有状态StatefulOp象,此时的Stage没有执行,而是Pipeline生成了一个中间操作Stage链表;当我们调用终结操作时,会生成一个最终的Stage,通过这个Stage触发之前的中间操作,从最后一个Stage开始,递归产生一个Sink链。如下图所示:下面我们再通过一个例子来感受下Stream操作分类是如何实现高效迭代大数据集合代1List<String>names=Arrays.asList("张三","李四","王老五","李三","刘老四2StringmaxLenStartWithZ=.filter(name->name.startsWith("张这个例子的需求是查找出一个长度最长,并且以张为姓氏的名字。从代码角度来看,你可能会认为是这样的操作流程:首先遍历一次集合,得到以“张”开头的所有名字;然后遍历一次filer得到的集合,将名字转换成数字长度;最后再从长度集合中找到最长的那个名字并且返回。首先因为namesArrayList合names.stream法将会调用集合类基础接口Collection的Stream方法:代defaultStream<E>stream()returnStreamSupport.stream(splitor(), 然后,Stream方法就会调用StreamSupport类的Stream方法,方法中初始化了一ReferencePipelineHead部类对象代publicstatic<T>Stream<T> tor<T> tor,booleanparallel)returnnewReferencePipeline.Head<>(spli 再调用filter和map方法,这两个方法都是无状态的中间操作,所以执行filter和操作时,并没有进行任何的操作,而是分别创建了一个Stage来标识用户的每一次操而通常情况下Stream的操作又需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。如下图所示,分别是ReferencePipeline的filter方法和map方法:代12publicfinalStream<P_OUT>filter(Predicate<?superP_OUT>predicate)34returnnewStaessOp<P_OUT,P_OUT>(this,5StreamOpFlag.NOT_SIZED)67Sink<P_OUT>opWrapSink(intflags,Sink<P_OUT>sink)8returnnewSink.ChainedReference<P_OUT,P_OUT>(sink)9publicvoidbegin(longsize)} publicvoidaccept(P_OUTu) if
代publicfinal<R>Stream<R>map(Function<?superP_OUT,?extendsR>mapper)returnnewStaessOp<P_OUT,R>(this,StreamOpFlag.NOT_SORTED|Sink<P_OUT>opWrapSink(intflags,Sink<R>sink)returnnewSink.ChainedReference<P_OUT,R>(sink)publicvoidaccept(P_OUTu)}}}new essOp将会调用父 Pipeline的构造函数,这个构造函数将前后Stage系起来,生成一个Stage表代 Pipeline<?,E_IN,?>previousStage,intopFlags)ifthrownewpreviousStage.linkedOrConsumed=previousStage.nextStage=this;//将当前的stage的next指针指向之前的6this.previousStage=previousStage;//赋值当前stage当全局变量this.sourceOrOpFlags=opFlags&binedFlags this.sourceStage=ifsourceStage.sourceAnyStateful=
this.depth=previousStage.depth+因为在创建每一个Stage时,都会包含一个opWrapSink()方法,该方把一个操作的具体实现封装在Sink类中,Sink采用(处理->转发)的模式来叠加操作。当执行max法时,会ReferencePipelinemax法,此时由于max法是终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。代publicfinalOptional<P_OUT>max(Comparator<?superP_OUT>comparator)return 最后,调 Pipeline的wrapSink方法,该方调用opWrapSink生成一Sink表,Sink表中的每一个Sink装了一个操作的具体实现代final<P_IN>Sink<P_IN>wrapSink(Sink<E_OUT>sink)5for( Pipeline Pipeline.this;sink= binedFlags, return(Sink<P_IN>) Sink表生成完成后,Stream始执行,通过splitor代集合,执行Sink链表代final<P_IN>voidcopyInto(Sink<P_IN>wrappedSink, tor<P_IN>splitor)4if(!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) elsecopyIntoWithCancel(wrappedSink,spli Java8中的Spli tor的forEachRemaining会迭代集合,每迭代一次,都会执行一次filter操作,如果filter操作通过,就会触发map操作,然后将结果放入到临时数组object中,再进行下一次的迭代。完成中间操作后,就会触发终结操作max。这就是串行处理方式了,那么Stream的另一种处理数据的方式又是怎么操作的Stream并行Stream理数据的方式有两种,串行处理和并行处理。要实现并行处理,我们只需要在例子的代码中新增一个Parallel()方法,代码如下所示:代1List<String>names=Arrays.asList("","","","","刘老四2StringmaxLenStartWithZ=.filter(name->name.startsWith("张Stream的并行处理在执行终结操作之前,跟串行处理的实现是一样的。而在调用终结方法之后,实现的方式就有点不太一样,会调用TerminalOp的evaluateParallel方法进行行处理代final<R>Revaluate(TerminalOp<E_OUT,R>terminalOp)assertgetOutputShape()==ifthrownewlinkedOrConsumed=6return?terminalOp.evaluateParallel(this,sourceSpli:terminalOp.evaluateSequential(this,sourceSpli 这里的并行处理指的是,Stream结合了ForkJoin框架,对Stream处理进行了分片 tor中的estimateSize方估算出分片的数据量ForkJoin框架和估算算法,在这里我就不具体讲解了,如果感,你可以深入源码分析通过预估的数据量获取最小处理单元的阀值,如果当前分片大小大于最小处理单元的阀值,就继续切分集合。每个分片将会生成一个Sink链表,当所有的分片操作完成后,ForkJoin框架将会合并分片任何结果集。合理使用看到这里,你应该对StreamAPI是如何优化集合遍历有个清晰的认知了。StreamAPI用起来简洁,还能并行处理,那是不是使用StreamAPI,系统性能就更好呢?通过一组测对常规的迭代、Stream串行迭代以及Stream并行迭代进行性能测试对比,迭代循多核CPU服务器配置环境下,对比长度100的int数组的性能多核CPU服务器配置环境下,对比长度1.00E+8的int数组的性能多核CPU服务器配置环境下,对比长度1.00E+8对象数组过滤分组的性能;单核CPU服务器配置环境下,对比长度1.00E+8对象数组过滤分组的性能。 常规的迭代<Stream并行迭代<Stream串行迭代Stream并行迭代<常规的迭代<Stream串行迭代Stream并行迭代<常规的迭代<Stream串行迭代常规的迭代<Stream串行迭代<Stream并行迭代而更好;在单核CPU服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核CPU的情况下,Stream的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核CPU环境下,并且使用Stream的用事实说话,我们看到其实使用Stream必可以使系统性能更佳,还是要结合应用场景进行选择,也就是合理地使用Stream。纵观Stream设计实现,非常值得我们学习。从大的设计方向上来说,Stream从小的分类方向上来说,Stream将遍历元素的操作和对元素的计算分为中间操作和终结操作,而中间操作又根据元间状态有无干扰分为有状态和无状态操作,实现了链结构中的在串行处理操作中,Stream在执行每一步中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来,最终由终结操作触发,生成一个数据处理链表,通过Java8
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 单位人事管理制度展示大合集
- 2024年度出租车公司客运业务入股及品牌拓展合同3篇
- 2024年度公共设施墙体广告租赁合同含后期维护条款3篇
- 2024年度高档公寓物业管理委托合同3篇
- 2024版代付工程款三方资金支付与结算管理合同2篇
- 陕西省蓝田县焦岱中学高中语文 6 鸿门宴教学实录1 新人教版必修1
- 2025版高考语文一轮总复习第1部分现代文阅读Ⅰ信息性文本阅读任务3考点突破第5讲千淘和万漉吹沙始到金-观点探究运用教师用书
- 1感受生活中的法律 第1课时 教学实录-2023-2024学年道德与法治六年级上册统编版
- 上海市金山2024-2025高三生物上学期期末一模质量监控试题
- 北京市海淀区2024-2025学年高一数学上学期期中试题含解析
- 个人养老金制度
- 回族做礼拜的念词集合6篇
- 英语:初升高八种时态复习全解课件
- 粮油厂安全现状评价报告
- 有机肥供货及售后服务方案(投标专用)
- 走近湖湘红色人物知到章节答案智慧树2023年湖南工商大学
- 普通化学习题库
- 穿孔机操作规程
- 危机公关处理技巧
- 10、特种作业人员管理台账
- GB/T 70.1-2008内六角圆柱头螺钉
评论
0/150
提交评论