部分效果 #
部分网站的爬取效果。其中图1是本博客的爬取效果,表明该方案是适用一般网站的;图2和图3是两个开源的论坛程序搭建起来的论坛的爬取效果,表明对于开源程序能够正常爬取;图4是对著名的天涯论坛的爬取效果,表明哪怕是公司内部开发的论坛,也具有不错的效果。
改进空间 #
总的来说,这是一种较高效率的、无监督的通用网站爬取方案,能够自适应不同的网站(不局限于论坛),适合企业大规模部署。
当然,这种方案也有一些有待改进的地方:
1、保证效率和稳定性,牺牲了普适,比如对内容取舍时,直接按照中文占比来判断,没有用到更精确的语言模型方案;
2、进一步的提升空间较小,因为基于标准模板比较的方案,直接混合式地提出了有效文本,丢失了位置和层次信息,虽然通过聚类可以进一步解决这个情况,但还是存在失效的网站,在原来方法基础上的提升空间不大;
3、没有利用到视觉信息,比如没法准确定位用户名,是因为我们不知道那里才算是用户名,但是如果肉眼去看的话,那是比较容易判断的,也就是说,视觉信息是一个比较重要的指标,如果精准度要求更高的话,我们需要利用视觉信息。
期待看到更好的解决方案。
代码 #
Python2.7代码~
通用爬取框架:
#! -*- coding:utf-8 -*-
import requests as rq
import numpy as np
import re
from lxml.html import html5parser
class crawler:
def __init__(self, standard_urls):
self.title = ''
self.emphase_mark = set(['p', 'br', 'em', 'strong', 'b', 'font', 'u', 'a', 'img', 'h1', 'h2', 'h3', 'h4', 'h5', 'sup', 'sub', 'blockquote', 'cite', 'code', 'pre'])
self.standard_urls = standard_urls
self.sess = rq.Session()
self.standard_contents = []
self.find_title = re.compile('<title>([\s\S]*?)</title>').findall
if isinstance(standard_urls, list):
self.standard_dom = self.create_dom(self.standard_urls[0])
self.standard_contents = set([c[1:] for c in self.traverse_dom(self.standard_dom)[1]])
for url in self.standard_urls[1:]:
self.standard_dom = self.create_dom(url)
self.standard_contents = self.standard_contents & set([c[1:] for c in self.traverse_dom(self.standard_dom)[1]])
else:
self.standard_dom = self.create_dom(self.standard_urls)
self.standard_contents = set([c[1:] for c in self.traverse_dom(self.standard_dom)[1]])
self.find_zh = re.compile(u'[\u4e00-\u9fa5]').findall
self.find_zh_en = re.compile(u'[a-zA-Z\d_\u4e00-\u9fa5]').findall
self.find_date = re.compile(u'.*?年.*?月.*?日|发布时间|\d{4}\-\d{1,2}\-\d{1,2}|昨天|前天|今天|小时前|\d{1,2}:\d{2}').findall
def create_dom(self, url):
r = self.sess.get(url)
return html5parser.fromstring(r.content)
def traverse_dom(self, dom, idx=0, tag=''):
content = []
if len(dom) > 0:
tag += (str(dom.tag)+'_')
if dom.tag not in self.emphase_mark:
idx += 1
idx_ = idx
if dom.text and dom.text.strip():
content.append((idx_, tag, dom.text.strip()))
for d in dom:
idx, content_ = self.traverse_dom(d, idx, tag)
content.extend(content_)
if dom.tail and dom.tail.strip():
content.append((idx_, tag, dom.tail.strip()))
elif dom.tag != 'head':
if dom.text and dom.text.strip():
content.append((idx, tag, dom.text.strip()))
if dom.tail and dom.tail.strip():
content.append((idx, tag, dom.tail.strip()))
if (isinstance(dom.tag, str) or isinstance(dom.tag, unicode)) and 'title' in dom.tag and not self.title:
self.title = dom.text.strip() + dom.tail.strip()
return idx, content
def peak(self, d):
r = []
if d[0] > d[1]:
r.append(0)
for i in range(1, len(d)-1):
if d[i] > max(d[i-1], d[i+1]):
r.append(i)
if len(d) >= 3 and d[-1] > d[-2]:
r.append(len(d)-1)
return r
def keep_proba(self, s):
if len(self.find_zh_en(s)) == len(s):
return 1
else:
c0 = len(''.join(self.find_date(s)))
return 1.*(len(self.find_zh(s))+c0)/(len(s))
def crawl_url(self, url, cluster_times=2):
self.title = ''
dom = self.create_dom(url)
content = self.traverse_dom(dom)[1]
content_ = []
for c in content:
if c[1:] not in self.standard_contents:
content_.append(list(c))
content = [content_[0]]
for c in content_:
if c[0] == content[-1][0]:
content[-1] = [c[0], c[1], content[-1][2]+'\n'+c[2]]
else:
content.append(c)
content = [c for c in content if self.keep_proba(c[2]) >= 0.25]
for _ in range(cluster_times):
content_ = content[:]
if len(content_) >= 3:
idxs = [c[0] for c in content_]
idxs = set([idxs[i+1] for i in self.peak(np.diff(idxs))])
content = [content_[0]]
cc = 1
for i in range(1, len(content_)):
if content_[i][0] in idxs:
content[-1][0] = int(content[-1][0]/cc)
content.append(content_[i])
cc = 1
else:
content[-1] = [content[-1][0]+content_[i][0], content[-1][1], content[-1][2]+'\n'+content_[i][2]]
cc += 1
return [c[2] for c in content]
落实到论坛:
def find_datetime(s):
r = []
for t in s.split('\n'):
l = len(t)*1.
d = re.findall('\d+\-\d+\-\d+ +\d+:\d+', t)
if d and len(d[0])/l > 0.5:
r.append(d[0])
else:
d = re.findall(u'\d+年 *\d+月 *\d+日 +\d+:\d+', t)
if d and len(d[0])/l > 0.5:
d = re.findall(u'(\d+)年 *(\d+)月 *(\d+)日 +(\d+):(\d+)', t)
r.append('%s-%s-%s %s:%s'%d[0])
else:
r.append(None)
return r
def extract_info(b,c):
r = []
for t in b:
dts = find_datetime(t)
t = t.split('\n')
dt = max(dts)
if dt:
idx = dts.index(dt)
r.append((dt, '\n'.join(t[:idx]), '\n'.join(t[idx+1:])))
else:
r.append((None, '\n'.join(t), '\n'.join(t)))
idx = 1 + (sum([c.keep_proba(i[1]) for i in r]) < sum([c.keep_proba(i[2]) for i in r]))
r = [(i[0], i[idx]) for i in r if i[idx]]
if not r[0][0]:
r = r[1:]
rr = [r[0]]
for a,b in r[1:]:
if not a:
rr[-1] = rr[-1][0], rr[-1][1]+'\n'+b
else:
rr.append((a,b))
return rr
if __name__ == '__main__':
c = crawler(['http://bbs.emath.ac.cn/thread-9531-1-1.html', 'http://bbs.emath.ac.cn/thread-2749-1-1.html'])
b = c.crawl_url('http://bbs.emath.ac.cn/thread-9538-1-1.html')
title = c.title
r = extract_info(b,c)
keys = ('publish_date', 'content', 'title', 'author')
final = {}
final['post'] = dict(zip(keys, (r[0][0], r[0][1], title, '')))
final['replys'] = [dict(zip(keys, (i[0], i[1], title, ''))) for i in r[1:]]
import pandas as pd
pd.DataFrame(r)
转载到请包括本文地址:https://kexue.fm/archives/4430
更详细的转载事宜请参考:《科学空间FAQ》



