title: scrapy爬取知名技术文章网站date: 2018-05-31 16:00:38
tags: spider
确认爬取目标
明确网站(伯乐在线)的结构
比如说最新文章的url:http://blog.jobbole.com/all-posts/page/2/
每一页的位置和最后的页码有关
方案1:通过页码的调整,存储文章列表所有url。(缺点:无法时时获得总页数)
方案2:通过判断是否有“下一页”的按钮,通过不断点击下一页来获取url。
新建爬虫项目
1.新建一个python虚拟环境(article_spider)
进入虚拟环境,进行下列2、3、4操作:
2.安装scrapy(可用豆瓣源)
$ pip install -i https://pypi.douban.com/simple/ scrapy
3.在项目文件夹中创建爬虫项目(ArticleSpider)
$ scrapy startproject ArticleSpider
4.在项目中创建爬虫模板(可自定义)
$ cd ArticleSpider
$ scrapy genspider example example.com
5.导入article_spider虚拟环境
setting->interpreter—>Add Local—>虚拟环境
注意:当点击右上角设置锯齿图标时,show all 后若有多个重名解释器时会提示错误
准备工作
在根目录下创建main.py文件用于调试
main.py
# -*- coding: utf-8 -*-
__author__ = "yaoyinnan"
from scrapy.cmdline import execute
import sys
import os
print (os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
execute(["scrapy", "crawl", "jobbole"])
需要注意的是,在settings.py文件中将ROBOTSTXT_OBEY设置为False,否则的话爬虫会默认识别网站是否遵循ROBO协议,只爬去遵循的,爬虫会很快停止。
打断点,debug看response返回的内容
xpath
xpath简介
1.xpath使用路径表达式在xml和html中进行导航
2.xpath包含标准函数库
3.xpath是一个w3c的标准
xpath节点关系
1.父节点
2.子节点
3.同胞结点
4.先辈结点
5.后代结点
xpath语法
所有下标从1开始
/ 一层
// 无论多少层
@ 属性
| 求并集
应用xpath编写逻辑代码
jobbole.py
class JobboleSpider(scrapy.Spider):
name = 'jobbole'
allowed_domains = ['blog.jobbole.com']
start_urls = ['http://blog.jobbole.com/113942/']
def parse(self, response):
re_selector1 = response.xpath("/html/body/div[1]/div[3]/div[1]/div[1]/h1/text()")
re_selector2 = response.xpath('//*[@id="post-113942"]/div[1]/h1/text()')
re_selector3 = response.xpath('//*[@class="entry-header"]/h1/text()')
pass
通过检查—>右键—>copy path—>获得网页结构xpath
方法一:“绝对path”,从头开始一级一级往下写xpath
方法二:通过ID,每个id都是固定的,用离着所需标签最短的id缩短xpath
方法三:通过不重复的Class,方式同ID
注意:这里的“第几个div”是从1开始数,和数组从0开始不一致。
通过python每次debug太慢了,可以通过shell命令进行爬取
先进入虚拟环境下的项目中,再输入:
$ scrapy shell url
在shell中成功的代码要复制回pycharm
进一步爬取:title、time、favour
class JobboleSpider(scrapy.Spider):
name = 'jobbole'
allowed_domains = ['blog.jobbole.com']
start_urls = ['http://blog.jobbole.com/113942/']
def parse(self, response):
title = response.xpath('//*[@class="entry-header"]/h1/text()').extract()[0]
create_date = response.xpath('//*[@id="post-113942"]/div[2]/p/text()').extract()[0].strip().replace("·", "").strip()
favour = response.xpath('//*[@id="113942votetotal"]/text()').extract()[0] + "赞"
print(title)
print(create_date)
print(favour)
pass
extract() : 提取,返回的是一个数组
extract()[0] : 数组的第一个元素
可以用extract_first(),取第零个上面数组下标的方式获取时需要做异常处理,而这种无需。
并且可以加参数,参数作为提取不到的返回值
replace() : 替换
strip() : 移出字符串尾指定字符,默认为空格
完整代码:
class JobboleSpider(scrapy.Spider):
name = 'jobbole'
allowed_domains = ['blog.jobbole.com']
start_urls = ['http://blog.jobbole.com/113942/']
def parse(self, response):
# 标题
title = response.xpath('//*[@class="entry-header"]/h1/text()').extract()[0]
# 时间
create_date = response.xpath('//*[@id="post-113954"]/div[2]/p/text()').extract()[0].strip().replace("·",
"").strip()
# 内容
content = response.xpath('//div[@class="entry"]').extract()
# 关键词
tag_list = response.xpath('//p[@class="entry-meta-hide-on-mobile"]/a/text()').extract()
tag_list = [element for element in tag_list if not element.strip().endswith("评论")]
tags = ",".join(tag_list)
# 赞
praise_nums = response.xpath('//*[@id="113954votetotal"]/text()').extract_first() + "赞"
# 收藏
collection_str = response.xpath('//span[contains(@class,"bookmark-btn")]/text()').extract_first()
regex_col = "(.[0-9])"
collection_str = re.match(regex_col, collection_str).group(0) + "收藏"
# 评论
comment_str = response.xpath('//div[contains(@class, "post-adds")]/a/span/text()').extract_first()
regex_col = "(.[0-9])"
comment_str = re.match(regex_col, comment_str).group(0) + "评论"
pass
去除评论:[element for element in tag_list if not element.strip().endswith(“评论”)]
用“,”连接数组元素:”,”.join(tag_list)
CSS选择器
完整代码:
class JobboleSpider(scrapy.Spider):
name = 'jobbole'
allowed_domains = ['blog.jobbole.com']
start_urls = ['http://blog.jobbole.com/113942/']
def parse(self, response):
# 标题
title = response.xpath('//*[@class="entry-header"]/h1/text()').extract()[0]
# 时间
create_date = response.xpath('//*[@id="post-113954"]/div[2]/p/text()').extract()[0].strip().replace("·",
"").strip()
# 内容
content = response.xpath('//div[@class="entry"]').extract()
# 关键词
tag_list = response.xpath('//p[@class="entry-meta-hide-on-mobile"]/a/text()').extract()
tag_list = [element for element in tag_list if not element.strip().endswith("评论")]
tags = ",".join(tag_list)
# 赞
praise_nums = response.xpath('//*[@id="113954votetotal"]/text()').extract_first() + "赞"
# 收藏
collection_str = response.css('.bookmark-btn::text').extract_first()
regex_col = "(.[0-9])"
collection_str = re.match(regex_col, collection_str).group(0) + "收藏"
# 评论
comment_str = response.css('a .btn-bluet-bigger::text').extract_first()
regex_col = "(.[0-9])"
comment_str = re.match(regex_col, comment_str).group(0) + "评论"
通过css选择器可以更加方便的爬取。
爬取完整文章列表
解析列表页所有文章url
首先,改变start_urls为列表页url。
第一步:
通过分析得到列表页所有文章url的list:
post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
其中attr(href)可以获取对应html标签的属性的值。
第二步:
for post_url in post_urls:
yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse_detail)
通过遍历每一条url,并且将其交给scrapy下载。
yield:下载
Request方法:
需要调用scrapy.http
from scrapy.http import Request
它可以进入url页面,再通过之前编写的解析代码获取其中内容。
其中可以指定url和callback的函数:
callback函数需要指定为该url回调的函数,比如目前解析列表页所有文章url,则需要回调到之前编写的parse_detail解析文章页面函数。
parse.urljoin方法:因为有的网站的a标签里的href不存放完整的url,需要通过一定的方法将其与域名链接。
需要调用urlib
from urllib import parse
提取下一页
next_urls = response.css(".next.page-numbers::attr(href)").extract_first("")
if next_urls:
yield Request(url=parse.urljoin(response.url, next_urls), callback=self.parse)
与之前思路相仿,将下一页的url提取出来,回调给本函数即可。
由此过程,便提取了文章列表所有文章的内容,下一步需要将其存入数据库中。
存入数据库
借助Items格式化存储字段
首先,再items.py中定义自己的Item类:
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field()
url = scrapy.Field()
url_object_id = scrapy.Field()
front_image_url = scrapy.Field()
front_image_path = scrapy.Field()
praise_nums = scrapy.Field()
collection_nums = scrapy.Field()
comment_nums = scrapy.Field()
tags = scrapy.Field()
content = scrapy.Field()
回到jobboleList.py,由于存储在收藏数和评论数是int类型,需要做判断:
# 收藏
collection_str = response.css('.bookmark-btn::text').extract_first()
regex_col = "(.[0-9])"
match_re = re.match(regex_col, collection_str)
if match_re:
collection_nums = int(match_re.group(0))
else:
collection_nums = 0
# 评论
comment_str = response.css('a .btn-bluet-bigger::text').extract_first()
regex_col = "(.[0-9])"
match_re = re.match(regex_col, comment_str)
if match_re:
comment_nums = int(match_re.group(0))
else:
comment_nums = 0
再jobboleList.py中引入之前定义好的Items类,并对其实例化:
from ArticleSpider.items import JobBoleArticleItem
def ...
...
...
article_item = JobBoleArticleItem()
通过实例的items对象,将每篇文章页爬取得字段保存在items对象中进行后续存储操作:
# 将id格式化为md5格式
article_item["url_object_id"] = get_md5(response.url)
article_item["title"] = title
article_item["url"] = response.url
article_item["create_date"] = create_date
# 将提取的日期转化为日期对象
try:
create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
except Exception as e:
create_date = datetime.datetime.now().date()
article_item["front_image_url"] = [front_image_url] # 需要注意,这里传入的是数组!
article_item["praise_nums"] = praise_nums
article_item["collection_nums"] = collection_nums
article_item["comment_nums"] = comment_nums
article_item["tags"] = tags
article_item["content"] = content
yield article_item
md5需要引入:
from ArticleSpider.utils.common import get_md5
date对象需要引入:
import datetime
提取图片
在pipelines.py中定义ArticleImagePipeline方法作为管道(需要引入ImagesPipeline)
from scrapy.pipelines.images import ImagesPipeline
ArticleImagePipeline:
class ArticleImagePipeline(ImagesPipeline):
def item_completed(self, results, item, info):
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
注意需要return item,以备下一个item传入。
在setting.py中开启ITEM_PIPELINES
先引入os:
import os
ITEM_PIPELINES:
ITEM_PIPELINES = {
# 获取image的path
'ArticleSpider.pipelines.ArticleImagePipeline': 1,
# 下载图片
'scrapy.pipelines.images.ImagesPipeline': 3
}
IMAGES_URLS_FIELD = "front_image_url" # 确认所下载字段
project_dir = os.path.abspath(os.path.dirname(__file__)) # 获取当前目录路径
IMAGES_STORE = os.path.join(project_dir, "images") # join连接
items流经的pipelines(管道),数字代表管道流经顺序(数字越小越先执行)。
存入Json文件
在pipelines.py中引入json
import json
通过定义“管道“将items对象保存的字段存储如json文件。
自定义Json文件的导出
首先在pipelines.py中定义JsonWithEncodingPipeline:
class JsonWithEncodingPipeline(object):
# 自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
lines = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.file.write(lines)
return item
def spider_closed(self, spider):
self.file.close()
存入文件article.json
自定义方法存入的内容可能会有一定的问题和错误,需要注意细节。
调用scrapy提供的json_export导出json文件
首先引入JsonItemExporter:
from scrapy.exporters import JsonItemExporter
在pipelines.py中定义JsonExporterPipleline:
class JsonExporterPipleline(object):
# 调用scrapy提供的json_export导出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def close_spoder(self, spider):
self.exporter.finish_exporting()
self.file.close()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
最后,在setting.py中开启管道JsonWithEncodingPipeline或JsonExporterPipleline。
Exporter中还提供有存储成其他类型文件的方式,可以Ctrl+自行查看。
存入数据库
数据表设计:
字段 | 类型 | 空 | 默认 | 注释 |
---|---|---|---|---|
title | varchar(200) | 否 | ||
create_date | date | 是 | NULL | |
url | varchar(300) | 否 | ||
url_object_id | varchar(50) | 否 | ||
front_image_url | varchar(300) | 是 | NULL | |
front_image_path | varchar(200) | 是 | NULL | |
praise_nums | int(11) | 是 | NULL | |
collection_nums | int(11) | 是 | NULL | |
comment_nums | int(11) | 是 | NULL | |
tags | varchar(200) | 是 | NULL | |
content | longtext | 是 | NULL |
方法设计:
首先,引入MySQL:
import MySQLdb
同步机制
在pipelines.py中定义MysqlPipeline,自定义的方法是同步机制,同步数据入库,速度慢,易堵塞。
class MysqlPipeline(object):
# 采用同步的机制,将数据存入数据库,方法一,同步入库,效率低,插入数据库速度跟不上爬取速度
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'article_spider', '7dKwHJYkXG', 'article_spider', charset="utf8",
use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = "insert into jobbole_article(title,create_date,url,url_object_id,front_image_url,front_image_path,praise_nums,collection_nums,comment_nums,tags,content)VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
self.cursor.execute(insert_sql, (
item['title'], item['create_date'], item['url'], item['url_object_id'], item['front_image_url'][0],
item['front_image_path'],
item['praise_nums'], item['collection_nums'], item['comment_nums'], item['tags'], item['content']))
self.conn.commit()
return item
异步机制
需要先引入cursors
import MySQLdb.cursors
在pipelines.py中定义MysqlTwistedPipeline,使用异步方式可以更加迅速地进行对数据库的插入。
先在setting.py中配置好数据库:
# DataBase config
# 局域网
# MYSQL_HOST = "xxx.xxx.xxx.xxx"
# 本地
MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "article_spider"
MYSQL_PASSWORD = "xxxxxxxxxx"
MysqlTwistedPipeline:
class MysqlTwistedPipeline(object):
# 采用异步的机制,将数据存入数据库,方法二,Twiste异步入库,先在settings中配置好数据库配置
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
password=settings["MYSQL_PASSWORD"],
charset="utf8",
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
# 使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error) # 处理异常
def handle_error(self, failure):
# 处理异步插入的异常
print(failure)
def do_insert(self, cursor, item):
# 执行具体的插入
insert_sql = "insert into jobbole_article(title,create_date,url,url_object_id,front_image_url,front_image_path,praise_nums,collection_nums,comment_nums,tags,content)VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.execute(insert_sql, (
item['title'], item['create_date'], item['url'], item['url_object_id'], item['front_image_url'][0],
item['front_image_path'],
item['praise_nums'], item['collection_nums'], item['comment_nums'], item['tags'], item['content']))
return item
其中,使用python自带的dict可以轻松获取setting中配置好的数据库信息:
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host=settings["MYSQL_HOST"],
db=settings["MYSQL_DBNAME"],
user=settings["MYSQL_USER"],
password=settings["MYSQL_PASSWORD"],
charset="utf8",
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
注意**dbparms的传入方式
对于插入数据库的方法,只需要更改do_insert即可。
使用ItemLoader规范化解析
之前的爬取是通过response将xpath或者css选择器解析的字段存入局部变量,然后再通过ietm传入管道进行后续处理。但是由于其不规范性以及不利于后期维护,我们引入ItemLoader来进行规范化解析。
首先,在jobboleList.py中引入ItemLoader:
from scrapy.loader import ItemLoader
ItemLoader对象有几种add方法:
item_loader.add_xpath()
item_loader.add_css()
item_loader.add_value()
用这其中的方法,可以直接将解析结果交给item
# 文章封面图
front_image_url = response.meta.get("front_image_url", "")
# 通过item loader加载实例
item_loader = ArticleItemLoader(item=JobBoleArticleItem(), response=response)
# item_loader.add_xpath()
item_loader.add_css("title", ".entry-header h1::text")
item_loader.add_value("url", response.url)
item_loader.add_value("url_object_id", response.url)
item_loader.add_css("create_date", ".entry-meta-hide-on-mobile::text")
item_loader.add_value("front_image_url", [front_image_url])
item_loader.add_css("praise_nums", ".vote-post-up h10::text")
item_loader.add_css("collection_nums", ".bookmark-btn::text")
item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
item_loader.add_css("content", "div.entry")
article_item = item_loader.load_item()
不过由于这种方式缺少了对字段的处理和过滤,比如时间对象转化,nums提取数值等。
再在ietm.py中对传入的值进行处理。
首先,引入MapCompose、TakeFirst及Join:
from scrapy.loader.processors import MapCompose, TakeFirst, Join
再在之前定义好的JobBoleArticleItem中对Field进行添加传入的值。
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field(
input_processor=MapCompose(date_filter, date_convert)
)
url = scrapy.Field(
input_processor=MapCompose(get_md5)
)
url_object_id = scrapy.Field()
front_image_url = scrapy.Field(
# 由于传递的是数组,default获取的是第一个。将其覆盖可以用小技巧,传入一个函数,什么都不操作,只return value。
output_processor=MapCompose(return_value)
)
front_image_path = scrapy.Field()
praise_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
collection_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
comment_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
tags = scrapy.Field(
input_processor=MapCompose(remove_comment_tags),
output_processor=Join(',')
)
content = scrapy.Field()
input_processor:对传入字段进行处理。
output_processor:对传出字段进行处理。
由于爬取的是以list数据类型传入的,需要进行预处理,代替之前的extract_first()或extract()[0],重新定义ItemLoader:
先引入ItemLoader:
from scrapy.loader import ItemLoader
ArticleItemLoader
class ArticleItemLoader(ItemLoader):
# 自定义ItemLoader
default_output_processor = TakeFirst()
各种处理函数:
注意需要先引入对应的库:
import datetime
import re
from ArticleSpider.utils.common import get_md5
def date_filter(value):
return value.strip().replace("·", "").strip()
def date_convert(value):
try:
value = datetime.datetime.strptime(value, "%Y/%m/%d").date()
except Exception as e:
value = datetime.datetime.now().date()
return value
def get_nums(value):
match_re = re.match(".*?(\d+).*", value)
if match_re:
nums = int(match_re.group(0))
else:
nums = 0
return nums
def remove_comment_tags(value):
# 去掉tag中提取的评论
if "评论" in value:
return ""
else:
return value
def return_value(value):
return value
注意上面的front_image_url进行了处理,需要对pipelines.py的ArticleImagePipeline进行改动:
class ArticleImagePipeline(ImagesPipeline):
def item_completed(self, results, item, info):
if "front_image_url" in item:
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
判断是否存在front_image_url,如果不做判断,会报错。