上海协策网站,湘潭网站建设方案费用,找网站建设客户,网页版传奇开服转载请注明#xff1a;http://www.cnblogs.com/demievil/p/4059141.html 我的github博客#xff1a;http://demievil.github.io/ 做项目的时候遇到一个问题#xff0c;在Mapper和Reducer方法中处理目标数据时#xff0c;先要去检索和匹配一个已存在的标签库#xff0c;再对… 转载请注明http://www.cnblogs.com/demievil/p/4059141.html 我的github博客http://demievil.github.io/ 做项目的时候遇到一个问题在Mapper和Reducer方法中处理目标数据时先要去检索和匹配一个已存在的标签库再对所处理的字段打标签。因为标签库不是很大没必要用HBase。我的实现方法是把标签库存储成HDFS上的文件用分布式缓存存储这样让每个slave都能读取到这个文件。 main方法中的配置 //分布式缓存要存储的文件路径
String cachePath[] {hdfs://10.105.32.57:8020/user/ad-data/tag/tag-set.csv,hdfs://10.105.32.57:8020/user/ad-data/tag/TagedUrl.csv};
//向分布式缓存中添加文件job.addCacheFile(new Path(cachePath[0]).toUri());job.addCacheFile(new Path(cachePath[1]).toUri()); 参考上面代码即可向分布式缓存中添加文件。 在Mapper和Reducer方法中读取分布式缓存文件 /** 重写Mapper的setup方法获取分布式缓存中的文件*/Overrideprotected void setup(MapperLongWritable, Text, Text, Text.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubsuper.setup(context);URI[] cacheFile context.getCacheFiles();Path tagSetPath new Path(cacheFile[0]);Path tagedUrlPath new Path(cacheFile[1]);文件操作(如把内容读到set或map中);}Override
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {在map()中使用读取出的数据;} 同样如果在Reducer中也要读取分布式缓存文件示例如下 /** 重写Reducer的setup方法获取分布式缓存中的文件*/Overrideprotected void setup(Context context) throws IOException, InterruptedException {super.setup(context);mos new MultipleOutputsText, Text(context);URI[] cacheFile context.getCacheFiles();Path tagSetPath new Path(cacheFile[0]);Path tagSetPath new Path(cacheFile[1]);文件读取操作;}Overridepublic void reduce(Text key, IterableText values, Context context)throws IOException, InterruptedException {while(values.iterator().hasNext()){使用读取出的数据;}context.write(key, new Text(sb.toString()));} 以上。 转载于:https://www.cnblogs.com/kodyan/p/4059141.html