Coding筆記(10): Multi-Thread 多執行緒

JianJie
15 min readOct 28, 2021

--

現在所使用的CPU通常具有多核多執行緒的功能,而執行程式通常只會使用到一顆核心,但想要充分的發揮多核心硬體的運算能力,就需要使用多執行緒(Multi-Threading)或多行程 (Mult-Processing)等的平行運算技術

Thread

執行緒 Thread為作業系統(OS) 進行運算排程的最小單位,它被包含在行程(Process),是行程(Process)中實際運作單位。一條執行緒指的是行程中一個單一順序的控制流,一個行程中可以並行多個執行緒,每條執行緒並列執行不同的任務。

但Python在使用CPython直譯時受到GIL(Global Interpreter Lock)的影響,使Python在執行時只會運行單一Thread來保證Thread-Safe,因此在一般運算時,反而會有反效果,執行速度較慢。

但依然可以在IO密集型計算產生正面效果,當需要等待IO連接時,會有一段時間不執行任何Python程式,這時就可以讓其他執行緒繼續執行,因此Multi-Thread還是有較好的效果。

Thread-Safe是在多執行緒運行時,對同一資源進行讀寫操作的修改時,必須保證各執行緒之間不會衝突,數據修改不會發生錯誤。

Python Multi-Thread執行方式

在Python中,若要使用多執行緒,可以使用threading這個模組來建立子執行緒,而threading是Python標準函式庫裡面的模組,因此不需要額外安裝。

建立子執行緒

import threading
import time
# 子執行緒Function
def ChildThread():
for i in range(5):
print("Child Thread:", i)
time.sleep(1)
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread)
# 執行子執行緒
CT.start()
# 主執行緒繼續執行自己的工作
for i in range(3):
print("Main Thread:", i)
time.sleep(0.5)
# 等待 CT 這個子執行緒結束
CT.join()
print("Finish")

首先需要定義一個子執行緒的Function,並且使用threading.Thread建立子執行緒,其中target就是需要使用子執行緒執行的Function。

建立子執行緒後,就使用start()開始執行,在子執行緒執行的同時,主執行緒依然可以繼續執行其他任務。

而需要將子執行緒的任務執行完後再繼續執行主執行緒時,可以使用join(),主執行緒會等待子執行緒執行完成後,再繼續執行主執行緒。

執行結果:

Child thread: 0
Main thread: 0
Main thread: 1
Child thread: 1
Main thread: 2
Child thread: 2
Child thread: 3
Child thread: 4
Done.

setDaemon

與join()不同,join()是讓主執行緒等待子執行緒完成後,再回到主執行緒繼續執行。而setDaemon()則是在主執行緒結束的同時,讓子執行緒一起結束。而setDaemon()需要位於start()之前,否則setDaemon()依然為False

import threading
import time
# 子執行緒Function
def ChildThread(index, semaphore):
for i in range(2):
semaphore.acquire()
print("Child thread %d: start"%index)
print("Child thread %d:"%index, i)
time.sleep(0.5)
semaphore.release()
print("Child thread %d: end"%index)
if __name__ == '__main__':
# 建立 Semaphore
semaphore = threading.Semaphore(2)

thread_list = []
for i in range(3):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,semaphore))
thread_list.append(CT)
for i in range(3):
# 執行該子執行緒
thread_list[i].start()
for i in range(3):
thread_list[i].join()
print("Finish")

執行結果:

Child Thread: 0
Main Thread: 0
Main Thread: 1
Child Thread: 1
Main Thread: 2
Finish

多子執行緒

我們可以同時建立多個子執行緒,並分別傳入不同參數,使每個執行緒執行不同的任務,可以使用List來儲存多個執行緒。

import threading
import time
# 子執行緒Function
def ChildThread(index):
for i in range(2):
print("Child thread %d:"%index, i)
time.sleep(0.5)
if __name__ == '__main__':

thread_list = []
for i in range(5):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,))
thread_list.append(CT)
for i in range(5):
# 執行該子執行緒
thread_list[i].start()
# 主執行緒繼續執行自己的工作
for i in range(5):
print("Main thread:", i)
time.sleep(0.5)
for i in range(5):
thread_list[i].join()
print("Finish")

跟前面一樣,先建立一個子執行緒,而其中args就是傳入子執行緒Function中的參數。以上方式可以使每個子執行緒使用不同參數來執行不同功能的Function。

注意: args的格式為 (變數,) ,需要指定為Tuple才可以執行

執行結果:

Child thread 0: 0
Child thread 1: 0
Child thread 2: 0
Child thread 3: 0
Child thread 4: 0
Main thread: 0
Child thread 2: 1
Main thread: 1
Child thread 0: 1
Child thread 3: 1
Child thread 1: 1
Child thread 4: 1
Main thread: 2
Main thread: 3
Main thread: 4
Finish

物件導向

可以使用Python物件導向的方式來改寫

import threading
import time
#繼承父類threading.Thread
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
#把要執行的Function寫在run內,會在建立子執行緒後直接執行run
def run(self):
print("Starting " + self.name)
while self.counter:
print("%s: %d" %(self.name, self.counter))
time.sleep(1)
self.counter -= 1
print("Exiting " + self.name)
# 建立子執行緒
thread1 = myThread(1, "Thread-1", 2)
thread2 = myThread(2, "Thread-2", 3)

# 開始子執行緒
thread1.start()
thread2.start()

print("Exiting Main Thread")

其概念與前面的差不多,繼承threading.Thread。比較有差別的地方是在建立子執行緒後,開始執行子執行緒時,會直接呼叫自己run Function。因此可以直接把需要執行的Function寫在run內即可。

執行結果:

Starting Thread-1
Thread-1: 2
Starting Thread-2
Exiting Main Thread
Thread-2: 3
Thread-1: 1
Thread-2: 2
Thread-2: 1
Exiting Thread-1
Exiting Thread-2

對列 (Queue)

當有許多工作需要分配給多個Thread時,較為簡單的方式是使用Queue,Dictionary、List並不適合用於多執行緒,在多執行緒中,數據是共享的,在多執行緒中難以確認數據的安全性。Queue為一種數據結構,用來解決多執行緒間數據的交換。Queue為Python的標準函式庫,可以直接import使用。

Queue有三種形式: FIFO(先進先出)、LIFO(後進先出)、優先隊列

import time
import threading
import queue
# Worker 類別,負責處理資料
class Worker(threading.Thread):
def __init__(self, queue, num):
threading.Thread.__init__(self)
self.queue = queue
self.num = num
def run(self):
while self.queue.qsize() > 0:
# 取得新的資料
msg = self.queue.get()
# 處理資料
print("Worker %d: %s" % (self.num, msg))
time.sleep(1)
# 建立佇列
my_queue = queue.Queue()
# 將資料放入佇列
for i in range(10):
my_queue.put("Data %d" % i)
# 建立兩個 Worker
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)
# 讓 Worker 開始處理資料
my_worker1.start()
my_worker2.start()
# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()
print("Done.")

先宣告一個隊列Queue,並將資料放入隊列中Queue.put()。之後在子執行緒中取出資料Queue.get(),先執行完的執行緒就會繼續從隊列中取出資料繼續執行,直到隊列中無資料,結束子執行緒。

執行結果:

Worker 1: Data 0
Worker 2: Data 1
Worker 1: Data 2
Worker 2: Data 3
Worker 1: Data 4
Worker 2: Data 5
Worker 2: Data 6
Worker 1: Data 7
Worker 1: Data 8
Worker 2: Data 9
Done.

鎖定(Lock)

在平行化的多執行緒中,為了避免同時修改同一數據的問題,可以使用Lock(),使得一次只能運行一個執行緒,但由於GIL(Global Interpreter Lock),使用CPython時就已經避免多執行緒的衝突問題,其更多用途是可以手動安排執行緒的執行順序

import threading
import time
# 子執行緒Function
def ChildThread(index, lock):
lock.acquire()
for i in range(3):
print("Child thread %d:"%index, i)
time.sleep(0.5)
lock.release()
if __name__ == '__main__':
# 建立 lock
lock = threading.Lock()

thread_list = []
for i in range(2):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,lock))
thread_list.append(CT)
for i in range(2):
# 執行該子執行緒
thread_list[i].start()
for i in range(2):
thread_list[i].join()
print("Finish")

當要啟用該執行緒時,需要先取得執行權Lock.acquire(),並再執行完成後釋放執行權Lock.release()。

執行結果:

Child thread 0: 0
Child thread 0: 1
Child thread 0: 2
Child thread 1: 0
Child thread 1: 1
Child thread 1: 2
Finish

旗標(Semaphore)

在程式執行時,有時候遇到一些情況需要對運算資源進行一些限制,限定有限個執行緒同時進行,但不同於Lock,Lock限定一次運行一個執行緒,而Semaphore可以執行多個執行緒。

import threading
import time
# 子執行緒Function
def ChildThread(index, semaphore):
for i in range(2):
semaphore.acquire()
print("Child thread %d: start"%index)
print("Child thread %d:"%index, i)
time.sleep(0.5)
semaphore.release()
print("Child thread %d: end"%index)
if __name__ == '__main__':
# 建立 Semaphore
semaphore = threading.Semaphore(2)

thread_list = []
for i in range(3):
# 建立一個子執行緒
CT = threading.Thread(target = ChildThread, args = (i,semaphore))
thread_list.append(CT)
for i in range(3):
# 執行該子執行緒
thread_list[i].start()
for i in range(3):
thread_list[i].join()
print("Finish")

當Semaphore設定為2時,一次只能執行2個執行緒,因此下方執行的結果為,一開始有2個start,之後需要等到end之後才會有下一個start。

執行結果:

Child thread 0: start
Child thread 0: 0
Child thread 1: start
Child thread 1: 0
Child thread 1: end
Child thread 1: start
Child thread 1: 1
Child thread 2: start
Child thread 0: end
Child thread 2: 0
Child thread 1: end
Child thread 0: start
Child thread 0: 1
Child thread 2: end
Child thread 2: start
Child thread 2: 1
Child thread 0: end
Child thread 2: end
Finish

重複鎖定(RLock)

其功能與Lock類似,差別是Lock只能呼叫一次,當呼叫兩次acquire()或是release()時,就會發生錯誤。而RLock()內部則是有計數器,當呼叫acquire()時計數器+1,呼叫release()時計數器-1,而當計數器為0時,就會釋放鎖定,讓其他執行緒執行。在計數器大於0時,其他執行緒都無法執行。

而因為CPython中GIL的問題,造成Multi-Thread多工的效能不高,因此可以使用Multi-processing來進行多工處裡。

Multi-processing (多處理程序/多進程):

  • 資料在彼此間傳遞變得更加複雜及花時間,因為一個 process 在作業系統的管理下是無法去存取別的 process 的 memory
  • 適合需要 CPU 密集,像是迴圈計算

Multi-threading (多執行緒/多線程):

  • 資料彼此傳遞簡單,因為多執行緒的 memory 之間是共用的,但也因此要避免會有 Race Condition 問題
  • 適合需要 I/O 密集,像是爬蟲需要時間等待 request 回覆

參考文獻:

https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/

--

--

JianJie

Image Processing / Computer Vision / Deep Learning