大数据采集与预处理(微课版) 课件 项目5、6 动态网页访问日志数据采集、动态网页数据预处理_第1页
大数据采集与预处理(微课版) 课件 项目5、6 动态网页访问日志数据采集、动态网页数据预处理_第2页
大数据采集与预处理(微课版) 课件 项目5、6 动态网页访问日志数据采集、动态网页数据预处理_第3页
大数据采集与预处理(微课版) 课件 项目5、6 动态网页访问日志数据采集、动态网页数据预处理_第4页
大数据采集与预处理(微课版) 课件 项目5、6 动态网页访问日志数据采集、动态网页数据预处理_第5页
已阅读5页,还剩90页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

项目5

动态网页访问日志数据采集使用Flume采集某官网访问日志数据目录Content1使用Kafka消费Flume中某官网访问日志数据

2项目导言零

日志中通常包含了大量的用户访问网站的行为信息,比如页面响应时间、消耗的流量、访问时间、停留时间、是否访问成功等等一系列的信息,网站运营者可通过分析这些信息,对网站运营做出一定的决策。学习目标零知识目标了解什么是日志采集;熟悉日志数据采集的方法;掌握Flume进行数据采集的方法;精通使用Kafka进行消息订阅发布的方法技能目标具备使用Flume拦截器过滤数据的能力;具备熟悉Kafka的脚本操作的能力;具备掌握Flume采集日志文件的能力;具备掌握使用Kafka进行消息发布订阅的能力;素养目标具备精通Flume+Kafka架构实现数据采集的能力。技能目标具备事业心和责任心;具有艰苦奋斗的精神和务实作风;具有健康的体魄,良好的身体素质;具备责任心。任务5-1使用Flume采集某官网访问日志数据壹Flume简介任务技能Flume是一个由Cloudera开发的分布式、可靠和高可用的海量日志收集和传输系统,具有基于流数据的简单灵活的架构。Flume数据采集配置Flume实现数据采集的方式非常简单,只需编辑相应的配置文件即可完成特定数据的采集与存储。Flume启动启动Flume代理使用名为flume-ng的shell脚本,该脚本位于Flume发行版的bin目录中。Flume拦截器配置Flume能够在数据采集的过程中对事件进行删除或修改,可理解为对事件进行过滤,这些功能是靠拦截器完成的。任务5-1使用Flume采集某官网访问日志数据壹1Flume简介Flume是一个由Cloudera开发的分布式、可靠和高可用的海量日志收集和传输系统,具有基于流数据的简单灵活的架构。它具有可调整的可靠性机制以及故障转移和恢复机制,具有健壮性和容错性。Flume-og,随着功能的逐渐增多和完善,其存在的缺点也逐渐的暴露出来,Flume-og部分缺点如下:2核心组件设计不合理代码过于臃肿14“日志传输”十分不稳定核心配置缺乏标准3任务5-1使用Flume采集某官网访问日志数据壹(1)Flume三层架构Flume由三层架构组成,分别为agent、collector和storage。Agent层01包含Flume的Agent组件,与需要传输数据的数据源进行连接Collector02通过多个收集器收集Agent层的数据,然后将这些转发到下一层storage03接收collector层的数据并存储任务5-1使用Flume采集某官网访问日志数据壹(1)Flume三层架构Fume中包含三个核心组件分别为Source、Channel和Sink,组件说明如下所示。Sink下沉组件,负责取出Channel中的消息数据,将channel中的event数据发送到文件存储系统或服务器等。采集组件,用于与数据源进行对接,获取数据。Source支持的数据源包括Avro、Thrift、exec、JMS、spoolingdirectory、netcat、sequencegenerator、syslog、http、legacy、自定义类型。SourceChannel是一个缓存区,用于链接Source和Sink组件,缓存Source写入的Event任务5-1使用Flume采集某官网访问日志数据壹(2)Flume扇入与扇出

扇入流(数据流合并)0102Flume日志收集中常见的场景是将客户端大量生成的日志数据发送到存储子系统或一消费者代理。任务5-1使用Flume采集某官网访问日志数据壹(2)Flume扇入与扇出

扇出流(数据流复用)Flume不仅能够同时采集多个服务器的数据到同一个服务器中,还能够将数据采集并分别保存到不同服务器中。任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置

Flume实现数据采集的方式非常简单,只需编辑相应的配置文件即可完成特定数据的采集与存储,配置文件中需要对数据源(source)、通道(channel)和接收器(sink)进行配置。1.数据源配置

用于监听Avro端口并接收来自外部Avro客户端的事件流。当两个Flumeagent的AvroSink和AvroSource对应时,可组成分层集合拓扑结构。(1)AvroSource属性描述channels指定通道的名称type组件类型名称,值为avrobind监听的主机名或IP地址port监听的端口任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置1.数据源配置

ExecSource的配置是指设置一个Linux或Unix命令,通过该命令不断输出数据,当命令进程退出时ExecSource也会退出,后续不会产生下一步数据。ExecSource和其他异步源的问题在于,如果将事件放入Channel失败,无法保证数据的完整性。(2)Exec

Source属性描述channels指定通道的名称type组件类型名称,值为execcommand要执行的命令#配置一个agent,agent的名称可自定义#指定agent的sources(如r1)、channels(如c1)a1.sources=r1a1.channels=c1a1.sources.r1.type=execmand=tail-F/var/log/securea1.sources.r1.channels=c1任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置

SpoolingDirectorySource能够通过磁盘中的目录摄取数据,当监控的制定目录中出现新文件时,会将文件作为source进行处理,一旦文件被放到“自动收集”目录中后,便不能修改和重命名。(3)SpoolingDirectorySource属性描述channels指定通道的名称type组件类型名称,值为spooldirspoolDir读取文件的目录。fileHeader是否添加存储绝对路径文件名的文件头。#配置一个agent,agent的名称可自定义#指定agent的sources(如r1)、channels(如c1)a1.sources=r1a1.channels=c1a1.sources.r1.type=spooldira1.sources.r1.channels=c1a1.sources.r1.spoolDir=/var/log/apache/flumeSpoola1.sources.r1.fileHeader=true任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置

Flume能够作为Kafka的消费者,从Kafka中读取消息,当有多个kafka源在运行时,可以使用相同的ConsumerGroup

以便读取一组唯一的主题分区。(4)KafkaSource属性描述channels指定通道的名称type组件类型名称,值为org.apache.flume.source.kafka.KafkaSourcekafka.bootstrap.serversKafka集群中的broker列表kafka.topicsKafka消费者将从中读取消息的以逗号分隔的主题列表。kafka.topics.regex定义源订阅的主题集的正则表达式。此属性的优先级高于kafka.topicsbatchSize一批写入Channel的最大消息数batchDurationMillis将批次写入通道之前的最长时间(以毫秒为单位)只要达到大小和时间中的第一个,就会写入批次。kafka.consumer.group.id消费者组的唯一标识。任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置

监听指定端口并将每一行文本转换为一个事件。与命令nc

-k

-l

[host]

[port]类似。接收的数据是以换行符分隔的文本。每一行文本都会变成一个Flume事件并通过连接的通道发送。(5)NetCatTCPSource属性描述channels指定通道的名称type组件类型名称,值为spooldirbind监听的主机IP地址port监听的主机端口号#配置一个agent,agent的名称可自定义#指定agent的sources(如r1)、channels(如c1)a1.sources=r1a1.channels=c1a1.sources.r1.type=netcata1.sources.r1.bind=a1.sources.r1.port=6666a1.sources.r1.channels=c1任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(1)MemoryChannel

使用内存作为源与接收器之间的保留区称之为内存通道,内存速度比磁盘快数倍,所以数据传输的速度也会随之加快。2.通道配置属性描述type组件类型名称,需要设置为memorycapacitychannel里存放的最大event数默认100transactionCapacityChannel每次提交的Event数量默认100byteCapacityBufferPercentage定义byteCapacity和通道中所有事件的估计总大小之间的缓冲区百分比byteCapacity设置通道中所有事件在内存中的大小总和任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(2)FileChannel

使用磁盘作为源与接收器之间的保留区称之为文件通道,由于文件通道使用的是磁盘,所以会比内存通道速度慢。文件通道优点在于当遇到断电、硬件故障等能够导致Flume采集中断的问题时,重启Flume即可不会造成数据丢失2.通道配置属性描述type组件类型名称,需要设置为filecheckpointDir存储检查点文件的目录dataDirs用于存储日志文件。在不同磁盘上使用多个目录可以提高文件通道性能任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(1)HDFSSinks

此接收器数据写入Hadoop分布式文件系统(HDFS),持创建文本和序列文件,使用此接收器需要安装Hadoop,以便Flume与HDFS集群进行通信。3.接收器配置属性默认值描述channel-指定通道的名称type-组件类型名称,需要为hdfshdfs.path-写入hdfs的路径(必选)hdfs.filePrefixFLumeData文件名前缀hdfs.fileSuffix-文件名后缀hdfs.inUsePrefix-用于flume主动写入的临时文件的前缀hdfs.inUseSuffix.tmp用于flume主动写入的临时文件的后缀hdfs.rollInterval30按时间生成HDFS文件,单位:秒hdfs.rollSize1024触发滚动的文件大小,以字节为单位(0:从不根据文件大小进行滚动)hdfs.rollCount10按事件数量生成新文件hdfs.idleTimeout0关闭非活动文件的超时(0=禁用自动关闭空闲文件)hdfs.fileTypeSequenceFile文件格式,包括:SequenceFile,DataStream,CompressedStreamhdfs.writeFormatWritable写文件的格式。包含:Text,Writablehdfs.roundfalse是否对时间戳四舍五入hdfs.roundValue1将其四舍五入到最高倍数hdfs.roundUnitsecond向下取整的单位,可选值包括second,minuteorhourhdfs.minBlockReplicas-设置HDFS块的副本数,默认采用Hadoop的配置hdfs.useLocalTimeStampFalse在替换转义序列时使用本地时间(而不是事件标头中的时间戳)任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(1)HDFSSinks

HDFSSink能够根据时间、数据大小、事件数定期滚动文件(关闭当前文件并创建新文件)或按时间戳、事件起源的机器等属性对数据进行存储/分区。此时在hdfs.path设置中会包含格式转义序列,这些转义序列将被HDFS接收器替换以生成目录/文件名来存储事件。3.接收器配置转义序列描述%{host}主机名%t以毫秒为单位的Unix时间%a语言环境的简短工作日名称(周一、周二、...)%A语言环境的完整工作日名称(星期一、星期二、...)%b语言环境的短月份名称(Jan、Feb、...)%B语言环境的长月份名称(一月,二月,...)%c语言环境的日期和时间(2005年3月3日星期四23:05:25)%d一个月中的某天(01)%e没有填充的月份中的某一天(1)%D日期;与%m/%d/%y相同%H小时,24小时制(00…23)%I小时,12小时制%j一年中的一天(001..366)%k小时(0..23)%m月(01..12)%n简短月份(1..12)%M分钟(00..59)%p语言环境相当于am或pm%s自1970-01-0100:00:00UTC以来的秒数%S秒(00..59)%y年份的最后两位数字(00..99)%Y年(2022)转义序列描述%{host}主机名%t以毫秒为单位的Unix时间%a语言环境的简短工作日名称(周一、周二、...)%A语言环境的完整工作日名称(星期一、星期二、...)%b语言环境的短月份名称(Jan、Feb、...)%B语言环境的长月份名称(一月,二月,...)%c语言环境的日期和时间(2005年3月3日星期四23:05:25)%d一个月中的某天(01)%e没有填充的月份中的某一天(1)%D日期;与%m/%d/%y相同%H小时,24小时制(00…23)%I小时,12小时制%j一年中的一天(001..366)%k小时(0..23)%m月(01..12)%n简短月份(1..12)%M分钟(00..59)%p语言环境相当于am或pm%s自1970-01-0100:00:00UTC以来的秒数%S秒(00..59)%y年份的最后两位数字(00..99)%Y年(2022)任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(2)HiveSink

HiveSink接收器能够将包含特定分隔符的文件或JSON数据的事件流传输到Hive表中。由于事件是基于Hive事务编写的,所以一旦Flume将一组事件提交到Hive后,在Hive表中可立即查询,需要注意的是使用HiveSink时需要将Hive表创建为事务表,并开始Hive的事务。3.接收器配置属性默认值描述channel-指定通道的名称type-组件类型名称,需要为hivehive.metastore-HiveMetastoreURI(例如thrift://abcom:9083)hive.database-Hive数据库名称hive.table-Hive中的表名称hive.partition-使用逗号分隔的分区值列表,标识要写入的分区。例如:如果表按(continent:string,country:string,time:string)分区,则'Asia,India,2014-02-26-01-21'将表示continent=Asia,country=India,time=2014-02-26-01-21heartBeatInterval240设置以秒为单位的心跳检测时间间隔,设置为0表示禁用心跳检测autoCreatePartitionstrueFlume将自动创建必要的Hive分区batchSize15000在单个Hive事务中写入Hive的最大事件数maxOpenConnections500打开链接的最大数量,如果超过此数量,则关闭最近最少使用的连接callTimeout10000Hive和HDFSI/O操作的超时时间,单位为秒serializer

序列化器,负责从事件中解析出字段并映射到配表中的列roundUnitminute向下取整的单位,可选值包括second,minuteorhourroundValue1向下舍入到此的最高倍数(在使用hive.roundUnit配置的单位中),小于当前时间timeZoneLocalTime用于解析分区中的转义序列的时区名称,例如America/Los_AngelesuseLocalTimeStampFalse在替换转义序列时使用本地时间(而不是事件标头中的时间戳)任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(3)AvroSink

该接收器与AvroSource可构成分层架构。发送到此接收器的Flume事件将转换为Avro事件并发送到配置的主机名/端口。3.接收器配置属性默认值描述channel-指定通道的名称type-组件类型名称,需要为avrohostname-监听的主机名或IP地址port-监听的端口号。任务5-1使用Flume采集某官网访问日志数据壹2Flume数据采集配置(4)FileRollSink

该接收器能够将事件存储在本地文件系统上。3.接收器配置属性默认值描述channel-指定通道的名称type-组件类型名称,需要为file_rollsink.directory-将存储文件的目录sink.rollInterval30每30秒滚动一次文件。指定0将禁用滚动并导致将所有事件写入单个文件任务5-1使用Flume采集某官网访问日志数据壹3Flume启动启动Flume代理使用名为flume-ng的shell脚本,该脚本位于Flume发行版的bin目录中。启动时需要在命令中指定代理名称、配置目录以及配置文件,并且设置的代理名称要和配置文件中的代理名称一致,启动数据采集的命令如下。bin/flume-ngagent--nameagent_name--confconf–conf-fileconf/perties.template任务5-1使用Flume采集某官网访问日志数据壹3Flume启动参数描述--name指定Agent的名称(必选)--conf指定配置文件所在目录--conf-file指定配置文件任务5-1使用Flume采集某官网访问日志数据壹3Flume启动启动Flumeagent进行数据采集,命令如下所示。[root@masterflume-code]#cd/usr/local/flume/bin[root@masterbin]#./flume-ngagent--confconf--conf-file/usr/local/inspur/code/flume-code/example.conf--namea1任务5-1使用Flume采集某官网访问日志数据壹3Flume启动

单独启动另一个终端,使用telnet向4444向Flume发送事件,启动成功后输入“Helloworld!”,命令如下所示。[root@master~]#telnetlocalhost4444任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置

时间拦截器(TimestampInterceptor)1静态拦截器(StaticInterceptor)3正则表达式过滤拦截器(RegexFilteringInterceptor)5主机拦截器(HostInterceptor)2搜索和替换拦截器(SearchandReplaceInterceptor)4任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置(1)时间拦截器(TimestampInterceptor)时间拦截器将处理事件的时间(以毫秒为单位)插入到事件头中。并根据时间戳将数据写入到不同文件中,当不使用任何拦截器时,Flume接收到的只有message。属性默认值描述type-组件类型名称,值为timestampheaderNametimestamp生成的时间戳的标头的名称preserveExistingfalse如果时间戳已经存在,是否应该保留值为true或falsea1.sources=r1a1.channels=c1a1.sources.r1.channels=c1a1.sources.r1.type=avroerceptors=i1erceptors.i1.type=timestamp任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置(2)主机拦截器(HostInterceptor)

该拦截器向事件中添加包含当前Flume代理的主机名或IP地址,主要表现形式为在HDFS中显示以Flume主机IP地址做为前缀的文件名。属性默认值描述type-组件类型名称,值为hostpreserveExistingfalse如果主机头已经存在是否保留,值为true或falseuseIPtrue如果为true,请使用IP地址,否则使用主机名hostHeaderhost要使用的文件头a1.sources=r1a1.channels=c1erceptors=i1erceptors.i1.type=host任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置(3)静态拦截器(StaticInterceptor)静态拦截器能够将具有静态值的静态事件头附加到所有事件,同一个静态拦截器中不能够设置多个事件头,但可以设置多个静态拦截器。属性默认值描述type-组件类型名称,值为staticpreserveExistingtrue配置的标头已经存在,是否应该保留。值为true或falsekeykey事件头名称valuevalue静态值a1.sources=r1a1.channels=c1a1.sources.r1.channels=c1a1.sources.r1.type=avroerceptors=i1erceptors.i1.type=staticerceptors.i1.key=datacentererceptors.i1.value=NEW_YORK任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置(4)搜索和替换拦截器(SearchandReplaceInterceptor)该拦截器提供了基于Java正则表达式的字符串的搜索和替换功能。属性默认值描述type-组件类型名称,值为search_replacesearchPatterntrue要搜索和替换的模式replaceStringkey替换字符串charsetvalue事件主体的字符集。默认情况下假定为UTF-8erceptors=search-replaceerceptors.search-replace.type=search_replace#删除事件正文中的前导字母数字字符。erceptors.search-replace.searchPattern=^[A-Za-z0-9_]+erceptors.search-replace.replaceString=任务5-1使用Flume采集某官网访问日志数据壹4Flume拦截器配置(5)正则表达式过滤拦截器(RegexFilteringInterceptor)该拦截器通过将事件主体解释为文本,并将文本与配置的正则表达式匹配完成事件的过滤。属性默认值描述type-组件类型名称,值为regex_filterregex“*”用于匹配事件的正则表达式excludeEventsfalse如果为true,则正则表达式确定要排除的事件,否则正则表达式确定要包含的事件。erceptors=i1erceptors.i1.type=regex_filtererceptors.i1.regex=(\\d):(\\d):(\\d)安装启动httpdf服务Step1启动FlumeStep2浏览器访问httpd服务器Step3查看采集到本地文件系统的数据Step4访问日志数据使用Flume的扇出架构,采集httpd服务器日志数据到HDFS分布式文件系统和本地文件系统任务5-1使用Flume采集某官网访问日志数据壹任务5-2使用Kafka消费Flume中某官网访问日志数据贰1Kafka简介Kafka是由Apache软件基金会开发的一个开源流处理平台。是一个快速、可扩展的、分布式的、分区的、可复制的基于zookeeper协调的分布式日志系统,用于web/nginx日志、访问日志、消息服务等,Linkedin于2010年将Kafka贡献给了Apache基金会并成为顶级开源项目。持久性、可靠性高并发高吞吐量、低延迟容错性可扩展性23451任务5-2使用Kafka消费Flume中某官网访问日志数据贰1Kafka简介任务5-2使用Kafka消费Flume中某官网访问日志数据贰2Kafka配置1将zookeeper安装包上传到“/usr/local”目录下并解压重命名为zookeeper2将模板文件“zoo_sample.cfg”文件复制重命名为“zoo.cfg”3进入zookeeper安装目录的“bin”目录,启动zookeeper并查看进程5在Kafka的“/bin”目录下后台启动Kafka进程4将Kafka安装包上传到“/usr/local”目录下解压并重命名为kafka任务5-2使用Kafka消费Flume中某官网访问日志数据贰3Kafka脚本操作Kafka为开发人员提供了众多脚本对Kafka进行管理和相关的操作,其中常用的脚本包含Topic脚本、生产者脚本、消费者脚本等。任务5-2使用Kafka消费Flume中某官网访问日志数据贰3Kafka脚本操作1.Topic脚本Topic脚本文件名为“kafka-topic.sh”,通过该脚本文件能够完成创建主题、查看主题、修改主题、删除主题等操作。参数描述--alter修改主题--config<String:name=value>创建或修改主题时,用于设置主题级别的参数--create创建主题--delete删除主题--describe查看主题的详细信息--disable-rack-aware创建主题时不考虑机架信息--help打印帮助信息--if-exists修改或删除主题时使用,只有当主题存在时才会执行动作--if-not-exists创建主题时使用,只有主题不存在时才会执行动作--list列出所有可用的主题--partitons分区数创建主题或增加分区时指定的分区数--replica-assignment分配方案手工指定分区副本分配方案--replication-factor副本数创建主题时指定副本数--topic主题名称指定主题名称--topics-with-overrides使用describe查看主题信息时,只展示包含覆盖配置的主题--unavailable-partitions使用describe查看主题信息时,只展示包含没有leader副本的分区--under-replicated-partitions使用describe查看主题信息时,只展示包含失效副本的分区--bootstrap-server链接的服务器地址和端口(必需)任务5-2使用Kafka消费Flume中某官网访问日志数据贰3Kafka脚本操作2.生产者脚本

生产者脚本文件名为“kafka-console-producer.sh”,该脚本能够生产创建消息并将消息发送到消费者。参数描述--bootstrap-server要连接的服务器必需(除非指定–broker-list)--topic(必需)topic名称--batch-size单个批处理中发送的消息数(默认值:16384)--compression-codec压缩编解码器,可选值“none”、“gzip”、“snappy”、“lz4”或“zstd”。如果未指定值,则默认为“gzip”--max-block-ms在发送请求期间,生产者将阻止的最长时间(默认值:60000)--max-memory-bytes生产者用来缓冲等待发送到服务器的总内存(默认值:33554432)--max-partition-memory-bytes为分区分配的缓冲区大小(默认值:16384)--message-send-max-retries最大的重试发送次数(默认值:3)--metadata-expiry-ms强制更新元数据的时间阈值(ms)(默认值:300000)--producer-property将自定义属性传递给生成器的机制--producer.config生产者配置属性文件[--producer-property]优先于此配置配置文件完整路径--property自定义消息读取器--request-required-acks生产者请求的确认方式--request-timeout-ms生产者请求的确认超时时间--retry-backoff-ms生产者重试前,刷新元数据的等待时间阈值--socket-buffer-sizeTCP接收缓冲大小--timeout消息排队异步等待处理的时间阈值--sync同步发送消息任务5-2使用Kafka消费Flume中某官网访问日志数据贰3Kafka脚本操作3.消费者脚本

消费者脚本文件名为“kafka-console-consumer.sh”,该脚本接收并消费生产者发送的消息。参数描述--group指定消费者所属组的ID--topic被消费的topic--partition指定分区;除非指定–offset,否则从分区结束(latest)开始消费--offset执行消费的起始offset位置;默认值:latest;/latest/earliest/偏移量--whitelist正则表达式匹配topic;–topic就不用指定了;匹配到的所有topic都会消费;当然用了这个参数,--partition--offset等就不能使用了--consumer-property将用户定义的属性以key=value的形式传递给使用者--consumer.config消费者配置属性文件请注意,[consumer-property]优先于此配置--property初始化消息格式化程序的属性--from-beginning从存在的最早消息开始,而不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了--max-messages消费的最大数据量,若不指定,则持续消费下去--skip-message-on-error如果处理消息时出错,请跳过它而不是暂停--isolation-level设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted任务5-2使用Kafka消费Flume中某官网访问日志数据贰4KafkaPythonAPI生产者API顾名思义,就是指应用程序发送数据流到Kafka集群中的topic中,供消费者消费使用,生产者是线程安全的,跨线程共享单个生产者实例通常比拥有多个实例更快。1.ProducerAPI(生产者API)kafka.KafkaProducer(关键字参数)参数描述bootstrap_server要连接的服务器必需(除非指定–broker-list)batch-size单个批处理中发送的消息数(默认值:16384)compression_type生产者生成的所有数据的压缩类型。有效值为“gzip”、“snappy”、“lz4”或“无”max_block_ms在发送请求期间,生产者将阻止的最长时间(默认值:60000)buffer_memory生产者用来缓冲等待发送到服务器的总内存(默认值:33554432)metadata_max_age_ms强制更新元数据的时间阈值(ms)(默认值:300000)--request-timeout-ms生产者请求的确认超时时间,默认值:30000--retry-backoff-ms生产者重试前,刷新元数据的等待时间阈值,默认值:100任务5-2使用Kafka消费Flume中某官网访问日志数据贰4KafkaPythonAPI消费者API主要用于接收生产者集群发出的消息,并且消费者不是线程安全的,不应使用跨线程共享。2.ConsumerAPI(消费者API)kafka.KafkaConsumer(topics,

关键字参数)参数描述topics(str)要订阅的可选主题列表bootstrap_servers要连接的服务器client_id(str)此客户端的名称。此字符串在每个请求中传递给服务器,可用于标识与此客户端对应的特定服务器端日志条目group_id消费者组ID任务5-2使用Kafka消费Flume中某官网访问日志数据贰4KafkaPythonAPI通过消费者API成功接收生产者消息后,会返回一个消息集合,每条消息会包含topic、消息所在的分区、消息所在分区的位置、消息的内容、消息的key,对应获取方法如表所示。方法描述partition消息所在的分区offset消息所在分区的位置key消息的keyvalue消息的内容

在“/usr/local/inspur/code/pykafka”目录中编写python代码,使用API创建消费者接收“test-topic”中的消息。[root@masterpykafka]#vimconsumer-demo.py#代码如下consumer=KafkaConsumer('test-topic',bootstrap_servers='localhost:9092')formessageinconsumer:print("topic=%s,partition=%d,offset=%d,value=%s"%(message.topic,message.partition,message.offset,message.value))[root@masterpykafka]#pythonconsumer-demo.py创建名为“access_kafka.conf”的Flume配置文件Step1启动Flume数据采集Step2启动消费者接收名为“flume-topic”的topic中的消息Step3使用PythonAPI编写消费者程序Step4Kafka访问日志数据使用Flume+Kafka完成使用Flume监控httpd的日志数据,并作为生产者发送到topic,使用Kafka消费者进行接收。任务5-2使用Kafka消费Flume中某官网访问日志数据贰与您共同成长项目6

动态网页数据预处理使用Pandas实现新闻动态网页数据预处理目录Content1使用Pig实现浪潮云说网页数据预处理2使用ELK实现某官网日志数据预处理3项目导言零数据预处理是指将数据的缺失值、脏数据、数据格式等进行调整处理,由于数据采集过程中会因为数据的来源不统一造成数据格式的混乱,当使用这些原始数据进行数据分析时无法为决策提供有效的帮助,对数据进行预处理能够有效解决这些问题,那么如何对数据进行预处理,数据预处理使用的工具和方法有哪些呢?学习目标零知识目标了解什么是数据预处理;熟悉数据预处理的方法;掌握Pandas、Pig以及ELK进行数据预处理的方法;精通Pig以及ELK环境搭建方法;技能目标具备了解Pandas和Pig数据预处理的能力;具备熟悉Pandas和Pig数据预处理机制的能力;素养目标具备掌握Pandas和Pig数据预处理方法的能力;具备掌握ELK数据预处理方法及流程的能力;具备精通数据预处理的能力。技能目标具备团队意识;具备良好的解决问题的能力;具备较强的学习能力;任务6-1使用Pandas实现新闻动态网页数据预处理壹Pandas简介及安装Pandas数据结构Pandas基本功能汇总和描述统计处理缺失数据0102030405任务6-1使用Pandas实现新闻动态网页数据预处理壹1Pandas简介及安装Pandas是Python的核心开源数据分析支持库。Pandas是基于NumPy开发用于完成数据分析开发的数据分析工具,并且纳入了大量的库和标准数据模型,为实现高效的大型数据集操作提供支持。任务6-1使用Pandas实现新闻动态网页数据预处理壹2Pandas数据结构一维数组的对象,可保存任何类型的数据。由一组数据(各种Numpy数据类型)和与之相关的数据标签(索引)两部分构成。1.SeriesList中的元素可以是不同的数据类型,而Array和Series中则只允许存储相同的数据类型任务6-1使用Pandas实现新闻动态网页数据预处理壹2Pandas数据结构DataFrame是一个表格类型的数据结构,有一组有序的列构成,并且每列的数据类型可以不同,DataFrame中同时包含了行索引和列索引,可看做是由Series组成的字典。2.DataFrame3.Panel三维数组,可以理解为DataFrame的容器。需要注意的是,Pandas是Python的一个库,所以,Python中所有的数据类型在这里依然适用,还可以自己定义数据类型。任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法用于从CSV文件中读取数据,其必要参数为数据文件的存储路径,同时还可在读取文件时设置分隔符、编码、进行空值定义等操作。1.数据表获取(1)读取数据文件①read_csv()pd.read_csv(filepath_or_buffer,sep=',',header='infer',names=None,index_col=None,prefix=None,dtype=None,encoding=None,converters=None,skipinitialspace=False,na_values=None,na_filter=True,true_values=None,false_values=None)任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能1.数据表获取(1)读取数据文件①read_csv()参数描述filepath_or_buffer文件路径sep分隔符设置,默认值为‘,’header数据文件中用于表示列名部分的行数(数据开始的行),默认为0names用于为结果添加列名index_col指定索引列prefix自动生成的列名编号的前缀dtype指定列的数据类型encoding指定编码converters设置指定列的处理函数,可以用"序号"、“列名”进行列的指定skipinitialspace忽略分割符后面的空格na_values空值定义na_filter检测空值,值为Falsek时可以提供大文件的读取性能true_values将指定文本转换为truefalse_values将制定文本转换为false任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法主要用于从Excel文件中加载数据并以二维数据表的格式输出。1.数据表获取(1)读取数据文件②read_excel()d.read_excel(io,sheet_name=0,header=0,names=None,index_col=None,usecols=None,squeeze=False,dtype=None,true_values=None,false_values=None,skiprows=None,nrows=None)任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能1.数据表获取(1)读取数据文件②read_excel()参数描述io文件路径sheet_name指定表单名称header设置数据中为表头的行,默认为0names自定义表头的名称,值为数组类型。index_col指定作为索引的列usecols设置要获取的列的范围,值为str,则表示Excel列字母和列范围的逗号分隔列表,为int,则表示解析到第几列。为int列表,则表示解析列表中指定的列。squeeze默认为False。设置squeeze=True时表示如果解析的数据只包含一列,则返回一个Series。dtype指定列的数据类型,默认为None不改变数据类型。true_values将指定的文本转换为True,默认为Nonefalse_values将指定的文本转换为False,默认为Noneskiprows省略指定行数的数据nrows指定需要读取前多少行,通常用于较大的数据文件中。任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法主要用于从SQL数据库表获取数据,该方法需要创建与数据库的链接,通过SQL语句从表中获取数据。1.数据表获取(1)读取数据文件③read_sql()pandas.read_sql(sql,con,index_col=None,coerce_float=True,parse_dates=None,columns=None,chunksize)参数描述sql用于查询数据的sql语句,类型为strcon连接数据所需的引擎,使用对应的数据库链接库创建,如:index_col选择某一列作为indexcoerce_float将数字形字符串转为float读入parse_dates将某一列日期型字符串转换为datetime型数据columns要选取的列chunksize指定输出的行数任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法用于加载JSON文件中的数据,与read_csv()和read_excel()使用方法一致,read_json()方法参数如下所示。1.数据表获取(1)读取数据文件④read_json()pd.read_json(path_or_buf=None,orient=None,typ='frame',dtype=Truekeep_default_dates=True,numpy=False,date_unit=None,encoding=None,lines=False)参数描述path_or_buf文件路径orient指示预期的JSON字符串格式typ要恢复的对象类型dtype指定数据类型,值为json、dictkeep_default_dates显示Scrapy版本numpy直接解码为numpy数组date_unit用于检测转换日期的时间戳单位encoding指定编码lines按行读取文件作为json对象任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能2.检查数据信息Pandas中提供了若干用于检查数据信息的方法,如维度、基本信息、空值、列名等相关信息。能够帮助我们快速了解数据的基本信息,主要应用在数据量较大无法快速获取有效信息的情况。属性和方法描述DataFrame.shape()查看数据的维度DataFrame.dtypes()每列数据的格式DataFrame.values()查看数据表的值DataFrame.columns()查看数据列名称DataF()查看数据表基本信息DataFrame.isnull()查看空值DataFrame.unique()查看某一列的唯一值DataFrame.head()查看前指定行数据,默认为10DataFrame.tail()查看后指定行数据,默认为10任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法用于将Pandas程序中的数据持久化保存到csv文件中。3.数据持久化(1)to_csv()DataFrame.to_csv(path_or_buf=None,sep=',',na_rep='',float_format=None,columns=None,header=True,index=True,index_label=None,mode='w',encoding=None)参数描述filepath_or_buffer字符串类型的文件路径对象sep输出文件的字段分隔符na_rep缺失数据填充float_format小数点保留几位,参数类型为字符串columns自定义列名,参数类型为序列或数组header写出列名,若给定字符串列表,则作为列名的别名Index写入索引,默认为trueModePython写入模式,默认为“w”w:覆盖写入a:追加写入r+:可读可写,必须存在,可在任意位置读写,读与写共用同一个指针w+:可读可写,可以不存在,必会擦掉原有内容从头写a+:可读可写,可以不存在,必不能修改原有内容,只能在结尾追加写,文件指针只对读有效(写操作会将文件指针移动到文件尾)Encoding表示输出文件中使用的编码的字符串,默认为“utf-8”任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能该方法用于将DataFrame数据以Excel表格的形式保存到本地文件系统。3.数据持久化(2)to_excel()DataFrame.to_excel(excel_writer,sheet_name='Sheet1',na_rep='',float_format=None,columns=None,header=True,index=True,startrow=0,startcol=0)参数描述excel_writer保存到的文件路径sheet_name保存的sheet名na_rep缺失数据表示方式,默认为空float_format格式化浮点数的字符串,默认为Nonestartrow保存的数据在目标文件的开始行startcol保存的数据在目标文件开始的列header显示列名columns自定义列名index是否显示索引任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能在使用Pandas处理Json类型的数据时通常会将Json数据加载到程序中转换为DataFrame(可使用read_json),在处理完成后需要将处理后的数据保存回Json这时就需要用到to_json()方法。3.数据持久化(3)to_json()DataFrame.to_json(path_or_buf=None,orient=None,date_format=None,double_precision=10,force_ascii=True,date_unit='ms',default_handler=None,lines=False,compression=None,index=True)参数描述path_or_buf指定文件保存路径orient指定为将要输出的JSON格式。date_format日期转换类型double_precision对浮点值进行编码时使用的小数位数。默认为10位。force_ascii强制编码为ASCIIindex是否包含索引值任务6-1使用Pandas实现新闻动态网页数据预处理壹3Pandas基本功能to_sql()是Pandas中提供的用于将DataFrame数据保存到数据库的API。3.数据持久化(4)to_sql()DataFrame.to_sql(name,

con,

schema=None,

if_exists=’fail’,

index=True,

index_label=None,

chunksize=None)参数描述name表名称con连接sql数据库的engine,可以用pymysql之类的包建立schema相应数据库的引擎,不设置则使用数据库的默认引擎,如mysql中的innodb引擎index是否将表中索引保存到数据库index_label是否使用索引名称if_exists当数据库表存在时,设置数据的保存方式chunksize批量保存数据量大小任务6-1使用Pandas实现新闻动态网页数据预处理壹4汇总和描述统计Pandas提供了一组常用的汇总和描述统计方法,用于数据分析中完成汇总统计的功能。与对应的NumPy数组方法相比,它们都是基于没有缺失数据的假设而构建的。函数描述df.sum()求和函数df.mean()求平均值df.min()df.max()求最小值和最大值,对于字符串类型的,最小值返回按字母升序,当不忽略null值时,最小值最大值都是NaNdf.var()求样本值的方差df.std()求样本值的标准差df.count()计算非null值的数量df.median()计算中位数任务6-1使用Pandas实现新闻动态网页数据预处理壹5处理缺失数据缺失值是指数据中由于某些信息的缺失,造成现有数据中某个或某些属性不完整。Pandas中提供了若干对缺失值处理的行数,可分为四类,缺失值判断、缺失值统计、缺失值填充、缺失值删除。缺失值统计df.isna().sum(axis=None)缺失值填充df.fillna(value=None,method=None,axis=None,inplace=False,limit=None,downcast=None,**kwargs)缺失值判断df.isna()缺失值删除df.dropna(axis=None)加载数据Step1处理数据Step2读取数据Step3验证数据Step4动态网页数据预处理Pandas实现新闻动态网页数据预处理任务6-1使用Pandas实现新闻动态网页数据预处理壹任务6-2使用Pig实现浪潮云说网页数据预处理贰1Pig简介Pig是一款基于Hadoop的大规模数据分析平台,是Apache平台下的免费开源项目,是MapReduce的一个抽象。它是一个工具/平台,用于分析较大数据集,并表示为数据流。Pig通常与Hadoop一起使用。丰富的运算符集易于编程优化机会可扩展性用户定义函数处理各种数据任务6-2使用Pig实现浪潮云说网页数据预处理贰2Pig配置运行第一步:登录Pig官网第二步:找到相关镜像,进行下载第三步:进行解压下载第四步:配置环境变量[root@masterlocal]#vim~/.bashrcexportPIG_HOME=/usr/local/pigexportPATH=$PATH:$PIG_HOME/binexportPIG_CLASSPATH=$HADOOP_HOME/etc/hadoop[root@masterlocal]#source~/.bashrc[root@masterlocal]#pig-version任务6-2使用Pig实现浪潮云说网页数据预处理贰3PigLatin执行ApachePig提供了本地模式和MapReduce模式两种运行模式,其中在Local模式下,所有文件都从本地主机和文件系统中安装和运行,不需要使用Hadoop或HDFS,此模式多用于测试。MapReduce模式是使用ApachePig加载或处理Hadoop的分布式文件系统(HDFS)中存储的数据。交互模式(Gruntshell):使用Gruntshell以交互模式运行ApachePig。在此shell中,你可以输入PigLatin语句并获取输出。批处理模式(脚本):用于执行使用PigLatin语言编写的Pig程序脚本。嵌入式模式(UDF):用户可通过Java语言自定义函数,并在脚本中使用。任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符1.比较运算符比较运算符用于对符号两边的值进行比较,返回值有两种情况即True或False。运算符描述==等于,检查两个数的值是否相等;如果是,则条件为true!=不等于,检查两个数的值是否相等。如果值不相等,则条件为true>大于,检查左边数的值是否大于右边数的值。如果是,则条件变为true<小于,检查左边数的值是否小于右边数的值。如果是,则条件变为true>=大于或等于,检查左边数的值是否大于或等于右边数的值。如果是,则条件变为true<=小于或等于,检查左边数的值是否小于或等于右边数的值。如果是,则条件变为truematches模式匹配,检查左侧的字符串是否与右侧的常量匹配任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符2.类型结构运算符PigLatin的类型结构运算符主要有三个分别,元组构建运算符、包构造函数运算符和映射构造函数运算符。运算符描述示例()元组构造函数运算符-此运算符用于构建元组(Raju,30){}包构造函数运算符-此运算符用于构造包{(Raju,30),(Mohammad,45)}[]映射构造函数运算符-此运算符用于构造一个映射[name#Raja,age#30]任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(1)加载和存储lLOADLOAD运算符由两部分构成,使用等号(=)分割,等号左侧需要指定存储数据的关系的名称,右侧需要定义存储数据的方式,LOAD运算符语法如下。Relation_name=LOAD'Inputfilepath'USINGfunctionasschema;参数说明如下。relation_name:设置数据保存目标关系名称。Inputfilepath:数据文件在本地或HDFS的存储路径。

function:设置加载数据的文件类型函数Schema:数据模式,加载数据时必须制定数据模式(列名)任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(1)加载和存储l存储数据数据处理场景中,数据的体量通常会超过数十万条,仅靠程序的标准输出不能满足阅读条件,并且若要对处理后的数据进一步应用还要将其进行持久化存储。STORERelation_nameINTO'required_directory_path'[USINGfunction];参数说明如下。Relation_name:关系名。required_directory_path:关系目标存储路径。USINGfunction:加载函数任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(2)诊断运算DumpDumpstudent;用于运行PigLatin语句,并将结果打印到屏幕显示,此方法通常用于测试1explain用于显示关系的逻辑,物理和MapReduce执行计划explainstudent;3illustrate能够输出个语句逐步执行的结果illustratestudent;4Describe用于查看关系的模式describestudent;2任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(3)分组运算

分组操作在SQL中使用频率很高,PigLatin中同样提供了对数据进行分组方法,Group运算符能够对一个或多个关系中的数据进行分组。#对单个关系分组Group_data=GROUPRelation_nameBYGroup_key;#对多个关系分组Group_data=GROUPRelation_name1BYGroup_key,Relation_name2BYGroup_key;参数说明如下所示。①Relation_name:关系名。②Group_key:分组key。任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(4)连接运算连接运算操作类似SQL中的关联查询,在执行一个数据处理任务时,通常数据文件会保存在多个数据集中,这时就需要使用链接操作,或两个数据集中存在一定的联系,需要联合处理,PigLatin中的链接运算需要从每个关系中声明一个或一组元组作为key,当这些key匹配时,两个特定的元组匹配,否则记录将被丢弃。连接可以是以下类型:自连接、内部连接和外连接。任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(4)连接运算自连接Relation_name=JOINRelation1_nameBYkey,Relation2_nameBYkey;内部连接result=JOINrelation1BYcolumnname,relation2BYcolumnname;左外连接outer_right=JOINrelation1BYcolumnnameLEFT,relation1BYcolumnname;全外连接outer_full=JOINrelation1BYcolumnnameFULLOUTER,relation2BYcolumnname;右外连接outer_right=JOINrelation1BYcolumnnameRIGHT,relation2BYcolumnname;任务6-2使用Pig实现浪潮云说网页数据预处理贰4运算符3.关系运算符(5)过滤运算符FilterDistinctForeachFilter运算符能够根据过滤条件从关系中选择所需的元组Distinct运算符用于从关系中删除冗余(重复)元组Foreach运算符用于基于列数据生成指定的数据转换任务6-2使用Pig实现浪潮云说网页数据预处理贰5内置函数1.Eval函数Eval函数能够对数据进行简单的统计运算,如平均值、最大值、最小值求和等操作。函数描述AVG()计算平均值BagToString()将包的元素连接成字符串。在连接时,我们可以在这些值之间放置分隔符(可选)CONCAT()连接两个或多个相同类型的表达式COUNT()统计元素数量MAX()计算最大值MIN()计算最小值SIZE()基于任何Pig数据类型计算元素的数量SUM()要获取单列包中某列的数值总和任务6-2使用Pig实现浪潮云说网页数据预处理贰5内置函数2.字符串函数字符函数主要用于对数据中字符类型的数据进行处理,如大小写转换、截取字符、字符比较等。函数描述ENDSWITH(string,testAgainst)验证字符串是否已特定字符结尾STARTSWITH(string,substring)验证第一个字符串是否以第二个字符串开头。SUBSTRING(string,startIndex,stopIndex)返回来自给定字符串的子字符串EqualsIgnoreCase(string1,string2)比较两个字符串,忽略大小写INDEXOF(string,‘character’,startIndex)返回字符串中指定的第一个出现的字符LAST_INDEX_OF(expression)返回字符串中指定的最后一个出现的字符LCFIRST(expression)将字符串中的第一个字符转换为小写UCFIRST(expression)将字符串中的第一个字符转换为大写UPPER(expression)

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论