使用 Python 進行併發程式設計

讓計算機程式並發的執行是一個經常被討論的話題,今天我想討論一下Python下的各種併發方式。

多執行緒幾乎是每一個程式猿在使用每一種語言時都會首先想到用於解決並發的工具(JS程式員請迴避),使用多執行緒可以有效的利用CPU資源(Python例外)。然而多執行緒所帶來的程式的複雜度也不可避免,尤其是對競爭資源的同步問題。

然而在python中由於使用了全局解釋鎖(GIL)的原因,程式碼並不能同時在多核上並發的執行,也就是說,Python的多執行緒不能併發,很多人會發現使用多執行緒來改進自己的Python程式碼後,程式的執行效率卻下降了,這是多麼蛋疼的一件事呀!如果想瞭解更多細節,推薦閱讀這篇文章(http://www.jeffknupp.com/blog/2012/03/31/pythons-hardest-problem/)。實際上使用多執行緒的程式設計模型是很困難的,程式員很容易犯錯,這並不是程式員的錯誤,因為並行思維是反人類的,我們大多數人的思維是序列(精神分裂不討論),而且馮諾依曼設計的計算機架構也是以順序執行為基礎的。所以如果你總是不能把你的多執行緒程式搞定,恭喜你,你是個思維正常的程式猿:)

ADVERTISEMENT

Python提供兩組執行緒的介面,一組是thread模組,提供基礎的,低等級(Low Level)介面,使用Function作為執行緒的執行體。還有一組是threading模組,提供更容易使用的基於物件的介面(類似於Java),可以繼承Thread物件來實現執行緒,還提供了其它一些執行緒相關的物件,例如Timer,Lock

importthread

defworker:

importthreading

defworker:

importthreading

classworker(threading.Thread):

def__init__(self):

pass

defrun:

"""thread worker function"""

print'Worker'

t=worker

ADVERTISEMENT

t.start

由於前文提到的全局解釋鎖的問題,Python下比較好的並行方式是使用多程式,這樣可以非常有效的使用CPU資源,並實現真正意義上的併發。當然,程式的開銷比執行緒要大,也就是說如果你要建立數量驚人的並發程式的話,需要考慮一下你的機器是不是有一顆強大的心。

frommultiprocessingimportProcess

defworker:

"""thread worker function"""

print'Worker'

p=Process(target=worker)

p.start

p.join

由於執行緒共享相同的地址空間和記憶體,所以執行緒之間的通訊是非常容易的,然而程式之間的通訊就要複雜一些了。常見的程式間通訊有,管道,訊息佇列,Socket介面(TCP/IP)等等。

ADVERTISEMENT

Python的mutliprocess模組提供了封裝好的管道和佇列,可以方便的在程式間傳遞訊息。

Python程式間的同步使用鎖,這一點喝執行緒是一樣的。

另外,Python還提供了程式池Pool物件,可以方便的管理和控制執行緒。

遠端分散式主機 (Distributed Node)

隨著大資料時代的到臨,摩爾定理在單機上似乎已經失去了效果,資料的計算和處理需要分散式的計算機網路來執行,程式並行的執行在多個主機節點上,已經是現在的軟體架構所必需考慮的問題。

遠端主機間的程式間通訊有幾種常見的方式

- TCP/IP

TCP/IP是所有遠端通訊的基礎,然而API比較低階別,使用起來比較繁瑣,所以一般不會考慮

- 遠端方法呼叫 Remote Function Call

RPC是早期的遠端程式間通訊的手段。Python下有一個開源的實現RPyC

ADVERTISEMENT

- 遠端物件 Remote Object

遠端物件是更高階別的封裝,程式可以想操作本地物件一樣去操作一個遠端物件在本地的代理。遠端物件最廣為使用的規範CORBA,CORBA最大的好處是可以在不同語言和平臺中進行通訊。當讓不用的語言和平臺還有一些各自的遠端物件實現,例如Java的RMI,MS的DCOM

Python的開源實現,有許多對遠端物件的支援

- 訊息佇列 Message Queue

比起RPC或者遠端物件,訊息是一種更為靈活的通訊手段,常見的支援Python介面的訊息機制有

在遠端主機上執行併發和本地的多程式並沒有非常大的差異,都需要解決程式間通訊的問題。當然對遠端程式的管理和協調比起本地要複雜。

Python下有許多開源的框架來支援分散式的併發,提供有效的管理手段包括:

- Celery

Celery是一個非常成熟的Python分散式框架,可以在分散式的系統中,非同步的執行任務,並提供有效的管理和排程功能。參考這裡(http://my.oschina.net/taogang/blog/386077)

- SCOOP

SCOOP (Scalable COncurrent Operations in Python)提供簡單易用的分散式呼叫介面,使用Future介面來進行併發。

- Dispy

相比起Celery和SCOOP,Dispy提供更為輕量級的分散式並行服務

- Asyncoro

Asyncoro是另一個利用Generator實現分散式並發的Python框架,

當然還有許多其它的系統,我沒有一一列出

另外,許多的分散式系統多提供了對Python介面的支援,例如Spark

還有一種併發手段並不常見,我們可以稱之為偽執行緒,就是看上去像是執行緒,使用的介面類似執行緒介面,但是實際使用非執行緒的方式,對應的執行緒開銷也不存的。

fromgreenletimportgreenlet

deftest1:

print12

gr2.switch

print34

deftest2:

print56

gr1.switch

print78

gr1=greenlet(test1)

gr2=greenlet(test2)

gr1.switch

偽執行緒gr1 switch會列印12,然後呼叫gr2 switch得到56,然後switch回到gr1,列印34,然後偽執行緒gr1結束,程式退出,所以78永遠不會被列印。通過這個例子我們可以看出,使用偽執行緒,我們可以有效的控製程式的執行流程,但是偽執行緒並不存在真正意義上的併發。

eventlet,gevent和concurence都是基於greenlet提供並發的。

eventlet是一個提供網路呼叫並發的Python庫,使用者可以以非阻塞的方式來呼叫阻塞的IO操作。

importeventlet

fromeventlet.greenimporturllib2

urls=['http://www.google.com''http://www.example.com''http://www.python.org']

deffetch(url):

returnurllib2.urlopen(url).read

pool=eventlet.GreenPool

forbodyinpool.imap(fetchurls):

('got body'17629)

('got body'1270)

eventlet為了支援generator的操作對urllib2做了修改,介面和urllib2是一致的。這裡的GreenPool和Python的Pool介面一致。

importgevent

fromgeventimportsocket

urls=['www.google.com''www.example.com''www.python.org']

jobs=[gevent.spawn(socket.gethostbynameurl)forurlinurls]

gevent.joinall(jobstimeout=2)

print[job.valueforjobinjobs]

執行結果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']

concurence是另外一個利用greenlet提供網路並發的開源庫,我沒有用過,大家可以自己嘗試一下。

通常需要用到並發的場合有兩種,一種是計算密集型,也就是說你的程式需要大量的CPU資源;另一種是IO密集型,程式可能有大量的讀寫操作,包括讀寫檔案,收發網路請求等等。

對應計算密集型的應用,我們選用著名的蒙特卡洛演算法來計算PI值。基本原理如下

蒙特卡洛演算法利用統計學原理來模擬計算圓周率,在一個正方形中,一個隨機的點落在1/4圓的區域(紅色點)的概率與其面積成正比。也就該概率 p = Pi * R*R /4 : R* R , 其中R是正方形的邊長,圓的半徑。也就是說該概率是圓周率的1/4, 利用這個結論,只要我們模擬出點落在四分之一圓上的概率就可以知道圓周率了,為了得到這個概率,我們可以通過大量的實驗,也就是生成大量的點,看看這個點在哪個區域,然後統計出結果。

frommathimporthypot

fromrandomimportrandom

這裡test方法做了n(tries)次試驗,返回落在四分之一圓中的點的個數。判斷方法是檢查該點到圓心的距離,如果小於R則是在圓上。

通過大量的併發,我們可以快速的執行多次試驗,試驗的次數越多,結果越接近真實的圓周率。

deftest(tries):

returnsum(hypot(random,random)<1for_inrange(tries))

defcalcPi(nbFuturestries):

ts=time.time

result=map(test[tries]*nbFutures)

ret=4.*sum(result)/float(nbFutures*tries)

span=time.time-ts

print"time spend "span

returnret

為了使用執行緒池,我們用multiprocessing的dummy包,它是對多執行緒的一個封裝。注意這裡程式碼雖然一個字的沒有提到執行緒,但它千真萬確是多執行緒。

通過測試我們開(jing)心(ya)的發現,果然不出所料,當執行緒池為1是,它的執行結果和沒有併發時一樣,當我們把執行緒池數字設定為5時,耗時幾乎是沒有並發的2倍,我的測試資料從5秒到9秒。所以對於計算密集型的任務,還是放棄多執行緒吧。

frommultiprocessing.dummyimportPool

frommathimporthypot

fromrandomimportrandom

importtime

deftest(tries):

ts=time.time

p=Pool(1)

result=p.map(test[tries]*nbFutures)

returnret

if__name__=='__main__':

p=Pool

print("pi = {}".format(calcPi(30004000)))

理論上對於計算密集型的任務,使用多程式併發比較合適,在以下的例子中,程式池的規模設定為5,修改程式池的大小可以看到對結果的影響,當程式池設定為1時,和多執行緒的結果所需的時間類似,因為這時候並不存在併發;當設定為2時,響應時間有了明顯的改進,是之前沒有並發的一半;然而繼續擴大程式池對效能影響並不大,甚至有所下降,也許我的Apple Air的CPU只有兩個核?

當心,如果你設定一個非常大的程式池,你會遇到 Resource temporarily unavailable的錯誤,系統並不能支援建立太多的程式,畢竟資源是有限的。

frommultiprocessingimportPool

frommathimporthypot

fromrandomimportrandom

deftest(tries):

ts=time.time

p=Pool(5)

returnret

if__name__=='__main__':

不論是gevent還是eventlet,因為不存在實際的併發,響應時間和沒有併發區別不大,這個和測試結果一致。

importgevent

importtime

deftest(tries):

jobs=[gevent.spawn(testt)fortin[tries]*nbFutures]

gevent.joinall(jobstimeout=2)

ret=4.*sum([job.valueforjobinjobs])/float(nbFutures*tries)

returnret

deftest(tries):

ts=time.time

pool=eventlet.GreenPool

result=pool.imap(test[tries]*nbFutures)

returnret

SCOOP中的Future介面符合PEP-3148的定義,也就是在Python3中提供的Future介面。

在預設的SCOOP配置環境下(單機,4個Worker),並發的效能有提高,但是不如兩個程式池配置的多程式。

frommathimporthypot

fromrandomimportrandom

fromscoopimportfutures

deftest(tries):

expr=futures.map(test[tries]*nbFutures)

ret=4.*sum(expr)/float(nbFutures*tries)

returnret

if__name__=="__main__":

fromceleryimportCelery

frommathimporthypot

fromrandomimportrandom

app=Celery('tasks'backend='amqp'broker='amqp:[email protected]//')

app.conf.CELERY_RESULT_BACKEND='db+sqlite:///results.sqlite'

deftest(tries):

fromceleryimportgroup

fromtasksimporttest

result=group(test.s(tries)foriinxrange(nbFutures)).get

returnret

使用Celery做並發的測試結果出乎意料(環境是單機,4frefork的併發,訊息broker是rabbitMQ),是所有測試用例裡最糟糕的,響應時間是沒有並發的5~6倍。這也許是因為控制協調的開銷太大。對於這樣的計算任務,Celery也許不是一個好的選擇。

importasyncoro

importtime

deftest(tries):

yieldsum(hypot(random,random)<1for_inrange(tries))

coros=[asyncoro.Coro(testt)fortin[tries]*nbFutures]

ret=4.*sum([job.valueforjobincoros])/float(nbFutures*tries)

returnret

IO密集型的任務是另一種常見的用例,例如網路WEB伺服器就是一個例子,每秒鐘能處理多少個請求時WEB伺服器的重要指標。

frommathimporthypot

importtime

importurllib2

deftest(url):

deftestIO(nbFutures):

map(testurls*nbFutures)

span=time.time-ts

在不同併發庫下的程式碼,由於比較類似,我就不一一列出。大家可以參考計算密集型中程式碼做參考。

通過測試我們可以發現,對於IO密集型的任務,使用多執行緒,或者是多程式都可以有效的提高程式的效率,而使用偽執行緒效能提升非常顯著,eventlet比沒有並發的情況下,響應時間從9秒提高到0.03秒。同時eventlet/gevent提供了非阻塞的非同步呼叫模式,非常方便。這裡推薦使用執行緒或者偽執行緒,因為在響應時間類似的情況下,執行緒和偽執行緒消耗的資源更少。

Python提供了不同的併發方式,對應於不同的場景,我們需要選擇不同的方式進行併發。選擇合適的方式,不但要對該方法的原理有所瞭解,還應該做一些測試和試驗,資料才是你做選擇的最好參考。


» 程式源
喜歡這篇文章嗎?立刻分享出去讓更多人知道~