前言 線程是操作系統能夠進行運算調度的最小單位,它被包含在進程之中,是進程中的實際運作單位。由于CPython的GIL限制,多線程實際為單線程,大多只用來處理IO密集型任務。
Python一般用標準庫threading
來進行多線程編程。
基本使用 方式1,創建threading.Thread
類的示例 import threading
import time
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
num = counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
執行輸出示例
main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
方式2,繼承threading.Thread
類,重寫run()
和__init__()
方法 import threading
import time
class MyThread ( threading. Thread) :
def __init__ ( self, counter: int ) :
super ( ) . __init__( )
self. counter = counter
def run ( self) :
print ( f"thread: { threading. current_thread( ) . name} , args: { self. counter} , start time: { time. strftime( '%F %T' ) } " )
num = self. counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { self. counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = MyThread( 7 )
t2 = MyThread( 5 )
t3 = MyThread( 3 )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
繼承threading.Thread
類也可以寫成這樣,調用外部函數。
import threading
import time
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
num = counter
while num > 0 :
time. sleep( 3 )
num -= 1
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
class MyThread ( threading. Thread) :
def __init__ ( self, target, args: tuple ) :
super ( ) . __init__( )
self. target = target
self. args = args
def run ( self) :
self. target( * self. args)
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = MyThread( target= task1, args= ( 7 , ) )
t2 = MyThread( target= task1, args= ( 5 , ) )
t3 = MyThread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
多線程同步 如果多個線程共同對某個數據修改,則可能出現不可預料的后果,這時候就需要某些同步機制。比如如下代碼,結果是隨機的(個人電腦用python3.13實測結果都是0,而低版本的python3.6運行結果的確是隨機的)
import threading
import time
num = 0
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
global num
for _ in range ( 100000000 ) :
num = num + counter
num = num - counter
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t4 = threading. Thread( target= task1, args= ( 6 , ) )
t5 = threading. Thread( target= task1, args= ( 8 , ) )
t1. start( )
t2. start( )
t3. start( )
t4. start( )
t5. start( )
t1. join( )
t2. join( )
t3. join( )
t4. join( )
t5. join( )
print ( f"num: { num} " )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
Lock-鎖 使用互斥鎖可以在一個線程訪問數據時,拒絕其它線程訪問,直到解鎖。threading.Thread
中的Lock()
和Rlock()
可以提供鎖功能。
import threading
import time
num = 0
mutex = threading. Lock( )
def task1 ( counter: int ) :
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , start time: { time. strftime( '%F %T' ) } " )
global num
mutex. acquire( )
for _ in range ( 100000 ) :
num = num + counter
num = num - counter
mutex. release( )
print ( f"thread: { threading. current_thread( ) . name} , args: { counter} , end time: { time. strftime( '%F %T' ) } " )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
t1 = threading. Thread( target= task1, args= ( 7 , ) )
t2 = threading. Thread( target= task1, args= ( 5 , ) )
t3 = threading. Thread( target= task1, args= ( 3 , ) )
t1. start( )
t2. start( )
t3. start( )
t1. join( )
t2. join( )
t3. join( )
print ( f"num: { num} " )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
Semaphore-信號量 互斥鎖是只允許一個線程訪問共享數據,而信號量是同時允許一定數量的線程訪問共享數據。比如銀行有5個窗口,允許同時有5個人辦理業務,后面的人只能等待,待柜臺有空閑才可以進入。
import threading
import time
from random import randint
semaphore = threading. BoundedSemaphore( 5 )
def business ( name: str ) :
semaphore. acquire( )
print ( f" { time. strftime( '%F %T' ) } { name} is handling" )
time. sleep( randint( 3 , 10 ) )
print ( f" { time. strftime( '%F %T' ) } { name} is done" )
semaphore. release( )
if __name__ == "__main__" :
print ( f"main thread: { threading. current_thread( ) . name} , start time: { time. strftime( '%F %T' ) } " )
threads = [ ]
for i in range ( 10 ) :
t = threading. Thread( target= business, args= ( f"thread- { i} " , ) )
threads. append( t)
for t in threads:
t. start( )
for t in threads:
t. join( )
print ( f"main thread: { threading. current_thread( ) . name} , end time: { time. strftime( '%F %T' ) } " )
執行輸出
main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30
Condition-條件對象 Condition
對象能讓一個線程A停下來,等待其他線程,其他線程通知后線程A繼續運行。
import threading
import time
import random
class Employee ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Condition) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
with self. cond:
print ( f" { time. strftime( '%F %T' ) } { self. username} 到達公司" )
self. cond. wait( )
print ( f" { time. strftime( '%F %T' ) } { self. username} 開始工作" )
time. sleep( random. randint( 1 , 5 ) )
print ( f" { time. strftime( '%F %T' ) } { self. username} 工作完成" )
class Boss ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Condition) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
with self. cond:
print ( f" { time. strftime( '%F %T' ) } { self. username} 發出通知" )
self. cond. notify_all( )
time. sleep( 2 )
if __name__ == "__main__" :
cond = threading. Condition( )
boss = Boss( "老王" , cond)
employees = [ ]
for i in range ( 5 ) :
employees. append( Employee( f"員工 { i} " , cond) )
for employee in employees:
employee. start( )
boss. start( )
boss. join( )
for employee in employees:
employee. join( )
執行輸出
2024-10-26 21:16:20 員工0 到達公司
2024-10-26 21:16:20 員工1 到達公司
2024-10-26 21:16:20 員工2 到達公司
2024-10-26 21:16:20 員工3 到達公司
2024-10-26 21:16:20 員工4 到達公司
2024-10-26 21:16:20 老王 發出通知
2024-10-26 21:16:20 員工4 開始工作
2024-10-26 21:16:23 員工4 工作完成
2024-10-26 21:16:23 員工1 開始工作
2024-10-26 21:16:28 員工1 工作完成
2024-10-26 21:16:28 員工2 開始工作
2024-10-26 21:16:30 員工2 工作完成
2024-10-26 21:16:30 員工0 開始工作
2024-10-26 21:16:31 員工0 工作完成
2024-10-26 21:16:31 員工3 開始工作
2024-10-26 21:16:32 員工3 工作完成
Event-事件 在 Python 的 threading
模塊中,Event
是一個線程同步原語,用于在多個線程之間進行簡單的通信。Event
對象維護一個內部標志,線程可以使用 wait()
方法阻塞,直到另一個線程調用 set()
方法將標志設置為 True
。一旦標志被設置為 True
,所有等待的線程將被喚醒并繼續執行。
Event
的主要方法
set()
:將事件的內部標志設置為 True
,并喚醒所有等待的線程。clear()
:將事件的內部標志設置為 False
。is_set()
:返回事件的內部標志是否為 True
。wait(timeout=None)
:如果事件的內部標志為 False
,則阻塞當前線程,直到標志被設置為 True
或超時(如果指定了 timeout
)。import threading
import time
import random
class Employee ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Event) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
print ( f" { time. strftime( '%F %T' ) } { self. username} 到達公司" )
self. cond. wait( )
print ( f" { time. strftime( '%F %T' ) } { self. username} 開始工作" )
time. sleep( random. randint( 1 , 5 ) )
print ( f" { time. strftime( '%F %T' ) } { self. username} 工作完成" )
class Boss ( threading. Thread) :
def __init__ ( self, username: str , cond: threading. Event) :
self. username = username
self. cond = cond
super ( ) . __init__( )
def run ( self) :
print ( f" { time. strftime( '%F %T' ) } { self. username} 發出通知" )
self. cond. set ( )
if __name__ == "__main__" :
cond = threading. Event( )
boss = Boss( "老王" , cond)
employees = [ ]
for i in range ( 5 ) :
employees. append( Employee( f"員工 { i} " , cond) )
for employee in employees:
employee. start( )
boss. start( )
boss. join( )
for employee in employees:
employee. join( )
執行輸出
2024-10-26 21:22:28 員工0 到達公司
2024-10-26 21:22:28 員工1 到達公司
2024-10-26 21:22:28 員工2 到達公司
2024-10-26 21:22:28 員工3 到達公司
2024-10-26 21:22:28 員工4 到達公司
2024-10-26 21:22:28 老王 發出通知
2024-10-26 21:22:28 員工0 開始工作
2024-10-26 21:22:28 員工1 開始工作
2024-10-26 21:22:28 員工3 開始工作
2024-10-26 21:22:28 員工4 開始工作
2024-10-26 21:22:28 員工2 開始工作
2024-10-26 21:22:30 員工3 工作完成
2024-10-26 21:22:31 員工4 工作完成
2024-10-26 21:22:31 員工2 工作完成
2024-10-26 21:22:32 員工0 工作完成
2024-10-26 21:22:32 員工1 工作完成
使用隊列 Python的queue
模塊提供同步、線程安全的隊列類。以下示例為使用queue實現的生產消費者模型
import threading
import time
import random
import queue
class Producer ( threading. Thread) :
"""多線程生產者類."""
def __init__ (
self, tname: str , channel: queue. Queue, done: threading. Event
) :
self. tname = tname
self. channel = channel
self. done = done
super ( ) . __init__( )
def run ( self) - > None :
"""Method representing the thread's activity."""
while True :
if self. done. is_set( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} 收到停止信號事件"
)
break
if self. channel. full( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 隊列已滿, 全部停止生產"
)
self. done. set ( )
else :
num = random. randint( 100 , 1000 )
self. channel. put( f" { self. tname} - { num} " )
print (
f" { time. strftime( '%F %T' ) } { self. tname} 生成數據 { num} , queue size: { self. channel. qsize( ) } "
)
time. sleep( random. randint( 1 , 5 ) )
class Consumer ( threading. Thread) :
"""多線程消費者類."""
def __init__ (
self, tname: str , channel: queue. Queue, done: threading. Event
) :
self. tname = tname
self. channel = channel
self. done = done
self. counter = 0
super ( ) . __init__( )
def run ( self) - > None :
"""Method representing the thread's activity."""
while True :
if self. done. is_set( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} 收到停止信號事件"
)
break
if self. counter >= 3 :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 全部停止消費"
)
self. done. set ( )
continue
if self. channel. empty( ) :
print (
f" { time. strftime( '%F %T' ) } { self. tname} report: 隊列為空, counter: { self. counter} "
)
self. counter += 1
time. sleep( 1 )
continue
else :
data = self. channel. get( )
print (
f" { time. strftime( '%F %T' ) } { self. tname} 消費數據 { data} , queue size: { self. channel. qsize( ) } "
)
time. sleep( random. randint( 1 , 5 ) )
self. counter = 0
if __name__ == "__main__" :
done_p = threading. Event( )
done_c = threading. Event( )
channel = queue. Queue( 30 )
threads_producer = [ ]
threads_consumer = [ ]
for i in range ( 8 ) :
threads_producer. append( Producer( f"producer- { i} " , channel, done_p) )
for i in range ( 6 ) :
threads_consumer. append( Consumer( f"consumer- { i} " , channel, done_c) )
for t in threads_producer:
t. start( )
for t in threads_consumer:
t. start( )
for t in threads_producer:
t. join( )
for t in threads_consumer:
t. join( )
線程池 在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或其他更多資源。在多線程程序中,生成一個新線程之后銷毀,然后再創建一個,這種方式就很低效。池化多線程,也就是線程池就為此而生。
將任務添加到線程池中,線程池會自動指定一個空閑的線程去執行任務,當超過最大線程數時,任務需要等待有新的空閑線程才會被執行。Python一般可以使用multiprocessing
模塊中的Pool
來創建線程池。
import time
from multiprocessing. dummy import Pool as ThreadPool
def foo ( n) :
time. sleep( 2 )
if __name__ == "__main__" :
start = time. time( )
for n in range ( 5 ) :
foo( n)
print ( "single thread time: " , time. time( ) - start)
start = time. time( )
t_pool = ThreadPool( processes= 5 )
rst = t_pool. map ( foo, range ( 5 ) )
t_pool. close( )
t_pool. join( )
print ( "thread pool time: " , time. time( ) - start)
線程池執行器 python的內置模塊concurrent.futures
提供了ThreadPoolExecutor
類。這個類結合了線程和隊列的優勢,可以用來平行執行任務。
import time
from random import randint
from concurrent. futures import ThreadPoolExecutor
def foo ( ) - > None :
time. sleep( 2 )
return randint( 1 , 100 )
if __name__ == "__main__" :
start = time. time( )
futures = [ ]
with ThreadPoolExecutor( max_workers= 5 ) as executor:
for n in range ( 10 ) :
futures. append( executor. submit( foo) )
for future in futures:
print ( future. result( ) )
print ( "thread pool executor time: " , time. time( ) - start)
執行輸出
44
19
86
48
35
74
59
99
58
53
thread pool executor time: 4.001955032348633
ThreadPoolExecutor
類的最大優點在于:如果調用者通過submit
方法把某項任務交給它執行,那么會獲得一個與該任務相對應的Future
實例,當調用者在這個實例上通過result
方法獲取執行結果時,ThreadPoolExecutor
會把它在執行任務的過程中所遇到的異常自動拋給調用者。而ThreadPoolExecutor
類的缺點是IO并行能力不高,即便把max_worker
設為100,也無法高效處理任務。更高需求的IO任務可以考慮換異步協程方案。
轉自https://www.cnblogs.com/XY-Heruo/p/18514316
該文章在 2025/1/17 10:06:25 編輯過