Hbase调用JavaAPI实现批量导入操作_第1页
Hbase调用JavaAPI实现批量导入操作_第2页
Hbase调用JavaAPI实现批量导入操作_第3页
Hbase调用JavaAPI实现批量导入操作_第4页
Hbase调用JavaAPI实现批量导入操作_第5页
已阅读5页,还剩2页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、Hbase调用JavaAPI实现批量导入操作将手机上网日志文件批量导入到Hbase中,操作步骤:1、将日志文件(请下载附件)上传到HDFS中,利用hadoop的操作命令上传:hadoopfs-putinput/2、创建Hbase表,通过Java操作Java代码packagecom.jiewen.hbase;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.HBaseConfiguration;importorg.apache.hadoop.hbas

2、e.HColumnDescriptor;importorg.apache.hadoop.hbase.HTableDescriptor;importorg.apache.hadoop.hbase.client.Get;importorg.apache.hadoop.hbase.client.HBaseAdmin;importorg.apache.hadoop.hbase.client.HTable;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.client.Result;importorg.apach

3、e.hadoop.hbase.client.ResultScanner;importorg.apache.hadoop.hbase.client.Scan;importorg.apache.hadoop.hbase.util.Bytes;publicclassHbaseDemopublicstaticvoidmain(Stringargs)throwsIOExceptionStringtableName="wlan_log"StringcolumnFamily="cf"HbaseDemo.create(tableName,columnFamily);/H

4、baseDemo.put(tableName,"row1",columnFamily,"cl1","data");/HbaseDemo.get(tableName,"row1");/HbaseDemo.scan(tableName);/HbaseDemo.delete(tableName);/hbase操作必备privatestaticConfigurationgetConfiguration()Configurationconf=HBaseConfiguration.create();conf.set("

5、;hbase.rootdir","hdfs:/hadoop1:9000/hbase");/使用eclipse时必须添加这个,否则无法定位conf.set("hbase.zookeeper.quorum","hadoop1");returnconf;/创建一张表publicstaticvoidcreate(StringtableName,StringcolumnFamily)throwsIOExceptionHBaseAdminadmin=newHBaseAdmin(getConfiguration();if(admin.ta

6、bleExists(tableName)System.out.println("tableexists!");elseHTableDescriptortableDesc=newHTableDescriptor(tableName);tableDesc.addFamily(newHColumnDescriptor(columnFamily);admin.createTable(tableDesc);System.out.println("createtablesuccess!");/添加一条记录publicstaticvoidput(Stringtable

7、Name,Stringrow,StringcolumnFamily,Stringcolumn,Stringdata)throwsIOExceptionHTabletable=newHTable(getConfiguration(),tableName);Putp1=newPut(Bytes.toBytes(row);p1.add(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(data);table.put(p1);System.out.println("put'"+row+"

8、',"+columnFamily+":"+column+"','"+data+"'");/读取一条记录publicstaticvoidget(StringtableName,Stringrow)throwsIOExceptionHTabletable=newHTable(getConfiguration(),tableName);Getget=newGet(Bytes.toBytes(row);Resultresult=table.get(get);System.out.println(&qu

9、ot;Get:"+result);/显示所有数据publicstaticvoidscan(StringtableName)throwsIOExceptionHTabletable=newHTable(getConfiguration(),fortableName);Scanscan=newScan();ResultScannerscanner=table.getScanner(scan);(Resultresult:scanner)System.out.println("Scan:"+result);/删除表publicstaticvoiddelete(Strin

10、gtableName)throwsIOExceptionHBaseAdminadmin=newHBaseAdmin(getConfiguration();if(admin.tableExists(tableName)tryadmin.disableTable(tableName);admin.deleteTable(tableName);catch(IOExceptione)e.printStackTrace();System.out.println("Delete"+tableName+"失败");System.out.println("De

11、lete"+tableName+"成功");3、将日志文件导入Hbase表wlan_log中:Java代码importjava.text.SimpleDateFormat;importjava.util.Date;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.hbase.client.Put;importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat;importorg.apache.hadoop.hbase.mapred

12、uce.TableReducer;importorg.apache.hadoop.hbase.util.Bytes;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Counter;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg

13、.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;publicclassHbaseBatchImportpublicstaticvoidmain(Stringargs)throwsExceptionfinalConfigurationconfiguration=newConfiguration();/设置zookeeperconfiguration.set("hbase.zookeeper.quorum"

14、,"hadoop1");/设置hbase表名称configuration.set(TableOutputFormat.OUTPUT_TABLE,"wlan_log");/将该值改大,防止hbase超时退出configuration.set("dfs.socket.timeout","180000");finalJobjob=newJob(configuration,"HBaseBatchImport");job.setMapperClass(BatchImportMapper.class);jo

15、b.setReducerClass(BatchImportReducer.class);/设置map的输出,不设置reduce的输出类型job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);/不再设置输出路径,而是设置输出格式类型job.setOutputFormatClass(TableOutputFormat.class);FileInputFormat.setInputPaths(j

16、ob,"hdfs:/hadoop1:9000/input");job.waitForCompletion(true);staticclassBatchImportMapperextendsMapper<LongWritable,Text,LongWritable,Text>SimpleDateFormatdateformat1=newSimpleDateFormat("yyyyMMddHHmmss");Textv2=newText();protectedvoidmap(LongWritablekey,Textvalue,Con

17、textcontext)throwsjava.io.IOException,InterruptedExceptionfinalStringsplited=value.toString().split("t");tryfinalDatedate=newDate(Long.parseLong(splited0.trim();finalStringdateFormat=dateformat1.format(date);StringrowKey=splited1+":"+dateFormat;v2.set(rowKey+"t"+value.t

18、oString();context.write(key,v2);catch(NumberFormatExceptione)finalCountercounter=context.getCounter("BatchImport","ErrorFormat");counter.increment(1L);System.out.println("出错了"+splited0+""+e.getMessage();staticclassBatchImportReducerextendsTableReducer<LongWritable,Text,NullWritable>protectedvoidreduce(LongWritablekey,java.lang.Iterable<Text>values,Contextcontext)throwsjava.io.IOException,Interrup

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论