#Nutch学习记录:Generator
在Inject过后,程序返回到crawl的main方法中继续执行,接下来进入到一个循环,循环的终止条件是达到指定的爬行深度。在循环中依次进行generate和fetch两个操作,每次循环产生一个segment,位于crawl/segments下,以generate方法调用的时间作为这次循环产生目录的名称。首先分析Generate操作,generate操作有三步MapReduce过程 Generator类位于org.apache.nutch.crawl中,其中的核心方法是public Path generate(Path dbDir, Path segments, int numLists, long topN, long curTime)方法,作用是根据链接得分,产生包含topN个链接的待爬行队列fethclist,并更新网页数据库。crawl正是调用了这个方法进入Generate阶段。其中各个参数的含义是:
Path dbDir : crawldb的路径
Path segments : segments的路径
int numLists : Reduce任务的数量
long topN :用户指定的TopN数量,即每轮爬行选择前TopN个URL加入到fetchlist中
long curTime : 调用generate方法的时间
上述方法代码如下:
1 2 3 4 5 6 public Path generate (Path dbDir, Path segments, int numLists, long topN, long curTime) throws IOException { JobConf job = new NutchJob(getConf()); boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true ); return generate(dbDir, segments, numLists, topN, curTime, filter, false ); }
它通过配置文件读取是否要进行URL过滤,默认为过滤。然后调用generate(dbDir, segments, numLists, topN, curTime, filter, false)方法,这个方法是真正进行generate的地方,它产生了一个位于系统临时目录下的目录,命名为tempDir。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public Path generate (Path dbDir, Path segments, int numLists, long topN, long curTime, boolean filter, boolean force) throws IOException { ... ... JobConf job = new NutchJob(getConf()); job.setJobName("generate: select " + segment); if (numLists == -1 ) { numLists = job.getNumMapTasks(); } if ("local" .equals(job.get("mapred.job.tracker" )) && numLists != 1 ) { LOG.info("Generator: jobtracker is 'local', generating exactly one partition." ); numLists = 1 ; } ... ... FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(Selector.class); job.setPartitionerClass(Selector.class); job.setReducerClass(Selector.class); FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); try { JobClient.runJob(job); } catch (IOException e) { LockUtil.removeLockFile(fs, lock); throw e; }
程序首先对几个路径进行配置,然后获取Reducer的数量,如果hadoop当前的模式为local,则将Reducer的数量numLists设为1。
第1次MapReduce 程序接下来将对第1个MapReduce任务进行配置。输入路径InputPath为当前的网页数据库(crawl/crawldb/current),输出路径为临时目录tempDir。Mapper、Partitioner、Reducer类都是Selector类。
Mapper阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public void map (Text key, CrawlDatum value, OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter) throws IOException { Text url = key; if (filter) { try { if (filters.filter(url.toString()) == null ) return ; } catch (URLFilterException e) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")" ); } } } CrawlDatum crawlDatum = value; if (!schedule.shouldFetch(url, crawlDatum, curTime)) { LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime); return ; } LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); if (oldGenTime != null ) { if (oldGenTime.get() + genDelay > curTime) return ; } float sort = 1.0f ; try { sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort); } catch (ScoringFilterException sfe) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe); } } sortValue.set(sort); crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); entry.datum = crawlDatum; entry.url = (Text)key; output.collect(sortValue, entry); }
map首先对输入的URL进行过滤,如果被过滤掉则返回,否则继续执行。接下来,调用shouldFetch方法通过对比当前时间和数据库中该链接的fetchtime属性的间隔来判断是否考虑抓取该链接,如果不到应该的时间间隔则不怕,但是如果到了,也不一定就抓取,还要看后面的处理。接下来通过调用ScoringFilter的generatorSortValue方法判断在Generate阶段的得分。并将这个得分作为键收集将datum和url封装进SelectorEntry类型的entry对象中,并将entry作为值收集。第1次Map后的结果就是<sortValue, entry>链接得分和它的信息对应的键值对。
Partitioner阶段 1 2 3 public int getPartition (FloatWritable key, Writable value, int numReduceTasks) { return hostPartitioner.getPartition(((SelectorEntry)value).url, key, numReduceTasks); }
它根据url代表的主机名进行分区,将同一个主机上的URL交给同一个Reducer处理,这样体现了对该站点的礼貌性(politeness)。
Reducer阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void reduce (FloatWritable key, Iterator<SelectorEntry> values, OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter) throws IOException { while (values.hasNext() && count < limit) { SelectorEntry entry = values.next(); Text url = entry.url; String urlString = url.toString(); URL u = null ; try { u = new URL(url.toString()); } catch (MalformedURLException e) { LOG.info("Bad protocol in url: " + url.toString()); continue ; } String host = u.getHost(); host = host.toLowerCase(); String hostname = host; ... ... try { urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); host = new URL(urlString).getHost(); } catch (Exception e) { LOG.warn("Malformed URL: '" + urlString + "', skipping (" + StringUtils.stringifyException(e) + ")" ); continue ; } if (maxPerHost > 0 ) { IntWritable hostCount = hostCounts.get(host); if (hostCount == null ) { hostCount = new IntWritable(); hostCounts.put(host, hostCount); } hostCount.set(hostCount.get() + 1 ); if (hostCount.get() > maxPerHost) { if (hostCount.get() == maxPerHost + 1 ) { maxedHosts.add(hostname); if (LOG.isInfoEnabled()) { LOG.info("Host " + host + " has more than " + maxPerHost + " URLs." + " Skipping additional." ); } } continue ; } } output.collect(key, entry); count++; } }
limit是每个Reducer最大需要处理的链接的数量,由limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks()得到,也就是说,是由topN/Reducer的数量决定的。只要每个Reducer没有达到这个最大限度,就从输入的value中取出链接。如果用户配置了generate.max.per.host属性并设置为正值,则会限制同一个host中产生的链接数。程序会过滤掉超过指定数目的属于同一主机的URL。reduce方法最终收集通过过滤,且符合数量要求的<sortScore, selectorEntry>键值对。 由于每次收集的键值对的数量是受limit限制的,而且reduce输入的value又是根据链接的得分值从高到低排序的,所以当达到limit的限制时,低分的url就被略掉了。
第1次MapReduce任务的输出还是以<得分,链接信息>的形式保存的。
第2次MapReduce 在进行第1次MapReduce后,程序返回generate方法中,并进行第2次MapReduce:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ... ... job = new NutchJob(getConf()); job.setJobName("generate: partition " + segment); job.setInt("partition.url.by.host.seed" , new Random().nextInt()); FileInputFormat.addInputPath(job, tempDir); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(SelectorInverseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SelectorEntry.class); job.setPartitionerClass(PartitionUrlByHost.class); job.setReducerClass(PartitionReducer.class); job.setNumReduceTasks(numLists); FileOutputFormat.setOutputPath(job, output); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); job.setOutputKeyComparatorClass(HashComparator.class);
此次MapReduce的输入是第一次MapReduce输出到的临时文件,输出路径是segment/目录下的crawl_generate。
Mapper阶段 Mapper类是SelectorInverseMapper
1 2 3 4 5 6 7 public static class SelectorInverseMapper extends MapReduceBase implements Mapper <FloatWritable , SelectorEntry , Text , SelectorEntry > { public void map (FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException { SelectorEntry entry = (SelectorEntry)value; output.collect(entry.url, entry); } }
map方法的作用是将第一次MapReduce输出的以<链接得分,链接信息>为形式的文件转变为<url, 链接信息>形式的文件,以URL作为键值,以便Reducer进行处理。
Partitioner以url对应主机名的哈希值作为分发到Reducer的依据
Reducer阶段 Reducer采用PartitionReducer类
1 2 3 4 5 6 7 8 9 10 public static class PartitionReducer extends MapReduceBase implements Reducer <Text , SelectorEntry , Text , CrawlDatum > { public void reduce (Text key, Iterator<SelectorEntry> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { while (values.hasNext()) { SelectorEntry entry = values.next(); output.collect(entry.url, entry.datum); } } }
Reducer的作用也十分简单,就是将Mapper的输出进一步转化,形成crawlDb中数据的保存形式<url, crawldatum>。之所以将<sortScore, selectorEntry>到<url, crawldatum>的转换分两步进行而不全都放在Map阶段是因为转化需要将同一个主机上的链接交给同一个Reducer进行处理。
crawl_generate中包含的链接就是待爬行队列fetch list。
第3次MapReduce 在第二次MapReduce执行后,紧接着进行第三次MapReduce任务,这次的目的是更新tempDir中的generate时间信息,并输出到tempDir2中
1 2 3 4 5 6 7 8 9 10 11 12 13 Path tempDir2 = new Path(getConf().get("mapred.temp.dir" , "." ) + "/generate-temp-" + System.currentTimeMillis()); job = new NutchJob(getConf()); job.setJobName("generate: updatedb " + dbDir); job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); FileInputFormat.addInputPath(job, tempDir); FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDbUpdater.class); job.setReducerClass(CrawlDbUpdater.class); job.setOutputFormat(MapFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); FileOutputFormat.setOutputPath(job, tempDir2);
此次MapReduce以第一次MapReduce的输出tmpDir(里面存放着<sortScore,selectorEntry>形式的信息)和当前的网页数据库crawl/crawldb/current两个路径为输入,Mapper和Reducer都在CrawlDbUpdater中。
Mapper阶段 1 2 3 4 5 6 7 8 public void map (WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { if (key instanceof FloatWritable) { SelectorEntry se = (SelectorEntry)value; output.collect(se.url, se.datum); } else { output.collect((Text)key, (CrawlDatum)value); } }
它将两个输入路径中的不同数据形式统一为<url, crawldatum>的形式。
Reducer阶段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void reduce (Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); genTime.set(gt.get()); if (genTime.get() != generateTime) { orig.set(val); genTime.set(0L ); continue ; } else { orig.set(val); } } if (genTime.get() != 0L ) { orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); } output.collect(key, orig); }
Reduce任务的功能是将输入的<url, datum>中的datum的generate时间进行更新后,输出<url, datum>,此时的输出目录tempDir2就是最新的网页数据库。
在第三次MapReduce执行后,generate方法将tempDir2命名为当前的网页数据库,删除旧的crawlDb,返回segment对象。Generate阶段执行完毕。
这一阶段的数据流程图如下: