引言
在 C# 開(kāi)發(fā)中,合理控制并發(fā)任務(wù)數(shù)量是確保應(yīng)用程序高效、穩(wěn)定運(yùn)行的關(guān)鍵。過(guò)多的并發(fā)任務(wù)可能會(huì)耗盡系統(tǒng)資源,導(dǎo)致性能下降和不穩(wěn)定。本文將深入探討幾種有效的方法來(lái)限制 C# 中的并發(fā)任務(wù)數(shù)量,并通過(guò)具體的應(yīng)用場(chǎng)景和示例代碼展示如何實(shí)現(xiàn)這些方法。
使用 SemaphoreSlim
SemaphoreSlim
是一個(gè)輕量級(jí)的同步原語(yǔ),用于控制訪問(wèn)某一資源或資源池的線程數(shù)。通過(guò)它,我們可以很容易地限制并發(fā)任務(wù)的數(shù)量。當(dāng)你有一個(gè)需要訪問(wèn)共享資源(如數(shù)據(jù)庫(kù)連接池)的任務(wù)列表,但希望同時(shí)執(zhí)行的任務(wù)數(shù)量不超過(guò)某個(gè)特定值時(shí),可以使用SemaphoreSlim
。
示例
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
int maxConcurrentTasks = 3;
SemaphoreSlim semaphore = new SemaphoreSlim(maxConcurrentTasks);
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
try
{
// 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù)
Console.WriteLine($"Task {Task.CurrentId} started.");
await Task.Delay(TimeSpan.FromSeconds(2));
Console.WriteLine($"Task {Task.CurrentId} completed.");
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
Console.WriteLine("All tasks completed.");
}
}
在這個(gè)示例中,我們限制了最多只有 3 個(gè)任務(wù)可以同時(shí)運(yùn)行。通過(guò)對(duì)SemaphoreSlim
的調(diào)用,我們確保了當(dāng)達(dá)到最大并發(fā)任務(wù)數(shù)量時(shí),其他任務(wù)將會(huì)等待直到某個(gè)任務(wù)完成并釋放信號(hào)量。
使用 TPL Dataflow
TPL (Task Parallel Library) Dataflow 提供了一個(gè)更高級(jí)的方式來(lái)處理數(shù)據(jù)流和并發(fā)任務(wù),通過(guò)它可以很容易地限制并發(fā)任務(wù)的數(shù)量。當(dāng)你需要處理一系列的數(shù)據(jù)或任務(wù),并且每個(gè)任務(wù)都可能需要一些時(shí)間來(lái)完成,同時(shí)你想要限制同時(shí)處理這些任務(wù)的數(shù)量時(shí),可以使用 TPL Dataflow。
示例
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3 // 最大并發(fā)任務(wù)數(shù)量
};
var block = new ActionBlock<int>(async n =>
{
Console.WriteLine($"Processing {n}...");
await Task.Delay(TimeSpan.FromSeconds(1)); // 模擬異步操作
Console.WriteLine($"Processed {n}.");
}, options);
for (int i = 0; i < 100; i++)
{
block.Post(i);
}
block.Complete();
await block.Completion;
Console.WriteLine("All tasks completed.");
}
}
在這個(gè)示例中,ActionBlock
被用來(lái)處理一系列的任務(wù),通過(guò)設(shè)置ExecutionDataflowBlockOptions
中的MaxDegreeOfParallelism
屬性,我們限制了最大的并發(fā)任務(wù)數(shù)量。
使用 Parallel.ForEach
Parallel.ForEach
是 .NET 中用于并行處理集合元素的方法,它可以指定MaxDegreeOfParallelism
參數(shù)來(lái)限制并發(fā)任務(wù)的數(shù)量。
示例
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string> { /* 一系列 URL */ };
var options = new ParallelOptions { MaxDegreeOfParallelism = 3 };
await Parallel.ForEachAsync(urls, options, async (url, _) =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
}
}
在這個(gè)示例中,我們使用Parallel.ForEachAsync
方法來(lái)并行下載多個(gè)網(wǎng)頁(yè)內(nèi)容,并通過(guò)設(shè)置MaxDegreeOfParallelism
為 3 來(lái)限制同時(shí)進(jìn)行的下載任務(wù)數(shù)量。
使用 Polly Bulkhead
Polly 是一個(gè)強(qiáng)大的 .NET 錯(cuò)誤處理和彈性庫(kù),它的 Bulkhead 隔板策略可以限制并發(fā)任務(wù)的數(shù)量,并可以選擇將超過(guò)該數(shù)量的任務(wù)排隊(duì)。
示例
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Polly;
class Program
{
static async Task Main(string[] args)
{
var bulkhead = Policy.BulkheadAsync(3, Int32.MaxValue); // 最大并發(fā)任務(wù)數(shù)量為 3
var urls = new List<string> { /* 一系列 URL */ };
var tasks = new List<Task>();
foreach (var url in urls)
{
var t = bulkhead.ExecuteAsync(async () =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
tasks.Add(t);
}
await Task.WhenAll(tasks);
}
}
在這個(gè)示例中,我們使用 Polly 的 Bulkhead 隔板策略來(lái)限制并發(fā)下載任務(wù)的數(shù)量,并將超過(guò)最大并發(fā)數(shù)量的任務(wù)自動(dòng)排隊(duì)。
使用 Task.WhenAny
Task.WhenAny
方法可以用于限制并發(fā)任務(wù)的數(shù)量,其基本思路是維護(hù)一個(gè)任務(wù)列表,當(dāng)任務(wù)數(shù)量達(dá)到閾值時(shí),等待其中一個(gè)任務(wù)完成,然后繼續(xù)添加新任務(wù)。
示例
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string> { /* 一系列 URL */ };
var maxConcurrentTasks = 3;
var tasks = new List<Task>();
foreach (var url in urls)
{
tasks.Add(Task.Run(async () =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}));
if (tasks.Count >= maxConcurrentTasks)
{
await Task.WhenAny(tasks);
tasks = tasks.Where(t => t.Status == TaskStatus.Running).ToList();
}
}
await Task.WhenAll(tasks);
}
}
在這個(gè)示例中,我們通過(guò)Task.WhenAny
方法來(lái)等待任務(wù)列表中的任意一個(gè)任務(wù)完成,然后移除已完成的任務(wù),從而保持并發(fā)任務(wù)的數(shù)量不超過(guò)閾值。
總結(jié)
限制并發(fā)任務(wù)的數(shù)量是確保應(yīng)用程序穩(wěn)定和高效運(yùn)行的關(guān)鍵。在 C# 中,我們可以使用SemaphoreSlim
、TPL Dataflow、Parallel.ForEach
、Polly Bulkhead 和Task.WhenAny
等多種方法來(lái)輕松實(shí)現(xiàn)這一目標(biāo)。根據(jù)具體的業(yè)務(wù)需求和應(yīng)用場(chǎng)景,選擇最合適的方法來(lái)控制并發(fā)任務(wù)的數(shù)量,可以有效避免資源過(guò)度消耗,提高系統(tǒng)的響應(yīng)速度和可靠性。
該文章在 2024/12/24 11:48:49 編輯過(guò)