如何使用开源软件为 Hadoop 构建大数据管道_第1页
如何使用开源软件为 Hadoop 构建大数据管道_第2页
如何使用开源软件为 Hadoop 构建大数据管道_第3页
如何使用开源软件为 Hadoop 构建大数据管道_第4页
如何使用开源软件为 Hadoop 构建大数据管道_第5页
已阅读5页,还剩59页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、How to Build Big Data Pipelines for HadoopDr. Mark Pollack“Big data” refers to datasets whose size is beyond the ability of typical database software tools to capture, store, manage, and analyzeA subjective and moving targetBig data in many sectors today range from 10s of TB to multiple PBBig Data2E

2、nterprise Data Trends3Value from Data Exceeds Hardware & Software costsValue in connecting data setsGrouping e-commerce users by user agentOrbitz shows more expensive hotels to Mac usersSee /UhSlNiThe Data Access Landscape - The Value of Data4Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en) AppleWebKi

3、t/418.9 (KHTML, like Gecko) Safari/419.3cSpring has always provided excellent data access supportTransaction ManagementPortable data access exception hierarchyJDBC JdbcTemplateORM - Hibernate, JPA, JDO, Ibatis supportCache support (Spring 3.1)Spring Data project started in 2010Goal is to “refresh” S

4、prings Data Access supportIn light of new data access landscapeSpring and Data Access5Spring Data Mission Statement6“Provide a familiar and consistent Spring-based programming model for Big Data, NoSQL, and relational stores while retaining store-specific features and capabilities.RelationalJPAJDBC

5、ExtensionsNoSQLRedisHBaseMongoNeo4jLuceneGemfireBig DataHadoop HDFS and M/RHivePigCascadingSplunkAccessRepositoriesQueryDSLRESTSpring Data Supported Technologies7A View of a Big Data System8Integration AppsStream ProcessingUnstructured Data StoreInteractiveProcessing(Structured DB)BatchAnalysisAnaly

6、tical AppsReal TimeAnalyticsData Streams(Log Files, Sensors, Mobile)IngestionEngineDistributionEngineMonitoring / DeploymentSaaSSocialWhere Spring Projects can be used to provide a solutionReal world big data solutions require workflow across systemsShare core components of a classic integration wor

7、kflowBig data solutions need to integrate with existing data and appsEvent-driven processing Batch workflowsBig Data Problems are Integration Problems9Spring Integrationfor building and configuring message-based integration flowsusing input & output adapters, channels, and processorsSpring Batchfor

8、building and operating batch workflows and manipulating data in files and ETLBasis for JSR 352 in EE7.Spring projects offer substantial integration functionality10Spring Datafor manipulating data in relational DBs as well as a variety of NoSQL databases and data grids (inside Gemfire 7.0)Spring for

9、Apache Hadoopfor orchestrating Hadoop and non-Hadoop workflowsin conjunction with Batch and Integration processing (inside GPHD 1.2)Spring projects offer substantial integration functionality11Integration is an essential part of Big Data12Some Existing Big Data Integration tools13Hadoop as a Big Dat

10、a Platform14Hadoop has a poor out of the box programming modelApplications are generally a collection of scripts calling command line appsSpring simplifies developing Hadoop applicationsBy providing a familiar and consistent programming and configuration modeAcross a wide range of use cases HDFS usa

11、ge Data Analysis (MR/Pig/Hive/Cascading) Workflow Event Streams IntegrationAllowing to start small and growSpring for Hadoop - Goals15Relationship with other Spring projects16Spring Hadoop Core Functionality17Declarative configuration Create, configure, and parameterize Hadoop connectivity and all j

12、ob types Environment profiles easily move from dev to qa to prodDeveloper productivityCreate well-formed applications, not spaghetti script applicationsSimplify HDFS and FsShell API with support for JVM scripting Runner classes for MR/Pig/Hive/Cascading for small workflowsHelper “Template” classes f

13、or Pig/Hive/HBaseCapabilities: Spring + Hadoop18Core Map Reduce idea19Standard Hadoop APIsCounting Words Configuring M/R20Configuration conf = new Configuration(); Job job = new Job(conf, wordcount);job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map

14、.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args0); FileOutputFormat.setOutputPath(job, new Path(args1); job.waitForCompletion(true);Standard Hadoop API - Mapp

15、erCounting Words M/R Code21public class TokenizerMapper extends Mapper private final static IntWritable one = new IntWritable(1); Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException StringTokenizer itr = new StringTokenizer(value.

16、toString(); while (itr.hasMoreTokens() word.set(itr.nextToken();context.write(word, one); Standard Hadoop API - ReducerCounting Words M/R Code22public class IntSumReducer extends Reducer private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) th

17、rows IOException, InterruptedException int sum = 0; for (IntWritable val : values) sum += val.get(); result.set(sum); context.write(key, result); Standard HadoopSDHP (Spring Hadoop)Running Hadoop Example Jars23bin/hadoop jar hadoop-examples.jar wordcount /wc/input /wc/output Standard HadoopSHDPRunni

18、ng Hadoop Tools24bin/hadoop jar conf myhadoop-site.xml D ignoreCase=true wordcount.jar org.myorg.WordCount /wc/input /wc/output ignoreCase=trueConfiguring Hadoop25=$hd.fsinput.path=/wc/input/output.path=/wc/word/hd.fs=hdfs:/localhost:9000applicationContext.xmlpertiesAccess all “bin/hadoop fs” comman

19、ds through FsShellmkdir, chmod, testHDFS and Hadoop Shell as APIs26class MyScript Autowired FsShell fsh; PostConstruct void init() String outputDir = /data/output; if (fsShell.test(outputDir) fsShell.rmr(outputDir); FsShell is designed to support JVM scripting languagesHDFS and FsShell as APIs27/ us

20、e the shell (made available under variable fsh)if (!fsh.test(inputDir) fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir)if (fsh.test(outputDir) fsh.rmr(outputDir)copy-files.groovyHDFS and FsShell as APIs / use the shell (made available under variable fsh)if (!fsh

21、.test(inputDir) fsh.mkdir(inputDir); fsh.copyFromLocal(sourceFile, inputDir); fsh.chmod(700, inputDir)if (fsh.test(outputDir) fsh.rmr(outputDir)appCtx.xmlExternalize ScriptHDFS and FsShell as APIs29 appCtx.xml30$ demoinput.path=/wc/input/output.path=/wc/word/hd.fs=hdfs:/localhost:9000Streaming Jobs

22、and Environment Configurationbin/hadoop jar hadoop-streaming.jar input /wc/input output /wc/output -mapper /bin/cat reducer /bin/wc -files stopwords.txtenv=dev java jar SpringLauncher.jar applicationContext.xmlpertiesStreaming Jobs and Environment Configurationbin/hadoop jar hadoop-streaming.jar inp

23、ut /wc/input output /wc/output -mapper /bin/cat reducer /bin/wc -files stopwords.txtenv=qa java jar SpringLauncher.jar applicationContext.xmlinput.path=/gutenberg/input/output.path=/gutenberg/word/hd.fs=hdfs:/darwin:9000pertiesUse Dependency Injection to obtain reference to Hadoop JobPerform additio

24、nal runtime configuration and submit Word Count Injecting Jobs33public class WordService Inject private Job mapReduceJob; public void processWords() mapReduceJob.submit(); Pig34An alternative to writing MapReduce applicationsImprove productivityPig applications are written in the Pig Latin LanguageP

25、ig Latin is a high level data processing languageIn the spirit of sed and ask, not SQLPig Latin describes a sequence of stepsEach step performs a transformation on item of data in a collectionExtensible with User defined functions (UDFs)A PigServer is responsible for translating PigLatin to MRWhat i

26、s Pig?35Counting Words PigLatin Script36input_lines = LOAD /tmp/books AS (line:chararray);- Extract words from each line and put them into a pig bag words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line) AS word; - filter out any words that are just white spaces filtered_filtered_words = FILTER

27、 words BY word MATCHES w+;- create a group for each word word_groups = GROUP filtered_words BY word; - count the entries in each group word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count, group AS word; ordered_word_count = ORDER word_count BY count DESC; STORE ordered_word_coun

28、t INTO /tmp/number-of-words;Standard PigSpring HadoopCreates a PigServerOptional execution of scripts on application startupUsing Pig37pig x mapreduce wordcount.pigpig wordcount.pig P perties p pig.exec.nocombiner=true pig.exec.nocombiner=true ignoreCase=TRUE Execute a small Pig workflow (HDFS, PigL

29、atin, HDFS)Springs PigRunner38 inputDir=$inputDir outputDir=$outputDir PigRunner implements CallableUse Springs Scheduling supportSchedule a Pig job39Scheduled(cron= “0 0 12 * * ?”)public void process() pigRunner.call();Simplifies the programmatic use of PigCommon tasks are one-linersPigTemplate40 =

30、$hd.fs mapred.job.tracker=$mapred.job.trackerPigTemplate - Programmatic Use41public class PigPasswordRepository implements PasswordRepository private PigTemplate pigTemplate; private String pigScript = classpath:password-analysis.pig; public void processPasswordFile(String inputFile) String outputDi

31、r = baseOutputDir + File.separator + counter.incrementAndGet(); Properties scriptParameters = new Properties(); scriptParameters.put(inputDir, inputFile); scriptParameters.put(outputDir, outputDir); pigTemplate.executeScript(pigScript, scriptParameters); /.Hive42An alternative to writing MapReduce a

32、pplicationsImprove productivityHive applications are written using HiveQLHiveQL is in the spirit of SQLA HiveServer is responsible for translating HiveQL to MRAccess via JDBC, ODBC, or Thrift RPCWhat is Hive?43Counting Words - HiveQL44- import the file as linesCREATE EXTERNAL TABLE lines(line string

33、)LOAD DATA INPATH books OVERWRITE INTO TABLE lines;- create a virtual view that splits the linesSELECT word, count(*) FROM lines LATERAL VIEW explode(split(text, ) lTable as word GROUP BY word;Command-lineJDBC basedUsing Hive45$HIVE_HOME/bin/hive f wordcount.sql d ignoreCase=TRUE h hive-server.hostC

34、lass.forName(“org.apache.hadoop.hive.jdbc.HiveDriver”);Connection con = DriverManager.getConnection(“jdbc:hive:/server:port/default”,”, “”)try Statement stmt = con.createStatement(); ResultSet res = stmt.executeQuery(“”) .while (res.next() catch (SQLException ex) finally try con.close(); catch (Exce

35、ption ex) Access Hive using JDBC Client and use JdbcTemplateUsing Hive with Spring Hadoop46Reuse existing knowledge of Springs Rich ResultSet to POJO Mapping FeaturesUsing Hive with Spring Hadoop47public long count() return jdbcTemplate.queryForLong(select count(*) from + tableName);List result = jd

36、bcTemplate.query(“select * from passwords, new ResultSetExtractorList() public String extractData(ResultSet rs) throws SQLException / extract data from result set);HiveClient is not thread-safe, throws checked exceptionsStandard Hive Thrift API48public long count() HiveClient hiveClient = createHive

37、Client(); try hiveClient.execute(select count(*) from + tableName); return Long.parseLong(hiveClient.fetchOne(); / checked exceptions catch (HiveServerException ex) throw translateExcpetion(ex); catch (org.apache.thrift.TException tex) throw translateExcpetion(tex); finally try hiveClient.shutdown()

38、; catch (org.apache.thrift.TException tex) logger.debug(Unexpected exception on shutting down HiveClient, tex); protected HiveClient createHiveClient() TSocket transport = new TSocket(host, port, timeout); HiveClient hive = new HiveClient(new TBinaryProtocol(transport); try transport.open(); catch (

39、TTransportException e) throw translateExcpetion(e); return hive;Spring Hadoop Batch & Integration49Reuse same Batch infrastructure and knowledge to manage Hadoop workflows Step can be any Hadoop job type or HDFS scriptHadoop Workflows managed by Spring Batch50Spring Batch for File/DB/NoSQL driven ap

40、plicationsCollect: Process local filesTransform: Scripting or Java code to transform and enrich RT Analysis: N/AIngest: (batch/aggregate) write to HDFS or split/filteringBatch Analysis: Orchestrate Hadoop steps in a workflowDistribute: Copy data out of HDFS to structured storageJMX enabled along wit

41、h REST interface for job control Capabilities: Spring + Hadoop + Batch51CollectTransformRT AnalysisIngestBatch AnalysisDistributeUseSpring Batch Configuration for Hadoop52 Reuse previous Hadoop job definitionsSpring Batch Configuration for Hadoop53 Spring Integration for Event driven applicationsCol

42、lect: Single node or distributed data collection (tcp/JMS/Rabbit)Transform: Scripting or Java code to transform and enrich RT Analysis: Connectivity to multiple analysis techniquesIngest: Write to HDFS, Split/Filter data stream to other storesJMX enabled + control bus for starting/stopping individual componentsCapabilities: Spring + Hadoop + SI54CollectTransformRT AnalysisIngestBatch

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论