大数据可视化 课件 项目4 PyEcharts实战_第1页
大数据可视化 课件 项目4 PyEcharts实战_第2页
大数据可视化 课件 项目4 PyEcharts实战_第3页
大数据可视化 课件 项目4 PyEcharts实战_第4页
大数据可视化 课件 项目4 PyEcharts实战_第5页
已阅读5页,还剩81页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

项目四:—PyEcharts实战(用户行为数据可视化)目录CONTENTS项目概述学习目标任务4.1 ***用户行为数据分析任务4.2 可视化大屏应用思考与练习1项目概述项目概述本项目将致力于使用用户行为数据绘制可视化大屏。用户行为数据是企业运营过程中的重要参考依据,通常会在数据仓库系统中进行分析,生成用户行为数据指标,并通过报表和可视化大屏等工具展示出来。在大数据时代,不管销售什么商品,都会涉及到对用户行为数据的分析。分析用户行为数据的结果可以帮助企业了解用户的消费习惯和偏好,并根据分析结果改进营销策略和完善产品。本项目将解析用户行为数据指标,重点是使用PyEcharts将用户行为数据分析结果绘制成大数据可视化大屏,虽然不涉及用户行为数据分析,但读者仍能对此有所了解。由于仅使用PyEcharts进行可视化大屏绘制,并未使用前端组件,所以与常见的可视化产品的效果略有不同。本项目具体工作如下:用户行为数据分析指标介绍;用户行为数据集简介;针对不同指标数据绘制图表构成可视化大屏。2学习目标学习目标通过学习本项目的知识,了解用户行为数据分析项目,并对PyEcharts技术有扎实的了解,学会灵活使用PyEcharts配置项,并掌握使用PyEcharts绘制地图、直方图、散点图、折线图等图表,以及可视化大屏的绘制技术。具备PyEcharts大数据可视化的基本技能,拓展大数据专业学习视野。3任务4.1 ***用户行为数据分析任务描述经过前面几个项目的学习,对如何使用PyEcharts进行大数据可视化有了一定的了解,但是对于实际的业务应用场景还有所欠缺。本任务是基于对PyEcharts的理解,使用PyEcharts对用户行为数据进行可视化分析实践。通过这个任务,可以理解用户行为数据分析指标,将所学知识应用到实际业务场景中,通过实践激发学习兴趣。完成本任务需要掌握用户行为数据指标体系,了解为何要做用户行为数据分析,如何做用户行为数据分析,并使用PyEcharts根据用户行为数据分析指标结果绘制可视化大屏,通过学习用户行为数据可视化大屏案例,加深对PyEcharts的应用理解。知识与技能一、什么是用户行为数据

用户行为数据在绝大多数情况下,就是在用户使用APP、浏览网页过程中的日志数据。以电商为例,日志数据又分为页面数据、事件数据、曝光数据、启动数据和错误数据。

页面数据:主要记录用户在一个页面中的访问情况,如访问时间、停留时间、访问路径等信息。

事件数据:主要记录应用中具体的操作行为,包括操作类型、操作对象、操作对象描述等信息,如电商点击商品、添加收藏等。

曝光数据:使用APP时在首页区域滚动的界面,主要记录页面曝光的内容,包括曝光对象、曝光类型等。

启动数据:启动APP时显示的信息,一般是一些广告信息,或者活动信息。

错误数据:主要记录应用在使用过程中产生的错误信息。知识与技能二、用户行为数据从哪里来

用户行为数据一般都来源于用户的使用日志数据,而日志数据是怎么产生的呢?

一般而言,日志数据是采用页面埋点的方式获取的。

不同的日志结构可能会有所区别,但是涉及用户行为数据的大同小异。一般含有地区编码、手机品牌、渠道、手机型号、操作系统、动作id、事件时间、用户标识等。知识与技能三、用户行为数据指标体系统计周期指标说明最近1日PVpageview,页面点击量最近1日UVuniqueview,独立访客统计周期统计粒度指标最近1/7/30日渠道访客数最近1/7/30日渠道会话平均停留时长最近1/7/30日渠道会话平均浏览页面数最近1/7/30日渠道会话总数最近1/7/30日渠道跳出率统计周期指标说明最近1天流失用户数之前活跃过的用户,最近一段时间未活跃,就称为流失用户。(统计每天的前7天)知识与技能统计周期指标说明最近1日回流用户数之前的活跃用户,一段时间未活跃,今日又活跃了,就称为回流用户。统计周期指标说明最近1日留存率统计每天的1-7日留存率或者每天的1-30日留存率,统计的是新增留存率统计周期指标最近1、7、30日新增用户数最近1、7、30日活跃用户数统计周期指标最近1、7、30日选择人数最近1、7、30日扫码人数最近1、7、30日领取人数统计周期统计粒度指标最近1、7、30日年龄段下单人数环境安装一、Ubuntu20.4安装Step1卸载自带python在Ubuntu中,通常会预装Python2.x和Python3.x版本。如果您想要完全卸载所有现有的Python版本,可以使用以下命令:```sudoapt-getremove--purgepython2.7-minimalpython2.7sudoapt-getremove--purgepython3-minimalpython3sudoapt-getautoremove```

在UbuntuServer20.04中安装Python3.8.10环境,可以按照以下步骤进行:Step2更新系统软件包sudoapt-getupdate环境安装Step3安装必要的依赖项sudoapt-getinstallbuild-essentiallibssl-devzlib1g-devlibncurses5-devlibncursesw5-dev\libreadline-devlibsqlite3-devlibgdbm-devlibdb5.3-devlibbz2-devlibexpat1-devliblzma-dev\tk-devlibffi-devwgetcurl环境安装Step4下载Python3.8源码包cd~wget/mirrors/python/3.8.10/Python-3.8.10.tgzStep5解压源码包并进入目录tar-xzvfPython-3.8.10.tgzcdPython-3.8.10Step6编译和安装Python3.8./configure--enable-optimizationsmake-j8sudomakealtinstallStep7验证Python安装是否成功python3.8--version如果出现类似以下输出结果,则说明Python3.8安装成功:Python3.8.10环境安装环境安装Step8更新软链接#先看一看python的链接sudofind/-typef-namepython3.8#如果输出结果是/usr/local/bin/python3.8#再继续执行下面的命令sudoln-s/usr/local/bin/python3.8/usr/local/bin/python3刷新Bash中`python`命令的哈希表hash-rpython3#之后尝试使用python命令,是否可以进入环境环境安装Step9安装pipsudoaptinstallpython3-pipStep10安装相关模块python3-mpipinstalluser_agents-i/simplepython3-mpipinstallpandas-i/simple环境安装二、Hadoop安装Step1:安装Java8Step2:安装SSH服务sudoaptupdatesudoaptinstallsshsudoaptupdatesudoaptinstallopenjdk-8-jdkStep3:下载和解压缩Hadoop3.3.3wget/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz环境安装当然,教学资源中提供了hadoop的安装包,可下载并解压缩Hadoop3.3.3:tar-xvfhadoop-3.3.3.tar.gz#只是为了改名,变得简单些sudomvhadoop-3.3.3hadoopStep4:配置环境变量

sudovi/etc/profile在文件最后添加以下内容:exportHADOOP_HOME=/home/arthas/hadoopexportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin环境安装然后保存并关闭文件。接下来,重新加载当前的shell环境:source/etc/profileStep5:修改Hadoop配置文件

打开Hadoop配置文件:sudovi/home/arthas/hadoop/etc/hadoop/hadoop-env.sh修改JAVA_HOME变量的值为Java8的安装路径:exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64然后保存并关闭文件。环境安装接下来,打开core-site.xml配置文件:sudovi/home/arthas/hadoop/etc/hadoop/core-site.xml在<configuration>标签内添加以下内容:<property>

<name>fs.defaultFS</name>

<value>hdfs://localhost:9000</value>

</property>

<property>

<name>hadoop.tmp.dir</name>

<value>/home/arthas/hadoop/tmp</value>

<description>Abaseforothertemporarydirectories.</description>

</property>然后保存并关闭文件。环境安装现在,打开hdfs-site.xml配置文件:sudovi/home/arthas/hadoop/etc/hadoop/hdfs-site.xml在<configuration>标签内添加以下内容:<property>

<name>dfs.replication</name>

<value>3</value>

</property>

<property>

<name>.dir</name>

<value>file:/home/arthas/hadoop/hadoop_data/hdfs/namenode</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/home/arthas/hadoop/hadoop_data/hdfs/datanode</value>

</property>

<property>

<name>dfs.http.address</name>

<value>:50070</value>

</property>环境安装然后保存并关闭文件。最后,打开mapred-site.xml:sudovi/home/arthas/hadoop/etc/hadoop/mapred-site.xml在<configuration>标签内添加以下内容:<property>

<name></name>

<value>yarn</value></property>Step6:格式化HDFS

在启动Hadoop之前,我们需要格式化HDFS:hdfsnamenode-format环境安装Step7:生成

SSH密钥生成SSH密钥:

ssh-keygen-trsa-P''-f~/.ssh/id_rsa然后将该密钥添加到授权密钥列表中:cat~/.ssh/id_rsa.pub>>~/.ssh/authorized_keyschmod0600~/.ssh/authorized_keysStep8:开启密钥认证修改/etc/ssh/sshd_config:sudovi/etc/ssh/sshd_config环境安装在其内容中找到PubkeyAuthentication改为yesPubkeyAuthenticationyes然后保存并关闭文件。重新启动SSH服务以使更改生效:sudosystemctlrestartsshdStep9:启动Hadoop现在,进入hadoop安装根目录,您可以通过以下命令启动Hadoop:~/hadoop/sbin/start-all.sh启动过程(正常):arthas@arthas:~$start-all.shWARNING:AttemptingtostartallApacheHadoopdaemonsasarthasin10seconds.WARNING:Thisisnotarecommendedproductiondeploymentconfiguration.WARNING:UseCTRL-Ctoabort.Startingnamenodeson[localhost]StartingdatanodesStartingsecondarynamenodes[arthas]StartingresourcemanagerStartingnodemanagers环境安装一、Spark安装Step1:安装Scala教学资源中提供了spark的安装包,可拷贝并解压,解压缩并更名:cd~tar-xvfscala-2.11.12.tgzsudomvscala-2.11.12scalaStep2:安装Sparkcd~tar-xvfspark-3.3.0-bin-hadoop3.tgzsudomvspark-3.3.0-bin-hadoop3sparkStep3:配置Scala和Spark环境变量sudovi/etc/profile环境安装进入配置文件后,输入“i”进入编辑模式,将以下内容复制(重复内容可忽略):exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64exportPATH=$PATH:$JAVA_HOME/bin

exportHADOOP_HOME=/home/arthas/hadoopexportPATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

exportSCALA_HOME=/home/arthas/scalaexportPATH=$PATH:$SCALA_HOME/bin

exportSPARK_HOME=/home/arthas/sparkexportPATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin输入以下命令将环境重新生效:source/etc/profile环境安装Step4:修改Spark文件进入spark/conf/文件夹,复制配置文件spark-env.sh.template并更名为spark-env.sh之后,编辑该文件:cp~/spark/conf/spark-env.sh.templatespark-env.shsudovispark/conf/spark-env.sh进入配置文件后,输入“i”进入编辑模式,将以下内容复制:exportJAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64exportSCALA_HOME=/home/arthas/scalaexportHADOOP_HOME=/home/arthas/hadoopexportHADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopexportSPARK_MASTER_IP=MasterexportSPARK_LOCAL_DIRS=/home/arthas/scalaexportSPARK_DRIVER_MEMORY=2g任务实施4.1.1行为数据需求分析第一步:分析原始数据内容hdfsdfs-put/opt/software/data/behavior_202112-part/user/ysedu/hdfsdfs-put/opt/software/data/access_202112-part/user/ysedu/由于数据量较大,本任务使用Hadoop的hdfs存储数据,spark处理数据,可使用资料中已安装好全部环境的docker镜像tar包。也可使用资料包中安装包或自行下载后安装,本环境使用的版本为Hadoop3.3.3和spark3.3.0。本案例我们将基于某小程序2021年12月脱敏后真实数据进行分析,包括用户行为数据behavior_202112-part和访问日志access_202112-part。首先使用hdfs将准备好的原始数据导入任务实施4.1.1行为数据需求分析第一步:分析原始数据内容通过hdfs的cat命令可以查看我们原始数据hdfsdfs-cat/user/ysedu/behavior_202112-part/part-00000hdfsdfs-cat/user/ysedu/access_202112-part/di_log.2021122007.209.log

access_202112-part记录了用户nginx的请求数据,但其中ip需要解析为地理位置,浏览器信息可以解析为手机型号,token需要转换为用户id,以便进行用户行为数据的分析和展示。任务实施4.1.1行为数据需求分析第二步:根据需求制定分析指标根据以上数据我们可以利用整体的pv/uv(访问量/用户浏览量),展示应用整体的访问量情况;也可以根据不同页面page得出各页面的pv/uv,用于了解哪些页面展示得最多,从而将主要功能放在该页面。哪些想要展示的页面因为隐藏得比较深导致浏览量不高,需要做操作流程上的优化处理。根据用户通过不同渠道channel_code访问进入程序,可以展示各渠道的pv/uv,了解哪些优质渠道可以带来更多的用户访问,哪些渠道效果不佳,从而筛选出更好的渠道进行合作。根据访问时长spot_val,可以求得平均每次访问的停留时长。也可以得出平均每个用户的访问时长,结合页面和日期时间等,可以分析出在不同维度下的用户黏性,以此为依据了解什么时间段,什么样的功能页面更吸引用户。任务实施4.1.1行为数据需求分析第二步:根据需求制定分析指标根据访问日志,筛选url为登录接口/di/reg/get_login_token,可以统计出每天各时段的启动次数人数。也可以通过统计用户登录时段,了解用户在什么时候最活跃,从而可以优先考虑在该时段做活动或发文章。根据省份/城市的pv/uv查找出当前最活跃的地区,以及找到有潜力成为下一个活跃地区的城市。结合渠道信息,针对当地的情况找到合适的渠道,更快获取新客户、留住老客户。还可以分析事件转化,如扫码领取事件,可以分别分析扫码事件、选商品事件和领取事件的数据量,通过漏斗图分析用户流失环节,从而有针对性地做出优化。任务实施4.1.1行为数据需求分析第二步:根据需求制定分析指标最后还可以根据用户首次访问时间和之后再次访问时间,计算得出用户访问留存率,以此为数据支撑,对现有的业务做出合理评价,便于制定和调整计划。留存率是用于反映运营情况的统计指标,其具体含义为在统计周期(周/月)内,每日活跃用户数在第N日仍启动该App的用户数占比的平均值。其中N通常取2、4、8、15、31,分别对应次日留存率、三日留存率、七日留存率、半月留存率和月留存率。4.1.2原始数据简单处理任务实施上面提到access_202112-part数据还需要处理,需要将ip解析为地理位置,浏览器信息解析为手机型号,token解析为用户id。第一步:创建viuser_behavior_analysis_1.py创建user_behavior_analysis_1.py文件编写数据处理内容importsys

frompyspark.sqlimportSparkSession

fromdata_cleaningimportDataCleaning

fromdata_cleaningimportSplitType

fromip_areaimportIpArea

fromuser_agentimportUserAgentParser

importtime

#获取sparkSession

defget_spark_session():

returnSparkSession.builder.appName("BehaviorLog").getOrCreate()

defmain(argv):

iflen(argv)>1:

input_path=argv[1]

else:

sys.stderr.write("Usage:"+argv[0]+"<inputpath>[<outputpath>]\n")

exit(2)

任务实施

iflen(argv)>2:

output_path=argv[2]

else:

output_path=None

spark=get_spark_session()

rdd=spark.sparkContext.textFile(input_path)

#设定正则格式

reg_str='^(\\S+)\\S+\\S+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\"(\\S+)(\\S+)\\s*\\S+\\s*\"(\\d{3})(\\S+)(\\S+)\"(\\S*)\"\"([^\\\"]+)\"\"\\S*\"\"(\\S*)\"'

colums=["ip","day_time","method","url","rt_code","size","cost","refer_url","ua","token"]

defurl_parse(r):

url=r["url"]

url_path=""

uuid=""

pos=url.find("?")

ifpos>=0:

#切割url参数

url_path=url[:pos]

url_params=url[pos+1:].split('&')

forone_paraminurl_params:

param_pos=one_param.find("=")

ifparam_pos>=0:

key=one_param[:param_pos]

val=one_param[param_pos+1:]

ifkey=="uuid":

uuid=val

return(url_path,uuid)

ip_area=IpArea()

ip_area.set_dict(ip_area.load_ip_area_dict('./data/ip_region.data'),

ip_area.load_ip_area_dict('./data/ip_country.data'))任务实施before_converts={

("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',

time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),

("url_path","uuid"):url_parse,

("brower","os","platform","is_mobile"):lambdar:UserAgentParser().parse(r["ua"])

}

#只筛选/di开头的url

filters=[lambdar:r["url_path"].startswith("/di")]

after_converts={}

dc=DataCleaning(SplitType.regex,reg_str,colums,before_converts,filters,after_converts)

dc.remove_columns(["url","refer_url","ua"])

rdd=rdd.map(dc.format)

rdd=rdd.map(dc.convert).filter(lambdar:r!=None)

rdd=rdd.map(dc.tab_text)

#输出数据

ifoutput_path:

rdd.saveAsTextFile(output_path)

else:

print('\n'.join([str(item)foriteminrdd.take(10)]))

if__name__=="__main__":

main(sys.argv)任务实施这里用到了工具类DataCleaning数据清洗类,制定数据解析类型为正则,reg_str='^(\\S+)\\S+\\S+\\[([\\w:/]+\\s[+\\-]\\d{4})\\]\"(\\S+)(\\S+)\\s*\\S+\\s*\"(\\d{3})(\\S+)(\\S+)\"(\\S*)\"\"([^\\\"]+)\"\"\\S*\"\"(\\S*)\“’解析出来的字段对应为colums=["ip","day_time","method","url","rt_code","size","cost","refer_url","ua","token"]字典before_converts定义了各类数据和对应的处理方法before_converts={

("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),

("url_path","uuid"):url_parse,

("brower","os","platform","is_mobile"):lambdar:UserAgentParser().parse(r["ua"])

}任务实施筛选项目指定url为/di开头的数据filters=[lambdar:r["url_path"].startswith("/di")]

最后调用处理数据dc=DataCleaning(SplitType.regex,reg_str,colums,before_converts,filters,after_converts)

#删除多余列

dc.remove_columns(["url","refer_url","ua"])

rdd=rdd.map(dc.format)

rdd=rdd.map(dc.convert).filter(lambdar:r!=None)

#保存数据

rdd=rdd.map(dc.tab_text)任务实施第二步:ip解析地域("country","province","city"):lambdar:ip_area.get_area_by_ip(r["ip"]),

这里用到的是fromip_areaimportIpArea会用到两个文件,ip_region.dataip_country.data,用于解析ip所属国家和所属地区。创建ip_area.py文件,写入如下代码#-*-coding:utf-8-*-

#ip_area.py

classIpArea():

#指定ip地区和ip城市字典文件

def__init__(self,ip_region_file=None,ip_country_file=None):

ifip_region_file:

self.ip_region_dict=self.load_ip_area_dict(ip_region_file)

ifip_country_file:

self.ip_country_dict=self.load_ip_area_dict(ip_country_file)

defset_dict(self,ip_region_dict,ip_country_dict):

self.ip_region_dict=ip_region_dict

self.ip_country_dict=ip_country_dict任务实施

defload_ip_area_dict(self,ip_region_file):

res=[]

fb=open(ip_region_file,'r')

try:

whileTrue:

line=fb.readline()

ifnotline:

break

parts=line.split("\t")

#地域多个,则拼接成一个字符串

res.append([ip_to_long(parts[0]),ip_to_long(parts[1]),','.join(parts[2:]).strip()])

exceptIOErroraserr:

print("IOError:",err)

finally:

fb.close()

#排序

returnsorted(res,key=lambdak:k[:2])

defget_area_by_ip(self,ip_str):

ip=ip_str[7:]ifip_str.startswith("::ffff:")elseip_str

ip_long=ip_to_long(ip_str)

#ip查找城市

ip_region_dic_list=self.ip_region_dict

ip_index=-1

iflen(ip_region_dic_list)>0:

ip_index=ip_search_section(ip_region_dic_list,0,len(ip_region_dic_list)-1,ip_long)任务实施country=None

province="未知"

city="未知"

if-1!=ip_index:

region_cell=ip_region_dic_list[ip_index]

region_str=region_cell[2]

ifregion_str!="其他":

country="中国"

region_parts=region_str.split(",")

province=region_parts[0]

city=region_parts[1]iflen(region_parts)>1else"其他"

ifNone==country:

#ip查找国家或区域

ip_country_dic_list=self.ip_country_dict

country_index=-1

iflen(ip_country_dic_list)>0:

country_index=ip_search_section(ip_country_dic_list,0,len(ip_country_dic_list)-1,ip_long)

if-1!=country_index:

country_cell=ip_country_dic_list[country_index]

country_str=country_cell[2]

if"China"==country_str:

country="中国"

elif"Reserved"==country_str:

country="其他"

elif"Taiwan;RepublicofChina(ROC)"==country_str:

country="中国"

province="台湾"

elif"HongKong"==country_str:

country="中国"

province="香港"

return(country,province,city)任务实施#二分法查找IP

defip_search_section(ip_dict_list,low,high,ip_long):

iflow<=high:

mid=int((low+high)/2)

dic=ip_dict_list[mid]

s_ip=dic[0]

e_ip=dic[1]

ifip_long>=s_ipandip_long<=e_ip:

returnmid

elifip_long>e_ip:

returnip_search_section(ip_dict_list,mid+1,high,ip_long)

elifip_long<s_ip:

returnip_search_section(ip_dict_list,low,mid-1,ip_long)

return-1

#ip转长整型

defip_to_long(ip_str):

ip_long=[0,0,0,0]

p1=ip_str.find(".")

ifp1==-1:

returnint(ip_str)

p2=ip_str.find(".",p1+1)

p3=ip_str.find(".",p2+1)

ip_long[0]=int(ip_str[:p1])

ip_long[1]=int(ip_str[p1+1:p2])

ip_long[2]=int(ip_str[p2+1:p3])

ip_long[3]=int(ip_str[p3+1:])

return(ip_long[0]<<24)+(ip_long[1]<<16)+(ip_long[2]<<8)+ip_long[3]

if__name__=="__main__":

ip_area=IpArea("./data/ip_region.data","./data/ip_country.data")

print(ip_area.get_area_by_ip("15"))任务实施定义类IpArea,__init__指定ip_region和ip_country文件路径load_ip_area_dict用于读取文件并按ip排序get_area_by_ip为通过ip解析地址的方法,其中ip_to_long为将ip转为long类型数字,ip_search_section根据ip通过二分法递归查找对应所在位置下标第三步:解析时间为%Y-%m-%d%H:%M:%S"day_time":lambdar:time.strftime('%Y-%m-%d%H:%M:%S',time.strptime(r["day_time"][:20],'%d/%b/%Y:%H:%M:%S')),例如将日志中数据“31/Dec/2021:23:45:12+0800”解析为“2021-12-3123:45:12”返回。任务实施第四步:解析url中的uuid("url_path","uuid"):url_parse,

url_parse定义在user_behavior_analysis_1.py文件中,例如url为/diclient/user/perfect_info?scene=a45bba09140ffedd96d8719f294b7cd1&uuid=1640965488711&version=8将会被解析返回url_path:"/diclient/user/perfect_info",uuid:"1640965488711"第五步:ua解析手机型号("brower","os","platform","is_mobile"):lambdar:uap.parse(r["ua"]),

任务实施这里用到了fromuser_agentimportUserAgentParser需要拷贝手机型号数据文件user_agent_dict.data,用于解析ua中手机型号。创建user_agent.py文件,写入如下代码#-*-coding:utf-8-*-

#user_agent.py

importuser_agents

classUserAgentParser():

def__init__(self):

phone_map={}

#打开字典文件

withopen('./data/user_agent_dict.data','r')asfb:

whileTrue:

#逐行读取文件

line=fb.readline()

ifnotline:

break

parts=line.split("\t")

iflen(parts)!=2:

print("Unknowlinesplit:"+str(parts))

phone_map[parts[0].strip()]=parts[1].strip()

#记录map

self.phone_map=phone_map

#根据map解析

defparse(self,ua_str):

user_agent=user_agents.parse(ua_str)

dev=user_agent.device.family

platform='Other'ifdev=='Other'elseself.phone_map.get(user_agent.device.family,user_agent.device.family)

broswer,os,platform,is_mobile=(user_agent.browser.family,user_agent.os.family,platform,1ifuser_agent.is_mobileelse0)

return(broswer,os,platform,is_mobile)任务实施第六步:token解析用户信息("shopid","userid"):lambdar:tui.parse_userinfo(r['token'])

解析token用到了fromtoken_userinfoimportTokenUserinfo需要用到数据文件user_token_map.csv,用于将token转化为userid和shopid。将文件拷贝到data目录下。cp/opt/software/data/user_agent_dict.data/home/ysedu/workspace/data/

任务实施在/home/ysedu/workspace下创建token_userinfo.py文件,写入如下代码:#-*-coding:utf-8-*-

classTokenUserinfo():

def__init__(self,token_userinfo_map_path='data/user_token_map.csv'):

token_userinfo_map={}

#打开用户token对应字典文件

withopen(token_userinfo_map_path,'r')asfb:

whileTrue:

line=fb.readline()

ifnotline:

break

parts=line.strip().split('","')

token=parts[0][1:]

shopid=parts[1]

shopid=shopidifshopid!='0'else''

userid=parts[2][:-1]

userid=useridifuserid!='0'else''

token_userinfo_map[token]=(shopid,userid)

self.token_userinfo_map=token_userinfo_map

defparse_userinfo(self,token):

#根据map解析

userinfo=self.token_userinfo_map.get(token)

ifuserinfo:

returnuserinfo

else:

return('','')任务实施第七步:执行数据处理编写启动脚本start_uba_spark_1.sh,内容如下#!/bin/bash

source/etc/profile

data_path=/user/ysedu/access_202112-part

res_path=/user/ysedu/uba/access_202112-part

hadoopfs-rm-r$res_path

SPARK_MASTER=local

spark-submit--master$SPARK_MASTER--py-files=data_cleaning.py,ip_area.py,user_agent.py,token_userinfo.pyuser_behavior_analysis_1.py$data_path$res_path执行后完成数据处理并写入/user/ysedu/uba/access_202112-part4.1.3用户行为数据统计任务实施第一步:创建viuser_behavior_analysis_2.py#-*-coding:utf-8-*-

importsys

frompyspark.sqlimportSparkSession

frompyspark.sqlimportSQLContext

frompyspark.sql.typesimport*

importpandasaspd

fromdata_cleaningimportDataCleaning

fromdata_cleaningimportSplitType

fromdata_statimport*

defget_spark_session():

returnSparkSession.builder.appName("BehaviorStat").getOrCreate()

output_path=None

sc=None

defmain(argv):

iflen(argv)>2:

input_request_path=argv[1]

input_accesslog_path=argv[2]

else:

sys.stderr.write("Usage:"+argv[0]+"<input_request_path><input_accesslog_path>[<outputpath>]\n")

exit(2)

globaloutput_path

globalsc任务实施

iflen(argv)>3:

output_path=argv[2]

spark=get_spark_session()

sc=spark.sparkContext

#清洗请求日志数据

rqt=DataCleaning(SplitType.field,"\t",["day_time","url","channel_code","token","spot_code","spot_val","uuid","item_id","action","page","shopid","userid"])

accesslog=DataCleaning(SplitType.field,"\t",["ip","day_time","method","rt_code","size","cost","token","country","province","city","url_path","uuid","brower","os","platform","is_mobile","shopid","userid"])

#读取rdd数据

rqt_rdd=sc.textFile(input_request_path).map(rqt.format)

access_rdd=sc.textFile(input_accesslog_path).map(accesslog.format)

#分别计算各所需数据

page_stat(rqt_rdd)

total_stat(rqt_rdd)

channel_stat(rqt_rdd)

total_avg_access_duration_stat(rqt_rdd)

user_avg_access_duration_stat(rqt_rdd)

terminal_stat(access_rdd)

province_city_stat(access_rdd)

daily_stat(rqt_rdd)

hour_stat(access_rdd)

get_gift_convert_stat(access_rdd)

retention_stat(sc.textFile(input_accesslog_path).map(accesslog.format))

spark.stop()右图代码中分别引入hdfs文件/user/ysedu/data_cleaning/behavior_202112-part和/user/ysedu/uba/access_202112-part作为数据源,按以下列解析任务实施接下来我们分别完成统计的各个函数,并最终调用输出数据。提前编写好执行文件,每步写完一个函数,即可执行查看结果。vistart_uba_spark_2.sh#!/bin/bash

source/etc/profile

request_data_path=/user/ysedu/data_cleaning/behavior_202112-part

access_data_path=/user/ysedu/uba/access_202112-part

res_path=/user/ysedu/uba/behavior_202112-part

hadoopfs-rm-r$res_path

SPARK_MASTER=local

spark-submit--master$SPARK_MASTER--py-files=data_cleaning.py,data_stat.pyuser_behavior_analysis_2.py$request_data_path$access_data_path$res_path任务实施第二步:计算整体pvuv#整体pvuv

deftotal_stat(rdd):

dim_columns=[]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用数据统计类DataStat,筛选"spot_code"为"spot_interval"访问数据,然后分别计算总数和userid去重后数量,得到pv和uv。将得到的数据保存到data/stat_result/目录下,文件名为当前函数名,通过sys._getframe().f_code.co_name获得。任务实施第三步:页面pv

uv#页面pvuv

defpage_stat(rdd):

dim_columns=["page"]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#根据pv排序

rdd=rdd.sortBy(lambdar:r[-2],False,1)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用数据统计类DataStat,筛选"spot_code"为"spot_interval"访问数据,这里按page进行分组,分别计算总数和userid去重后数量,得到pv和uv,并使用倒数第二列,也就是pv进行倒序。将得到的数据保存到data/stat_result/目录下,文件名为当前函数名。任务实施第四步:渠道访问pvuv#渠道访问pvuv

defchannel_stat(rdd):

dim_columns=["channel_code"]

indices=[Count(),CountUniq(["userid"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#根据pv排序

rdd=rdd.sortBy(lambdar:r[-2],False,1)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))第五步:平均访问时长#平均访问时长

deftotal_avg_access_duration_stat(rdd):

dim_columns=[]

indices=[Avg(["spot_val"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(ds.converts).reduceByKey(ds.calculates).map(ds.map_results)

#保存

fb=open("data/stat_result/"+sys._getframe().f_code.co_name,"w")

fb.write('\n'.join(rdd.map(ds.tab_text).collect()))使用数据统计类DataStat,筛选"spot_code"为"spot_interval"访问数据,计算spot_val也就是停留时长的平均值。将得到的数据保存到data/stat_result/目录下,文件名为当前函数名。使用数据统计类DataStat,筛选"spot_code"为"spot_interval"访问数据,按channel_code进行分组,分别计算总数和userid去重后数量,得到pv和uv,并按pv倒序。任务实施第六步:用户访问时长#用户访问时长

defuser_avg_access_duration_stat(rdd):

dim_columns=["uuid"]

indices=[Avg(["spot_val"])]

ds=DataStat(dim_columns,indices)

rdd=rdd.filter(lambdar:r["spot_code"]=="spot_interval").map(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

最新文档

评论

0/150

提交评论