Apache Pig与Lucene集成

如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。
在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点:

(一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响

(二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。

(三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。


鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢?

(1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。

(2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。

(3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。



下面,散仙就具体说下如何使用Pig和Solr集成?

(1)依旧访问这个地址下载源码压缩包。
(2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
(3)使用ant重新打包成jar
(4)在pig里,注册相关依赖的jar包,并使用索引存储



注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:


SolrOutputFormat函数

package com.pig.support.solr;  
  
  
  
import java.io.IOException;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;  
  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.OutputCommitter;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.solr.client.solrj.SolrServer;  
import org.apache.solr.client.solrj.SolrServerException;  
import org.apache.solr.client.solrj.impl.CloudSolrServer;  
import org.apache.solr.client.solrj.impl.HttpSolrServer;  
import org.apache.solr.common.SolrInputDocument;  
/** 
 * @author qindongliang 
 * 支持SOlr的SolrOutputFormat 
 * 如果你想了解,或学习更多这方面的 
 * 知识,请加入我们的群: 
 *  
 * 搜索技术交流群(2000人):324714439  
 * 大数据技术1号交流群(2000人):376932160  (已满) 
 * 大数据技术2号交流群(2000人):415886155  
 * 微信公众号:我是攻城师(woshigcs) 
 *  
 * */  
public class SolrOutputFormat extends  
        FileOutputFormat<Writable, SolrInputDocument> {  
  
    final String address;  
    final String collection;  
  
    public SolrOutputFormat(String address, String collection) {  
        this.address = address;  
        this.collection = collection;  
    }  
  
    @Override  
    public RecordWriter<Writable, SolrInputDocument> getRecordWriter(  
            TaskAttemptContext ctx) throws IOException, InterruptedException {  
        return new SolrRecordWriter(ctx, address, collection);  
    }  
  
      
    @Override  
    public synchronized OutputCommitter getOutputCommitter(  
            TaskAttemptContext arg0) throws IOException {  
        return new OutputCommitter(){  
  
            @Override  
            public void abortTask(TaskAttemptContext ctx) throws IOException {  
                  
            }  
  
            @Override  
            public void commitTask(TaskAttemptContext ctx) throws IOException {  
                  
            }  
  
            @Override  
            public boolean needsTaskCommit(TaskAttemptContext arg0)  
                    throws IOException {  
                return true;  
            }  
  
            @Override  
            public void setupJob(JobContext ctx) throws IOException {  
                  
            }  
  
            @Override  
            public void setupTask(TaskAttemptContext ctx) throws IOException {  
                  
            }  
              
              
        };  
    }  
  
  
    /** 
     * Write out the LuceneIndex to a local temporary location.<br/> 
     * On commit/close the index is copied to the hdfs output directory.<br/> 
     *  
     */  
    static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {  
        /**Solr的地址*/  
        SolrServer server;  
        /**批处理提交的数量**/  
        int batch = 5000;  
          
        TaskAttemptContext ctx;  
          
        List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);  
        ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();  
        /** 
         * Opens and forces connect to CloudSolrServer 
         *  
         * @param address 
         */  
        public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {  
            try {  
                this.ctx = ctx;  
                server = new HttpSolrServer(address);  
                  
                exec.scheduleWithFixedDelay(new Runnable(){  
                    public void run(){  
                        ctx.progress();  
                    }  
                }, 1000, 1000, TimeUnit.MILLISECONDS);  
            } catch (Exception e) {  
                RuntimeException exc = new RuntimeException(e.toString(), e);  
                exc.setStackTrace(e.getStackTrace());  
                throw exc;  
            }  
        }  
  
          
        /** 
         * On close we commit 
         */  
        @Override  
        public void close(final TaskAttemptContext ctx) throws IOException,  
                InterruptedException {  
  
            try {  
                  
                if (docs.size() > 0) {  
                    server.add(docs);  
                    docs.clear();  
                }  
  
                server.commit();  
            } catch (SolrServerException e) {  
                RuntimeException exc = new RuntimeException(e.toString(), e);  
                exc.setStackTrace(e.getStackTrace());  
                throw exc;  
            } finally {  
                server.shutdown();  
                exec.shutdownNow();  
            }  
              
        }  
  
        /** 
         * We add the indexed documents without commit 
         */  
        @Override  
        public void write(Writable key, SolrInputDocument doc)  
                throws IOException, InterruptedException {  
            try {  
                docs.add(doc);  
                if (docs.size() >= batch) {  
                    server.add(docs);  
                    docs.clear();  
                }  
            } catch (SolrServerException e) {  
                RuntimeException exc = new RuntimeException(e.toString(), e);  
                exc.setStackTrace(e.getStackTrace());  
                throw exc;  
            }  
        }  
  
    }  
}  
package com.pig.support.solr;



import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
/**
 * @author qindongliang
 * 支持SOlr的SolrOutputFormat
 * 如果你想了解,或学习更多这方面的
 * 知识,请加入我们的群:
 * 
 * 搜索技术交流群(2000人):324714439 
 * 大数据技术1号交流群(2000人):376932160  (已满)
 * 大数据技术2号交流群(2000人):415886155 
 * 微信公众号:我是攻城师(woshigcs)
 * 
 * */
public class SolrOutputFormat extends
		FileOutputFormat<Writable, SolrInputDocument> {

	final String address;
	final String collection;

	public SolrOutputFormat(String address, String collection) {
		this.address = address;
		this.collection = collection;
	}

	@Override
	public RecordWriter<Writable, SolrInputDocument> getRecordWriter(
			TaskAttemptContext ctx) throws IOException, InterruptedException {
		return new SolrRecordWriter(ctx, address, collection);
	}

	
	@Override
	public synchronized OutputCommitter getOutputCommitter(
			TaskAttemptContext arg0) throws IOException {
		return new OutputCommitter(){

			@Override
			public void abortTask(TaskAttemptContext ctx) throws IOException {
				
			}

			@Override
			public void commitTask(TaskAttemptContext ctx) throws IOException {
				
			}

			@Override
			public boolean needsTaskCommit(TaskAttemptContext arg0)
					throws IOException {
				return true;
			}

			@Override
			public void setupJob(JobContext ctx) throws IOException {
				
			}

			@Override
			public void setupTask(TaskAttemptContext ctx) throws IOException {
				
			}
			
			
		};
	}


	/**
	 * Write out the LuceneIndex to a local temporary location.<br/>
	 * On commit/close the index is copied to the hdfs output directory.<br/>
	 * 
	 */
	static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {
		/**Solr的地址*/
		SolrServer server;
		/**批处理提交的数量**/
		int batch = 5000;
		
		TaskAttemptContext ctx;
		
		List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);
		ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
		/**
		 * Opens and forces connect to CloudSolrServer
		 * 
		 * @param address
		 */
		public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {
			try {
				this.ctx = ctx;
				server = new HttpSolrServer(address);
				
				exec.scheduleWithFixedDelay(new Runnable(){
					public void run(){
						ctx.progress();
					}
				}, 1000, 1000, TimeUnit.MILLISECONDS);
			} catch (Exception e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

		
		/**
		 * On close we commit
		 */
		@Override
		public void close(final TaskAttemptContext ctx) throws IOException,
				InterruptedException {

			try {
				
				if (docs.size() > 0) {
					server.add(docs);
					docs.clear();
				}

				server.commit();
			} catch (SolrServerException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			} finally {
				server.shutdown();
				exec.shutdownNow();
			}
			
		}

		/**
		 * We add the indexed documents without commit
		 */
		@Override
		public void write(Writable key, SolrInputDocument doc)
				throws IOException, InterruptedException {
			try {
				docs.add(doc);
				if (docs.size() >= batch) {
					server.add(docs);
					docs.clear();
				}
			} catch (SolrServerException e) {
				RuntimeException exc = new RuntimeException(e.toString(), e);
				exc.setStackTrace(e.getStackTrace());
				throw exc;
			}
		}

	}
}





SolrStore函数

package com.pig.support.solr;  
  
  
  
import java.io.IOException;  
import java.util.Properties;  
  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Writable;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.OutputFormat;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.pig.ResourceSchema;  
import org.apache.pig.ResourceSchema.ResourceFieldSchema;  
import org.apache.pig.ResourceStatistics;  
import org.apache.pig.StoreFunc;  
import org.apache.pig.StoreMetadata;  
import org.apache.pig.data.Tuple;  
import org.apache.pig.impl.util.UDFContext;  
import org.apache.pig.impl.util.Utils;  
import org.apache.solr.common.SolrInputDocument;  
  
/** 
 *  
 * Create a lucene index 
 *  
 */  
public class SolrStore extends StoreFunc implements StoreMetadata {  
  
    private static final String SCHEMA_SIGNATURE = "solr.output.schema";  
  
    ResourceSchema schema;  
    String udfSignature;  
    RecordWriter<Writable, SolrInputDocument> writer;  
  
    String address;  
    String collection;  
      
    public SolrStore(String address, String collection) {  
        this.address = address;  
        this.collection = collection;  
    }  
  
    public void storeStatistics(ResourceStatistics stats, String location,  
            Job job) throws IOException {  
    }  
  
    public void storeSchema(ResourceSchema schema, String location, Job job)  
            throws IOException {  
    }  
  
    @Override  
    public void checkSchema(ResourceSchema s) throws IOException {  
        UDFContext udfc = UDFContext.getUDFContext();  
        Properties p = udfc.getUDFProperties(this.getClass(),  
                new String[] { udfSignature });  
        p.setProperty(SCHEMA_SIGNATURE, s.toString());  
    }  
  
    public OutputFormat<Writable, SolrInputDocument> getOutputFormat()  
            throws IOException {  
        // not be used  
        return new SolrOutputFormat(address, collection);  
    }  
  
    /** 
     * Not used 
     */  
    @Override  
    public void setStoreLocation(String location, Job job) throws IOException {  
        FileOutputFormat.setOutputPath(job, new Path(location));  
    }  
  
    @Override  
    public void setStoreFuncUDFContextSignature(String signature) {  
        this.udfSignature = signature;  
    }  
  
    @SuppressWarnings({ "unchecked", "rawtypes" })  
    @Override  
    public void prepareToWrite(RecordWriter writer) throws IOException {  
        this.writer = writer;  
        UDFContext udc = UDFContext.getUDFContext();  
        String schemaStr = udc.getUDFProperties(this.getClass(),  
                new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);  
  
        if (schemaStr == null) {  
            throw new RuntimeException("Could not find udf signature");  
        }  
  
        schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));  
  
    }  
  
    /** 
     * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch 
     * @param input 
     * @return 
     */  
    private static String stripNonCharCodepoints(String input) {  
        StringBuilder retval = new StringBuilder(input.length());  
        char ch;  
  
        for (int i = 0; i < input.length(); i++) {  
            ch = input.charAt(i);  
  
            // Strip all non-characters  
            // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]  
            // and non-printable control characters except tabulator, new line  
            // and carriage return  
            if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step  
                                            // 0x10000  
                    ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range  
                    (ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef  
                    (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {  
  
                retval.append(ch);  
            }  
        }  
  
        return retval.toString();  
    }  
  
    @Override  
    public void putNext(Tuple t) throws IOException {  
  
        final SolrInputDocument doc = new SolrInputDocument();  
  
        final ResourceFieldSchema[] fields = schema.getFields();  
        int docfields = 0;  
  
        for (int i = 0; i < fields.length; i++) {  
            final Object value = t.get(i);  
  
            if (value != null) {  
                docfields++;  
                doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));  
            }  
  
        }  
  
        try {  
            if (docfields > 0)  
                writer.write(null, doc);  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
            return;  
        }  
  
    }  
  
}  
package com.pig.support.solr;



import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreMetadata;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.solr.common.SolrInputDocument;

/**
 * 
 * Create a lucene index
 * 
 */
public class SolrStore extends StoreFunc implements StoreMetadata {

	private static final String SCHEMA_SIGNATURE = "solr.output.schema";

	ResourceSchema schema;
	String udfSignature;
	RecordWriter<Writable, SolrInputDocument> writer;

	String address;
	String collection;
	
	public SolrStore(String address, String collection) {
		this.address = address;
		this.collection = collection;
	}

	public void storeStatistics(ResourceStatistics stats, String location,
			Job job) throws IOException {
	}

	public void storeSchema(ResourceSchema schema, String location, Job job)
			throws IOException {
	}

	@Override
	public void checkSchema(ResourceSchema s) throws IOException {
		UDFContext udfc = UDFContext.getUDFContext();
		Properties p = udfc.getUDFProperties(this.getClass(),
				new String[] { udfSignature });
		p.setProperty(SCHEMA_SIGNATURE, s.toString());
	}

	public OutputFormat<Writable, SolrInputDocument> getOutputFormat()
			throws IOException {
		// not be used
		return new SolrOutputFormat(address, collection);
	}

	/**
	 * Not used
	 */
	@Override
	public void setStoreLocation(String location, Job job) throws IOException {
		FileOutputFormat.setOutputPath(job, new Path(location));
	}

	@Override
	public void setStoreFuncUDFContextSignature(String signature) {
		this.udfSignature = signature;
	}

	@SuppressWarnings({ "unchecked", "rawtypes" })
	@Override
	public void prepareToWrite(RecordWriter writer) throws IOException {
		this.writer = writer;
		UDFContext udc = UDFContext.getUDFContext();
		String schemaStr = udc.getUDFProperties(this.getClass(),
				new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);

		if (schemaStr == null) {
			throw new RuntimeException("Could not find udf signature");
		}

		schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));

	}

	/**
	 * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
	 * @param input
	 * @return
	 */
	private static String stripNonCharCodepoints(String input) {
		StringBuilder retval = new StringBuilder(input.length());
		char ch;

		for (int i = 0; i < input.length(); i++) {
			ch = input.charAt(i);

			// Strip all non-characters
			// http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
			// and non-printable control characters except tabulator, new line
			// and carriage return
			if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step
											// 0x10000
					ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
					(ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
					(ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {

				retval.append(ch);
			}
		}

		return retval.toString();
	}

	@Override
	public void putNext(Tuple t) throws IOException {

		final SolrInputDocument doc = new SolrInputDocument();

		final ResourceFieldSchema[] fields = schema.getFields();
		int docfields = 0;

		for (int i = 0; i < fields.length; i++) {
			final Object value = t.get(i);

			if (value != null) {
				docfields++;
				doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));
			}

		}

		try {
			if (docfields > 0)
				writer.write(null, doc);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			return;
		}

	}

}



Pig脚本如下:

--注册依赖文件的jar包  
REGISTER ./dependfiles/tools.jar;  
  
--注册solr相关的jar包  
REGISTER  ./solrdependfiles/pigudf.jar;   
REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;  
REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;  
REGISTER  ./solrdependfiles/httpclient-4.3.1.jar  
REGISTER  ./solrdependfiles/httpcore-4.3.jar  
REGISTER  ./solrdependfiles/httpmime-4.3.1.jar  
REGISTER  ./solrdependfiles/noggit-0.5.jar  
  
  
--加载HDFS数据,并定义scheaml  
a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);  
  
--存储到solr中,并提供solr的ip地址和端口号  
store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');  
~                                                                                                                                                              
~                                                                        
~                                 
--注册依赖文件的jar包
REGISTER ./dependfiles/tools.jar;

--注册solr相关的jar包
REGISTER  ./solrdependfiles/pigudf.jar; 
REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;
REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;
REGISTER  ./solrdependfiles/httpclient-4.3.1.jar
REGISTER  ./solrdependfiles/httpcore-4.3.jar
REGISTER  ./solrdependfiles/httpmime-4.3.1.jar
REGISTER  ./solrdependfiles/noggit-0.5.jar


--加载HDFS数据,并定义scheaml
a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);

--存储到solr中,并提供solr的ip地址和端口号
store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');
~                                                                                                                                                            
~                                                                      
~



配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下:


Apache Pig与Lucene集成



成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等!

同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待!

相关推荐