Nutch学习记录:Parse

#Nutch学习记录:Parse

Parse阶段和Fetch同属于一个循环层次中,在Fetch后由Parse阶段对抓取的内容进行解析。该阶段的入口是:

1
public void parse(Path segment) throws IOException

其中就是对即将启动的Parse阶段的MapReduce任务进行配置,主要的配置如下:

1
2
3
4
5
6
7
8
9
FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ParseSegment.class);
job.setReducerClass(ParseSegment.class);
FileOutputFormat.setOutputPath(job, segment);
job.setOutputFormat(ParseOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ParseImpl.class);

其中输入路径就是Fetch阶段保存下来的segment/content目录,其中键值对的类型是<url, content>。Mapper和Reducer都在ParseSegment类中,输出的键值对类型是<url, ParseImpl>。输出格式是由ParseOutputFormat类实现的,它也是进行解析的重要部分。

Mapper阶段

在进行配置之后就启动任务,首先分析map方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void map(WritableComparable key, Content content,
OutputCollector&lt;Text, ParseImpl&gt; output, Reporter reporter)
throws IOException {
// convert on the fly from old UTF8 keys
if (key instanceof UTF8) {
newKey.set(key.toString());
key = newKey;
}
int status = Integer.parseInt(content.getMetadata().get(Nutch.FETCH_STATUS_KEY)); // 获取fetch阶段的状态
if (status != CrawlDatum.STATUS_FETCH_SUCCESS) {
// content not fetched successfully, skip document
LOG.debug("Skipping " + key + " as content is not fetched successfully");
return;
}
ParseResult parseResult = null;
try {
parseResult = new ParseUtil(getConf()).parse(content);
} catch (Exception e) {
LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
return;
}

map首先获取Fetch阶段的状态,如果没有抓取成功,则返回,跳过这个网页,如果成功则调用ParseUtil的parse方法,对页面进行解析,该方法实现如下:

1
parsers = this.parserFactory.getParsers(content.getContentType(), content.getUrl() != null ? content.getUrl():"");

首先根据网页的类型找出可以解析这个网页的全部parser,再依次调用每个parser的getParse方法对网页进行解析,只要有一个parser成功解析了网页就返回给map方法的parseResult。对于HTML来说,使用HtmlParser作为其解析器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ParseResult getParse(Content content) {   // 解析content
HTMLMetaTags metaTags = new HTMLMetaTags();
......
DocumentFragment root;
try {
byte[] contentInOctets = content.getContent();
InputSource input = new InputSource(new ByteArrayInputStream(contentInOctets));
......
input.setEncoding(encoding);
if (LOG.isTraceEnabled()) { LOG.trace("Parsing..."); }
root = parse(input);
}
......
HTMLMetaProcessor.getMetaTags(metaTags, root, base); // 获取网页的meta标签中的属性
if (LOG.isTraceEnabled()) {
LOG.trace("Meta tags for " + base + ": " + metaTags.toString());
}

首先对输入的内容进行处理,行程InputSource类型的对象,传入parse方法,该方法调用第三方库Neko(还有另外一个TagSoup)将输入的数据形成DOM树,并将头节点返回给root。接着,调用HTMLMetaProcessor的getMetaTags获取网页中meta标签的属性。meta标签中包含了许多对搜索引擎爬虫有用的信息,指明了哪些网页可以检索,哪些链接不能跟踪等,在这个方法中用到的一些标签对应的变量如下:

  • noIndex 允许搜索引擎访问这个URL,但不允许搜索引擎索引它,且不允许在搜索结果页面显示
  • noFollow 允许搜索引擎访问这个URL,允许搜索引擎索引它,在搜索结果页面显示,但不传递PR值
  • noCache 是否缓存页面
  • refresh 是否重定向
  • baseHref
  • refreshTime 重定向跳转的时间
  • refreshHref 重定向到的链接

接下来就根据meta标签中规定的内容进行进一步的解析:

1
2
3
4
5
6
7
8
9
10
if (!metaTags.getNoIndex()) {            // okay to index 
StringBuffer sb = new StringBuffer();
if (LOG.isTraceEnabled()) { LOG.trace("Getting text..."); }
utils.getText(sb, root); // extract text 提取网页内容
text = sb.toString();
sb.setLength(0);
if (LOG.isTraceEnabled()) { LOG.trace("Getting title..."); }
utils.getTitle(sb, root); // extract title 提取标题
title = sb.toString().trim();
}

如果该网页允许索引,则从DOM树的title标签下取出网页的标题,并提取去掉标签后的网页内容。

1
2
3
4
5
6
7
8
9
10
11
12
if (!metaTags.getNoFollow()) {              // okay to follow links 
ArrayList&lt;Outlink&gt; l = new ArrayList&lt;Outlink&gt;(); // extract outlinks
URL baseTag = utils.getBase(root);
if (LOG.isTraceEnabled()) {
LOG.trace("Getting links...");
}
utils.getOutlinks(baseTag!=null?baseTag:base, l, root);
outlinks = l.toArray(new Outlink[l.size()]);
if (LOG.isTraceEnabled()) {
LOG.trace("found "+outlinks.length+" outlinks in "+content.getUrl());
}
}

如果这个页面可以跟踪出链,则从DOM中提取出所有的出链并赋给outlinks数组。

1
2
3
4
5
if (metaTags.getRefresh()) {  // 要重定向
status.setMinorCode(ParseStatus.SUCCESS_REDIRECT);
status.setArgs(new String[] {metaTags.getRefreshHref().toString(),
Integer.toString(metaTags.getRefreshTime())});
}

如果在meta标签中发现需要重定向,则设置解析状态的minorCode为SUCCESS_REDIRECT,并设置好重定向需要的重定向链接和刷新时间参数。

1
2
ParseData parseData = new ParseData(status, title, outlinks, content.getMetadata(), metadata);              
ParseResult parseResult = ParseResult.createParseResult(content.getUrl(), new ParseImpl(text, parseData));

最后,将解析结果构造成ParseResult类型,再进行html过滤之后,就返回构造的ParseResult。

这里需要说明,解析结果是层层封装的,最高层是ParseResult,它包含了所有的解析结果。几个包含解析结果的类的关系如下:

如上图所示,每个类中都包含了不同的解析出的内容,ParseResult类中的parseMap映射里包含的就是<url,parseImpl>键值对,其中parseImpl是对parseData和parseText的封装。creatParseResult方法就是将键值对中需要的内容写入到parseMap中。

在封装好解析结果后,程序返回到ParseSegment类的map方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
for (Entry&lt;Text, Parse&gt; entry : parseResult) {   
Text url = entry.getKey();
Parse parse = entry.getValue();
......
byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse);
parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
try {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (ScoringFilterException e) {
......
}
}
output.collect(url, new ParseImpl(new ParseText(parse.getText()), parse.getData(), parse.isCanonical()));

使用迭代器依次从parseResult的parseMap中取出<url, parseImpl>(对于HtmlParser来说,parseMap中只有一组键值对),对content计算MD5签名并存入到contentMeta中。随后又调用scroing filter的passScoreAfterParsing方法,对于OPIC算法的filter来说,该方法就是将content的metadata中SCORE.KEY对应的得分拷贝到parseData中。

最后,收集map的结果<url, parseImpl>。

Reducer阶段

reduce方法非常简单,就是收集第一个<url, parseImpl>对,并输出:

1
2
3
public void reduce(Text key, Iterator&lt;Writable&gt; values, OutputCollector&lt;Text, Writable&gt; output, Reporter reporter) throws IOException {
output.collect(key, (Writable)values.next()); // collect first value
}

ParseOutputFormat

虽然看上去只是一个输出格式,但实际上它不仅担负了控制输出的功能还担负了解析页面的作用。该类最主要的方法就是getRecordWriter方法,它为输出返回一个writer。下面对其进行分析:

1
2
3
4
5
6
7
8
9
public RecordWriter&lt;Text, Parse&gt; getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress) throws IOException {
......
Path out = FileOutputFormat.getOutputPath(job);
Path text = new Path(new Path(out, ParseText.DIR_NAME), name);
Path data = new Path(new Path(out, ParseData.DIR_NAME), name);
Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);
final MapFile.Writer textOut = new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class, CompressionType.RECORD, progress);
final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, compType, progress); final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress);
}

该方法初始化了三个输出路径,分别用于三种不同格式的输出,这三个输出将用于三种不同的目的。这三个输出分别对应的输出格式如下:

输出 格式
textOut (Text, ParseText)
dataOut (Text, ParseData)
crawlOut (Text, CrawlDatum)

该类返回一个RecordWriter类的对象,真正向输出写数据的就是这个类的write方法。接下来对write方法进行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void write(Text key, Parse parse) throws IOException {
String fromUrl = key.toString();
String fromHost = null;
String toHost = null;
textOut.append(key, new ParseText(parse.getText(), parse.getThemeText()));
ParseData parseData = parse.getData();
String sig = parseData.getContentMeta().get(Nutch.SIGNATURE_KEY);
if (sig != null) {
byte[] signature = StringUtil.fromHexString(sig);
if (signature != null) {
// append a CrawlDatum with a signature
CrawlDatum d = new CrawlDatum(CrawlDatum.STATUS_SIGNATURE, 0);
d.setSignature(signature);
crawlOut.append(key, d);
}
}
}

write方法的输入就是reduce方法收集的<url, parseImpl>对,相同的url被分发到同一个reducer上处理。fromUrl就是输入进来的key。这个fromUrl是相对后面解析出来的出链而言的。

首先向textOut中添加输出<key, ParseText>。接着再从contentMeta中取出网页的签名,如果不为空则将它写成字节流,并添加到key对应的crawlDatum中,将<key, crawlDatum>对添加到crawlOut输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ParseStatus pstatus = parseData.getStatus();   
if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { // 重定向 参数在HtmlParser中设置
String newUrl = pstatus.getMessage();
int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
newUrl = normalizers.normalize(newUrl,URLNormalizers.SCOPE_FETCHER);
newUrl = filters.filter(newUrl);
String url = key.toString();
if (newUrl != null && !newUrl.equals(url)) {
String reprUrl = URLUtil.chooseRepr(url, newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME); //根据方法中的算法在原始url和重定向到的url中选出一个代表url(representation url)
CrawlDatum newDatum = new CrawlDatum();
newDatum.setStatus(CrawlDatum.STATUS_LINKED);
if (reprUrl != null && !reprUrl.equals(newUrl)) {
newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,new Text(reprUrl));
}
crawlOut.append(new Text(newUrl), newDatum); //添加新解析出的&lt;url, crawlDatum&gt;
}
}

接下来,程序获取parseData中的parseStatus,如果状态中表明需要重定向(上文介绍过在哪里设置需要重定向的标志)则继续获取重定向到的链接newUrl和刷新时间。如果newUrl和key中的url不同,则调用Util.chooseRepr方法根据该方法的算法(参见 http://help.yahoo.com/l/nz/yahooxtra/search/webcrawler/slurp-11.html)算则出key和newUrl中的一个代表链接。如果代表连接不是newUrl,则将newUrl的crawlDatum的metaData中WRITABLE_REPR_URL_KEY对应的值写成代表连接reprUrl。并将<newUrl, newDatum>写入到crawlOut的输出中。

接下来的部分收集出链为后面的crawlDb的更新作准备:

1
2
3
4
5
Outlink[] links = parseData.getOutlinks();
int outlinksToStore = Math.min(maxOutlinks, links.length);
......
List&lt;Entry&lt;Text, CrawlDatum&gt;&gt; targets = new ArrayList&lt;Entry&lt;Text, CrawlDatum&gt;&gt;(outlinksToStore);
List&lt;Outlink&gt; outlinkList = new ArrayList&lt;Outlink&gt;(outlinksToStore);

首先取出在前面解析出来的outlink,放入links数组中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
for (int i = 0; i &lt; links.length && validCount < outlinksToStore; i++) {
String toUrl = links[i].getToUrl();
// ignore links to self (or anchors within the page)
if (fromUrl.equals(toUrl)) {
continue;
}
if (ignoreExternalLinks) { // 忽略链向其它主机的链接
try {
toHost = new URL(toUrl).getHost().toLowerCase();
} catch (MalformedURLException e) {
toHost = null;
}
if (toHost == null || !toHost.equals(fromHost)) { // external
// links
continue; // skip it
}
}
try {
toUrl = normalizers.normalize(toUrl, URLNormalizers.SCOPE_OUTLINK); // normalize the url
toUrl = filters.filter(toUrl); // filter the url
if (toUrl == null) {
continue;
}
} catch (Exception e) {
continue;
}
CrawlDatum target = new CrawlDatum(CrawlDatum.STATUS_LINKED, interval); // interval 爬行间隔
Text targetUrl = new Text(toUrl);
try {
scfilters.initialScore(targetUrl, target);
} catch (ScoringFilterException e) {
LOG.warn("Cannot filter init score for url " + key
+ ", using default: " + e.getMessage());
target.setScore(0.0f);
}
targets.add(new SimpleEntry(targetUrl, target)); // 将出链的<url, crawldatum>加入到targets List中
outlinkList.add(links[i]); // 将出链加入到outlinkList的List中
validCount++;
}

接下来进入到循环中,直到所有的出链遍历完或者达到存储限制。每次循环从outlinks中取出一个出链,如果限制了链向其它主机的链接,则忽略这样的链接。对toUrl进行正规化和过滤,如果不能通过这些检查则跳到下一次循环。对与能够通过检查的链接,则对其赋予初始得分后将链接及其对应的crawlDatum以<url, crawlDatum>的形式加入到targets列表中。将url加入到linkList中。

接着,调用scoringfilter的distributeScoreToOutlinks为出链分发得分,在OPIC算法中,分发的方法是父链接的得分/出链的个数。

1
2
3
for (Entry&lt;Text, CrawlDatum&gt; target : targets) {
crawlOut.append(target.getKey(), target.getValue());
}

将带得分的所有通过规范性和过滤检查的出链<url, crawldatum>写入到crawlOut中。

1
2
3
4
Outlink[] filteredLinks = outlinkList.toArray(new Outlink[outlinkList.size()]);
parseData = new ParseData(parseData.getStatus(), parseData.getTitle(), filteredLinks, parseData.getContentMeta(),
parseData.getParseMeta());
dataOut.append(key, parseData); // 向dataOut中添加&lt;url, parseData&gt;

将通过过滤的出链存入数组filteredLinks中,并将parseData重新生成。向dataOut中写入这些<url, parseData>

1
2
3
4
5
6
7
8
9
10
11
12
if (!parse.isCanonical()) {
CrawlDatum datum = new CrawlDatum();
datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS);
String timeString = parse.getData().getContentMeta().get(Nutch.FETCH_TIME_KEY);
try {
datum.setFetchTime(Long.parseLong(timeString));
} catch (Exception e) {
LOG.warn("Can't read fetch time for: " + key);
datum.setFetchTime(System.currentTimeMillis());
}
crawlOut.append(key, datum);
}

最后,如果是子链接,向crawlOut中添加<url, crawldatum>。write方法结束。

总结

由于ParseOutputFormat设计三个输出路径,每个输出路径又不止一次输出,所以在这有必要对所有的输出进行一下小节:

  • textOut:输出的内容:根据reduce输出的<url, parse>中的parse得到ParseText,以此形成<url, parseText>输出。
  • dataOut:write方法最后的带出链得分的<url, parseData>。
  • crawlOut:
    1. reduce输出的key 和带有它对应的网页签名的<url, crawlDatum>。
    2. 如果key的网页要重定向,则输出重定向的<url, crawlDatum>。
    3. 通过正规化和过滤的带得分的出链的<url, crawlDatum>