hadoop写入hbase数据

贴下代码,留作备用,

@Override
    public int run(String[] args) throws Exception {
        Configuration hbaseConf = HBaseConfiguration.create();
       /* String whh = hbaseConf.get("hbase.zookeeper.quorum");
        System.out.print(whh);*/
        Config config = new Config();
        config.initJarFile("mr_hbase.properties");
        String numReduceTasksStr = config.getValue("numReduceTasks");
        Integer numReduceTasks = 3;
        if (NumberUtils.isDigits(numReduceTasksStr)) {
            numReduceTasks = Integer.valueOf(numReduceTasksStr);

        }
        String hbaseZookeeperQuorum = config.getValue("hbase.zookeeper.quorum");
        hbaseConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);

        String hbaseZookeeperPropertyClientPort = config.getValue("hbase.zookeeper.property.clientPort");
        hbaseConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperPropertyClientPort);
        if (args.length > 2) {
            hbaseConf.set("hbase.zookeeper.quorum", args[2]);
        }

        Job job = Job.getInstance(hbaseConf);
        job.setJarByClass(BookKpUnitToHbaseMr.class);
        job.setMapperClass(BookKpUnitToHbaseMr.BookKpUnitToHbaseMapper.class);

        //将第一个路径参数作为输入参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //将第二个HBase表参数作为输出参数
        TableMapReduceUtil.initTableReducerJob(args[1], BookKpUnitToHbaseMr.BookKpUnitToHbaseReducer.class, job);  -----> 设置reducer的时候,使用org.apache.hadoop.hbase.mapreduce类

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(StudentKpInfo.class);

        //设置任务个数
        job.setNumReduceTasks(numReduceTasks);
        return job.waitForCompletion(true) ? 0 : 1;
    }
public static class toHbaseReducer extends TableReducer<Text, StudentScoreInfo, ImmutableBytesWritable> {   ----> hbase的TableReducer设计只接受rowkey,其余的列簇,列名,列值按写入代码时灵活设置,因此这个类只有ImmutableBytesWritable
        @Override
        protected void reduce(Text key, Iterable<StudentScoreInfo> values, Context context) throws IOException, InterruptedException {
            try {
                String rowkey = key.toString();
                Put put = new Put(rowkey.getBytes());
                for (StudentScoreInfo studentScoreInfo : values) {
                    put.addColumn(Bytes.toBytes("es"), Bytes.toBytes(studentScoreInfo.getStudentId()), Bytes.toBytes(studentScoreInfo.getStudentScoreValue()));  // 写入列,参数1分别为 es表示列簇    参数2表示列名  参数3表示列值 
                }
                context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);  // 将rowkey和这一列写入hbase
            } catch (Exception e) {
                logger.error("reduce error: ", e);
            }
        }
    }

相关推荐