Chapter12- 大数据基础编程、实验和案例教程-第12章-数据采集工具的安装和使用_第1页
Chapter12- 大数据基础编程、实验和案例教程-第12章-数据采集工具的安装和使用_第2页
Chapter12- 大数据基础编程、实验和案例教程-第12章-数据采集工具的安装和使用_第3页
Chapter12- 大数据基础编程、实验和案例教程-第12章-数据采集工具的安装和使用_第4页
Chapter12- 大数据基础编程、实验和案例教程-第12章-数据采集工具的安装和使用_第5页
已阅读5页,还剩20页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第12章数据采集工具的安装和使用提纲12.1Kafka12.2实例:编写Spark程序使用Kafka数据源12.1Kafka12.1.1Kafka相关概念12.1.2安装Kafka12.1.3一个实例12.1.1Kafka相关概念为了更好地理解和使用Kafka,这里介绍一下Kafka的相关概念:Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker;Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic,即可生产或消费数据,而不必关心数据存于何处。Partition:是物理上的概念,每个Topic包含一个或多个Partition。Producer:负责发布消息到KafkaBroker。Consumer:消息消费者,向KafkaBroker读取消息的客户端。ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,可为每个Consumer指定groupname,若不指定groupname则属于默认的group。12.1.2安装Kafka访问Kafka官网下载页面(/downloads),下载Kafka的安装包kafka_2.11-.tgz,此安装包内已经附带Zookeeper,不需要额外安装Zookeeper。打开一个终端,执行如下命令:$cd~/下载$sudotar-zxvfkafka_2.11-.tgz-C/usr/local$cd/usr/local$sudomvkafka_2.11-/./kafka$sudochown-Rhadoop./kafka12.1.3一个实例新建一个Linux终端,执行如下命令启动Zookeeper:$cd/usr/local/kafka$./bin/zookeeper-server-start.shconfig/perties新建第二个终端,输入如下命令启动Kafka:$cd/usr/local/kafka$./bin/kafka-server-start.shconfig/perties新建第三个个终端,输入如下命令:$cd/usr/local/kafka$./bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicdblab12.1.3一个实例可以用list命令列出所有创建的topics,来查看刚才创建的topic是否存在,命令如下:$cd/usr/local/kafka$./bin/kafka-topics.sh--list--zookeeperlocalhost:2181可以在结果中查看到,dblab这个topic已经存在。接下来用producer生产一些数据,命令如下:$cd/usr/local/kafka$./bin/kafka-console-producer.sh--broker-listlocalhost:9092--topicdblab该命令执行后,可以在该终端中输入以下信息作为测试:hellohadoophelloxmuhadoopworld12.1.3一个实例然后,再次开启新的第四个终端,输入如下命令使用consumer来接收数据:$cd/usr/local/kafka$./bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topicdblab--from-beginning执行该命令以后,就可以看到刚才在另外一个终端的producer产生的三条信息“hellohadoop”、“helloxmu”和“helloworld”。说明Kafka安装成功。12.2实例:编写Spark程序使用Kafka数据源12.2.1Kafka准备工作12.2.2Spark准备工作12.2.3编写Spark程序使用Kafka数据源12.2.1Kafka准备工作1.启动Kafka首先需要启动Kafka。请登录Linux系统(本教程统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:$cd/usr/local/kafka$./bin/zookeeper-server-start.shconfig/perties打开第二个终端,然后输入下面命令启动Kafka服务:$cd/usr/local/kafka$./bin/kafka-server-start.shconfig/perties12.2.1Kafka准备工作2.测试Kafka是否正常工作请再另外打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的topic:$cd/usr/local/kafka$./bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1--partitions1--topicwordsendertest#这个topic叫wordsendertest,2181是Zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在Kafka集群中使用,这里单机版就不用备份了#可以用list列出所有创建的topics,来查看上面创建的topic是否存在$./bin/kafka-topics.sh--list--zookeeperlocalhost:218112.2.1Kafka准备工作下面,用producer来产生一些数据,请在当前终端内继续输入下面命令:$./bin/kafka-console-producer.sh--broker-listlocalhost:9092--topicwordsendertest上面命令执行后,就可以在当前终端内用键盘输入一些英文单词,比如可以输入:hellohadoophellospark打开第四个终端,输入下面命令:$cd/usr/local/kafka$./bin/kafka-console-consumer.sh--zookeeperlocalhost:2181--topicwordsendertest--from-beginning可以看到,屏幕上会显示出如下结果:hellohadoophellospark12.2.2Spark准备工作1.添加相关jar包打开一个新的终端,然后启动spark-shell,命令如下:$cd/usr/local/spark$./bin/spark-shell启动成功后,在spark-shell中执行下面import语句:scala>importorg.apache.spark.streaming.kafka._<console>:25:error:objectkafkaisnotamemberofpackageorg.apache.spark.streamingimportorg.apache.spark.streaming.kafka._^可以看到,马上会报错,因为找不到相关的jar包。所以,需要下载spark-streaming-kafka-0-8_2.11-2.4.0.jar。12.2.2Spark准备工作访问Spark官网(/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0),里面有提供spark-streaming-kafka-0-8_2.11-2.4.0.jar文件的下载,其中,2.11表示Scala的版本号,2.4.0表示Spark的版本号。现在,需要把这个文件复制到Spark目录的lib目录下。请新打开一个终端,输入如下命令:$cd/usr/local/spark/jars$mkdirkafka $cd~/Downloads$cp./spark-streaming-kafka-0-8_2.11-2.4.0.jar/usr/local/spark/jars/kafka12.2.2Spark准备工作下面还要继续把Kafka安装目录的lib目录下的所有jar文件复制到“/usr/local/spark/lib/kafka”目录下,请在终端中执行下面命令:$cd/usr/local/kafka/libs$ls$cp./*/usr/local/spark/jars/kafka2.启动spark-shell然后,执行如下命令启动spark-shell:$cd/usr/local/spark$./bin/spark-shell--jars/usr/local/spark/jars/*:/usr/local/spark/jars/kafka/*启动成功后,再次执行如下命令:scala>importorg.apache.spark.streaming.kafka._//会显示下面信息importorg.apache.spark.streaming.kafka._12.2.3编写Spark程序使用Kafka数据源1.编写生产者(producer)程序请新打开一个终端,然后,执行如下命令创建代码目录和代码文件:$cd/usr/local/spark/mycode$mkdirkafka$cdkafka$mkdir-psrc/main/scala$cdsrc/main/scala$vimKafkaWordProducer.scala12.2.3编写Spark程序使用Kafka数据源packageorg.apache.spark.examples.streamingimportjava.util.HashMapimportducer.{KafkaProducer,ProducerConfig,ProducerRecord}importorg.apache.spark.SparkConfimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka._objectKafkaWordProducer{defmain(args:Array[String]){if(args.length<4){

System.err.println("Usage:KafkaWordCountProducer<metadataBrokerList><topic>"+"<messagesPerSec><wordsPerMessage>")

System.exit(1)}

valArray(brokers,topic,messagesPerSec,wordsPerMessage)=args//Zookeeperconnectionproperties

valprops=newHashMap[String,Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"mon.serialization.StringSerializer")

valproducer=newKafkaProducer[String,String](props)//Sendsomemessageswhile(true){(1tomessagesPerSec.toInt).foreach{messageNum=>

val

str=(1towordsPerMessage.toInt).map(x=>scala.util.Random.nextInt(10).toString).mkString("")print(str)

println()

valmessage=newProducerRecord[String,String](topic,null,str)

producer.send(message)}

Thread.sleep(1000)}}}12.2.3编写Spark程序使用Kafka数据源packageorg.apache.spark.examples.streamingimportorg.apache.spark._importorg.apache.spark.SparkConfimportorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka._importorg.apache.spark.streaming.StreamingContext._importorg.apache.spark.streaming.kafka.KafkaUtilsobjectKafkaWordCount{defmain(args:Array[String]){StreamingExamples.setStreamingLogLevels()valsc=newSparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val

ssc=newStreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint")//设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval

zkQuorum="localhost:2181"//Zookeeper服务器地址valgroup="1"//Topic所在的group,可以设置为自己想要的名称,比如不用1,而是valgroup="test-consumer-group"valtopics="wordsender"//topics的名称val

numThreads=1//每个topic的分区数val

topicMap=topics.split(",").map((_,numThreads.toInt)).toMapval

lineMap=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)vallines=lineMap.map(_._2)valwords=lines.flatMap(_.split(""))valpair=words.map(x=>(x,1))val

wordCounts=pair.reduceByKeyAndWindow(_+_,_-_,Minutes(2),Seconds(10),2)//这行代码的含义在下一节的窗口转换操作中会有介绍wordCounts.printssc.startssc.awaitTermination}}2.编写消费者(consumer)程序12.2.3编写Spark程序使用Kafka数据源packageorg.apache.spark.examples.streamingimporternal.Loggingimportorg.apache.log4j.{Level,Logger}/**UtilityfunctionsforSparkStreamingexamples.*/objectStreamingExamplesextendsLogging{/**Setreasonablelogginglevelsforstreamingiftheuserhasnotconfiguredlog4j.*/defsetStreamingLogLevels(){

vallog4jInitialized=Logger.getRootLogger.getAllAppenders.hasMoreElementsif(!log4jInitialized){//WefirstlogsomethingtoinitializeSpark'sdefaultlogging,thenweoverridethe//logginglevel.

logInfo("Settingloglevelto[WARN]forstreamingexample."+"Tooverrideaddacustompertiestotheclasspath.")

Logger.getRootLogger.setLevel(Level.WARN)}}}3.编写日志格式设置程序12.2.3编写Spark程序使用Kafka数据源4.编译打包程序请执行下面命令新建一个simple.sbt文件:$cd/usr/local/spark/mycode/kafka/$vimsimple.sbt在simple.sbt中输入以下代码:name:="SimpleProject"version:="1.0"scalaVersion:="2.11.12"libraryDependencies+="org.apache.spark"%%"spark-core"%"2.4.0"libraryDependencies+="org.apache.spark"%"spark-streaming_2.11"%"2.4.0"libraryDependencies+="org.apache.spark"%"spark-streaming-kafka-0-8_2.11"%"2.4.0"exclude("net.jpountz.lz4","lz4")12.2.3编写Spark程序使用Kafka数据源然后执行下面命令,进行编译打包:$cd/usr/local/spark/mycode/kafka/$/usr/local/sbt/sbtpackage5.运行程序首先,请启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:$cd/usr/local/hadoop$./sbin/start-dfs.sh启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。12.2.3编写Spark程序使用Kafka数据源请新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):$cd/usr/local/spark$/usr/local/spark/bin/spark-submit\>--driver-class-path/usr/local/spark/jars/*:/usr/local/spark/jars

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论