专6-4-基于kafka-spark streaming的数据处理系统及测试-甄丽霞_第1页
专6-4-基于kafka-spark streaming的数据处理系统及测试-甄丽霞_第2页
专6-4-基于kafka-spark streaming的数据处理系统及测试-甄丽霞_第3页
专6-4-基于kafka-spark streaming的数据处理系统及测试-甄丽霞_第4页
专6-4-基于kafka-spark streaming的数据处理系统及测试-甄丽霞_第5页
已阅读5页,还剩35页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

parkStreaming数据处理系统及测试待测系统涉及到的相关技术知234AnswerAnswerColume非频繁更新项频繁更新项5fastfastfastpartition01fastpartition02fastpartition01fastpartition02fastpartition03freqpartition01freqpartition02freqpartition03fastpartition05fastpartition08freqpartition05freqpartition06freqpartition07nfast-fast-partition09fastpartition10freqpartition08freqpartition10n6jsonFasthbaseFastjsonFasthbaseFast…map1map2map3kkkk7SSMSMS8zhihu-spooldir-kafka-agent.sources=zhihu-spooldir-source-freqzhihu-spooldir-source-freq3zhihu-spooldir-source-fastzhihu-spooldir-source-instantzhihu-spooldir-kafka-agent.channels=zhihu-kafka-channel-freqzhihu-kafka-channel-freq3zhihu-kafka-channel-fastzhihu-kafka-channel-instant1定义zhihu-freq、zhihu-freq3、zhihu-fast、zhihu-instant各组件对应的的配置项#zhihu-freqzhihu-spooldir-kafka-agent.sources.zhihu-spooldir-source-freq.inputCharset=UTF-8//注:编码方式,默认是"UTF-8"zhihu-spooldir-kafka-agent.sources.zhihu-spooldir-source-freq.decodeErrorPolicy=IGNORE//注:传输的过程中有不可解码的流出现会导致flume停止服务,加上这个配置之后增加flume鲁棒性zhihu-spooldir-kafka-agent.sources.zhihu-spooldir-source-freq.customSourceCounterType=TimedSourceCerceptors=timestamp-interceptorstatic-interceptorcirclenumber-interceptordocid-interceptor//注:在eventsheader中加入key和value的拦截器,包括时间戳、环号、erceptors.timestamp-interceptor.type=timestamp注:在header中加入时间戳erceptors.docid-interceptor.type=erceptor.zhihu.ZhihuDocIdInterceptor$Builder//注:调用docid算法,在header中加docidzhihu-spooldir-kafka-agent.sources.zhihu-spooldir-source-freq.channels=zhihu-kafka-channel-freq//注:确认本source对应的channel组件名称zhihu-spooldir-kafka-agent.channels.zhihu-kafka-channel-freq.type=org.apache.flume.channel.kafka.KafkaChannel//注:定义channel的类型为Kafkachannel。zhihu-spooldir-kafka-agent.channels.zhihu-kafka-channel-freq.brokerList=10.1xx.1xx.29:xxxx//注:设定链接的kafka的ip和端口zhihu-spooldir-kafka-agent.channels.zhihu-kafka-channel-freq.topic=zhihu-freq//注:设定本source对应的kafka的topic类型是zhihu-freqzhihu-spooldir-kafka-agent.channels.zhihu-kafka-channel-freq.zookeeperConnect=10.1xx.1xx.29:xxxx/kafka//注:设定需要注册和分配资源的zookeeper的ip和端口9zhihu-kafka-hdfszhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.hdfs.rollInterval=120//注:hdfssink间隔多长将临时文件滚动成最终目标文件,单位:秒;zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.hdfs.rollSize=0//注:当临时文件达到该大小(单位:bytes)时,滚动成目标文件;如果设置成0,则表示不根据临时文件大小来滚动文件;zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.hdfs.callTimeout=120000//注:执行HDFS操作的超时时间(单位:毫秒);zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.hdfs.batchSize=10000//注:每个批次刷新到HDFS上的events数量zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.channel=zhihu-kafka-channel-freq//注:该sink对应的channel的名称-hdfs-sink-instantzhihu-hdfs-sink-blacklistChannels.brokerList=ip:portelfreq定义channel、sink组件,每个channel都对应一个sink配置:zhihu-kafka-hdfs-agent.channels=zhihu-kafka-channel-freq-localqueryzhihu-kafka-channel-freqzhihu-kafka-channel-freq3zhihu-kafka-channel-fastzhihu-kafka-channel-instantzhihu-kafka-channel-blacklistzhihu-kafka-hdfs-agent.channels.zhihu-kafka-channel-freq.topic=zhihu-freq//注:kafka里对应的topic类型zhihu-kafka-hdfs-agent.channels.zhihu-kafka-channel-freq.zookeeperConnect=10.1xx.1xx.29:xxxx/kafka//注:zookeeper的端口zhihu-kafka-hdfs-agent.channels.zhihu-kafka-channel-freq.groupId=zhihu-freq//注:consumer需要设置其所归属的groupidzhihu-kafka-hdfs-agent.channels.zhihu-kafka-channel-freq.kafka.fetch.message.max.bytes=2000000000//注:表示消息的最大大小,单位是字节zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.type=hdfs//注:sink组件取出channel队列中的数据,存入相应类型的存储文件系统。这里定义的是存储系统的类型zhihu-kafka-hdfs-agent.sinks.zhihu-hdfs-sink-freq.hdfs.path=hdfs://sss/xxx/xxx/data/zhihu-test/%{pushtype}/sjs_100_29/%Y%m/%Y%m%d//注:写入hdfs的路径,包含文件系统标识“sss/xxx/xxx/data/”是hdfs的系统路径;“zhihu-test”是为本次测试创建的hdfs的数据存储目录;“%{pushtype}”是在spooldir-kafka的配置文件(erceptors.static-interceptor.value)中设置的pushtype的值;“sjs_100_29”是本测试机的标识;“%Y%m/%Y%m%d”年月的目录地址;//注:存储到hdfs里的文件名称:/sss/xxx/xxx/data/zhihu-test/freq/sjs_100_2/i26s0.1t/0p16e0=12/dfrefqs-2016012423-7.1453808288400.lzo(T3)(T3)(Producer)(push)(push)(push)(T3)(T3)(Producer)(push)(push)(push)(Broker)Real-time(T2)Hadoop(T1)Other(T1)ata(T3)(pull)(pull)(pull)(pull)(push)((8 (注:即是哪个原始文件的数据)、circlenumber(注:根据数据类型,伪造各类型数据的url,计算环数,数据总共分8环)、pushtype(注:该数据类型是freqinstantfast写入kafkaT1-P1-R0-LT1-P2-R0T1-P1-R2T1-P2-R1-LT2-PT1-P1-R0-LT1-P2-R0T1-P1-R2T1-P2-R1-LT2-P1-R2T2-P2-R1T1-P1-R1T1-P2-R2T2-P1-R1-LT2-P2-R0-LT2-P1-R1T2-P1-R0-LT2-P2-R2T1-P3-RT1-P3-R0-L(new)加载PA解析的jar包;加载配置文件确认下游模块;加载配置文件确认topic类型;instantSparkstreaming加载PA解析的jar包;加载配置文件确认下游模块;加载配置文件确认topic类型;instantSparkstreamingfreq加载配置文件确认topic类型;加载hadoop数据;Sparkstreaming根根据kafka的key,获取对应的hbase数据;解析数据拼xpage,发送给index模块;将instant数据写入hbase;写入写入habasememory处理的进程数memory处理的进程数d1--masteryarn-client\//注:以client方式连接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式1--driver-memory1G\//注:drivermemory并不是master分配了多少内存,而是管理多少内存。换言之就是为当前应用分配了多少内存运行的集群队列1--num-executors5\注:在yarn集群上启动5个进程进行数据处理,其中一个进程读取数据,剩余进程进行数据处理一定程度就会爆栈。一般先设置DM(drivermemory),随后根据集群情况、任务大小等实际情况来设置EM(executor一定程度就会爆栈。一般先设置DM(drivermemory),随后根据集群情况、任务大小等实际情况来设置EM(executorinstant序列化之后发送给剩余的进程进行数据处理,但flume对于序列化的操even以通过yarn执行的时候涉及到数据分发到多个进程了,此时flumeeven函数进行修改;但本机运行的时候不涉及数据分发所以能确性测试:instantPAxpagexslt解析正确性;不同业务流程涉及不fre

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论