尚硅谷大数据项目之实时项目2日活需求_第1页
尚硅谷大数据项目之实时项目2日活需求_第2页
尚硅谷大数据项目之实时项目2日活需求_第3页
尚硅谷大数据项目之实时项目2日活需求_第4页
尚硅谷大数据项目之实时项目2日活需求_第5页
已阅读5页,还剩14页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、第1章实时处理模块1.1模块搭建添加scala框架1.2代码思路1)消费kafka中的数据;2)利用redis过滤当日已经计入的日活设备;3) 把每批次新增的当日日活信息保存到HBASE或ES 中;4)从ES中查询出数据,发布成数据接口,通可视化化工程调用。1.3代码开发1 -消费Kafka1.3.1配置1) perties# Kafka 配置kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092# Redis 配置redis.host=hadoop102rdis.port=6379 2) pom.xml

2、 com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.apache.spark spark-core_2.11 org.apache.spark spark-stream in g_2.11 org.apache.kafka kafka-clie nts org.apache.spark spark-stream in g-kafka-0-8_2.11 redis.clie nts jedis 2.9.0io.searchboxjestvversion 5.3.3 net.java.dev.j naj na 4.5.2org

3、.codehaus.ja nino com mon s-compiler 2.7.8!-该插件用于将Scala代码编译成n et.alchim31.mave n scala-mave n-plugi n 3.2.2!-声明绑定到mavencompile testCompile1.3.2工具类class 文件-的 compile 阶段-1 ) MykafkaUtilpackage com.atguigu.utilsimport java.util.Propertiesimport kafka.serializer.Stri ngDecoderimport org.apache.spark.str

4、eami ng.Streami ngCon textimport org.apache.spark.streami ng.dstrea m.ln putDStreamimport org.apache.spark.streami ng.kafka.KafkaUtilsobject MyKafkaUtil def getKafkaStream(ssc:Stream ingCon text,In putDStream(Stri ng, Stri ng) = topics:SetStri ng):valproperties:Properties=PropertiesUtil.load(c on fi

5、perties)val kafkaPara = Map(bootstrap.servers-properties.getProperty(kafka.broker.list),group.id - bigdata0408)/ 基于Direct方式消费Kafka数据valkafkaDStream:In putDStream(Stri ng,Strin g)=KafkaUtils.createDirectStreamStri ng,String,Strin gDecoder,Strin gDecoder(ssc, kafkaPara, topics)/ 返回kafkaDStream2 )

6、 PropertiesUtilimport java.i o.ln putStreamReaderimport java.util.Propertiesobject PropertiesUtil def load(propertieName:Stri ng): Properties =val prop=new Properties。;prop .lo ad( newIn putStreamReader(Thread.curre ntThread().getC on textClassLoader.ge tResourceAsStream(propertieName) , UTF-8)prop

7、3 ) RedisUtil object RedisUtil var jedisPool:JedisPoo l=n ulldef getJedisClie nt: Jedis = if(jedisPool=null)printin(”开辟一个连接池)val config = PropertiesUtil.load(c on perties) val host = con fig.getProperty(redis.host)val port = con fig.getProperty(redis.port)最大连接数最大空闲最小空闲忙碌时是否等待忙碌时等待时长毫秒 每次获得连接的

8、进行测试val jedisPoolConfig = new JedisPoolConfig() jedisPoolC on fig.setMaxTotal(IOO) / jedisPoolCo nfig.setMaxldle(20) / jedisPoolCo nfig.setMi nldle(20) / jedisPoolCo nfig.setBlockWhe nExhausted(true) / jedisPoolC on fig.setMaxWaitMillis(500) / jedisPoolC on fig.setTest On Borrow(true) /jedisPoo l=ne

9、w JedisPool(jedisPoolC on fig,host,port.to Int)/ prin tl n(sjedisPool.getNumActive= $jedisPool.getNumActive)/ println(”获得一个连接)jedisPool.getResource1.3.3样例类Startuplogcase class StartUpLog(mid:Stri ng, uid:Stri ng, appid:Stri ng, area:Stri ng, os:Stri ng, ch:Stri ng, logType:Stri ng, vs:Stri ng, var l

10、ogDate:Stri ng, var logHour:Stri ng, var ts:L ong)1.3.4业务类消费kafkaimport org.apache.phoe ni x.spark._object RealtimeStartupApp def ma in (args: ArrayStri ng): Unit = valsparkC onf:SparkC onfSparkCo nf().setMaster(local*).setAppName(gmall2019)val sc = new SparkC on text(sparkC onf)val ssc = new Stream

11、 ingCon text(sc,Sec on ds(10)newvalstartupStream:String MyKafkaUtil.getKafkaStream(ssc ARTUP)In putDStreamC on sumerRecordStri ng,Set(GmallCo nsta nts.KAFKA_TOPIC_ST/startupStream.map(_.value().foreachRDD rdd=/printin (rdd.collectOkStri ng(n)/valstartupLogDstream:startupStream.map(_.value().map log

12、=/ println( slog = $log)valstartUpLog:StartUpLogDStreamStartUpLog= JSON.parseObject(log,classOfStartUpLog) startUpLog1.4代码开发2 -去重1.4.1流程图142 设计 Redis 的 KVkeyvaluedau:2019-01-22设备id143业务代码import java.utilimport java.text.SimpleDateFormatimport java.util.Dateimport com.alibaba.fastjs on JSONimport com

13、.atguigu.gmall.c on sta nt.GmallC on sta ntsimport com.atguigu.gmall2019.realtime.bea n. StartupLogimport com.atguigu.gmall2019.realtime.util.MyKafkaUtil, RedisUtilimport org.apache.hadoop.c onf.Con figuratio nimport org.apache.kafka.clie nts.c on sumer.C on sumerRecordimport org.apache.spark.SparkC

14、 onfimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.stream in g.dstream.DStream, In putDStreamimport org.apache.spark.stream in g.Sec on ds, Stream ingCon textimport redis.clie nts.jedis.Jedisimport org.apache.phoe ni x.spark._object DauApp def main(

15、 args: ArrayStri ng): Unit = valsparkC onf:SparkC onf=newSparkCo nf().setMaster(local*).setAppName(dau_app)val ssc = new Stream ingCon text(sparkC on f,Sec on ds(5)/ 1 消费 kafkaval inputDstream: InputDStreamConsumerRecordString, StringMyKafkaUtil.getKafkaStream(ssc,Set(GmallCo nsta nts.KAFKA_TOPIC_ST

16、ARTUP)2数据流转换结构变成case class补充两个时间字段val startuplogDstream:DStreamStartupLog= in putDstream.map record =val json Str: String = record.value()valstartupLog:StartupLog=JSON.parseObject(js on Str,classOfStartupLog)valdateTimeStr:Stri ng= new SimpleDateFormat(yyyy-MM-ddHH).format(new Date(startupLog.ts)val

17、 dateArr: ArrayString = dateTimeStr.split(”)startupLog .lo gDate = dateArr(0)startupLog .lo gHour = dateArr(1) startupLogstartuplogDstream.cache()3利用用户清单进行过滤去重只保留清单中不存在的用户访问记录valfilteredDstream:DStreamStartupLogstartuplogDstream.tra nsform rdd =按周期val jedis: Jedis = RedisUtil.getJedisClient /driver

18、/执行valdateStr:Stri ng= new SimpleDateFormat(yyyy-MM-dd).format(new Date()val key = dau: + dateStrval dauMidSet: util.SetStri ng = jedis.smembers(key)jedis.close()valdauMidBC:Broadcastutil.SetStri ngssc.sparkC on text.broadcast(dauMidSet)prin tl n(”过滤前:” + rdd.cou nt()val filteredRDD: RDDStartupLog =

19、 rdd.filter startuplog =/executorval dauMidSet: util.SetStri ng = dauMidBC.value!dauMidSet.c ontain s(startuplog.mid)prin tl n(”过滤后:” + filteredRDD.cou nt()filteredRDD4批次内进行去重:按照mid进行分组,每组取第一个值val groupbyMidDstream: DStream(Stri ng, IterableStartupLog)filteredDstream.map(startuplog=(startuplog.mid,s

20、tartuplog).grou pByKey()valdistictDstream:DStreamStartupLoggroupbyMidDstream.flatMap case (mid, startupLogItr)=startupLogltr.toList.take(1)/ 5 保存今日访问过的用户 (mid)清单 -Redis 1 key类型:setkey : dau:2019-xx-xx 3 value : middistictDstream.foreachRDDrdd=/driverrdd.foreachPartiti on startuplogItr=val jedis:Jedi

21、s=RedisUtil.getJedisClie nt /executorfor (startuplog - startuplogItr ) val key= dau:+startuplog .lo gDatejedis.sadd(key,startuplog.mid)prin tl n( startuplog)jedis.close()ssc.start()ssc.awaitTerm in ati on()1.5代码实现3 -保存到HBase中1.5.1 Phoenix-HBase的 SQL化插件技术详情参见尚硅谷大数据技术之phoe nix1.5.2利用Phoenix建立数据表create

22、 table gmall190408_dau( _mid varchar, uid varchar, appid varchar, area varchar, os varchar, ch varchar, type varchar, vs varchar, logDate varchar, logHour varchar, ts bigi nt CONSTRAINT dau_pk PRIMARY KEY (mid, logDate);1.5.3 pom.xml中增加依赖org.apache.phoe ni x phoe ni x-spark 4.14.2-HBase-1.3org.apach

23、e.spark spark-sql_业务保存代码/把数据写入hbase+phoenix distictDstream.foreachRDDrdd= rdd.saveToPhoe nix(GMALL2019_DAU,Seq(MID,UID,APPID,AREA, OS, CH, TYPE, VS, LOGDATE, LOGHOUR, TS) ,newCon figuratio n,Some(hadoop102,hadoop103,hadoop104:2181)第2章日活数据查询接口2.1访问路径总数http:/localhost:8070/realtime-total?date

24、=2019-09-06分时统计http:/localhost:8070/realtime-hours?id二dau&date=2019-09-062.2要求数据格式总数id:dau,name:新增日活,”value:1200,id:new_mid,name:新增设备,”value:233分时统计yesterday:11:383,12:123,17:88,19:200 ,today:12:38,13:1233,17:123,19:6882.3搭建发布工程fi J1VModdtSDK! 叵聪*mit:是 JWB EntrrpnseClicxjfi# IritisSsrr Sjrvic# URL&

25、JSq&sCi!)I2MF;) Default ttpss/f Start.ipirrn.i gC) CloudsO Custom:O Epdngvatae Btirnyaur nwwork connectorii is xtiv址 before torrtiLJir.鼻 Ando;dIntelliJ PlLatform PlugirSprint h Hj-aGji.HT Mavm亍 jrdleProject (Metadata口r up:corrie 日 tg igugmll 2019,dwArtifactsdw-DiiblisberILaMai/en Projj&ct 】Paiirkghg

26、-Java Version:Name1Deme project fo-r Spring Boat0A1-SNAP WOTdw-publiibFrcem. atg big ul g rm all2019,d vM.pub I is her!* MoclukBodi 15-21 “Salectsd D百ipmrtdmnci甸#Develcper Tgl匚笫伽怕IPAChcvdoper To-ali_ M/SQL Drk-efLoinbGkWebILS JDBC API5prin Wb tasterWdSQLFFramF.ADiiifM申旳irigI- PoiTqwSCH DnvwSQLIK)匚 M

27、E QL Eerver OrfvEIDS匚 APIOpiSpring tlwd厂 H/JW 带QLDilitJiWMy Doti FFumiwmitSpig Clcxid SecurityL Apache Dertsry Ddtdbdeii 二.:2.4配置文件241 pom.xmlvjava.vers ion 1.8org.spri ngframework.boot spri ng-boot-starter-web com.atguigu.gmall2019.dw dw-com mon 1.0-SNAPSHOTorg.spri ngframework.boot spri ng-boot-st

28、arter-testtestorg.mybatis.spri ng.boot mybatis-spri ng-boot-starter vversion 1.3.4org.spri ngframework.boot spri ng-boot-starter-jdbc org.apache.phoe ni x phoe ni x-core 4.14.2-HBase-1.3com.google.guava guava 20.0org.spri ngframework.boot spri ng-boot-mave n-plugi n 2.4.2 applicati on .propertiesser

29、ver.port=8070loggi ng.l evel.root=errorspri ng.datasource.driver-class-n ame=org.apache.phoe nix.jdbc.Phoe ni xDriverspri ng.datasource.url=jdbc:phoe ni x:hadoop102,hadoop103,hadoop104:2181spri ng.datasource.data-user name=spri ng.datasource.data-password=#mybatis#mybatis.typeAliasesPackage=com.exam

30、ple.phoe ni x.e ntity mybatis.mapperLocati on s=classpath:mapper/*.xml mybatis.c on figurati on. map-un derscore-to-camel-case=true2.5代码实现控制层PublisherCo ntroller实现接口的web发布服务层PublisherService数据业务查询in terfacePublisherServiceImpl业务查询的实现类数据层DauMapper数据层查询的in terfaceDauMapper.xml数据层查询的实现配置主程序GmallPublish

31、erApplicatio n增加扫描包2.5.1 GmallPublisherApplication 增加扫描包Spri ngBootApplicati onMapperSca n( basePackagescom.atguigu.gmallXXXXXXX.publisher.m apper)public class Gmall2019PublisherApplicati onpublic static void main(String args) Spri ngApplicatio n.run (Gmall2019PublisherApplicatio n.class, args);2.5.

32、2 controller 层import com.alibaba.fastjs on. JSON;import com.alibaba.fastjs on. JSONObject;import com.atguigu.gmall2019.dw.publisher.service .P ublisherService; import mons.lan g.time.DateUtils;import org.spri ngframework.bea ns.factory.a nn otatio n. Autowired; import org.spri ngframework.web.b in d

33、.a nno tati on .GetMapp ing;import org.spri ngframework.web.bi nd.a nn otatio n.RequestParam; import org.spri ngframework.web.bi nd.a nn otatio n.RestC on troller;import java.text.ParseExcepti on;import java.text.SimpleDateFormat;import java.util.*;RestCo ntroller public class PublisherC on troller

34、AutowiredPublisherService publisherService;GetM appi ng(realtime-total)publicString realtimeHourDate(RequestParam(date)date) List list = new ArrayList();/日活总数int dauTotal = publisherService.getDauTotal(date);Map dauMap=new HashMap(); dauMap.put(id,da u);dauMap.put(”name,新增日活);dauMap.put(value,dauTot

35、al);list.add(dauMap);/新增用户int n ewMidTotal = publisherService.getNewMidTotal(date);Map newMidMap=new HashMap();n ewMidMap.put(id, new_mid);newMidMap.put(”name,新增用户”);n ewMidMap.put(value, newMidTotal);list.add( newMidMap);return JSON.toJSONStri ng(list);GetMapp in g(realtime-hours)publicStringrealti

36、meHourDate(RequestParam(id)id,RequestParam(date) String date)if(dau.equals(id)Map dauHoursToday = publisherService.getDauHours(date);JSONObject jso nObject = new JSONObject();jsonO bject.put(today,dauHoursToday);String yesterdayDateString=;try DatedateToday = new SimpleDateFormat(yyyy-MMdd).parse(da

37、te);Date dateYesterday = DateUtils.addDays(dateToday, -1);yesterdayDateStri ng=newSimpleDateFormat(yyyy-MMdd).format(dateYesterday); catch (ParseExceptio n e) e.pri ntStackTrace();MapdauHoursYesterdaypublisherService.getDauHours(yesterdayDateStri ng);jsonO bject.put(yesterday,dauHoursYesterday);return jso nObject.toJSONStri ng();if( n ew_order_totalam oun t.equals(id)Stri ngn ewOrderTotalam oun tJs onpublisherService.getNewOrderTotalAm oun tHours(d

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论