尚大数据技术之flume Flume_第1页
尚大数据技术之flume Flume_第2页
尚大数据技术之flume Flume_第3页
尚大数据技术之flume Flume_第4页
尚大数据技术之flume Flume_第5页
已阅读5页,还剩32页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

尚硅谷大数据技术之(作者:尚硅谷大数据研发部1FlumeFlume定为什么选用Flume最主要的作用就是,实 服务器本地磁盘的数据,将数据写入到HDFS网络端口数为什么选用Flume最主要的作用就是,实 服务器本地磁盘的数据,将数据写入到HDFS网络端口数 日志数Python爬虫数Flume基础架Source是负责接收数据到FlumeAgent的组件。Source组件可以处理各种类型、各种、sequencegenerator、syslog、http、legacy。Sink不断地轮询Channel中的且批量地移除它们,并将这些批量写入到FlumeAgent。Channel是位于SourceSink之间的缓冲区。因此,Channel允许SourceSink运Sink的操作。FlumeChannel:MemoryChannelFileChannelMemoryChannelMemoryChannelFileChannel将所有写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失传输单元,Flume数据传输的基本单元,以Event的形式将数据从送至目的地。EventHeaderBody两部分组成,HeadereventK-VBody2章FlumeFlume安装部安装地安装部 [atguigu@hadoop102[atguigu@hadoop102software]$tar-zxf/opt/software/apache-flume-1.9.0--C[atguigu@hadoop102module]$mv/opt/module/apache-flume-1.9.0-binlibguava-11.0.2.jarHadoop[atguigu@hadoop102lib]$rm/opt/module/flume/lib/guava- Flume案端口数据案1通过netcat工具向本机的44444端 2Flume 口发送数 通过Flume的source 数据oonclocalhost--conf-filejob/flume-net4444[atguigu@hadoop102software]$sudoyuminstall-y [atguigu@hadoop102 net]$sudonetstat-nlp|grep FlumeAgentflume-netcat-在 [atguigu@hadoop102[atguigu@hadoop102flume]$mkdirjob[atguigu@hadoop102flume]$cdjob/[atguigu@hadoop102job]$vimflume-netcat- 添加内容如下#Namethe添加内容如下#Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444#Describethesinka1.sinks.k1.type=logger#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c1#Namethecomponentsonthisa1.sources=a1:表示agenta1.sinks=k1:表示a1的Sinka1.channels= c1:表示a1的Channel#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444表示a1的输入源类型为netcat #Describethea1.sinks.k1.type= 表示a1的输出目的地是控制台logger#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=表示a1的channel传输时收集到了100条event#Bindthesourceandsinktothea1.sources.r1.channels=a1.sinks.k1.channel=表示将r1和c1先开启flume端[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea1--conf-filejob/flume-netcat-logger.conf-Dflume.root.logger=INFO,console[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent-cconf/-na1-fjob/flume-netcat-logger.conf-Dflume.root.logger=INFO,console--conf/-c:表示配置文件在--name/-nagent--conf-file/-f:flume本次启动的配置文件是在job文件夹下的flume--Dflume.root.logger=INFO,console:-Dflume运行时动态修改flume.root.loggerINFOlog、info、warn、[atguigu@hadoop102[atguigu@hadoop102~]$nclocalhost44444思考:nchadoop10244444,flume实时单个追加文案例需求:实时Hive日志,并上传到HDFS需求分析33开启生成日Hive日志文JAVAHOME=/opt/module/jdk1.8.0212JAVAHOME=/opt/module/jdk1.8.0212exportPATHJAVA_HOMEHADOOP_HOME[atguigu@hadoop102job]$vimflume-file- 日志在Linux系统中所以文件的类型选择:exec即execute执行的意思。表示执行Linux命令来文件。##Namethecomponentsonthisagenta2.sources=r2a2.sinks=k2a2.channels=c2#Describe/configurethesourcea2.sources.r2.type=execmand=tail-F#Describethea2.sinks.k2.typea2.sinks.k2.type=a2.sinks.k2.hdfs.pathhdfs://hadoop102:8020/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.filePrefixlogs-a2.sinks.k2.hdfs.round=truea2.sinks.k2.hdfs.roundValue1a2.sinks.k2.hdfs.roundUnithoura2.sinks.k2.hdfs.useLocalTimeStamptrue#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize=100a2.sinks.k2.hdfs.fileTypeDataStreama2.sinks.k2.hdfs.rollInterval=60a2.sinks.k2.hdfs.rollSize#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount=#Useachannelwhichbufferseventsinmemorya2.channels.c2.type=memorya2.channels.c2.capacity=1000a2.channels.c2.transactionCapacity=#Bindthesourceandsinktothechannela2.sources.r2.channels=c2a2.sinks.k2.channel=c2注意对于所有与时间相关的转义序列,EventHeader“timestamp(除非hdfs.useLocalTimeStamp设置为true,此方使用TimestampInterceptor自动timestampa3.sinks.k3.hdfs.useLocalTimeStamp=实实本地#Namethecomponentsa2.sources=r2a2.sinks=k2a2.channels=c2#Deribe/configurethesoa2.sources.r2.type=execmand=ta =#Describethesinka2.sinks.k2.type=hdfsa2.sinks.k2.hdfs.path=hdfs:a2.sinks.k2.hdfs.filePrefix=la2.sinks.k2.hdfs.round=trua2.sinks.k2.hdfs.roundUnit=a2.sinks.k2.hdfs.batchSize=a2.sinks.k2.hdfs.fileType=Da2.sinks.k2.hdfs.rollInterval=a2.sinks.k2.hdfs.rollSize=13a2.sinks.k2.hdfs.rollCount=[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea2--conf-fileHadoopHiveHive[atguigu@hadoop102hadoop-2.7.2]$sbin/start-dfs.sh[atguigu@hadoop103[atguigu@hadoop102hadoop-2.7.2]$sbin/start-dfs.sh[atguigu@hadoop103hadoop-2.7.2]$sbin/start-yarn.sh[atguigu@hadoop102hive]$bin/hivehive(default)>实时下多个新文案例需求:使用Flume整 SpooldirSpooldirHDFSMemory实文件到HDFS3向upload创建符合条件的flume4查看 5查看/opt/module/flume/upload [atguigu@hadoop102job]$vimflume-dir- a3.sources=r3a3.sinks=k3a3.channelsa3.sources=r3a3.sinks=k3a3.channels=c3#Describe/configurethesourcea3.sources.r3.type=spooldira3.sources.r3.spoolDir=/opt/module/flume/uploada3.sources.r3.fileSuffix=.COMPLETEDa3.sources.r3.fileHeader=true#忽略所有以.tmp结尾的文件,不上a3.sources.r3.ignorePattern=([^#Describethesinka3.sinks.k3.type=hdfsa3.sinks.k3.hdfs.pathhdfs://hadoop102:8020/flume/upload/%Y%m%d/%Ha3.sinks.k3.hdfs.filePrefixupload-a3.sinks.k3.hdfs.round=true#多少时间单位创建一个新的文件a3.sinksa3.sinks=a3.channels=#Describe/configurethesourca3.sources.r3.type=spooldira3.sources.r3.spoolDir=/opt/ma3.sources.r3.fileSuffix=.COMa3.sources.r3.fileHeader=truea3.sours.r3.ignorePattern=(#Describethesinka3.sinks.k3.type=hdfsa3.sinks.k3.hdfs.filrefix=upla3.sinks.k3.hdfs.round=truea3.sinks.k3.hdfs.roundValue=a3.sinks.k3.hdfs.roundUnit=ha3.sinks.k3.hdfs.batchSize=10a3.sinks.k3.hdfs.roundValue1a3.sinks.k3.hdfs.roundUnithoura3.sinks.k3.hdfs.useLocalTimeStamptrue#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize=100a3.sinks.k3.hdfs.fileTypeDataStreama3.sinks.k3.hdfs.rollInterval=60a3.sinks.k3.hdfs.rollSize=#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount=#Useachannelwhichbufferseventsinmemorya3.channels.c3.type=memorya3.channels.c3.capacity=1000a3.channels.c3.transactionCapacity=#Bindthesourceandsinktothechannela3.sources.r3.channels=c3a3.sinks.k3.channel=c3启动文件夹命[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea3--conf-file说明:在使用SpoolingDirectorySource时,不要在中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被文件夹每500毫秒扫描一次文件变动。在 [atguigu@hadoop102[atguigu@hadoop102flume]$mkdir[atguigu@hadoop102[atguigu@hadoop102upload]$touchatguigu.txt[atguigu@hadoop102upload]$touchatguigu.tmp[atguigu@hadoop102upload]$touchatguigu.log实时下的多个追加文Execsource适用于一个实时追加的文件,不能实现断点续传;SpooldirSource适合用于多个实时追加的文件,并且能够实现断点续传。案例需求:使用Flume整个的实时追加文件,并上传至实 文件到HDFS案3. 文件追加内 o>> o>>

TaildirSource HDFSSinkMemory1创建符合条件的flume配置文2执行配置文件,开

4查看上数3)3)实现步骤[atguigu@hadoop102job]$vimflume-taildir- a3.sources=r3a3.sinks=k3a3.channelsa3.sources=r3a3.sinks=k3a3.channels=c3#Describe/configurethesourcea3.sources.r3.type=TAILDIRa3.sources.r3.positionFile=/opt/module/flume/tail_dir.jsona3.sources.r3.filegroups=f1f2a3.sources.r3.filegroups.f1=/opt/module/flume/files/.*file.*a3.sources.r3.filegroups.f2=#Describethesinka3.sinks.k3.type=hdfsa3.sinks.k3.hdfs.pathhdfs://hadoop102:8020/flume/upload2/%Y%m%d/%Ha3.sinks.k3.hdfs.filePrefixupload-a3.sinks.k3.hdfs.round=truea3.sinks.k3.hdfs.roundValue1a3.sinks.k3.hdfs.roundUnit=a3.channelsa3.channels=#Describe/configurethesourca2.sources.r2.type=TAILDIRa2.sources.r2.positionFile=/opa2.sources.r2.filegroups=f1a2.sources.r2.filegroups.f1=/o#Describethesinka3.sinks.k3.type=hdfsa3.sinks.k3.hdfs.filePrefix=upla3.sinks.k3.hdfs.round=truea3.sinks.k3.hdfs.rodValue=a3.sinks.k3.hdfs.roundUnit=ha3.sinks.k3.hdfs.batchSize=10a3.sinks.k3.hdfs.useLocalTimeStamptrue#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize=100a3.sinks.k3.hdfs.fileTypeDataStreama3.sinks.k3.hdfs.rollInterval=60a3.sinks.k3.hdfs.rollSize=#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount=#Useachannelwhichbufferseventsinmemorya3.channels.c3.type=memorya3.channels.c3.capacity=1000a3.channels.c3.transactionCapacity=#Bindthesourceandsinktothechannela3.sources.r3.channels=c3a3.sinks.k3.channel=c3启动文件夹命[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea3--conf-file在 [atguigu@hadoop102[atguigu@hadoop102flume]$mkdir[atguigu@hadoop102[atguigu@hadoop102files]$o>>[atguigu@hadoop102files]$echoatguigu>>Taildir说明 到 inode号码来识别不同的文件,Unix/Linuxinode号码来3FlumeFlume事TTPut事务流doPut:将批数据先写入临时缓doCommit:检查channel内存队doRollbackchannel内存队列FlumeAgent内部原FlumeFlumeAgentChannelSelectors两种类型:ReplicatingChannelSelector(default)Multipl 45Selector。Replicating会将source过来的events发往所有channel,而Multiplexing以配置发往哪些ChannelCEventChannel。SinkProcessor共有三种类型,分别 DefaultSinkProcessorDefaultSinkProcessor对应的是单个的SinkLoadBalancingSinkProcessor和FailoverSinkProcessorSinkGroup,LoadBalancingSinkProcessor载均衡的功能,FailoverSinkProcessorFlume拓扑结简单串图FlumeAgent目的系统。此模式不建议桥接过多的flume数量,flume数量过多不仅会影响传输速flume图单source,多channel、Flume支持将流向一个或者多个目的地。这种模式可以将相同数据到多个channelchannel,sink图Flume转sink聚图FlumeAgentflume,flumehdfs、hive、hbaseFlume企业开发案使用Flume-1文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责HDFSFlume-1Flume-3,Flume-3LocalReplicatingReplicatingHive在 [atguigu@hadoop102job]$cd 在 [atguigu@hadoop102datas]$mkdir sinkhdfsflume-flume-dir。[atguigu@hadoop102group1]$vimflume-file- ##Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1k2a1.channels=c1c2#将数据流 给所有channela1.sources.r1.selector.type=replicating#Describe/configurethesourcea1.sources.r1.type=execmand=tail-F/opt/module/hive/logs/hive.log =/bin/bash-c#Describethe#sink端的avro是一个数据发送者a1.sinks.k1.type=avroa1.sinks.k1.hostnamehadoop102a1.sinks.k1.port=4141a1.sinks.k2.type=avroa1.sinks.k2.hostname=hadoop102a1.sinks.k2.port=4142#Describethechannela1.channels.c1.type=memorya1.channels.c1.capacitya1.channels.c1.capacity=a1.channels.c1.transactionCapacity=a1.channels.c2.type=memorya1.channels.c2.capacity=1000a1.channels.c2.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1c2a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2[atguigu@hadoop102group1]$vimflume-flume-Flume[atguigu@hadoop102group1]$vimflume-flume-##Namethecomponentsonthisagenta2.sources=r1a2.sinks=k1a2.channels=c1#Describe/configurethesourcesource端的avro是一个数据接收服务a2.sources.r1.type=avroa2.sources.r1.bind=hadoop102a2.sources.r1.port=4141#Describethesinka2.sinks.k1.type=hdfsa2.sinks.k1.hdfs.pathhdfs://hadoop102:8020/flume2/%Y%m%d/%Ha2.sinks.k1.hdfs.filePrefixflume2-a2.sinks.k1.hdfs.round=truea2.sinks.k1.hdfs.roundValue1a2.sinks.k1.hdfs.roundUnithoura2.sinks.k1.hdfs.useLocalTimeStamptrue#积攒多少个Event才flush到HDFS一次a2.sinks.k1.hdfs.batchSize=100a2.sinks.k1.hdfs.fileTypeDataStreama2.sinks.k1.hdfs.rollInterval=600#设置每个文件的滚动大小大128Ma2.sinks.k1.hdfs.rollSize=#文件的滚动与Event数量无关a2.sinks.k1.hdfs.rollCount=#Describethechannela2.channels.c1.type=memorya2.channels.c1.capacity=1000a2.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela2.sources.r1.channels=c1a2.sinks.k1.channel=c1[atguigu@hadoop102group1]$vimflume-flume-配置上级Flume输出的Source,输出是到本地 的[atguigu@hadoop102group1]$vimflume-flume-##Namethecomponentsonthisagenta3.sources=r1a3.sinks=k1a3.channels=c2#Describe/configurethesourcea3.sources.r1.type=avroa3.sources.r1.bind=hadoop102a3.sources.r1.port=4142#Describethesinka3.sinks.k1.type=file_rolla3.sinks.k1.sink.directory=#Describethechannela3.channels.c2.type=memorya3.channels.c2.capacity=1000a3.channels.c2.transactionCapacity=#Bindthesourceandsinktothechannela3.sources.r1.channels=c2a3.sinks.k1.channel=c2 flume-dirflume-hdfflum-[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea3--conf-file[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea2--conf-file[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea1--conf-fileHadoop[atguigu@hadoop102hadoop-2.7.2]$sbin/start-dfs.sh[atguigu@hadoop103[atguigu@hadoop102hadoop-2.7.2]$sbin/start-dfs.sh[atguigu@hadoop103hadoop-2.7.2]$sbin/start-yarn.sh[atguigu@hadoop102hive]$bin/hivehive(default)>检查 [atguigu@hadoop102[atguigu@hadoop102flume3]$总用量-rw-rw-r1atguiguatguigu59425月2200:091526918887550-使用Flume1一个端口,其sink组中的sink分别对接Flume2和Flume3,采用故障故障转移案Flume-logg e- ry flume-flume-MemoryMemoryChannelAvronclocalhost控台在 [atguigu@hadoop102job]$cd 1netcatsource1channel、1sinkgroup(2sink,分别输送给flume-flume-console1flume-flume-console2。[atguigu@hadoop102group2]$vimflume-netcat- ##Namethecomponentsonthisagenta1.sources=r1a1.channels=c1a1.sinkgroups=g1a1.sinks=k1k2#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444cessor.type=failovercessor.priority.k1=cessor.priority.k2=cessor.maxpenalty=#Describethesinka1.sinks.k1.type=avroa1.sinks.k1.hostname#Describethesinka1.sinks.k1.type=avroa1.sinks.k1.hostname=hadoop102a1.sinks.k1.port=4141a1.sinks.k2.type=avroa1.sinks.k2.hostname=hadoop102a1.sinks.k2.port=4142#Describethechannela1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinkgroups.g1.sinks=k1k2a1.sinks.k1.channel=c1a1.sinks.k2.channel=c1[atguigu@hadoop102group2]$vim[atguigu@hadoop102group2]$vimflume-flume-##Namethecomponentsonthisagenta2.sources=r1a2.sinks=k1a2.channels=c1#Describe/configurethesourcea2.sources.r1.type=avroa2.sources.r1.bind=hadoop102a2.sources.r1.port=4141#Describethesinka2.sinks.k1.type=logger#Describethechannela2.channels.c1.type=memorya2.channels.c1.capacity=1000a2.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela2.sources.r1.channels=c1a2.sinks.k1.channel=c1[atguigu@hadoop102group2]$vim[atguigu@hadoop102group2]$vimflume-flume-##Namethecomponentsonthisagenta3.sources=r1a3.sinks=k1a3.channels=c2#Describe/configurethesourcea3.sources.r1.type=avroa3.sources.r1.bind=hadoop102a3.sources.r1.port=4142#Describethesinka3.sinks.k1.type=logger#Describethechannela3.channels.c2.type=memorya3.channels.c2.capacity#Describethechannela3.channels.c2.type=memorya3.channels.c2.capacity=1000a3.channels.c2.transactionCapacity=#Bindthesourceandsinktothechannela3.sources.r1.channels=c2a3.sinks.k1.channel=c2[atguigu@hadoop102flume]$bin/flume-ngagent[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea3--conf-filejob/group2/flume-flume-console2.conf-Dflume.root.logger=INFO,console[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea2--conf-filejob/group2/flume-flume-console1.conf-Dflume.root.logger=INFO,console[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea1--conf-file$$nclocalhostFlume2Flume3Flume2killFlume3jps-mlFlume聚Flume-1Flume-2hadoop104Flume-3,Flume-3通过通过net向44444Flume[atguigu@hadoop102module]$xsync [atguigu@hadoop102[atguigu@hadoop102job]$mkdirgroup3[atguigu@hadoop103job]$mkdirgroup3[atguigu@hadoop104job]$mkdir配置Source用于hive.log文件,配置Sink输出数据到下一级Flume。hadoop102[atguigu@hadoop102group3]$vimflume1-logger- ##Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sources.r1.type=execmand=tail-F/opt/module/group.log =/bin/bash-c#Describethesinka1.sinks.k1.type=avroa1.sinks.k1.hostname=hadoop104a1.sinks.k1.port=4141#Describethechannela1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.channela1.sinks.k1.channel=[atguigu@hadoop102group3]$vimflume2-netcat-配置Source端口44444数据流,配置Sink数据[atguigu@hadoop102group3]$vimflume2-netcat-##Namethecomponentsonthisagenta2.sources=r1a2.sinks=k1a2.channels=c1#Describe/configurethesourcea2.sources.r1.type=netcata2.sources.r1.bind=hadoop103a2.sources.r1.port=44444#Describethesinka2.sinks.k1.type=avroa2.sinks.k1.hostname=hadoop104a2.sinks.k1.port=4141#Useachannelwhichbufferseventsinmemorya2.channels.c1.type=memorya2.channels.c1.capacity=1000a2.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela2.sources.r1.channels=c1a2.sinks.k1.channel=c1创建flume3-flume-在在hadoop104上编辑配置文[atguigu@hadoop104group3]$touchflume3-flume-logger.conf[atguigu@hadoop104group3]$vimflume3-flume-logger.conf##Namethecomponentsonthisagenta3.sources=r1a3.sinks=k1a3.channels=c1#Describe/configurethesourcea3.sources.r1.type=avroa3.sources.r1.bind=hadoop104a3.sources.r1.port=4141#Describethesink#Describethea3.sinks.k1.type=#Describethechannela3.channels.c1.type=memorya3.channels.c1.capacity=1000a3.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela3.sources.r1.channels=c1a3.sinks.k1.channel=c1分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-[atguigu@hadoop104[atguigu@hadoop104flume]$bin/flume-ngagent--confconf/--namea3--conf-filejob/group3/flume3-flume-logger.conf-Dflume.root.logger=INFO,console[atguigu@hadoop102flume]$bin/flume-ngagent--confconf/--namea2--conf-file[atguigu@hadoop103flume]$bin/flume-ngagent--confconf/--namea1--conf-file [atguigu@hadoop103module]$echo o'> [atguigu@hadoop102 nethadoop102 自定义发送到不同的分析此时会用到Flume拓扑结构中的MultiplexingMultiplexing的原理是,根据eventHeader的某个key的值,将不同的event到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予的日志,我们需要自定义interceptor(ChannelInterceptor和Interceptor和MultiplexingChannelSelector案Flume-Flume-控制台实时输入数netcatAvroMemoryMemoryFlume-AvroMultiplexingChannelm3AvroSourceloggerSink23amAvroSourceloggerflume-flume-Memory<artifactId>flume-ng-packagepackageimportorg.apache.flume.Context;importorg.apache.flume.Event;erceptor.Interceptor;importjava.util.List;publicclassCustomInterceptorimplementsInterceptorpublicvoidinitialize()}publicEventintercept(Eventevent)byte[]body=if(body[0]<'z'&&body[0]>'a'){event.getHeaders().put("type",}elseif(body[0]>'0'&&body[0]<'9'){event.getHeaders().put("type",}return}publicList<Event>intercept(List<Event>events){for(Eventevent:events){}return}publicvoidclose()}publicstaticclassBuilderimplementsInterceptor.Builder{publicInterceptorbuild()returnnew}publicvoidconfigure(Contextcontext)}}}#Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1k2a1.channels=c1c2#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444erceptors=i1erceptors.i1.type=a1.sources.r1.selector.type=multiplexinga1.sources.r1.selector.header=type .letter=c1 .number=c2#Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1k2a1.channels=c1c2#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444erceptors=i1erceptors.i1.type=a1.sources.r1.selector.type=multiplexinga1.sources.r1.selector.header=type .letter=c1 .number=c2#Describethesinka1.sinks.k1.type=avroa1.sinks.k1.hostname=hadoop103a1.sinks.k1.port=4141a1.sinks.k2.hostname=hadoop104a1.sinks.k2.port=4242#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Useachannelwhichbufferseventsinmemorya1.channels.c2.type=memorya1.channels.c2.capacity=1000a1.channels.c2.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1c2a1.sinks.k1.channel=c1a1.sinks.k2.channel=c2hadoop103Flume4avrosourceloggersinka1.sources=r1a1.sinks=k1a1.channelsa1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=avroa1.sources.r1.bind=hadoop103a1.sources.r1.port=4141a1.sinks.k1.typea1.sinks.k1.type=a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=a1.sinks.k1.channel=c1a1.sources.r1.channels=c1hadoop104Flume3avrosourceloggersinka1.sources=r1a1.sinks=k1a1.channelsa1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=avroa1.sources.r1.bind=hadoop104a1.sources.r1.port=4242a1.sinks.k1.type=a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=a1.sinks.k1.channel=c1a1.sources.r1.channels=c1自定义Source是负责接收数据到FlumeAgent的组件。Source组件可以处理各种类型、各种source。根据说明自定义MySource需要继承 getBackOffSleepIncrement()//backoff步长getMaxBackOffSleepInterval()//backoff最长时间configure(Contextcontext)//初始化context(配置文件内容)process()/eventchannel,这个方法将被循环调用。使用场景:MySQL数据或者其他文件系统。自定义Source需1自定义Source需1编写自定义Source代码创建符合条件的flume配置文执行配置文件,开4查看控制LoggerMemory自自定义Source需求分configure(Contextcontext): process():接收数据,将数据封装成一个个的Event,写入Channel。使用for循环模拟数据生成。for(inti=0;i<5;i++)getMaxBackOffSleepInterval():暂不<artifactId>flume-ng-packagepackageimportimportorg.apache.flume.EventDeliveryException;importorg.apache.flume.PollableSource;importorg.apache.flume.conf.Configurable;importorg.apache.flume.event.SimpleEvent;importorg.apache.flume.source. importpublicclassMySourceSourceimplementsConfigurable,PollableSource//定义配置文件将来 的字privateLongdelay;privateStringfield;publicvoidconfigure(Contextcontext){delay=context.getLong("delay");field=context.getString("field", }publicStatusprocess()throwsEventDeliveryExceptiontry//创 头信HashMap<String,String>hearderMap=new//SimpleEventevent=new//for(inti=0;i<5;i++)// 设置头信// 设置内event.setBody((field+ 写入channel}}catch(Exceptione){returnStatus.BACKOFF;}return}publiclonggetBackOffSleepIncrement(){return0;}publiclonggetMaxBackOffSleepInterval(){return0;}} ##Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethea1.sources.r1.typea1.sources.r1.type=com.atguigu.MySourcea1.sources.r1.delay=1000#a1.sources.r1.field=atguigu#Describethesinka1.sinks.k1.type=logger#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c1[atguigu@hadoop102[atguigu@hadoop102flume]$[atguigu@hadoop102flume]$bin/flume-ngagent-cconf/-fjob/mysource.conf-na1-自定义Sink不断地轮询Channel中的且批量地移除它们,并将这些批量写入到FlumeAgent。Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量一旦成功写出到系统或下一个FlumeAgent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除。Sink。根据说明自定义MySink需要继承 Sink类并实现Configurable接口。configure(Contextcontext)//初始化context(配置文件内容)(event,使用场景:Channel数据写入MySQL或者其他文件系统。flumeSinkflume1.1.编 任务配置文件中的配置信息process():从Channel中取数据,添加前后缀,写入日志2.打包到集群并编写任务配置文3.数据 packagepackageimportimportorg.apache.flume.conf.Configurable;importorg.apache.flume.sink.Sink;importorg.slf4j.Logger;importpublicclassMySinkSinkimplementsConfigurable//创建Logger对privatestaticfinalLoggerLOG=privateStringprefix;privateStringpublicStatusprocess()throwsEventDeliveryException 返回值状态信Status//获取当前Sink绑定的ChannelChannelch//获取事Transactiontxn=Event Channel中while(true),直 结束循event=ch.take();if(event!=null){}}try//处 (打印LOG.info(prefix+newString(event.getBody())+//status=}catch(Exceptione)status=}finally}return}publicvoidconfigure(Contextcontext) prefix=context.getString("prefix", suffix=}} ##Namethecomponentsonthisagenta1.sources=r1a1.sinks=k1a1.channels=c1#Describe/configurethesourcea1.sources.r1.type=netcata1.sources.r1.bind=localhosta1.sources.r1.port=44444#Describethea1.sinks.k1.type=com.atguigu.MySink#a1.sinks.k1.prefix=atguigu:a1.sinks.k1.suffix=:atguigu#Useachannelwhichbufferseventsinmemorya1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=#Bindthesourceandsinktothechannela1.sources.r1.channels=c1a1.sinks.k1.channel=c1[atguigu@hadoop102[atguigu@hadoop102flume]$bin/flume-ngagent-cconf/-fjob/mysink.conf-na1[atguigu@hadoop102~]$nclocalhost44444oFlume数据流Gangliagweb(GangliaWeb)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所PHPWebgwebgmetadgmod102103104epel-[atguigu@hadoop102[atguigu@hadoop102flume]$sudoyum-yinstallepel-102[atguigu@hadoop102[atguigu@hadoop102flume]$sudoyum-yinstallganglia-gmetad[atguigu@hadoop102flume]$sudoyum-yinstallganglia-web[atguigu@hadoop102flume]$sudoyum-yinstallganglia-gmond[atguigu@hadoop102flume]$sudo[atguigu@hadoop102flume]$sudoyum-yinstallganglia-在102修改配置文件[atguigu@hadoop102[atguigu@hadoop102flume]$sudovim##GangliamonitoringsystemphpwebfrontendAlias/ganglia<Location/ganglia>#Requirelocal#通过 ganglia,需要配置Linux对应的主机(windows)ip地Requireip#Requireip#Requirehost102修改配置文件[atguigu@hadoop102[atguigu@hadoop102flume]$sudovimdata_sourcedata_source"mycluster"102103104[atguigu@hadoop102[atguigu@hadoop102flume]$sudovim修改为clustername="mycluster"owner="unspecified"latlong="unspecified"url="unspecifi

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论