python多進(jìn)程基礎(chǔ)詳解
進(jìn)程
什么是進(jìn)程
進(jìn)程指的是一個程序的運(yùn)行過程,或者說一個正在執(zhí)行的程序
所以說進(jìn)程一種虛擬的概念,該虛擬概念起源操作系統(tǒng)
一個CPU 同一時刻只能執(zhí)行一件事
開啟一個進(jìn)程
from multiprocessing import Process
import time
def task(name):
print('%s is running'%name)
time.sleep(3)
print('%s is done'%name)
# 開啟子進(jìn)程的操作必須放到
# if __name__ == '__main__'的子代碼中
# 子進(jìn)程不會再次加載
if __name__ == '__main__':
p=Process(target=task,args=('小王',))
# p=Process(target=task,kwargs={'name':'小王'})
# print(p)
p.start()
# 主進(jìn)程只是向操作系統(tǒng)發(fā)送了一個開啟子進(jìn)程的信號
# p.start()
# 1.操作系統(tǒng)先申請內(nèi)存空間
# 2.把主進(jìn)程的數(shù)據(jù)拷貝到子進(jìn)程里面
# 3.調(diào)用cup才能運(yùn)行里面的代碼
# 創(chuàng)造進(jìn)程的開銷大
print('主')

JOIN方法
當(dāng)前進(jìn)程jion別的進(jìn)程。當(dāng)前進(jìn)程就會等到別的進(jìn)程執(zhí)行完畢了才會繼續(xù)開始往下執(zhí)行

from multiprocessing import Process
import time
def task(name, n):
print('%s is running' % name)
time.sleep(n)
print('%s is done' % name)
if __name__ == '__main__':
start = time.time()
p_l = []
for i in range(1, 4):
p = Process(target=task, args=('小王%s' % i, i))
p_l.append(p)
p.start()
# 主進(jìn)程等待子進(jìn)程
for p in p_l:
p.join()
print('主', (time.time() - start))
進(jìn)程之間空間隔離
from multiprocessing import Process # 這個n是主進(jìn)程里面的值 n = 100 def task(): global n # 改的是子進(jìn)程里面的全局變量 # 主進(jìn)程里面沒有改 n = 0 if __name__ == '__main__': p=Process(target=task) p.start() p.join() print(n)

進(jìn)程的常用方法
current_process 查看pid(進(jìn)程id)
# 1. 進(jìn)程pid:每一個進(jìn)程在操作系統(tǒng)內(nèi)都有一個唯一的id號,稱之為pid
from multiprocessing import Process, current_process
import time
def task():
print('%s is running' % current_process().pid)
time.sleep(3)
print('%s is done' % current_process().pid)
# 開啟子進(jìn)程的操作必須放到
# if __name__ == '__main__'的子代碼中
# 子進(jìn)程不會再次加載
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主', current_process().pid)

os.getpid() 查看進(jìn)程id
# os模塊也可以
from multiprocessing import Process, current_process
import time, os
def task():
print('%s is running 爹是%s' % (os.getpid(), os.getppid()))
time.sleep(3)
print('%s is done爹是%s' % (os.getpid(), os.getppid()))
# 開啟子進(jìn)程的操作必須放到
# if __name__ == '__main__'的子代碼中
# 子進(jìn)程不會再次加載
if __name__ == '__main__':
p = Process(target=task)
p.start()
# 誰把主進(jìn)程創(chuàng)造出來的
#用pycharm就是pycharm創(chuàng)造的
print('主%s爹是%s' % (os.getpid(), os.getppid()))

進(jìn)程其他方法和屬性
from multiprocessing import Process,current_process
import time,os
def task():
print('%s is running 爹是%s'%(os.getpid(),os.getppid()))
time.sleep(30)
print('%s is done爹是%s'%(os.getpid(),os.getppid()))
# 開啟子進(jìn)程的操作必須放到
# if __name__ == '__main__'的子代碼中
# 子進(jìn)程不會再次加載
if __name__ == '__main__':
p=Process(target=task)
p.start()
# 誰把主進(jìn)程創(chuàng)造出來的
# 用pycharm就是pycharm創(chuàng)造的
# 進(jìn)程的名字
print(p.name)
# 殺死子進(jìn)程
p.terminate()
# 需要時間
time.sleep(0.1)
# 判斷子進(jìn)程是否存活
print(p.is_alive())
print('主%s爹是%s'%(os.getpid(),os.getppid()))

守護(hù)進(jìn)程
本質(zhì)就是一個"子進(jìn)程",該"子進(jìn)程"的生命周期<=被守護(hù)進(jìn)程的生命周期
當(dāng)被守護(hù)的進(jìn)程執(zhí)行完了。它也會被殺死
# 主進(jìn)程運(yùn)行完了,子進(jìn)程沒有存在的意義
# 皇帝和太監(jiān)不是同生,但是是同死
from multiprocessing import Process
import time
def task(name):
print('%s活著'%name)
time.sleep(3)
print('%s正常死亡'%name)
if __name__ == '__main__':
p1=Process(target=task,args=('老太監(jiān)',))
# 聲明子進(jìn)程為守護(hù)進(jìn)程
p1.daemon = True
p1.start()
time.sleep(1)
print('皇帝正在死亡')

互斥鎖
進(jìn)程之間內(nèi)存空間互相隔離,怎樣實現(xiàn)共享數(shù)據(jù)
進(jìn)程之間內(nèi)存數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問同一個文件,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結(jié)果就是錯亂,如何控制,就是加鎖處理
'''
搶票
查票
購票
互斥鎖:
在程序中進(jìn)行加鎖處理
必須要釋放鎖下一個鎖才能獲取,所以程序在合適的時候必須要有釋放鎖
所以用文件來處理共享數(shù)據(jù)
1.速度慢
2.必須有互斥鎖
'''
import json
import time,random
from multiprocessing import Process,Lock
# 查票
def search(name):
with open('db.json','rt',encoding='utf-8')as f:
dic = json.load(f)
# 模擬查票時間
time.sleep(1)
print('%s 查看到余票為 %s'%(name,dic['count']))
# 購票
# 第二個get子進(jìn)程不會是第一個get子進(jìn)程修改后count的結(jié)果
# 加互斥鎖,把這一部分并發(fā)變成串行,
# 但是犧牲了效率,保證了數(shù)據(jù)安全
def get(name):
with open('db.json','rt',encoding='utf-8')as f:
dic = json.load(f)
if dic['count']>0:
dic['count']-=1
time.sleep(random.randint(1,3))
with open('db.json', 'wt', encoding='utf-8')as f:
json.dump(dic,f)
print('%s 購票成功'%name)
else:
print('%s 查看到?jīng)]有票了'%name)
def task(name,mutex):
# 并發(fā)
search(name)
# 串行
# 加互斥鎖
mutex.acquire()
get(name)
# 釋放互斥鎖
mutex.release()
# if __name__ == '__main__':
# for i in range(10):
#p=Process(target=task,args=('路人%s'%i,))
#p.start()
## join只能將進(jìn)程的任務(wù)整體變成串行
## 互斥鎖可以局部串行
#p.join()
## 數(shù)據(jù)安全,是指讀的時候無所謂,寫的(改的)時候必須安全
## 寫的時候是串行,讀的時候并發(fā)
# 加鎖
if __name__ == '__main__':
# 主進(jìn)程加鎖
mutex=Lock()
for i in range(10):
# 鎖傳入子進(jìn)程
p=Process(target=task,args=('路人%s'%i,mutex))
p.start()
# join只能將進(jìn)程的任務(wù)整體變成串行
# 互斥鎖可以局部串行
# p.join()
# 數(shù)據(jù)安全,是指讀的時候無所謂,寫的(改的)時候必須安全
# 寫的時候是串行,讀的時候并發(fā)
db.json 中只有10張票。如果沒有加鎖。則可能會出現(xiàn)票唄多賣的情況
進(jìn)程間通信(IPC機(jī)制)
'''
速度快
鎖問題解決
ipc機(jī)制
進(jìn)程彼此之間互相隔離,要實現(xiàn)進(jìn)程間通信(IPC),
multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的
共享內(nèi)存空間
隊列=管道+鎖
'''
from multiprocessing import Queue
# 占用的內(nèi)存,最好小數(shù)據(jù),消息數(shù)據(jù),下載地址
# Queue(限制隊列里面的個數(shù))
# 先進(jìn)先出
q=Queue(3)
# 添加
q.put('a')
q.put('b')
q.put({'x':2})
print('籃子滿了')
# 隊列滿了,相當(dāng)于鎖了
# q.put({'x':2})
# 提取
print(q.get())
print(q.get())
print(q.get())
# # 隊列為空,等待加入,也會阻塞,相當(dāng)于鎖了
print('隊列為空')
print(q.get())
隊列被取完了 后面的q.get() 會阻塞直到有新的元素。所以程序不會結(jié)束

JoinableQueue 來實現(xiàn)生產(chǎn)消費(fèi)者
JoinableQueue#task_done()方法當(dāng)隊列里面沒有元素會結(jié)束線程
'''
小王和小周每人生產(chǎn)10分包子和土豆絲
小戴和小楊一直吃,當(dāng)隊列里面沒有食物時。終結(jié)進(jìn)程
'''
import time, random
from multiprocessing import Process, JoinableQueue
def producer(name, food, q):
for i in range(10):
res = '%s%s' % (food, i)
# 模擬生產(chǎn)數(shù)據(jù)的時間
time.sleep(random.randint(1, 3))
q.put(res)
print('廚師%s生成了%s' % (name, res))
def consumer(name, q):
while True:
# 訂單都沒了還在等,隊列里面空了
res = q.get()
# 模擬處理數(shù)據(jù)的時間
time.sleep(random.randint(1, 3))
print('吃貨%s吃了%s' % (name, res))
# 1每次完成隊列取一次,往q.join() ,取干凈了q.join()運(yùn)行完
q.task_done()
# 多個生產(chǎn)者和消費(fèi)者
if __name__ == '__main__':
q = JoinableQueue()
# 生產(chǎn)者
p1 = Process(target=producer, args=('小王', '包子', q))
p3 = Process(target=producer, args=('小周', '土豆絲', q))
# 消費(fèi)者
c1 = Process(target=consumer, args=('小戴', q))
c2 = Process(target=consumer, args=('小楊', q))
# #3.守護(hù)進(jìn)程的作用: 主進(jìn)程死了,消費(fèi)者子進(jìn)程也跟著死
# #把消費(fèi)者變成守護(hù)進(jìn)程
c1.daemon = True
c2.daemon = True
p1.start()
p3.start()
c1.start()
c2.start()
p1.join()
p3.join()
# 2消費(fèi)者task_done給q.join()發(fā)信號
q.join()
print('主')
# 生產(chǎn)者運(yùn)行完?1,2
# 消費(fèi)者運(yùn)行完?1,2
當(dāng)隊列為空時,不會傻傻等待而是結(jié)束進(jìn)程

總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注本站的更多內(nèi)容!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非maisonbaluchon.cn所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。

關(guān)注官方微信