LOGO OA教程 ERP教程 模切知識交流 PMS教程 CRM教程 開發文檔 其他文檔  
 
網站管理員

Python多線程快速入門

freeflydom
2025年1月17日 9:57 本文熱度 373

前言

線程是操作系統能夠進行運算調度的最小單位,它被包含在進程之中,是進程中的實際運作單位。由于CPython的GIL限制,多線程實際為單線程,大多只用來處理IO密集型任務。

Python一般用標準庫threading來進行多線程編程。

基本使用

  • 方式1,創建threading.Thread類的示例
import threading
import time
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

執行輸出示例

main thread: MainThread, start time: 2024-10-26 12:42:37
thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
main thread: MainThread, end time: 2024-10-26 12:42:58
  • 方式2,繼承threading.Thread類,重寫run()__init__()方法
import threading
import time
class MyThread(threading.Thread):
    def __init__(self, counter: int):
        super().__init__()
        self.counter = counter
    def run(self):
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, start time: {time.strftime('%F %T')}")
        num = self.counter
        while num > 0:
            time.sleep(3)
            num -= 1
        print(f"thread: {threading.current_thread().name}, args: {self.counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創建三個線程
    t1 = MyThread(7)
    t2 = MyThread(5)
    t3 = MyThread(3)
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

繼承threading.Thread類也可以寫成這樣,調用外部函數。

import threading
import time
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    num = counter
    while num > 0:
        time.sleep(3)
        num -= 1
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
class MyThread(threading.Thread):
    def __init__(self, target, args: tuple):
        super().__init__()
        self.target = target
        self.args = args
    
    def run(self):
        self.target(*self.args)
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創建三個線程
    t1 = MyThread(target=task1, args=(7,))
    t2 = MyThread(target=task1, args=(5,))
    t3 = MyThread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執行完畢
    t1.join()
    t2.join()
    t3.join()
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

多線程同步

如果多個線程共同對某個數據修改,則可能出現不可預料的后果,這時候就需要某些同步機制。比如如下代碼,結果是隨機的(個人電腦用python3.13實測結果都是0,而低版本的python3.6運行結果的確是隨機的)

import threading
import time
num = 0
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    for _ in range(100000000):
        num = num + counter
        num = num - counter
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    t4 = threading.Thread(target=task1, args=(6,))
    t5 = threading.Thread(target=task1, args=(8,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()
    # join() 用于阻塞主線程, 等待子線程執行完畢
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Lock-鎖

使用互斥鎖可以在一個線程訪問數據時,拒絕其它線程訪問,直到解鎖。threading.Thread中的Lock()Rlock()可以提供鎖功能。

import threading
import time
num = 0
mutex = threading.Lock()
def task1(counter: int):
    print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
    global num
    mutex.acquire()
    for _ in range(100000):
        num = num + counter
        num = num - counter
    mutex.release()
    print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    # 創建三個線程
    t1 = threading.Thread(target=task1, args=(7,))
    t2 = threading.Thread(target=task1, args=(5,))
    t3 = threading.Thread(target=task1, args=(3,))
    # 啟動線程
    t1.start()
    t2.start()
    t3.start()
    # join() 用于阻塞主線程, 等待子線程執行完畢
    t1.join()
    t2.join()
    t3.join()
    
    print(f"num: {num}")
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

Semaphore-信號量

互斥鎖是只允許一個線程訪問共享數據,而信號量是同時允許一定數量的線程訪問共享數據。比如銀行有5個窗口,允許同時有5個人辦理業務,后面的人只能等待,待柜臺有空閑才可以進入。

import threading
import time
from random import randint
semaphore = threading.BoundedSemaphore(5)
def business(name: str):
    semaphore.acquire()
    print(f"{time.strftime('%F %T')} {name} is handling")
    time.sleep(randint(3, 10))
    print(f"{time.strftime('%F %T')} {name} is done")
    semaphore.release()
if __name__ == "__main__":
    print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
    threads = []
    for i in range(10):
        t = threading.Thread(target=business, args=(f"thread-{i}",))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    
    print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")

執行輸出

main thread: MainThread, start time: 2024-10-26 17:40:10
2024-10-26 17:40:10 thread-0 is handling
2024-10-26 17:40:10 thread-1 is handling
2024-10-26 17:40:10 thread-2 is handling
2024-10-26 17:40:10 thread-3 is handling
2024-10-26 17:40:10 thread-4 is handling
2024-10-26 17:40:15 thread-2 is done
2024-10-26 17:40:15 thread-5 is handling
2024-10-26 17:40:16 thread-0 is done
2024-10-26 17:40:16 thread-6 is handling
2024-10-26 17:40:19 thread-3 is done
2024-10-26 17:40:19 thread-4 is done
2024-10-26 17:40:19 thread-7 is handling
2024-10-26 17:40:19 thread-8 is handling
2024-10-26 17:40:20 thread-1 is done
2024-10-26 17:40:20 thread-9 is handling
2024-10-26 17:40:21 thread-6 is done
2024-10-26 17:40:23 thread-7 is done
2024-10-26 17:40:24 thread-5 is done
2024-10-26 17:40:24 thread-8 is done
2024-10-26 17:40:30 thread-9 is done
main thread: MainThread, end time: 2024-10-26 17:40:30

Condition-條件對象

Condition對象能讓一個線程A停下來,等待其他線程,其他線程通知后線程A繼續運行。

import threading
import time
import random
class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 到達公司")
            self.cond.wait()  # 等待通知
            print(f"{time.strftime('%F %T')} {self.username} 開始工作")
            time.sleep(random.randint(1, 5))
            print(f"{time.strftime('%F %T')} {self.username} 工作完成")
class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Condition):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        with self.cond:
            print(f"{time.strftime('%F %T')} {self.username} 發出通知")
            self.cond.notify_all()  # 通知所有線程
        time.sleep(2)
if __name__ == "__main__":
    cond = threading.Condition()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"員工{i}", cond))
    for employee in employees:
        employee.start()
    boss.start()
    boss.join()
    for employee in employees:
        employee.join()

執行輸出

2024-10-26 21:16:20 員工0 到達公司
2024-10-26 21:16:20 員工1 到達公司
2024-10-26 21:16:20 員工2 到達公司
2024-10-26 21:16:20 員工3 到達公司
2024-10-26 21:16:20 員工4 到達公司
2024-10-26 21:16:20 老王 發出通知
2024-10-26 21:16:20 員工4 開始工作
2024-10-26 21:16:23 員工4 工作完成
2024-10-26 21:16:23 員工1 開始工作
2024-10-26 21:16:28 員工1 工作完成
2024-10-26 21:16:28 員工2 開始工作
2024-10-26 21:16:30 員工2 工作完成
2024-10-26 21:16:30 員工0 開始工作
2024-10-26 21:16:31 員工0 工作完成
2024-10-26 21:16:31 員工3 開始工作
2024-10-26 21:16:32 員工3 工作完成

Event-事件

在 Python 的 threading 模塊中,Event 是一個線程同步原語,用于在多個線程之間進行簡單的通信。Event 對象維護一個內部標志,線程可以使用 wait() 方法阻塞,直到另一個線程調用 set() 方法將標志設置為 True。一旦標志被設置為 True,所有等待的線程將被喚醒并繼續執行。

Event 的主要方法

  1. set():將事件的內部標志設置為 True,并喚醒所有等待的線程。
  2. clear():將事件的內部標志設置為 False
  3. is_set():返回事件的內部標志是否為 True
  4. wait(timeout=None):如果事件的內部標志為 False,則阻塞當前線程,直到標志被設置為 True 或超時(如果指定了 timeout)。
import threading
import time
import random
class Employee(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 到達公司")
        self.cond.wait()  # 等待事件標志為True
        print(f"{time.strftime('%F %T')} {self.username} 開始工作")
        time.sleep(random.randint(1, 5))
        print(f"{time.strftime('%F %T')} {self.username} 工作完成")
class Boss(threading.Thread):
    def __init__(self, username: str, cond: threading.Event):
        self.username = username
        self.cond = cond
        super().__init__()
    def run(self):
        print(f"{time.strftime('%F %T')} {self.username} 發出通知")
        self.cond.set()
if __name__ == "__main__":
    cond = threading.Event()
    boss = Boss("老王", cond)
    
    employees = []
    for i in range(5):
        employees.append(Employee(f"員工{i}", cond))
    for employee in employees:
        employee.start()
    boss.start()
    boss.join()
    for employee in employees:
        employee.join()

執行輸出

2024-10-26 21:22:28 員工0 到達公司
2024-10-26 21:22:28 員工1 到達公司
2024-10-26 21:22:28 員工2 到達公司
2024-10-26 21:22:28 員工3 到達公司
2024-10-26 21:22:28 員工4 到達公司
2024-10-26 21:22:28 老王 發出通知
2024-10-26 21:22:28 員工0 開始工作
2024-10-26 21:22:28 員工1 開始工作
2024-10-26 21:22:28 員工3 開始工作
2024-10-26 21:22:28 員工4 開始工作
2024-10-26 21:22:28 員工2 開始工作
2024-10-26 21:22:30 員工3 工作完成
2024-10-26 21:22:31 員工4 工作完成
2024-10-26 21:22:31 員工2 工作完成
2024-10-26 21:22:32 員工0 工作完成
2024-10-26 21:22:32 員工1 工作完成

使用隊列

Python的queue模塊提供同步、線程安全的隊列類。以下示例為使用queue實現的生產消費者模型

import threading
import time
import random
import queue
class Producer(threading.Thread):
    """多線程生產者類."""
    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        super().__init__()
    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信號事件"
                )
                break
            if self.channel.full():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 隊列已滿, 全部停止生產"
                )
                self.done.set()
            else:
                num = random.randint(100, 1000)
                self.channel.put(f"{self.tname}-{num}")
                print(
                    f"{time.strftime('%F %T')} {self.tname} 生成數據 {num}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))
class Consumer(threading.Thread):
    """多線程消費者類."""
    def __init__(
        self, tname: str, channel: queue.Queue, done: threading.Event
    ):
        self.tname = tname
        self.channel = channel
        self.done = done
        self.counter = 0
        super().__init__()
    def run(self) -> None:
        """Method representing the thread's activity."""
        while True:
            if self.done.is_set():
                print(
                    f"{time.strftime('%F %T')} {self.tname} 收到停止信號事件"
                )
                break
            if self.counter >= 3:
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 全部停止消費"
                )
                self.done.set()
                continue
            if self.channel.empty():
                print(
                    f"{time.strftime('%F %T')} {self.tname} report: 隊列為空, counter: {self.counter}"
                )
                self.counter += 1
                time.sleep(1)
                continue
            else:
                data = self.channel.get()
                print(
                    f"{time.strftime('%F %T')} {self.tname} 消費數據 {data}, queue size: {self.channel.qsize()}"
                )
                time.sleep(random.randint(1, 5))
                self.counter = 0
if __name__ == "__main__":
    done_p = threading.Event()
    done_c = threading.Event()
    channel = queue.Queue(30)
    threads_producer = []
    threads_consumer = []
    for i in range(8):
        threads_producer.append(Producer(f"producer-{i}", channel, done_p))
    for i in range(6):
        threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))
    for t in threads_producer:
        t.start()
    for t in threads_consumer:
        t.start()
    for t in threads_producer:
        t.join()
    for t in threads_consumer:
        t.join()

線程池

在面向對象編程中,創建和銷毀對象是很費時間的,因為創建一個對象要獲取內存資源或其他更多資源。在多線程程序中,生成一個新線程之后銷毀,然后再創建一個,這種方式就很低效。池化多線程,也就是線程池就為此而生。

將任務添加到線程池中,線程池會自動指定一個空閑的線程去執行任務,當超過最大線程數時,任務需要等待有新的空閑線程才會被執行。Python一般可以使用multiprocessing模塊中的Pool來創建線程池。

import time
from multiprocessing.dummy import Pool as ThreadPool
def foo(n):
    time.sleep(2)
if __name__ == "__main__":
    start = time.time()
    for n in range(5):
        foo(n)
    print("single thread time: ", time.time() - start)
    start = time.time()
    t_pool = ThreadPool(processes=5)  # 創建線程池, 指定池中的線程數為5(默認為CPU數)
    rst = t_pool.map(foo, range(5))  # 使用map為每個元素應用到foo函數
    t_pool.close()  # 阻止任何新的任務提交到線程池
    t_pool.join()  # 等待所有已提交的任務完成
    print("thread pool time: ", time.time() - start)

線程池執行器

python的內置模塊concurrent.futures提供了ThreadPoolExecutor類。這個類結合了線程和隊列的優勢,可以用來平行執行任務。

import time
from random import randint
from concurrent.futures import ThreadPoolExecutor
def foo() -> None:
    time.sleep(2)
    return randint(1,100)
if __name__ == "__main__":
    start = time.time()
    futures = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        for n in range(10):
            futures.append(executor.submit(foo))  # Fan out
            
    for future in futures:  # Fan in
        print(future.result())
    print("thread pool executor time: ", time.time() - start)

執行輸出

44
19
86
48
35
74
59
99
58
53
thread pool executor time:  4.001955032348633

ThreadPoolExecutor類的最大優點在于:如果調用者通過submit方法把某項任務交給它執行,那么會獲得一個與該任務相對應的Future實例,當調用者在這個實例上通過result方法獲取執行結果時,ThreadPoolExecutor會把它在執行任務的過程中所遇到的異常自動拋給調用者。而ThreadPoolExecutor類的缺點是IO并行能力不高,即便把max_worker設為100,也無法高效處理任務。更高需求的IO任務可以考慮換異步協程方案。

轉自https://www.cnblogs.com/XY-Heruo/p/18514316


該文章在 2025/1/17 10:06:25 編輯過
關鍵字查詢
相關文章
正在查詢...
點晴ERP是一款針對中小制造業的專業生產管理軟件系統,系統成熟度和易用性得到了國內大量中小企業的青睞。
點晴PMS碼頭管理系統主要針對港口碼頭集裝箱與散貨日常運作、調度、堆場、車隊、財務費用、相關報表等業務管理,結合碼頭的業務特點,圍繞調度、堆場作業而開發的。集技術的先進性、管理的有效性于一體,是物流碼頭及其他港口類企業的高效ERP管理信息系統。
點晴WMS倉儲管理系統提供了貨物產品管理,銷售管理,采購管理,倉儲管理,倉庫管理,保質期管理,貨位管理,庫位管理,生產管理,WMS管理系統,標簽打印,條形碼,二維碼管理,批號管理軟件。
點晴免費OA是一款軟件和通用服務都免費,不限功能、不限時間、不限用戶的免費OA協同辦公管理系統。
Copyright 2010-2025 ClickSun All Rights Reserved

黄频国产免费高清视频,久久不卡精品中文字幕一区,激情五月天AV电影在线观看,欧美国产韩国日本一区二区
中出一区二区免费视频 | 日本最新免费的一区二区 | 中出仑乱中文字幕在线 | 日本免费午夜啪视频 | 欧美成αⅴ人在线观看 | 午夜福利不卡片在线播放免费 |