@@ -36,24 +36,208 @@ Python3.2带来了`concurrent.futures` 模块,这个模块包含了线程池
36361 . Python 2.2:第一次提出了生成器(最初称之为迭代器)的概念(PEP 255)。
37372 . Python 2.5:引入了将对象发送回暂停了的生成器这一特性即生成器的` send() ` 方法(PEP 342)。
38383 . Python 3.3:添加了` yield from ` 特性,允许从迭代器中返回任何值(注意生成器本身也是迭代器),这样我们就可以串联生成器并且重构出更好的生成器。
39- 4 . Python 3.4:引入` asyncio.coroutine ` 装饰器用来标记作为协程的函数,协程函数和` asyncio ` 及其事件循环一起使用,来实现异步I/O操作。
39+ 4 . Python 3.4:引入` asyncio.coroutine ` 装饰器用来标记作为协程的函数,协程函数和` asyncio ` 及其事件循环一起使用,来实现异步I/O操作。
40405 . Python 3.5:引入了` async ` 和` await ` ,可以使用` async def ` 来定义一个协程函数,这个函数中不能包含任何形式的` yield ` 语句,但是可以使用` return ` 或` await ` 从协程中返回值。
4141
42-
42+ #### 示例代码
43+
44+ 1 . 生成器 - 数据的生产者。
45+
46+ ``` Python
47+
48+ from time import sleep
49+
50+
51+ # 倒计数生成器
52+ def countdown (n ):
53+ while n > 0 :
54+ yield n
55+ n -= 1
56+
57+
58+ def main ():
59+ for num in countdown(5 ):
60+ print (f ' Countdown: { num} ' )
61+ sleep(1 )
62+ print (' Countdown Over!' )
63+
64+
65+ if __name__ == ' __main__' :
66+ main()
67+
68+ ```
69+
70+ 生成器还可以叠加来组成生成器管道,代码如下所示。
71+
72+ ``` Python
73+
74+ # Fibonacci数生成器
75+ def fib ():
76+ a, b = 0 , 1
77+ while True :
78+ a, b = b, a + b
79+ yield a
80+
81+
82+ # 偶数生成器
83+ def even (gen ):
84+ for val in gen:
85+ if val % 2 == 0 :
86+ yield val
87+
88+
89+ def main ():
90+ gen = even(fib())
91+ for _ in range (10 ):
92+ print (next (gen))
93+
94+
95+ if __name__ == ' __main__' :
96+ main()
97+
98+ ```
99+
100+ 2 . 协程 - 数据的消费者。
101+
102+ ``` Python
103+
104+ from time import sleep
105+
106+
107+ # 生成器 - 数据生产者
108+ def countdown_gen (n , consumer ):
109+ consumer.send(None )
110+ while n > 0 :
111+ consumer.send(n)
112+ n -= 1
113+ consumer.send(None )
114+
115+
116+ # 协程 - 数据消费者
117+ def countdown_con ():
118+ while True :
119+ n = yield
120+ if n:
121+ print (f ' Countdown { n} ' )
122+ sleep(1 )
123+ else :
124+ print (' Countdown Over!' )
125+
126+
127+ def main ():
128+ countdown_gen(5 , countdown_con())
129+
130+
131+ if __name__ == ' __main__' :
132+ main()
133+
134+ ```
135+
136+ > 说明:上面代码中countdown_gen函数中的第1行consumer.send(None)是为了激活生成器,通俗的说就是让生成器执行到有yield关键字的地方挂起,当然也可以通过next(consumer)来达到同样的效果。如果不愿意每次都用这样的代码来“预激”生成器,可以写一个包装器来完成该操作,代码如下所示。
137+
138+ ``` Python
139+
140+ from functools import wraps
141+
142+
143+ def coroutine (fn ):
144+
145+ @wraps (fn)
146+ def wrapper (* args , ** kwargs ):
147+ gen = fn(* args, ** kwargs)
148+ next (gen)
149+ return gen
150+
151+ return wrapper
152+ ```
153+
154+ 这样就可以使用` @coroutine ` 装饰器对协程进行预激操作,不需要再写重复代码来激活协程。
155+
156+ 3 . 异步I/O - 非阻塞式I/O操作。
157+
158+ ``` Python
159+
160+ import asyncio
161+
162+
163+ @asyncio.coroutine
164+ def countdown (name , n ):
165+ while n > 0 :
166+ print (f ' Countdown[ { name} ]: { n} ' )
167+ yield from asyncio.sleep(1 )
168+ n -= 1
169+
170+
171+ def main ():
172+ loop = asyncio.get_event_loop()
173+ tasks = [
174+ countdown(" A" , 10 ), countdown(" B" , 5 ),
175+ ]
176+ loop.run_until_complete(asyncio.wait(tasks))
177+ loop.close()
178+
179+
180+ if __name__ == ' __main__' :
181+ main()
182+
183+ ```
184+
185+ 4 . ` async ` 和` await ` 。
186+
187+ ``` Python
188+
189+ import asyncio
190+ import aiohttp
191+
192+
193+ async def download (url ):
194+ print (' Fetch:' , url)
195+ async with aiohttp.ClientSession() as session:
196+ async with session.get(url) as resp:
197+ print (url, ' --->' , resp.status)
198+ print (url, ' --->' , resp.cookies)
199+ print (' \n\n ' , await resp.text())
200+
201+
202+ def main ():
203+ loop = asyncio.get_event_loop()
204+ urls = [
205+ ' https://www.baidu.com' ,
206+ ' http://www.sohu.com/' ,
207+ ' http://www.sina.com.cn/' ,
208+ ' https://www.taobao.com/' ,
209+ ' https://www.jd.com/'
210+ ]
211+ tasks = [download(url) for url in urls]
212+ loop.run_until_complete(asyncio.wait(tasks))
213+ loop.close()
214+
215+
216+ if __name__ == ' __main__' :
217+ main()
218+
219+ ```
220+
221+ 上面的代码使用了[ AIOHTTP] ( https://github.com/aio-libs/aiohttp ) 这个非常著名的第三方库,它实现了HTTP客户端和HTTP服务器的功能,对异步操作提供了非常好的支持,有兴趣可以阅读它的[ 官方文档] ( https://aiohttp.readthedocs.io/en/stable/ ) 。
43222
44223### 实例 - 多线程爬取“手机搜狐网”所有页面。
45224
46225``` Python
47226
227+ import pickle
228+ import zlib
48229from enum import Enum, unique
49- from queue import Queue
230+ from hashlib import sha1
50231from random import random
51- from threading import Thread, current_thread
232+ from threading import Thread, current_thread, local
52233from time import sleep
53234from urllib.parse import urlparse
54235
236+ import pymongo
237+ import redis
55238import requests
56239from bs4 import BeautifulSoup
240+ from bson import Binary
57241
58242
59243@unique
@@ -113,7 +297,6 @@ class Spider(object):
113297
114298 def parse (self , html_page , * , domain = ' m.sohu.com' ):
115299 soup = BeautifulSoup(html_page, ' lxml' )
116- url_links = []
117300 for a_tag in soup.body.select(' a[href]' ):
118301 parser = urlparse(a_tag.attrs[' href' ])
119302 scheme = parser.scheme or ' http'
@@ -122,34 +305,51 @@ class Spider(object):
122305 path = parser.path
123306 query = ' ?' + parser.query if parser.query else ' '
124307 full_url = f ' { scheme} :// { netloc}{ path}{ query} '
125- if full_url not in visited_urls:
126- url_links.append( full_url)
127- return url_links
308+ redis_client = thread_local.redis_client
309+ if not redis_client.sismember( ' visited_urls ' , full_url):
310+ redis_client.rpush( ' m_sohu_task ' , full_url)
128311
129312 def extract (self , html_page ):
130313 pass
131314
132315 def store (self , data_dict ):
316+ # redis_client = thread_local.redis_client
317+ # mongo_db = thread_local.mongo_db
133318 pass
134319
135320
136321class SpiderThread (Thread ):
137322
138- def __init__ (self , name , spider , tasks_queue ):
323+ def __init__ (self , name , spider ):
139324 super ().__init__ (name = name, daemon = True )
140325 self .spider = spider
141- self .tasks_queue = tasks_queue
142326
143327 def run (self ):
328+ redis_client = redis.Redis(host = ' 1.2.3.4' , port = 6379 , password = ' 1qaz2wsx' )
329+ mongo_client = pymongo.MongoClient(host = ' 1.2.3.4' , port = 27017 )
330+ thread_local.redis_client = redis_client
331+ thread_local.mongo_db = mongo_client.msohu
144332 while True :
145- current_url = self .tasks_queue.get()
146- visited_urls.add(current_url)
333+ current_url = redis_client.lpop(' m_sohu_task' )
334+ while not current_url:
335+ current_url = redis_client.lpop(' m_sohu_task' )
147336 self .spider.status = SpiderStatus.WORKING
148- html_page = self .spider.fetch(current_url)
149- if html_page not in [None , ' ' ]:
150- url_links = self .spider.parse(html_page)
151- for url_link in url_links:
152- self .tasks_queue.put(url_link)
337+ current_url = current_url.decode(' utf-8' )
338+ if not redis_client.sismember(' visited_urls' , current_url):
339+ redis_client.sadd(' visited_urls' , current_url)
340+ html_page = self .spider.fetch(current_url)
341+ if html_page not in [None , ' ' ]:
342+ hasher = hasher_proto.copy()
343+ hasher.update(current_url.encode(' utf-8' ))
344+ doc_id = hasher.hexdigest()
345+ sohu_data_coll = mongo_client.msohu.webpages
346+ if not sohu_data_coll.find_one({' _id' : doc_id}):
347+ sohu_data_coll.insert_one({
348+ ' _id' : doc_id,
349+ ' url' : current_url,
350+ ' page' : Binary(zlib.compress(pickle.dumps(html_page)))
351+ })
352+ self .spider.parse(html_page)
153353 self .spider.status = SpiderStatus.IDLE
154354
155355
@@ -158,19 +358,22 @@ def is_any_alive(spider_threads):
158358 for spider_thread in spider_threads])
159359
160360
161- visited_urls = set ()
361+ thread_local = local()
362+ hasher_proto = sha1()
162363
163364
164365def main ():
165- task_queue = Queue()
166- task_queue.put(' http://m.sohu.com/' )
167- spider_threads = [SpiderThread(' thread-%d ' % i, Spider(), task_queue)
366+ redis_client = redis.Redis(host = ' 1.2.3.4' , port = 6379 , password = ' 1qaz2wsx' )
367+ if not redis_client.exists(' m_sohu_task' ):
368+ redis_client.rpush(' m_sohu_task' , ' http://m.sohu.com/' )
369+
370+ spider_threads = [SpiderThread(' thread-%d ' % i, Spider())
168371 for i in range (10 )]
169372 for spider_thread in spider_threads:
170373 spider_thread.start()
171374
172- while not task_queue.empty( ) or is_any_alive(spider_threads):
173- sleep( 5 )
375+ while redis_client.exists( ' m_sohu_task ' ) or is_any_alive(spider_threads):
376+ pass
174377
175378 print (' Over!' )
176379
0 commit comments