压缩解压缩、加密解密、正则表达式
网络爬虫、读写数据库
多进程Process(multiprocessing) | 多线程Thread(threading) | 协程Coroutine(asyncio) | |
---|---|---|---|
优点 | 可以利用多核CPU并行运算 | 相比进程更轻量级,占用资源少 | 内存开销最少,启动协程数量最多 |
缺点 | 占用资源最多,可启动数目比线程少 | 相比进程多线程只能并发执行,不能利用多CPU(GIL);相比协程启动数目有限制,占用内存资源,有线程切换开销 | 支持的库有限制,代码实现复杂 |
适用场景 | CPU密集型计算 | IO密集型计算、同时运行的任务数目要求不多 | IO密集型计算、需要超多任务运行但有现成的库支持的场景 |
defmy_func(a,b): do_something(a,b)
import threading t= threading.Thread(target= my_func,args=(100,200))
t.start()
t.join()
import queue
q = queue.Queue()
q.put(item)
item = q.get()
q.qsize()
q.empty()
q.full()
import threadingimport requestsimport queuedefdo_crawl(url_queue,html_queue):whileTrue: url= url_queue.get() html= requests.get(url) html_queue.put(html)print(threading.current_thread().name,f'crawl{url},url_queue.size={url_queue.qsize()}')defdo_parse(html_queue,fout):whileTrue: html= html_queue.get() results= html.textfor resultin results: fout.write(str(result)+'\n')print(threading.current_thread().name,f'results.size={len(results)},html_queue.size={html_queue.qsize()}') url_queue= queue.Queue() html_queue= queue.Queue()for urlin url_list: url_queue.put(url)#启动三个生产者线程for idxinrange(3): t= threading.Thread(target=do_crawl,args=(url_queue,html_queue),name=f'crawl{idx}') t.start() fout=open('test.txt','w')#启动两个消费者线程for idxinrange(2): t= threading.Thread(target=do_parse,args=(html_queue,fout),name=f'parse{idx}') t.start()
import threading lock= threading.Lock() lock.acquire()try:#do somethingfinally: lock.release()
import threading lock= threading.Lock()with lock:#do something
map函数
,较简单,需要注意map的结果和入参是顺序对应的from concurrent.futuresimport ThreadPoolExecutor,as_completedwith ThreadPoolExecutor()as pool: results= pool.map(crawl,urls)for resultin results:print(result)
future模式
,更强大,需要注意如果用as_completed顺序是不定的from concurrent.futuresimport ThreadPoolExecutor,as_completedwith ThreadPoolExecutor()as pool: futures=[pool.submit(crawl,url)for urlin urls]for futurein futures:print(future.result())for futurein as_completed(futures):print(future.result())
多进程的使用方法和多线程比较类似,可以参考多线程的代码。
import asyncio#获取事件循环 loop= asyncio.get_event_loop()#定义协程asyncdefmyfunc(url):await get_url(url)#创建task列表 tasks=[loop.create_task(myfunc(url))for urlin urls]#执行爬虫事件列表 loop.run_until_complete(asyncio.wait(tasks))
信号量(Semaphore)
是一个同步对象,用于保持在0至指定最大值之间的一个计数值sem= asyncio.Semaphore(10)asyncwith sem:#work with shared resourse
sem= asyncio.Semaphore(10)await sem.acquire()try:#work with shared resoursefinally: sem.release()
根据官方文档说明,await 后面的对象必须是如下格式之一:
简而言之,在await后面只能跟三种可等待的对象:协程对象、future对象、task对象
要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源。使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
tasks用于并发调度协程,它是对协程对象的进一步封装,包含了任务的各个状态。通过asyncio.creat_task(协程对象)的方式可以创建Task对象,这样可以让协程加入事件循环中等待被调度执行。除了使用asyncio.creat_task()以外,还可以用低层级的loop.creat_task()或asyncio.ensure_future()。
注意:asyncio.creat_task()在python3.7之后加入。在python3.7之前可以改用低层级的asyncio.ensure_future()或loop.create_task()(loop.create_task()是最低层级)。
例1:
import asyncioimport logging logging.basicConfig(level=logging.INFO,format='%(asctime)s - |%(levelname)s| - %(message)s')asyncdeffunc(): logging.info(1)await asyncio.sleep(3) logging.info(2)return'返回值'asyncdefmain():print('main start!') task1= asyncio.create_task(func()) task2= asyncio.create_task(func())print('main end!') r1=await task1 r2=await task2print(r1,r2) asyncio.run(main())
以上代码的输出为:
代码的分析:
程序内部本质上只有一个线程,在main函数开始之后,到运行到r1 = await task1之前,线程都没有遇到io等待,所以会一瞬间将main函数中的前四行代码执行完毕。
此时算上asyncio.run(main())中注册的main()协程对象,加上task1、task2,相当于在事件循环中有三个任务。当遇到await的时候,会切换到别的任务中执行,所以在main()遇到await即r1 = await task1时,会切换到task1中执行任务,切换到task1中执行任务之后又遇到了io等待,所以又立即切换到了task2中去执行任务。
此时在控制台打印出的效果就是一瞬间实行了task1与task2,此时没有了io等待,所以程序等待task1与task2执行完毕后又回到main()中继续执行任务。
例2:将task放在一个列表中
import asyncioimport logging logging.basicConfig(level=logging.INFO,format='%(asctime)s - |%(levelname)s| - %(message)s')asyncdeffunc(x): logging.info(1)await asyncio.sleep(x) logging.info(2)returnf'返回值{x}'asyncdefmain():print('main start!') task_list=[ asyncio.create_task(func(2),name='f1'), asyncio.create_task(func(5),name='f2')]print('main end!')#写法1,使用asyncio.wait# done,pending = await asyncio.wait(task_list,timeout = None)# print(done)# 此处打印的是一个Task finished对象的集合#写法2,使用asyncio.gather,asyncio.gather依照协程执行的先后顺序来返回一个列表# t = await asyncio.gather(*task_list)# print(t)# 此处打印:['返回值2', '返回值5'] asyncio.run(main())
例3:一种错误的写法
import asyncioimport logging logging.basicConfig(level=logging.INFO,format='%(asctime)s - |%(levelname)s| - %(message)s')asyncdeffunc(x): logging.info(1)await asyncio.sleep(x) logging.info(2)returnf'返回值{x}' task_list=[ asyncio.create_task(func(2),name='f1'), asyncio.create_task(func(5),name='f2')] done, pending= asyncio.run(asyncio.wait(task_list))
这段代码会报错:no running event loop
原因在于,使用asyncio.create_task的时候,事件循环还没有创建,所以会报错。
此时我们需要改写task_list成这样:
task_list=[ func(2), func(5)]
在task_list里面放协程对象,而不是放task对象,改成这样就可以正常运行了。原因在于在asyncio.run的内部其实会帮你创建一个事件循环,然后这个事件循环中,你传进去的协程对象,它会在内部帮你创建成task对象,就解决了事件循环还没有创建而报错的问题。
建议还是按照例2的方式,将asyncio.creak_task放在一个main()的协程对象里就行了。
Future:代表将来执行或没有执行的任务的结果,Future 相较于Task属于更底层的概念。Future,又称 未来对象、期程对象,其本质上是一个容器,用于接受异步执行的结果。
Furture 对象内部封装了一个 _state,这个 _state 维护着四种状态:Pending、Running、Done,Cancelled,如果变成 Done 完成,就不再等待,而是往后执行,这四种状态的存在其实类似与进程的运行态、就绪态、阻塞态,事件循环凭借着四种状态对 Future\协程对象 进行调度。
我们前面讲的 Task 是继承自 Future,Task对象内部await结果的处理是基于Future对象来的。
例1
import asyncioasyncdefmain():# 获取当前事件循环 loop= asyncio.get_event_loop()# 创建一个任务(Future对象,这个任务什么都不干) fut= loop.create_future()# 等待任务最终结果(Future对象),没有结果则会一直等下去await fut asyncio.run( main())
上面代码会一直运行,不会停止。
例2
import asyncioimport logging logging.basicConfig(level=logging.INFO,format='%(asctime)s - |%(levelname)s| - %(message)s')asyncdefset_after(fut): logging.info('enter set_after')await asyncio.sleep(3) fut.set_result('666')asyncdefmain(): logging.info('enter main')#获取当前时间循环 loop= asyncio.get_running_loop()#创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束 fut= loop.create_future()#创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后会给fut赋值。即手动设置future任务的最终结果,那么fut就可结束了await loop.create_task(set_after(fut))#此处分析一下是否加await的区别 logging.info('get data')#等待Future对象获取最终结果,否则一直等下去 data=await fut logging.info(f'fut的返回值为{data}') asyncio.run(main())
上述代码中,在loop.create_task(set_after(fut))前加了await后的输出结果为:
分析如下:
首先进入main()协程对象,打印enter_main,然后
asynio.gather()中的参数可以是task或协程对象,并且得到其列表形式的返回值。其中如果传入的是协程对象,那么首先gather会先把这些协程对象转换为task,
event loop相当于一个大脑,在下面有若干的task,task是没有办法去控制event loop去执行某一个task的,它只能告诉event loop说我在等这个task,最终是由event loop去决定下面要运行哪一个task。而event loop一旦开始运行task,就必须要task显式地把控制权交还给event loop,交还控制权的方式有await和函数运行完毕。所以尽管我们说协程的方式是并发的,但实际上同一时刻只有一段代码在运行,它只是想办法利用了代码中间等待的时间,所以如果我们的代码里面没有等待的话,协程的帮助并不大。
Python:协程中Task和Future的理解及使用
Python爬虫进阶、异步协程
Python异步编程视频讲解
【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。
【2021最新版】Python 并发编程实战,用多线程、多进程、多协程加速程序运行
Python中协程异步IO(asyncio)详解