現在所使用的CPU通常具有多核多執行緒的功能,而執行程式通常只會使用到一顆核心,但想要充分的發揮多核心硬體的運算能力,就需要使用多執行緒(Multi-Threading)或多行程 (Mult-Processing)等的平行運算技術
Thread
執行緒 Thread為作業系統(OS) 進行運算排程的最小單位,它被包含在行程(Process),是行程(Process)中實際運作單位。一條執行緒指的是行程中一個單一順序的控制流,一個行程中可以並行多個執行緒,每條執行緒並列執行不同的任務。
但Python在使用CPython直譯時受到GIL(Global Interpreter Lock)的影響,使Python在執行時只會運行單一Thread來保證Thread-Safe,因此在一般運算時,反而會有反效果,執行速度較慢。
但依然可以在IO密集型計算產生正面效果,當需要等待IO連接時,會有一段時間不執行任何Python程式,這時就可以讓其他執行緒繼續執行,因此Multi-Thread還是有較好的效果。
Thread-Safe是在多執行緒運行時,對同一資源進行讀寫操作的修改時,必須保證各執行緒之間不會衝突,數據修改不會發生錯誤。
在Python中,若要使用多執行緒,可以使用threading這個模組來建立子執行緒,而threading是Python標準函式庫裡面的模組,因此不需要額外安裝。
建立子執行緒
import threading
import time# 子執行緒Function
def ChildThread():
for i in range(5):
print("Child Thread:", i)
time.sleep(1)# 建立一個子執行緒
CT = threading.Thread(target = ChildThread)# 執行子執行緒
CT.start()# 主執行緒繼續執行自己的工作
for i in range(3):
print("Main Thread:", i)
time.sleep(0.5)# 等待 CT 這個子執行緒結束
CT.join()print("Finish")
首先需要定義一個子執行緒的Function,並且使用threading.Thread建立子執行緒,其中target就是需要使用子執行緒執行的Function。
建立子執行緒後,就使用start()開始執行,在子執行緒執行的同時,主執行緒依然可以繼續執行其他任務。
而需要將子執行緒的任務執行完後再繼續執行主執行緒時,可以使用join(),主執行緒會等待子執行緒執行完成後,再繼續執行主執行緒。
執行結果:
Child thread: 0
Main thread: 0
Main thread: 1
Child thread: 1
Main thread: 2
Child thread: 2
Child thread: 3
Child thread: 4
Done.
setDaemon
與join()不同,join()是讓主執行緒等待子執行緒完成後,再回到主執行緒繼續執行。而setDaemon()則是在主執行緒結束的同時,讓子執行緒一起結束。而setDaemon()需要位於start()之前,否則setDaemon()依然為False
import threading
import time# 子執行緒Function
def ChildThread(index, semaphore):
for i in range(2):
semaphore.acquire()
print("Child thread %d: start"%index)
print("Child thread %d:"%index, i)
time.sleep(0.5)
semaphore.release()
print("Child thread %d: end"%index)if __name__ == '__main__':
# 建立 Semaphore
semaphore = threading.Semaphore(2)
thread_list = []
for i in range(3):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,semaphore))
thread_list.append(CT)for i in range(3):
# 執行該子執行緒
thread_list[i].start()for i in range(3):
thread_list[i].join()print("Finish")
執行結果:
Child Thread: 0
Main Thread: 0
Main Thread: 1
Child Thread: 1
Main Thread: 2
Finish
多子執行緒
我們可以同時建立多個子執行緒,並分別傳入不同參數,使每個執行緒執行不同的任務,可以使用List來儲存多個執行緒。
import threading
import time# 子執行緒Function
def ChildThread(index):
for i in range(2):
print("Child thread %d:"%index, i)
time.sleep(0.5)if __name__ == '__main__':
thread_list = []
for i in range(5):# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,))
thread_list.append(CT)for i in range(5):
# 執行該子執行緒
thread_list[i].start()# 主執行緒繼續執行自己的工作
for i in range(5):
print("Main thread:", i)
time.sleep(0.5)for i in range(5):
thread_list[i].join()print("Finish")
跟前面一樣,先建立一個子執行緒,而其中args就是傳入子執行緒Function中的參數。以上方式可以使每個子執行緒使用不同參數來執行不同功能的Function。
注意: args的格式為 (變數,) ,需要指定為Tuple才可以執行
執行結果:
Child thread 0: 0
Child thread 1: 0
Child thread 2: 0
Child thread 3: 0
Child thread 4: 0
Main thread: 0
Child thread 2: 1
Main thread: 1
Child thread 0: 1
Child thread 3: 1
Child thread 1: 1
Child thread 4: 1
Main thread: 2
Main thread: 3
Main thread: 4
Finish
物件導向
可以使用Python物件導向的方式來改寫
import threading
import time#繼承父類threading.Thread
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
#把要執行的Function寫在run內,會在建立子執行緒後直接執行run
def run(self):
print("Starting " + self.name)
while self.counter:
print("%s: %d" %(self.name, self.counter))
time.sleep(1)
self.counter -= 1
print("Exiting " + self.name)# 建立子執行緒
thread1 = myThread(1, "Thread-1", 2)
thread2 = myThread(2, "Thread-2", 3)
# 開始子執行緒
thread1.start()
thread2.start()
print("Exiting Main Thread")
其概念與前面的差不多,繼承threading.Thread。比較有差別的地方是在建立子執行緒後,開始執行子執行緒時,會直接呼叫自己run Function。因此可以直接把需要執行的Function寫在run內即可。
執行結果:
Starting Thread-1
Thread-1: 2
Starting Thread-2
Exiting Main Thread
Thread-2: 3
Thread-1: 1
Thread-2: 2
Thread-2: 1
Exiting Thread-1
Exiting Thread-2
對列 (Queue)
當有許多工作需要分配給多個Thread時,較為簡單的方式是使用Queue,Dictionary、List並不適合用於多執行緒,在多執行緒中,數據是共享的,在多執行緒中難以確認數據的安全性。Queue為一種數據結構,用來解決多執行緒間數據的交換。Queue為Python的標準函式庫,可以直接import使用。
Queue有三種形式: FIFO(先進先出)、LIFO(後進先出)、優先隊列
import time
import threading
import queue# Worker 類別,負責處理資料
class Worker(threading.Thread):
def __init__(self, queue, num):
threading.Thread.__init__(self)
self.queue = queue
self.num = numdef run(self):
while self.queue.qsize() > 0:
# 取得新的資料
msg = self.queue.get()# 處理資料
print("Worker %d: %s" % (self.num, msg))
time.sleep(1)# 建立佇列
my_queue = queue.Queue()# 將資料放入佇列
for i in range(10):
my_queue.put("Data %d" % i)# 建立兩個 Worker
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)# 讓 Worker 開始處理資料
my_worker1.start()
my_worker2.start()# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()print("Done.")
先宣告一個隊列Queue,並將資料放入隊列中Queue.put()。之後在子執行緒中取出資料Queue.get(),先執行完的執行緒就會繼續從隊列中取出資料繼續執行,直到隊列中無資料,結束子執行緒。
執行結果:
Worker 1: Data 0
Worker 2: Data 1
Worker 1: Data 2
Worker 2: Data 3
Worker 1: Data 4
Worker 2: Data 5
Worker 2: Data 6
Worker 1: Data 7
Worker 1: Data 8
Worker 2: Data 9
Done.
鎖定(Lock)
在平行化的多執行緒中,為了避免同時修改同一數據的問題,可以使用Lock(),使得一次只能運行一個執行緒,但由於GIL(Global Interpreter Lock),使用CPython時就已經避免多執行緒的衝突問題,其更多用途是可以手動安排執行緒的執行順序
import threading
import time# 子執行緒Function
def ChildThread(index, lock):
lock.acquire()
for i in range(3):
print("Child thread %d:"%index, i)
time.sleep(0.5)
lock.release()if __name__ == '__main__':
# 建立 lock
lock = threading.Lock()
thread_list = []
for i in range(2):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,lock))
thread_list.append(CT)for i in range(2):
# 執行該子執行緒
thread_list[i].start()for i in range(2):
thread_list[i].join()print("Finish")
當要啟用該執行緒時,需要先取得執行權Lock.acquire(),並再執行完成後釋放執行權Lock.release()。
執行結果:
Child thread 0: 0
Child thread 0: 1
Child thread 0: 2
Child thread 1: 0
Child thread 1: 1
Child thread 1: 2
Finish
旗標(Semaphore)
在程式執行時,有時候遇到一些情況需要對運算資源進行一些限制,限定有限個執行緒同時進行,但不同於Lock,Lock限定一次運行一個執行緒,而Semaphore可以執行多個執行緒。
import threading
import time# 子執行緒Function
def ChildThread(index, semaphore):
for i in range(2):
semaphore.acquire()
print("Child thread %d: start"%index)
print("Child thread %d:"%index, i)
time.sleep(0.5)
semaphore.release()
print("Child thread %d: end"%index)if __name__ == '__main__':
# 建立 Semaphore
semaphore = threading.Semaphore(2)
thread_list = []
for i in range(3):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,semaphore))
thread_list.append(CT)for i in range(3):
# 執行該子執行緒
thread_list[i].start()for i in range(3):
thread_list[i].join()print("Finish")
當Semaphore設定為2時,一次只能執行2個執行緒,因此下方執行的結果為,一開始有2個start,之後需要等到end之後才會有下一個start。
執行結果:
Child thread 0: start
Child thread 0: 0
Child thread 1: start
Child thread 1: 0
Child thread 1: end
Child thread 1: start
Child thread 1: 1
Child thread 2: start
Child thread 0: end
Child thread 2: 0
Child thread 1: end
Child thread 0: start
Child thread 0: 1
Child thread 2: end
Child thread 2: start
Child thread 2: 1
Child thread 0: end
Child thread 2: end
Finish
重複鎖定(RLock)
其功能與Lock類似,差別是Lock只能呼叫一次,當呼叫兩次acquire()或是release()時,就會發生錯誤。而RLock()內部則是有計數器,當呼叫acquire()時計數器+1,呼叫release()時計數器-1,而當計數器為0時,就會釋放鎖定,讓其他執行緒執行。在計數器大於0時,其他執行緒都無法執行。
而因為CPython中GIL的問題,造成Multi-Thread多工的效能不高,因此可以使用Multi-processing來進行多工處裡。
Multi-processing (多處理程序/多進程):
- 資料在彼此間傳遞變得更加複雜及花時間,因為一個 process 在作業系統的管理下是無法去存取別的 process 的 memory
- 適合需要 CPU 密集,像是迴圈計算
Multi-threading (多執行緒/多線程):
- 資料彼此傳遞簡單,因為多執行緒的 memory 之間是共用的,但也因此要避免會有 Race Condition 問題
- 適合需要 I/O 密集,像是爬蟲需要時間等待 request 回覆
參考文獻:
https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/