尚大数据项目实战之教育02数仓实现_第1页
尚大数据项目实战之教育02数仓实现_第2页
尚大数据项目实战之教育02数仓实现_第3页
尚大数据项目实战之教育02数仓实现_第4页
尚大数据项目实战之教育02数仓实现_第5页
已阅读5页,还剩90页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

尚硅谷大数据项目实战之教育(02数(作者:尚硅谷大数据研发部1ODS(OperationDataODS层:原始数据层,存放原始数据DWD(datawarehouse(DWS(datawarehouse以DWD为基础,进行轻度汇总ADS(ApplicationDataADS层,1)将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单、并且方便定位问题2)规范数据分层,通过的中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性 不论是数据的异常还是数据的敏感性,使真实数据与统计数据解耦详见:尚硅谷大数据技术之Hive&MySQL[atguigu@hadoop102[atguigu@hadoop102conf]$[atguigu@hadoop102conf]$vimhive-jdk1.8、hadoop-2.7.2、hive、3Spark准备三台Linux服务器,安装好Spark安装包上传解压安装包上传spark-2.1.1-bin-hadoop2.7.tgz安装包到Linux上解压安装包到指定位置tar-xfspark-2.1.1-bin-hadoop2.7.tgz-C修改Hadoopyarn-<!--SitespecificYARNconfigurationproperties--默认是true-->默认是true-->修改Spark-env.sh让Spark能够发现Hadoopsparkhistory4第5章用户模块需baseadlog基础表原始json数{"adid":"0", //基础表id"adname":"弹窗0", "dn":"webA" } og基础表原始json数{"creator":"delete":"dn": "siteid":"2", //id"sitename":"114", "siteurl": }memberRegtype用户跳转地址{"appkey":"-"appregurl": "bdp_uuid":"- "dn": ":"-"isranreg":"-"regsource":"4",//所属平台1.PC "uid":"0",//用户id"websiteid":"0"//对应 og下的siteid}pcentermempaymoneylog{"dn": "paymoney":"162.54","siteid":"1", //id对应对应basewebsi og下的siteid "uid": "vip_id":"0pcentermemviplevellog}pcentermemviplevellogvip{"discountval":"-"dn": "end_time":"2019-01-01", //vip结束时间"last_modify_time":"2019-01-01","max_":"-","min_":"-"next_level":"-","operator":"update",":" //vip"vip_id": //vip"vip_level":"银卡 //vip级别名}memberlog{"ad_id": //"birthday"1981-08- "dn": "": "fullname":"王69239", "iconurl":"-","lastlogin":"-"mailaddr":"-"memberlevel "password":"123456",//"paymoney":"-","phone": "register":"2016-08- "regupdatetime":"-"uid":"69239", //id"unitname":"-","userip": //ip"zipcode":"-}其余字段为非统计项直接使用默认值“-”即hadoophadoopdfs-mkdir在hive里分别建立三个库,dwd、dws、ads 分别用于etl后的数据、宽表和拉链createdatabasedwd;createdatabasedws;createdatabaseads;各层级 以dwd 用 模块dwd层6dws模拟上传数模拟将日志文件数据直接上传到hadoop集群上ETL数据需求1:必须使用Spark进行数据对用户名号进行脱敏处理并使用dwdhive规则用户名:王 直换成dwddws需求2:对dwd层的6进行合并,生成一张宽表,先使用SparkSql实现。有时间的同学需要使用DataFrameapi实现功能,join进行优化。3dws层宽表的支付金额(paymoney)vip等级(vip_level)这两个会变动的字需求4:使用SparkDataFrameApi统计通过各跳转地址(appregurl)进行的用户SparkSql需求5:使用SparkDataFrameApi统计各所属(sitename)的用户数,有时间的再Spark6:SparkDataFrameApi(regsourcename)用户数,有时间SparkSql需求7:使用SparkDataFrameApi统计通过各跳转(adname)的用户数,有时间的再SparkSql8:SparkDataFrameApi(memberlevel)的用户数,有时间的SparkSql需求9:使用SparkDataFrameApi统计各分区、用户级别下(dn、memberlevel)top3Spark第6章用户做题模块需求QzWebsite.log做题日志数{"createtime":"2019-07-2211:47:18", "creator":"admin", "dn": "":"-"dt" "multicastport":"-"sequence":"-","siteid":0, //id"sitename": "status":"-"temteserver":"-} {"boardid":64, "coursechapter":"-","courseid":66, //课程id"createtime"2019-07-22 "creator":"admin", "dn":"webA", "dt" "sequence":"-"servertype":"-"showstatus":"-"sitecourseid": "siteid":77, //id"status":"-"}QzQuestionType.log{"createtime":"2019-07-2210:42:47", "creator":"admin", "dn": "dt" "papertypename":"-","questypeid":0, "quesviewtype":0,"remark":"-"sequence":"-"status":"-"viewtypename":}QzQuestion.log{"ysis":"-"answer":"-"attanswer":"-"content":"-"createtime":"2019-07-2211:33:46", "creator":"admin", "dn": "dt" "lecture":"-"modifystatus":"-","optnum":8,"parentid":57,"quesskill":"-","questag":"-","questionid":0, "questypeid":57,id"quesviewtype":44,"score24.124501582742543,"splitscore":0.0,"status":"-", ysisaddr":"-"vdeoaddr":"-}QzPointQuestion.log{"createtime":"2019-07-2209:16:46", "creator":"admin", "dn": "dt":" ",//日期分区"pointid":0, //知识点id"questionid":0,//题id"questype":0}QzPoint.log{"chapter":"-", "chapterid":0, //章节id"courseid":0, //课程id"createtime":"2019-07-2209:08:52",//创建时间"creator":"admin", "dn": "dt":" "excisenum":73,"modifystatus":"-","pointdescribe":"-","pointid":0, //id"pointlevel":"9", "pointlist":"-","pointlistid":82, //知识点列表id"pointname":"pointname0", "pointnamelist":"-","pointyear"2019",知识点所属年份"remid":"-","score":83.86880766562163, "sequece":"-","status":"-"thought":"-"typelist":"-}{"contesttime":"2019-07-2219:02:19","contesttimelimit":"-","createtime"2019-07-22 "creator":"admin", "dayiid":94,"dn":"webA",//分"downurl":"-"dt" "exinurl":"-"iscontest":"-"modifystatus":"-"openstatus":"-"paperdifficult":"-","paperid":83, "paperparam":"-","papertype":"-"paperuse":"-"paperuseshow":"-","paperviewcatid":1,"paperviewid":0, "paperviewname "testreport":"-}QzPaper.log{"chapter "chapterid":33, //id"chapterlistid":69,id"courseid":72,//id"createtime":"2019-07-2219:14:27",//创建时间"creator":"admin", "dn": "dt":" "papercatid":92,"paperid": "papername":"papername0", "paperyear":"2019", "status":"-","suitnum":"-"totalscore }QzMemberPaperQuestion.log{"chapterid":33,//id"dn":"webA",//分区"dt" "istrue":"-"lasttime":"2019-07-22"majorid":77,id"opertype":"-","paperid":91,//id"paperviewid":37,"question_answer1,做题结果(01正确"questionid":94,"score":76.6941793631127, "sitecourseid":1,//课程id"spendtime4823,所用时间单位(秒)"useranswer":"-","userid":0}QzMajor.log{ ":41,//主业id"columm_sitetype":"-","createtime":"2019-07-2211:10:20",//创建时间"creator":"admin", "dn": "dt" "majorid": "majorname":"majorname1", "sequence":"-","shortname":"-","siteid":24,//id"status":"-"}QzCourseEduSubject.log{"courseeduid":0,//课程辅导id"courseid":0, //课程id"createtime":"2019-07-2211:14:43",//创建时间"creator":"admin", "dn": "dt":" ", "edusubjectid":44,//辅导科目id"majorid":38 //主修id}QzCourse.log{"chapterlistid":45,//章节列表id"courseid":0, //课程id"coursename":"coursename0", "createtime":"2019-07-2211:08:15",//创建时间"creator":"admin", "dn": "dt" "isadvc":"-"majorid":39, //主修id"pointlistid":92, //知识点列表id"sequence":"8128f2c6-2430-42c7-9cb4-787e52da2d98","status":"-"}QzChapterList.log{ llnum":0, "chapterlistid":0, //章节列表id"chapterlistname":"chapterlistname0", "courseid":71, //课程id"createtime"2019-07-2216:22:19",创建时间"creator":"admin",//创建者"dn": "dt" "status":"-}QzChapter.log{"chapterid":0, //章节id"chapterlistid":0, "chaptername":"chaptername0", "chapternum":10, "courseid": "createtime":"2019-07-2216:37:24", "creator":"admin", "dn": "dt":" "outchapterid":0,"sequence":"-"showstatus":"-"status":"-}QzCenterPaper.log试卷关联数{"centerid": //"createtime":"2019-07-2210:48:30",//创建时间"creator":"admin", "dn": "dt" "openstatus":"-","paperviewid":2, "sequence":"-"}QzCenter.log数{"centerid": //"centername":"centername0",//名"centerparam":"-","centertype":"3", "centerviewtype":"-","centeryear":"2019", "createtime"2019-07-2219:13:09",创建时间"creator":"-","dn":"dt" "openstatus":"provideuser":"-"sequence":"-"stage":"-}Centerid: centername:名称centertype:类型centeryear:年createtime:创建时间dn:分区dt:日期分QzBusiness.log{ ":0, //行业id"businessname":"bsname0", "createtime":"2019-07-2210:40:54", "creator":"admin", "dn":"webA",//分"dt" "sequence":"-"siteid":1, //所属id"status":"-"}模拟上传数 做题建表语句1sparkodshive表中,score分1位小数并且四舍五入。需求2:基于dwd层基础表数据,需要对表进行维度进行表聚合,聚合成sqldataframeapi操作dws.dws_qz_chapte:4joindwd.dwd_qz_chapterinnerjoindwd.qz_chapter_listjoin条件:chapterlistiddn,innerjoindwd.dwd_qz_pointjoin条件:chapteriddn,innerjoindwd.dwd_qz_point_questionjoin条件:pointiddndws.dws_qz_course:3joindwd.dwd_qz_site_courseinnerjoindwd.qz_coursejoin条件courseiddninnerjoindwd.qz_course_edusubjectjoin条件:courseiddws.dws_qz_major:3 inner 条件:siteid和dn,innerjoin join条件:siteid和dws.dws_qz_paper:4 qz_paperviewleftjoinqz_centerjoin条件:paperviewidleftjoin join条件:centeriddn,innerjoinqz_paperjoin条件:paperiddws.dws_qz_question:2表join qz_quesitoninnerjoinqz_questiontype join条件:questypeid和dn需求3dws.dws_qz_chapterdws.dws_qz_coursedws.dws_qz_majordws.dws_qz_paper、dws.dws_qz_questiondwd.dwd_qz_member_paper_questiondw.user_paper_detail,使sparksqldataframeapi操作dws.user_paper_detail:dwd_qz_member_paper_questioninnerjoindws_qz_chapterjoin条件:chapterid和dn,innerjoindws_qz_coursejoin条件:sitecourseid和dn,innerjoindws_qz_majorjoinmajoriddn,innerjoindws_qz_paperpaperviewid和dn,innerjoindws_qz_questionquestioniddn4SparkSqlSparkDataFrameApi5SparkSqlSparkDataFrame6SparkSql完成指标统计,再使SparkDataFrameApi。SparkDataFrameApi。11adsdataxadsmysql7maven配置 <?xml<?xmlversion="1.0"encoding="UTF-<projectxmlns="""<module>comatguigu<!--LoggingEnd--引入Spark相关的Jar<!--

周期的具体实现】--<plugin 所有的编译都依照JDK1.8<!-- 该插件用于将Scala代码编译成class 绑定到maven的compile <?xml<?xmlversion="1.0"encoding="UTF-<projectxmlns="""Spark引入Scala scala 下存放各表sql类 第8章用户模块代码实packagepackagecaseclassuid:varpaymoney:String,vip_level:String,start_time:String,varend_time:String,dn:String)caseclassMemberZipperResult(list:caseclassuid:Int,ad_id:Int,memberlevel:register:appregurl:String,// 来源urlregsource:String,regsourcename:String,adname:String,siteid:String,sitename:String,vip_level:String,paymoney:BigDecimal,dt:String,dn:)caseclassuid:Int,adid:Int,fullname:String,iconurl:String,lastlogin:String,mailaddr:String,memberlevel:String,password:String,paymoney:BigDecimal,phone:String,register:String,regupdatetime:String,unitname:String,userip:String,zipcode:String,appkey:String,appregurl:String,bdp_uuid:String,reg_createtime:String,isranreg:String,regsource:String,regsourcename:String,adname:String,siteid:String,sitename:String,siteurl:String,site_delete:String,site_createtime:String,site_creator:String,vip_id:String,vip_level:String,vip_start_time:String,vip_end_time:String,vip_last_modify_time:String, :String, :String,vip_next_level:String,vip_operator:String,dt:String,dn:String)caseclassuid:Int,ad_id:Int,fullname:String,icounurl:lastlogin:String,mailaddr:String,memberlevel:lastlogin:String,mailaddr:String,memberlevel:String,password:String,paymoney:String,phone:String,register:String,regupdatetime:String,unitname:String,userip:String,zipcode:String,appkey:String,appregurl:String,bdp_uuid:String,regcreatetime:String,isranreg:String,regsource:String,regsourcename:String,adname:String,siteid:String,sitename:String,siteurl:String,site_delete:String,site_createtime:String,site_creator:String,vip_id:String,vip_level:String,vip_start_time:String,vip_end_time:String,vip_last_modify_time:String, :String, :String,vip_next_level:String,vip_operator:String,dt:String,dn:String)jsonfastjson,utilParseJosnDatapackagepackageimportpublicclassParseJsonDatapublicstaticJSONObjectgetJsonData(Stringdata){try{return}catch(Exceptione){returnnull;}}}utilHiveHiveUtilpackageimportobjectHiveUtil@paramdefsetMaxpartitions(spark:SparkSession)={spark.sql("sethive.exec.dynamic.partition=true")spark.sql("sethive.exec.dynamic.partition.mode=nonstrict")spark.sql("sethive.exec.max.dynamic.partitions=100000")spark.sql("sethive.exec.max.dynamic.partitions.pernode=100000")spark.sql("sethive.exec.max.created.files=100000")}@param pression(spark:SparkSession)={ }@paramdefopenDynamicPartition(spark:SparkSession)={spark.sql("sethive.exec.dynamic.partition=true")spark.sql("sethive.exec.dynamic.partition.mode=nonstrict")}使用lzo@param pression(spark:SparkSession)= }使用snappy@param spark.sql("setmapreduce.output.fi }}}对日志进行数据导收集日志原始数据后我们需要对日志原始数据进行将dwd层表packageimportcom.alibaba.fastjson.JSONObjectimportcom.atguigu.util.ParseJsonDataimportorg.apache.spark.SparkContextimportorg.apache.spark.sql.{SaveMode,objectEtlDataService *@param@paramdefetlMemberRegtypeLog(ssc:SparkContext,packageimportcom.alibaba.fastjson.JSONObjectimportcom.atguigu.util.ParseJsonDataimportorg.apache.spark.SparkContextimportorg.apache.spark.sql.{SaveMode,objectEtlDataService *@param@paramdefetlMemberRegtypeLog(ssc:SparkContext,sparkSession:SparkSession)importsparkSession.implicits._//隐式转换.filter(item=>valobj=ParseJsonData.getJsonData(item)}).mapPartitions(partitoin=>{partitoin.map(item=>{valjsonObject=ParseJsonData.getJsonData(item)valappkey=jsonObject.getString("appkey")valappregurl=jsonObject.getString("appregurl")valbdp_uuid=jsonObject.getString("bdp_uuid")valcreatetime=jsonObject.getString("createtime")valisranreg=jsonObject.getString("isranreg")valregsource=jsonObject.getString("regsource")valregsourceName=regsourcematch{case"1"=>case"2"=>" case"3"=>"App"case"4"=>"WeChat"case_=>"other"}valuid=valwebsiteid=jsonObject.getIntValue("websiteid")valdt=jsonObject.getString("dt")valdn=(uid,appkey,appregurl,bdpuuid,createtime,isranreg,regsource,regsourceName,websiteid,dt,dn)}*@param@paramdefetlMemberLog(ssc:SparkContext,sparkSession:SparkSession)importsparkSession.implicits._//隐式转换ssc.textFile("/user/atguigu/ods/member.log").filter(item=>{valobj=ParseJsonData.getJsonData(item)}).mapPartitions(partition=>{partition.map(item=>{valjsonObject=ParseJsonData.getJsonData(item)valad_id=jsonObject.getIntValue("ad_id")valbirthday=jsonObject.getString("birthday") =jsonObject.getString(" valfullname=jsonObject.getString("fullname").substring(0,1)+"xx"valiconurl=jsonObject.getString("iconurl")vallastlogin=jsonObject.getString("lastlogin")valmailaddr=jsonObject.getString("mailaddr")valmemberlevel=jsonObject.getString("memberlevel")valpassword="******"valpaymoney=jsonObject.getString("paymoney")valphone=jsonObject.getString("phone")valnewphone=phone.substring(0,3)+"*****"+phone.substring(7,11)valqq=jsonObject.getString("qq")valregister=valregupdatetime=jsonObject.getString("regupdatetime")valuid=jsonObject.getIntValue("uid")valunitname=jsonObject.getString("unitname")valuserip=jsonObject.getString("userip")valzipcode=jsonObject.getString("zipcode")valdt=jsonObject.getString("dt")valdn=(uid,ad_id,birthday, ,fullname,iconurl,lastlogin,mailaddr,memberlevel,password,paymoney,newphone,qq,register,regupdatetime,unitname,userip,zipcode,dt,} @param@paramdefetlBaseAdLog(ssc:SparkContext,sparkSession:SparkSession)importsparkSession.implicits._//隐式转换valresult=ssc.textFile("/user/atguigu/ods/baseadlog.log").filter(item=>{valobj=ParseJsonData.getJsonData(item)}).mapPartitions(partition=>{partition.map(item=>{valjsonObject=ParseJsonData.getJsonData(item)valadid=jsonObject.getIntValue("adid")valadname=jsonObject.getString("adname")valdn=jsonObject.getString("dn")(adid,adname,} @param@paramdefetlBaseWebSi og(ssc:SparkContext,sparkSession:SparkSession)importsparkSession.implicits._//隐式转换ssc.textFile("/user/atguigu/ods/baswewebsite.log").filter(item=>{valobj=ParseJsonData.getJsonData(item)}).mapPartitions(partition=>{partition.map(item=>{valjsonObject=ParseJsonData.getJsonData(item)valsiteid=jsonObject.getIntValue("siteid")valsitename=jsonObject.getString("sitename")valsiteurl=jsonObject.getString("siteurl")valdelete=jsonObject.getIntValue("delete")valcreatetime=jsonObject.getString("createtime")valcreator=jsonObject.getString("creator")valdn=(siteid,sitename,siteurl,delete,createtime,creator,}@param@paramdefetlMemPayMoneyLog(ssc:SparkContext,sparkSession:SparkSession)={importsparkSession.implicits._//隐式转换ssc.textFile("/user/atguigu/ods/pcentermempaymoney.log").filter(itemvalobj=ParseJsonData.getJsonData(item)}).mapPartitions(partition=>{partition.map(item=>{valjSONObject=ParseJsonData.getJsonData(item)valpaymoney=jSONObject.getString("paymoney")valuid=jSONObject.getIntValue("uid")valvip_id=jSONObject.getIntValue("vip_id")valsite_id=jSONObject.getIntValue("siteid")valdt=jSONObject.getString("dt")valdn=jSONObject.getString("dn")(uid,paymoney,site_id,vip_id,dt,dn)}}导入用户vip*@param@paramdefetlMemVipLevelLog(ssc:SparkContext,sparkSession:SparkSession)={importsparkSession.implicits._//隐式转换ssc.textFile("/user/atguigu/ods/pcenterMemViplevel.log").filter(itemvalobj=ParseJsonData.getJsonData(item)}).mapPartitions(partition=>{partition.map(item=>{valjSONObject=valdiscountval=jSONObject.getString("discountval")valend_time=jSONObject.getString("end_time")vallast_modify_time=jSONObject.getString("last_modify_time")valmax_ =jSONObject.getString("max_ val = valnext_level=jSONObject.getString("next_level")valoperator=jSONObject.getString("operator")valstart_time=jSONObject.getString("start_time")valvip_id=jSONObject.getIntValue("vip_id")valvip_level=jSONObject.getString("vip_level")valdn=jSONObject.getString("dn")(vip_id,vip_level,start_time,end_time,last_modify_time,max_next_level,operator,dn),,}}))packageimportcom.atguigu.member.service.EtlDataServiceimportcom.atguigu.util.HiveUtilimportimportobjectDwdMemberControllerdefmain(args:Array[String]):Unit={System.setProperty("HADOOP_USER_NAME","atguigu")valsparkConf=newSparkConf().setAppName("dwd_member_import").setMaster("local[*]" valssc=sparkSession.sparkContextHiveUtil.openDynamicPartition(sparkSession)//开启动态分区// // 存入bdl层表中EtlDataService.etlBaseAdLog(ssc,sparkSession)//导入基础 og(ssc,sparkSession)//导入基础 EtlDataService.etlMemberLog(ssc,sparkSession)// EtlDataService.etlMemberRegtypeLog(ssc,sparkSession)// EtlDataService.etlMemberRegtypeLog(ssc,sparkSession)// EtlDataService.etlMemPayMoneyLog(ssc,sparkSession)//导入用户支付情况记录EtlDataService.etlMemVipLevelLog(ssc,sparkSession)//导入vip基础数据}}packagepackageimportorg.apache.spark.sql.SparkSessionobjectDwdMemberDao{defgetDwdMember(sparkSession:SparkSession)={ ,fullname,iconurl,lastlogin,mailaddr,memberlevel,"+dwd.dwd}defgetDwdMemberRegType(sparkSession:SparkSession)= ,isranreg,""regsource,regsourcename,websiteidassiteid,dnfromdwd.dwdmemberregtype}defgetDwdBaseAd(sparkSession:SparkSession)={sparkSession.sql("selectadidasad_id,adname,dnfromdwd.dwd_base_ad")}defgetDwdBaseWebSite(sparkSession:SparkSession)={sparkSession.sql("selectsiteid,sitename,siteurl,deleteassite_delete,"+ }defgetDwdVipLevel(sparkSession:SparkSession)=sparkSession.sql("selectvip_id,vip_level,start_timeasvip_start_time,end_timeasvip_end_time,"+"last_modify_timeasvip_last_modify_time,max_ asvip_max_ vipmin ,"+"nextlevelasvipnextlevel,operatorasvipoperator,dnfromdwd.dwdvip}defgetDwdPcentermemPayMoney(sparkSession:SparkSession)={sparkSession.sql("selectuid,cast(paymoneyasdecimal(10,4))aspaymoney,vipid,dnfromdwd.dwd}}dwddwsdataframejoingroupbykey算sparksqlpackage importimportorg.apache.spark.sql.{SaveMode,objectDwsMemberServicedefimportMemberUseApi(sparkSession:SparkSessiondt:String)importsparkSession.implicits._//隐式转换valdwdMember=DwdMemberDao.getDwdMember(sparkSession).where(s"dt='${dt}'")主表valdwdMemberRegtype=DwdMemberDao.getDwdMemberRegType(sparkSession)valdwdBaseAd=DwdMemberDao.getDwdBaseAd(sparkSession)valdwdBaseWebsite=valdwdPcentermemPaymoney=DwdMemberDao.getDwdPcentermemPayMoney(sparkSession)valdwdVipLevel=DwdMemberDao.getDwdVipLevel(sparkSession)importvalresult=dwdMember.join(dwdMemberRegtype,Seq("uid","dn"),.join(broadcast(dwdBaseAd),Seq("ad_id","dn"),.join(broadcast(dwdBaseWebsite),Seq("siteid","dn"),.join(broadcast(dwdPcentermemPaymoney),Seq("uid","dn"),.join(broadcast(dwdVipLevel),Seq("vip_id","dn"), "memberlevel","password","paymoney","phone","qq","register","regupdatetime","unitname","userip","zipcode","appkey","appregurl","bdp_uuid","reg_createtime"," ","isranreg","regsource","regsourcename","adname" "site_creator","vip_id","vip_level", ","vip_next_level","vip_operator","dt","dn").as[DwsMember]result.groupByKey(item=>item.uid+"_"+item.dn).mapGroups{case(key,iters)=>valkeys=key.split("_")valuid=Integer.parseInt(keys(0))valdn=keys(1)valdwsMembers= null).map(_.paymoney).reduceOption(_+_).getOrElse(BigDecimal.apply(0.00)).toStringvalad_id=valfullname=dwsMembers.map(_.fullname).headvalicounurl=dwsMembers.map(_.iconurl).headvallastlogin=dwsMembers.map(_.lastlogin).headvalmailaddr=dwsMembers.map(_.mailaddr).headvalmemberlevel=dwsMembers.map(_.memberlevel).headvalpassword=dwsMembers.map(_.password).headvalphone=dwsMembers.map(_.phone).headvalqq=dwsMembers.map(_.qq).headvalregister=valregupdatetime=dwsMembers.map(_.regupdatetime).headvalunitname=dwsMembers.map(_.unitname).headvaluserip=dwsMembers.map(_.userip).headvalzipcode=dwsMembers.map(_.zipcode).headvalappkey=dwsMembers.map(_.appkey).headvalappregurl=dwsMembers.map(_.appregurl).headvalbdp_uuid=dwsMembers.map(_.bdp_uuid).headvalreg_createtime=dwsMembers.map(_.reg_createtime).head =dwsMembers.map(_. valisranreg=dwsMembers.map(_.isranreg).headvalregsource=dwsMembers.map(_.regsource).headvalregsourcename=dwsMembers.map(_.regsourcename).headvaladname=dwsMembers.map(.adname).headvalsiteid=dwsMembers.map(.siteid).headvalsitename=dwsMembers.map(_.sitename).headvalsiteurl=dwsMembers.map(_.siteurl).headvalsite_delete=valsite_createtime=dwsMembers.map(_.site_createtime).headvalsitecreator=dwsMembers.map(.sitecreator).headvalvip_id=valvip_level=valvip_start_time=dwsMembers.map(_.vip_start_time).minvalvip_end_time=dwsMembers.map(_.vip_end_time).maxvalvip_last_modify_time=dwsMembers.map(_.vip_last_modify_time).maxvalvipmax =dwsMembers.map(.vipmax val = valvip_next_level=dwsMembers.map(_.vip_next_level).headvalvip_operator=dwsMembers.map(_.vip_operator).headDwsMember_Result(uid,ad_id,fullname,icounurl,lastlogin,mailaddr,memberlevel,password,paymoney,bdp_uuid,reg_createtime, ,isranreg,regsource,regsourcename,adname, vip_start_time,vip_end_time,vip_last_modify_time,vip_max_ ,vip_min_ vip_next_level,vip_operator,dt,dn)}defimportMember(sparkSession:SparkSession,timeString)importsparkSession.implicits._//隐式转换//uid,first(ad_id),first(fullname),first(iconurl),first(lastlogin),"+ decimal(10,4))),first(phone),first(qq),"+ +st(sitename),"+p_id),max(vip_level),"+ ),max(vip_next_level),"+"first(vip_operator),dt,dnfrom"+"a.password,e.paymoney,a.phone,a.qq,a.register,a.regupdatetime,a.unitname,a.userip,"+ ,b.isranreg,b.regsource,"+ site_delete,d.createtimeassite_createtime," vip_start_time,f.end_timeasvip_end_time,"+"f.last_modify_timeasvip_last_modify_time,f.max_ asvip_max_ asvip_min_ ,"+"f.next_levelasvip_next_level,f.operatorasvip_operator,a.dn"s"fromdwd.dwdmemberaleftjoindwd.dwdmemberregtypebona.uid=b.uid"+"anda.dn=b.dnleftjoindwd.dwdbaseadcona.adid=c.adidanda.dn=c.dnleftjoin""dwd.dwd_base_websitedonb.websiteid=d.siteidandb.dn=d.dnleftjoindwd.dwd_pcentermempaymoneye"+s"ona.uid=e.uidanda.dn=e.dnleftjoindwd.dwd_vip_levelfone.vip_id=f.vip_idande.dn=f.dnwherea.dt='${time}')r"+ decimal(10,4)))aspaymoney,max(b.viplevel)asviplevel,"+ start_time,'9999-12-31'asend_time,first(a.dn)asdn"+"fromdwd.dwd_pcentermempaymoneyajoin"s"dwd.dwd_vip_levelbona.vip_id=b.vip_idanda.dn=b.dnwherea.dt='$time'groupbyuid").as[MemberZipper] //两份数据根据用户id进行聚合对end_timevalreuslt=dayResult.union(historyResult).groupByKey(item=>item.uid+"_"+.mapGroups{case(key,iters)=>valkeys=key.split("_")valuid=keys(0)valdn=keys(1)vallistiters.toList.sortBy(itemitem.start_time)if(list.size>1&&"9999-12-31".equals(list(list.size-2).end_time))//end_timevaloldLastModel=list(list.size-vallastModel=list(list.size-1)oldLastModel.end_time=lastModel.start_time }member_zipper")//重组对象打散刷新拉链表}}packagepackageimportcom.atguigu.member.service.DwsMemberServiceimportcom.atguigu.util.HiveUtilimportimportobjectobjectDwsMemberControllerdefmain(args:Array[String]):Unit={System.setProperty("HADOOPUSERNAME","atguigu")valsparkConf=newSparkConf().setAppName("dwsmember =valssc=sparkSession.sparkContextHiveUtil.openDynamicPartition(sparkSession)//开启动态分区// " }}packagepackageimportorg.apache.spark.sql.SparkSessionobjectDwsMemberDao{*@paramdefqueryIdlMemberData(sparkSession:SparkSession)={uid,ad_id,memberlevel,register,appregurl,regsource,regsourcename,adname,"+"siteid,sitename,vip_level,cast(paymoneyasdecimal(10,4))aspaymoney,dt,dnfromdws.dws_member} 来源url人@paramdefqueryAppregurlCount(sparkSession:SparkSession,dt:String)={sparkSession.sql(s"selectappregurl,count(uid),dn,dtfromdws.dws_memberwheredt='${dt}'groupby} defquerySiteNameCount(sparkSession:SparkSession,dt:String)={ dt='${dt}'groupby}defqueryRegsourceNameCount(sparkSession:SparkSession,dt:String)={sparkSession.sql(s"selectregsourcename,count(uid),dn,dtfromdws.dws_memberwheredt='${dt}'groupbyregsourcename,dn,dt}defqueryAdNameCount(sparkSession:SparkSession,dt:String)={defqueryAdNameCount(sparkSession:SparkSession,dt:String)={ dt='${dt}'groupby}defqueryMemberLevelCount(sparkSession:SparkSession,dt:String)={sparkSession.sql(s"selectmemberlevel,count(uid),dn,dtfromdws.dws_memberwheredt='${dt}'groupby}//统计各用户vipdefqueryVipLevelCount(sparkSession:SparkSession,dt:String)={sparkSession.sql(s"selectvip_level,count(uid),dn,dtfromdws.dws_membergroupwheredt='${dt}'by}defgetTop3MemberLevelPayMoneyUser(sparkSession:SparkSession,dt:String)=uid,ad_id,memberlevel,register,appregurl,regsource"+decimal(10,4)),row_number()over"s"(partitionbydn,memberlevelorderbycast(paymoneyasdecimal(10,4))desc)asrownum,dnfromdws.dws_memberwheredt='${dt}')"+"whererownum<4orderby}}packagepackageimportcom.atguigu.member.bean.QueryResultimportcom.atguigu.member.dao.DwsMemberDaoimportorg.apache.spark.sql.expressions.Windowimportorg.apache.spark.sql.{SaveMode,objectAdsMemberService统计各项指标使用*@paramdefqueryDetailApi(sparkSession:SparkSession,dtString)importsparkSession.implicits._//隐式转换 = 来源url人vala=result.mapPartitions(partition=>partition.map(item=>(item.appregurl+"_"+item.dn+"_"+item.dt,.mapValues(item=>item._2).reduceGroups(_+.map(item=>valkeys=item._1.split("_")valappregurl=keys(0)valdn=keys(1)valdt=(appregurl,item._2,dt, result.mapPartitions(partiton=>partiton.map(item=>(item.sitename+""+item.dn+""+item.dt,}).groupByKey(.1).mapValues((item=>item.2)).reduceGroups(+.map(item=>valkeys=item._1.split("_")valsitename=keys(0)valdn=keys(1)valdt=(sitename,item._2,dt, //统计所属来源人数pc wechatappresult.mapPartitions(partition=>{partition.map(item=>(item.regsourcename+""+item.dn+""+item.dt,}).groupByKey(_._1).mapValues(item=>item._2).reduceGroups(_+.map(item=>valkeys=item._1.split("_")valregsourcename=keys(0)valdn=keys(1)valdt=(regsourcename,item._2,dt, 进来的人result.mapPartitions(partition=>partition.map(item=>(item.adname+"_"+item.dn+"_"+item.dt,.map(item=>valkeys=item._1.split("_")valadname=keys(0)valdn=keys(1)valdt=(adname,item._2,dt, result.mapPartitions(partitionpartition.map(item=>(item.memberlevel+"_"+item.dn+"_"+item.dt,.map(item=>valkeys=item._1.split("_")valmemberlevel=keys(0)valdn=keys(1)valdt=(memberlevel,item._2,dt,//统计各用户vipresult.mapPartitions(partition=>partition.map(item=>(item.vip_level+"_"+item.dn+"_"+item.dt,.map(item=>valkeys=item._1.split("_")valvip_level=keys(0)valdn=keys(1)valdt=(vip_level,item._2,dt,import rownumber().over(Window.partitionBy("dn",.select("uid","memberlevel","regis

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论