spark交流0804内训介绍下_第1页
spark交流0804内训介绍下_第2页
spark交流0804内训介绍下_第3页
spark交流0804内训介绍下_第4页
spark交流0804内训介绍下_第5页
已阅读5页,还剩43页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Spark介绍(下)陈希富2015年07月目录123初识ScalaSpark流计算Spark安装及基本操作概述曾经有人问Java的创始人高斯林这样一个问题,“除了Java语言以外,您现在还使用JVM平台上的哪种编程语言?”他毫不犹豫的说是Scala。在目前众多的JVM语言当中,Scala无疑是最引人注意的语言之一。连Java8也引入了FP。

从某种程度上来说,Java认可了Scala的做法。Scala是一门多范式编程语言,综合了多门语言的风格和思想,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。可以说,Scala是面向函数与面向对象的混合Scala能否成为Java杀手?一个Twitter的开发人员说:Scala将会成为现代Web2.0的发起语言Scala是运行在JVM之上的,Scala的代码也被编译为.class形式,性能接近Java。Scala是一个静态语言,更适合大型工程项目。Scala是针对“并发性问题”的解决方案之一,让开发人员能够更加轻松地专注于问题的实质,而不用考虑并发编程的低级细节。为什么要使用ScalaSpark是用Scala开发的,如果出现异常,在排查问题时对Spark源码比较熟悉,可以起到事半功倍的效果。Scala是一门优雅的语言技术,使用Scala开发Spark应用程序开发效率更高,代码更精简。Actor编程模式让高度并行应用程序的开发更加简单,而不必依照复杂的Java线程模型来编写程序。有人说:熟悉Scala之后再看Java代码,有种读汇编的感觉!两个简单的Scala例子1)对集合求2的整数倍:varlists=List(1,2,3,4,5)2)JavaBean:publicclassPeople{ privateStringname; privateStringsex; publicStringgetName(){returnname;} publicvoidsetName(Stringname){=name;} publicStringgetSex(){returnsex;} publicvoidsetSex(Stringsex){this.sex=sex;} }Scala:classPeople(varname:String,varsex:String){}lists.filter(n=>n%2==0)或lists.filter(_%2==0)Scala的特性Scala是面向对象的Scala是一个纯面向对象语言,在某种意义上来讲所有数值都是对象。对象的类型和行为是由class和trait来描述的。Class的抽象可由子类化和一种灵活的基于mixin的组合机制(它可作为多重继承的简单替代方案)来扩展。Scala是函数式的Scala还是一个函数式语言,在某种意义上来讲所有函数都是数值。Scala为定义匿名函数提供了一种轻量级的语法,它支持高阶(higher-order)函数、允许函数嵌套、支持局部套用(currying)。Scala的case类及其内置支持的模式匹配模型在许多函数式编程语言中都被使用。Scala是静态类型的Scala配备了一套富有表现力的类型系统,该抽象概念以一种安全的和一致的方式被使用。Scala是可扩展的Scala的设计承认了实践事实,领域特定应用开发通常需要领域特定语言扩展。Scala提供了一个独特的语言组合机制,这可以更加容易地以类库的形式增加新的语言结构:任何方式可以被用作中缀(infix)或后缀(postfix)操作符闭包按照所期望的类型(目标类型)自动地被构造两者结合使用可方便地定义新语句,无需扩展语法,也无需使用类似宏的元编程工具。Scala可与Java和.NET进行互操作这一点,在Scala设计之初就已经有所考虑。Scala和java代码上的比较1、告别return语句。

在Scala中无需return语句,代码最后执行的结果为返回值。2、类型推断,你只需要使用val或var。vararg1=“abc”valarg2=2valarg3:Int=43、Tuples(元组)让定义Map<String,Map<String,Integer>>时哭出来。vartuple=(1,”abc”,2)varmap=Map(1->”a”,2->”b”,3->”c”)4、for里还能写iffor(file<-filesiffile.isFile )println(file)5、if语句也有返回值valfilename=if(!args.isEmpty)args(0)else"default.txt"6、多重继承。Trait(类似java的接口)中可以有具体的实现。7、……Scala基本语法变量的定义通过val和var定义val类似java中的final无需;结尾类型推断valmsg="Hello,World"valmsg2:String="Helloagain"函数的定义defadd(x:Int,y:Int):Int={x+y}每个Scala函数都有返回值,只是有些返回值类型为Unit,类似Java中的void类型函数的最后一个表达式的值就可以作为函数的结果作为返回值Ifelse语句也有返回值(其实if也是一个函数)迭代for:for(arg<-args)println(arg)foreach:args.foreach(arg=>println(arg))Scala基本语法数组的操作数组的定义:valargs=newArray[String](3)数组的访问:args(0)=“hello”或args(0)集合的操作List:valnumList=List(1,2)元组(Tuples):valtp=(123,“hello")Set:varscalaSet=Set(“hello",”hi")Map:valscalaMap=Map(1->“I”,2->“II”)Scala基本语法-if和whilevarfilename="default.txt"if(!args.isEmpty)filename=args(0)valfilename=if(!args.isEmpty)args(0)else"default.txt“(使用val更为函数式编程风格)while(a!=0){……}do{...}while(a!=0)(while基本和java类似,且没有返回值,所以要尽量避免使用while)Scala基本语法-for(瑞士军刀)语法格式:valfilesHere=(newjava.io.File(".")).listFilesfor(file<-filesHere)println(file)条件过滤:valfilesHere=(newjava.io.File(".")).listFilesfor(file<-filesHereiffile.isFileiffile.getName.endsWith(".scala"))println(file)Scala基本语法-for(瑞士军刀)嵌套迭代:valfilesHere=(newjava.io.File(".")).listFilesdeffileLines(file:java.io.File)=scala.io.Source.fromFile(file).getLines().toListdefgrep(pattern:String)=for{file<-filesHereiffile.getName.endsWith(".scala")

line<-fileLines(file)ifline.trim.matches(pattern)}println(file+":"+line.trim)grep(".*gcd.*")Scala基本语法-for(瑞士军刀)生成新集合:valfilesHere=(newjava.io.File(".")).listFilesdefscalaFiles=for{file<-filesHereiffile.getName.endsWith(".scala")}yieldfileScala基本语法-matchvalfirstArg=if(args.length>0)args(0)else""valfriend=firstArgmatch{case"salt"=>"pepper"case"chips"=>"salsa"case"eggs"=>"bacon"case_=>"huh?"}println(friend)Scala基本语法-try抛出异常:thrownewRuntimeException("nmustbeeven")捕获异常:try{valf=newFileReader("input.txt")}catch{caseex:FileNotFoundException=>//handlemissingfilecaseex:IOException=>//handleotherI/Oerror}finally:try{//usethefile}finally{file.close()}try…catch…也有返回值

类和对象-定义objectChecksumAccumulator{

privatevalcache=Map[String,Int]()

defcalculate(s:String):Int=

if(cache.contains(s))

cache(s)

else{

valacc=newChecksumAccumulator

for(c<-s)

acc.add(c.toByte)

valcs=acc.checksum()

cache+=(s->cs)

cs

}

}classChecksumAccumulator{

privatevarsum=0

defadd(b:Byte):Unit=sum+=b

defchecksum():Int=~(sum&0xFF)+1

}ChecksumAccumulator.calculate("helloScala")类和对象-操作有理数为例,定义其加减乘除,了解类的操作:有理数(rational)可以表示成个分数形式:n/d,其中n和d都是整数(d不可以为0),n称为分子(numberator),d为分母(denominator)。1)定义:classRational(n:Int,d:Int)2)重写:classRational(n:Int,d:Int){

overridedeftoString=n+"/"+d}3)前提条件检查:classRational(n:Int,d:Int){ require(d!=0) overridedeftoString=n+"/"+d}类和对象-操作4)添加成员变量和重载:classRational(n:Int,d:Int){require(d!=0)valnumber=nvaldenom=doverridedeftoString=number+"/"+denom

defadd(that:Rational)=newRational(number*that.denom+that.number*denom,denom*that.denom)defadd(i:Int)=newRational(number+i*denom,denom)}那么就可以使用a+b或a+Int的形式进行运算了如何实现Int+a形式?implicitdefintToRational(x:Int)=newRational(x,1)头等公民-函数-类成员函数当程序越来越大,你需要使用函数将代码细化为小的容易管理的模块。和Java相比,Scala提供了多种Java不支持的方法来定义函数,除了类成员函数外,Scala还支持嵌套函数,函数字面量,函数变量等。importscala.io.SourceobjectLongLines{defprocessFile(filename:String,width:Int){valsource=Source.fromFile(filename)for(line<-source.getLines())processLine(filename,width,line)}privatedefprocessLine(filename:String,width:Int,line:String){if(line.length>width)println(filename+":"+line.trim)}}头等公民-函数-局部函数局部函数:importscala.io.SourceobjectLongLines{defprocessFile(filename:String,width:Int){defprocessLine(line:String){if(line.length>width)println(filename+":"+line.trim)}valsource=Source.fromFile(filename)for(line<-source.getLines())processLine(line)}}1)函数嵌套函数。2)局部函数可以直接访问上层函数的参数头等公民-函数-函数字面量函数字面量:字面量:(x:Int)=>x+1附值给变量:varincrease=(x:Int)=>x+1多行:varincrease=(x:Int)=>{println("We")println("are")println("here")x+1}函数做为参数:valsomeNumbers=List(-11,-10,-5,0,5,10)someNumbers.foreach((x:Int)=>println(x))someNumbers.filter(x=>x>0)简化写法:valf=(_:Int)+(_:Int)+(_:Int)f(5,10,13)头等公民-函数-闭包闭包:闭包是可以包含自由(未绑定到特定对象)变量的代码块;这些变量不是在这个代码块内或者任何全局上下文中定义的,而是在定义代码块的环境中定义(局部变量)。scala>varmore=1more:Int=1scala>valaddMore=(x:Int)=>x+moreaddMore:Int=>Int=<function1>scala>addMore(100)res1:Int=101(当自由变量改变时,scala也能捕获到这个变化。同样scala改变变量时,也能反映到外面。)scala>defmakeIncreaser(more:Int)=(x:Int)=>x+moremakeIncreaser:(more:Int)Int=>Intscala>valinc1=makeIncreaser(1)inc1:Int=>Int=<function1>scala>valinc9999=makeIncreaser(9999)inc9999:Int=>Int=<function1>scala>inc1(10)res5:Int=11scala>inc9999(10)res6:Int=10009头等公民-函数-参数重复参数:defecho(args:String*)=for(arg<-args)println(arg)echo("One")echo("Hello","World")命名参数:defspeed(distance:Float,time:Float):Float=distance/timespeed(time=10,distance=100)speed(distance=100,time=10)缺省参数值:defspeed(distance:Float=100,time:Float):Float=distance/timespeed(time=10)头等公民-函数-柯里化函数Scala允许程序员自己新创建一些控制结构,并且可以使得这些控制结构在语法看起来和Scala内置的控制结构一样,在Scala中需要借助于柯里化(Currying),柯里化是把接受多个参数的函数变换成接受一个单一参数(最初函数的第一个参数)的函数,并且返回接受余下的参数而且返回结果的新函数的技术。普通函数:defplainOldSum(x:Int,y:Int)=x+yplainOldSum(1,2)柯里化函数:defcurriedSum(x:Int)(y:Int)=x+ycurriedSum(1)(2)deffirst(x:Int)=(y:Int)=>x+yvalsecond=first(1)second(2)valonePlus=curriedSum(1)_onePlus(2)Scala编程风格和思想-一个简单例子一个简单的原则,如果代码中含有var类型的变量,这段代码就是传统的指令式编程,如果代码只有val变量,这段代码就很有可能是函数式代码,因此学会函数式编程关键是不使用vars来编写代码。defprintArgs(args:Array[String]):Unit={ vari=0 while(i<args.length){ println(args(i)) i+=1 }}defprintArgs(args:Array[String]):Unit={ for(arg<-args) println(arg)}defprintArgs(args:Array[String]):Unit={ args.foreach(println)}Scala编程风格和思想-另一个简单例子packagecom.cxf.testimportjava.io.FileobjectClosureTest{privatedeffiles=(newFile("c:\\deleteme")).listFiles()deffileEndFilger(query:String)={for(file<-filesif(file.getName.endsWith(query)))yieldfile}deffileContainsFilger(query:String)={for(file<-filesif(file.getName.contains(query)))yieldfile}deffileRegexFilger(query:String)={for(file<-filesif(file.getName.matches(query)))yieldfile}}

Scala编程风格和思想-另一个简单例子packagecom.cxf.testimportjava.io.FileobjectClosureTest{privatedeffiles=(newFile("c:\\deleteme")).listFiles()

deffileFilter(matcher:(String)=>Boolean)={for(file<-filesif(matcher(file.getName)))yieldfile}defendFilter(query:String)=fileFilter(_.endsWith(query))defcontainsFilter(query:String)=fileFilter(_.contains(query))defregexFilter(query:String)=fileFilter(_.matches(query))}

高级应用-抽象类abstractclassElement{defcontents:Array[String]}一个含有抽象方法的类必须定义成抽象类,也就是使用abstract关键字来定义类。一个没有定义实现的方法就是抽象方法,只要这个方法没有具体实现,就是抽象方法。无参函数:abstractclassElement{defcontents:Array[String]valheight=contents.lengthvalwidth=if(height==0)0elsecontents(0).length}高级应用-扩展类classArrayElement(conts:Array[String])extendsElement{defcontents:Array[String]=conts}其中extends具有两个功效,一是让ArrayElement继承所有Element类的非私有成员,第二使得ArrayElement成为Element的一个子类。而Element称为ArrayElement的父类。ArrayElement继承了Element的width和height方法,因此你可以使用ArrayElement.width来查询宽度,比如:valae=newArrayElement(Array("hello","world"))派生也意味着子类的值可以用在任何可以使用同名父类值的地方,比如:vale:Element=newArrayElement(Array("hello"))组合与继承-类成员关系高级应用-Trait在Scala中Trait为重用代码的一个基本单位。一个Traits封装了方法和变量,和Interface相比,它的方法可以有实现,这一点有点和抽象类定义类似。但和类继承不同的是,Scala中类继承为单一继承,也就是说子类只能有一个父类。当一个类可以和多个Trait混合,这些Trait定义的成员变量和方法也就变成了该类的成员变量和方法,由此可以看出Trait集合了Interface和抽象类的优点,同时又没有破坏单一继承的原则。traitPhilosophical{defphilosophize(){println("Iconsumememeory,thereforIam!")}}这可以使用extends或with来混合一个traitclassFrogextendsPhilosophical{overridedeftoString="gree"}classFrogextendsAnimalwithPhilosophicalwithHasLegs总结Scala可以说是:“麻雀虽小,五脏俱全”。Scala也可以简单理解为:大量语法糖的Java。重要的是学习Scala的语言风格及思想。语法更简洁,让开发人员更关注具体实现,而不是代码细节更适合于高并发大型项目应用函数式编程更适合分布式计算目录123初识ScalaSpark流计算Spark安装及基本操作伯克利大学-BDAS软件栈复杂的批量数据处理(batchdataprocessing)10'~5H。基于历史数据的交互式查询(interactivequery)10s~5'基于实时数据流的数据处理(streamingdataprocessing)500ms~5s什么是SparkStreaming

将Spark扩展为大规模流处理系统

可以扩展到100节点规模,达到秒级延迟

高效且具有良好的容错性

提供了类似批处理的API,可很容易实现复杂算法

电子商务:统计过去1分钟内访问最多的5件商品SparkStreaming设计动机

很多重要的应用要处理大量在线流式数据,并返回近似实时的结果

社交网络趋势追踪

网站指标统计

广告系统

具备分布式流式处理框架的基本特征

良好的扩展性(百级别节点)

低延迟(秒级别)有状态的流式处理

传统流式系统采用了“record-at-a-time”的处理模型

每个节点状态是变化的

对于每条记录,修改状态后发射新的记录

节点宕机后,状态丢失

在流式处理系统中,让可变状态具有容错性是很具有挑战性的工作mutablestatenode

1node

3input

recordsnode

2input

records已存在的流式系统:Storm

重发未被处理的数据记录

每条数据至少被处理一次

状态可能被修改多次

状态信息可能因为(硬件或软件)故障而丢失SparkStreaming基本思想将流式计算转化为一批很小的、确定的批处理作业。以X秒为单位将数据流切分成离散的作业将每批数据看做RDD,使用RDD操作符处理最终结果以RDD为单位返回(写入HDFS或者其他系统)SparkSpark

StreamingbatchesofXsecondslivedatastreamprocessedresultsDStream(Discretized

Streams)

表示连续的数据流,可能是输入流,或通过输入流转化而成的数据流;

内部由一系列离散的RDD组成;

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论