尚大数据之flink版实时项目dwd层准备_第1页
尚大数据之flink版实时项目dwd层准备_第2页
尚大数据之flink版实时项目dwd层准备_第3页
尚大数据之flink版实时项目dwd层准备_第4页
尚大数据之flink版实时项目dwd层准备_第5页
已阅读5页,还剩27页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第1章需求分析及实现思路我们这里从kafka的ods层用户行为日志以及业务数据,并进行简单处理,写回到kafka作为dwd层。分分 数据描 媒页面等等合,形成宽表1:2:计算用户行为日志DWD3:DWD第2章功能1在工程中新建模块gmall2021-产生各层数据的flink <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具log4j.appender.stdout.layout.ConversionPattern=%d%p[%c]-在log4j.appender.stdout.layout.ConversionPattern=%d%p[%c]-第3章功能2:准备用户行为日志-DWD我们前面的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层的日志数据分为3类,页面日志、启动日志和日志。这三类数据虽然的日志写回Kafka不同中,作为日志DWD层。识别新老用户利用侧输出流实现数据拆分根据日志数据内容,将日志数据分为3类,页面日志、启动日志和日志。页面日将不同流的数据推送下游的kafka的不同Topic接收Kafka数据,并进行转换1)封装操作Kafka的工具类,并提供获取kafka消费者的方法(读package importimportimportorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.kafka. importAuthor:publicclassMyKafkaUtilprivatestaticStringkafkaServer=publicstaticFlinkKafkaConsumer<String>getKafkaSource(Stringtopic,StringgroupId){Propertiesprop=newProperties();returnnewFlinkKafkaConsumer<String>(topic,new}}22)Flink调用工具类数据的主程序packageimportcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.atguigu.gmall.realtime.utils.MyKafkaUtil;importorg.ap importorg.apache.flink.runtime.state.filesystem.FsStateBackend;importorg.apache.flink.streaming.api.CheckpointingMode;importimportorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;Author: publicclassBaseLogApp privatestaticfinalStringTOPIC_START="dwd_start_log";privatestaticfinalStringTOPIC_PAGE="dwd_page_log";privatestaticfinalString Y publicstaticvoidmain(String[]args)throwsException//TODO0.StreamExecutionEnvironmentenv=//设置并行度这里和kafka 保证(默认)每5000ms开始一次checkpointenv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);//Checkpoint必须在一分钟内完成,否则就会被抛弃StringgroupId="ods_dwd_base_log_app";Stringtopic="ods_base_log";//TODO1.从kafka FlinkKafkaConsumer<String>kafkaSource=MyKafkaUtil.getKafkaSource(topic,groupId);DataStreamSource<String>kafkaDS=env.addSource(kafkaSource);SingleOutputStreamOperator<JSONObject>jsonObjectDS=kafkaDS.map(newMapFunction<String,JSONObject>(){publicJSONObjectmap(Stringvalue)throwsException{JSONObjectjsonObject=JSON.parseObject(value);returnjsonObject;}}env.execute("dwd_base_log}}识别新老访客//TODO2.KeyedStream<JSONObject,String>midKeyedDS=//TODO2.KeyedStream<JSONObject,String>midKeyedDS=jsonObjectDS.keyBy(data->data.getJSONObject("common").getString("mid")); SingleOutputStreamOperator<JSONObject>midWithNewFlagDS=midKeyedDS.map(newRi apFunction<JSONObject,JSONObject>(){ privateValueState<String> privateSimpleDateFormatpublicvoidopen(Configurationparameters)throwsExceptionfirstVisitDataState=newValueStateDescriptor<String>("newMidDateState",simpleDateFormat=new}publicJSONObjectmap(JSONObjectjsonObj)throwsException 标 0表示老访客1表示新访StringisNew=Longts=if("1".equals(isNew)){StringStringnewMidDate= 日StringtsDate=simpleDateFormat.format(new 过则将 if(newMidDate!=null&&newMidDate.length()!=0){isNew="0";} }}return}}利用侧输出流实现数据拆分//TODO3. OutputTag<String>startTag=newOutputTag<String>("start"){};OutputTag<String>dis yTag=newOutputTag<String>("dis SingleOutputStreamOperator<String>pageDStream=midWithNewFlagDS.process(newProcessFunction<JSONObject,String>(){publicvoidprocessElement(JSONObjectjsonObj,Contextctx,Collector<String>out)throwsException{JSONObjectstartJsonObj=StringStringdataStr=if(startJsonObj!=null&&startJsonObj.size()>0){ctx.output(startTag,dataStr);}else System.out.println("PageString:"+ JSONArraydis ys=jsonObj.getJSONArray("dis if(dis ys!=null&&dis ys.size()>0){for(inti=0;i< ys.size();i++)JSONObject yJsonObj= StringpageId= yJsonObj.put("page_id", yTag, }}}}}DataStream<String>startDStream=pageDStream.getSideOutput(startTag);DataStream<String>dis yDStream=pageDStream.getSideOutput(dis 将不同流的数据推送到下游kafka的不同Topic(分流在MyKafkaUtil工具类中封装获取生产者的方法(publicstaticFlinkKafkaProducer<String>getKafkaSink(Stringtopic)returnnewFlinkKafkaProducer<>(kafkaServer,topic,new}程序中调用kafka工具类获取 FlinkKafkaProducer<String>startSink=MyKafkaUtil.getKafkaSink(TOPIC_START);FlinkKafkaProducer<String>pageSink=MyKafkaUtil.getKafkaSink(TOPIC_PAGE);FlinkKafkaProducer<String>dis ySink=MyKafkaUtil.getKafkaSink(TOPIC_DIS 测试Idea中运行DwdBaseLogrt_applog下模拟生成数据的jar第4章功能3:准备业务数据-DWD业务数据的变化,我们可以通过Maxwell到,但是MaxWell是把全部数据统一Topic中,这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层数据,经过处理后,将维度数据保存到Hbase,将事实数据写回Kafka作为业务数据的DWD层。接收Kafka数据,过滤空值数据实现动态分流功能由于MaxWell是把全部数据统一写入一个Topic中,这样显然不利于日后的数据处HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又另一种是用mysql数据库,周期性的同步,使用FlinkCDC这里选择第二种方案,主要是mysql对于配置数据初始化和管理,用sql都比较把分好的流保存到对应表、业务数据保存到Kafka的中维度数据保存到Hbase的表中packageimportcom.alibaba.fastjson.JSON;importpackageimportcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importimportorg.ap importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importAuthor:Date:publicclassBaseDBApppublicstaticvoidmain(String[]args)throwsExceptionStreamExecutionEnvironmentenv=env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);//TODO2.从 //2.1定 Stringtopic="ods_base_db_m";StringgroupId="basedbapp_group";//2.2获取FlinkKafkaConsumer<String>kafkaSource=MyKafkaUtil.getKafkaSource(topic,groupId);DataStreamSource<String>kafkaDS=//2.3对流的数据进行结构转换StringSingleOutputStreamOperator<JSONObject>jsonObjDS=kafkaDS.map(newMapFunction<String,JSONObject>(){publicJSONObjectmap(StringjsonStr)throwsException{returnJSON.parseObject(jsonStr);}}SingleOutputStreamOperator<JSONObject>jsonObjDS=->SingleOutputStreamOperator<JSONObject>jsonObjDS=//2.4对数据进行简单的SingleOutputStreamOperator<JSONObject>filteredDS=jsonObjDS.filter(newFilterFunction<JSONObject>(){publicpublicbooleanfilter(JSONObjectjsonObj)throwsException{booleanflag=jsonObj.getString("table")!=null&&jsonObj.getString("table").length()>0&&jsonObj.getJSONObject("data")!=null&&jsonObj.getString("data").length()>3;return}}}}根据MySQL的配置表,动态进行分流我们通过FlinkCDC动态配置表的变化,以流的形式将配置表的变化读到程序中,1)准备工作pom.xml依赖MysqlCREATECREATETABLE`table_process``source_table`varchar(200)NOTNULLCOMMENT来源表`operate_type`varchar(200)NOTNULLCOMMENT'操作类型`sink_type`varchar(200)DEFAULTNULLCOMMENT输出类型hbase`sink_table`varchar(200)DEFAULTNULLCOMMENT'输出表 `sink_columns`varchar(2000DEFAULTNULLCOMMENT输出字段`sink_pk`varchar(200DEFAULTNULLCOMMENT主键字段`sink_extend`varchar(200DEFAULTNULLCOMMENT建表扩展',PRIMARYKEY(`source_table`,`operate_type`))ENGINE=InnoDBDEFAULTpackagepackageimportpublicclassTableProcess//动态分流Sink常 一publicstaticfinalStringSINK_TYPE_HBASE="hbase";publicstaticfinalStringSINK_TYPE_KAFKA="kafka";publicstaticfinalStringSINK_TYPE_CK="clickhouse";String//insert,update,deleteStringoperateType;//hbasekafkaStringsinkType; StringsinkTable;StringStringsinkPk;String}在MySQLBinlog添加对配置数据库的,并重启sudosudo 参考CDC文22)通过配置表形成广播流,主流和广播流进行连接//TODO3.使用 DebeziumSourceFunction<String>sourceFunction=.deserializer(newDataStreamSource<String>mysqlDS=MapStateDescriptor<String,TableProcess>mapStateDescriptor=newMapStateDescriptor<>("table-process",String.class,TableProcess.class);BroadcastStream<String>broadcastStream=mysqlDS.broadcast(mapStateDescriptor);//TODO4.BroadcastConnectedStream<JSONObject,String>connectedStream=自定义CDC的反序列化器CDC格式数据转换为jsonpackagepackageimportimportcom.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;importio.debezium.data.Envelope;importorg.ap importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importimportAuthor:publicclassMyDeserializationSchemaFunctionimplementsDebeziumDeserializationSchema<String>{publicvoiddeserialize(SourceRecordsourceRecord,Collector<String>collector)throwsException{//定义JSONJSONObjectresult=newStringtopic=sourceRecord.topic();String[]split=topic.split("\\.");Stringdatabase=split[1];Stringtable=Envelope.Operationoperation=Structstruct=(Struct)Structafter=struct.getStruct("after");JSONObjectvalue=newJSONObject();System.out.println("Value:"+if(after!=null)Schemaschema=for(Fieldfield:schema.fields()){Stringname=();Objecto=after.get(name);System.out.println(name+":"+o);value.put(name,o);}}//将数据放入JSONresult.put("database",database);result.put("table",table);Stringtype=operation.toString().toLowerCase();if("create".equals(type)){type=}result.put("type",type);result.put("data",value);}publicTypeInformation<String>getProducedType(){returnTypeInformation.of(String.class);}}主程序中对连接后的数据进行分流////TODO5.对数据进行分流操作维度数据放到侧输出流事实数据放到主流OutputTag<JSONObject>dimTag=newOutputTag<JSONObject>("dimTag"){};SingleOutputStreamOperator<JSONObjectrealDSconnectedScess(newDataStream<JSONObject>dimDS=自定义函数packagepackageimportimportimportimport import import import importimportimportimportimportimportimportimportimportAuthor:Date:Desc:publicclassTableProcessFunctionextendsBroadcastProcessFunction<JSONObject,String,JSONObject>privateConnectionconn=privateOutputTag<JSONObject>privateMapStateDescriptor<String,TableProcess>this.outputTag=this.mapStateDescriptor=}publicvoidopen(Configurationparameters)throwsExceptionconn=}publicvoidprocessElement(JSONObjectjsonObj,ReadOnlyContextctx,Collector<JSONObject>out)ExceptionReadOnlyBroadcastState<String,TableProcess>broadcastStateStringtable=Stringtype=JSONObjectdataJsonObj=if(type.equals("bootstrap-insert"))type=jsonObj.put("type",}Stringkey=table+":"+TableProcesstableProcess=if(tableProcess!=null)jsonObj.put("sink_table",if(tableProcess.getSinkColumns()!=null&&tableProcess.getSinkColumns().length()>0)}ctx.output(outputTag,}elseif(tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA))}}elseSystem.out.println("NOthisKeyinTableProce"+}}publicvoid ement(Stringvalue,Contextctx,Collector<JSONObject>out)ExceptionJSONObjectjsonObject=Stringdata=TableProcesstableProcess=JSON.parseObject(data,StringsourceTable=StringoperateType=//输出类 StringsinkType= StringsinkTable=//StringsinkColumns=//StringsinkPk=StringsinkExtend=Stringkey=sourceTable+":"+if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)&&"insert".equals(operateType))checkTable(sinkTable,sinkColumns,sinkPk,}//BroadcastState<String,TableProcess>broadcastState=broadcastState.put(key,}privatevoidcheckTable(StringtableName,Stringfields,Stringpk,Stringext)if(pk==pk=}if(ext==ext=}String[]fieldsArr=StringBuildercreateSql=newStringBuilder("createtableifnotexists"+for(inti=0;i<fieldsArr.length;i++)Stringfield=createSql.append(field).append("varcharprimarykeycreateSql.append("info.").append(field).append("varchar}//如果不是最后一个字段拼接逗号if(i<fieldsArr.length-createSql.append(}}System.out.println("Phoenix的建表语句:PreparedStatementps=tryps=}catch(SQLExceptione)thrownewRuntimeException("Phoenix中创建维度表失败}finallytry}catch(SQLExceptione)}}}}//sinkColumns:privatevoidfilterColumn(JSONObjectdataJsonObj,StringsinkColumns)String[]fieldArr=List<String>fieldList=Set<Map.Entry<String,Object>>entrySet=// tor<Map.Entry<String,Object>>it= forit.hasNext()Map.Entry<String,Object>entry=//如果sink_columns中不包含遍历出的属 将其删}}}}定义一个项目中常用的配置常量类packagepackageAuthor:publicclassGmallConfigpublicstaticfinalStringHBASE_SCHEMA="GMALL2021_REALTIME";publicstaticfinalString}分流Sink之保存维度到HBase(Phoenix)程序流程分析DimSink继承了RichSinkFunction,functionopen(图中紫线,我们可以把连接的初始化工作放另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。引入apache工具包以及phoenix依赖<<mons-beanutils是Apache开源组织提供的用于操作JAVABEAN使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作因为要用单独的schema,所以在Idea程序中加入hbase-<?xml<?xml<?xml-stylesheettype="text/xsl" hbasenamespace和phoenixschemalinux服务上,也需要在hbasephoenixhbase-site.xml配置文件中,加上以上两个配置,并使用xsync进行同步。createschemacreateschemapackagepackageimportimportcom.atguigu. importimportimportjava.sql.*;importjava.util.Set;Author:Desc:PhoenixHbasepublicclassDimSinkextendsRichSinkFunction<JSONObject>Connectionconnection=publicvoidopen(Configurationparameters)throwsException{connection=}@param@param@throwspublicvoidinvoke(JSONObjectjsonObject,Contextcontext)throwsExceptionString

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论