1.簡述
并發(fā)通過管理多個任務的執(zhí)行順序,確保系統(tǒng)在高負載下仍能保持響應性;并行則利用多處理器或多核心硬件,真正同時執(zhí)行任務,以加速計算。這兩者在高性能計算、實時系統(tǒng)和用戶交互應用中發(fā)揮著不可替代的作用。
在多核處理器時代,傳統(tǒng)串行編程已無法充分利用硬件潛力。并行計算通過將任務分解到多個核心執(zhí)行,顯著縮短了計算時間。然而,并發(fā)與并行的實現(xiàn)并非沒有代價,它們引入了諸如競爭條件、死鎖和負載均衡等復雜問題,需要開發(fā)者具備深厚的理論基礎和實踐經(jīng)驗。
2.并發(fā)與并行
2.1 定義
并發(fā)(Concurrency):
- 指系統(tǒng)在一段時間內管理多個任務的能力。并發(fā)關注任務的協(xié)調與交錯執(zhí)行,通過時間分片等技術在一個或多個處理器上實現(xiàn),因此并發(fā)看似同時進行,但不一定在同一時刻執(zhí)行。
- 并發(fā)強調任務的邏輯組織和協(xié)調。
- 舉例:一個Web服務器可以并發(fā)處理多個客戶端請求,通過快速切換任務確保每個請求都能及時響應。
并行(Parallelism):
- 指多個任務在同一時刻真正同時執(zhí)行,通常依賴于多核處理器或分布式系統(tǒng)。其核心目標是提升計算速度,通過將問題分解為獨立的子任務并同時處理。并行適用于計算密集型任務。
- 并行關注物理執(zhí)行的并行性。
- 舉例:在并行矩陣乘法中,不同的核心可以同時計算矩陣的不同部分,從而顯著縮短總計算時間;科學模擬或圖像處理,其效果依賴于多核處理器、GPU或分布式計算系統(tǒng)的硬件支持。
2.2 區(qū)別

并發(fā)與并行的根本區(qū)別在于執(zhí)行的時間性和資源依賴性:
- 執(zhí)行模式:并行強調真正的同時執(zhí)行,而并發(fā)通過任務切換營造同時進行的假象。
- 硬件依賴:并行需要多處理器或多核心支持,而并發(fā)在單核系統(tǒng)上即可實現(xiàn)。
- 目標:并行旨在加速計算,而并發(fā)注重系統(tǒng)響應性和多任務處理能力。
例如,在單核系統(tǒng)中,操作系統(tǒng)通過時間片輪轉調度多個線程;而多核系統(tǒng)中,線程可以分配到不同核心并行運行。
3.實現(xiàn)并發(fā)
3.1 并行實現(xiàn)并發(fā)
在多核處理器上,任務可以分配到不同核心并行執(zhí)行,從而實現(xiàn)高效并發(fā)。例如,Web服務器通過多線程并行處理客戶端請求。
代碼示例:多線程并行處理 :System.Threading.ThreadPool來創(chuàng)建和管理線程池,并使用ManualResetEventSlim來等待所有任務完成。
using System;
using System.Collections.Generic;
using System.Threading;
class Program
{
static void Main(string[] args)
{
List<Request> requests = new List<Request>
{
new Request { Data = "Request1" },
new Request { Data = "Request2" },
new Request { Data = "Request3" }
};
process_requests(requests);
Console.WriteLine("All requests processed.");
}
static void process_requests(List<Request> requests)
{
int num_cores = Environment.ProcessorCount;
ManualResetEventSlim[] mres = new ManualResetEventSlim[requests.Count];
for (int i = 0; i < requests.Count; i++)
{
int index = i;
mres[index] = new ManualResetEventSlim(false);
ThreadPool.QueueUserWorkItem((state) =>
{
handle_request(requests[index]);
mres[index].Set();
});
}
ManualResetEventSlim.WaitAll(mres);
}
static void handle_request(Request request)
{
Response response = process(request);
send_response(response);
}
static Response process(Request request)
{
Console.WriteLine($"Processing request: {request.Data}");
Thread.Sleep(1000);
return new Response { Data = $"Response for {request.Data}" };
}
static void send_response(Response response)
{
Console.WriteLine($"Sending response: {response.Data}");
}
}
class Request
{
public string Data { get; set; }
}
class Response
{
public string Data { get; set; }
}
======================================================================================================================
使用Task.Run和Task.WhenAll來實現(xiàn)
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
List<Request> requests = new List<Request>
{
new Request { Data = "Request1" },
new Request { Data = "Request2" },
new Request { Data = "Request3" }
};
await process_requests(requests);
Console.WriteLine("All requests processed.");
}
static async Task process_requests(List<Request> requests)
{
List<Task> tasks = new List<Task>();
foreach (Request request in requests)
{
Task task = Task.Run(() => handle_request(request));
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
static async Task handle_request(Request request)
{
Response response = await process(request);
send_response(response);
}
static async Task<Response> process(Request request)
{
Console.WriteLine($"Processing request: {request.Data}");
await Task.Delay(1000);
return new Response { Data = $"Response for {request.Data}" };
}
static void send_response(Response response)
{
Console.WriteLine($"Sending response: {response.Data}");
}
}
class Request
{
public string Data { get; set; }
}
class Response
{
public string Data { get; set; }
}
3.2 任務調度
在單核處理器上,通過時間片輪轉等調度算法實現(xiàn)并發(fā)。操作系統(tǒng)在任務間快速切換,營造同時執(zhí)行的假象。
代碼示例:時間片輪轉調度 :示例使用了Task和CancellationToken來管理任務的時間片輪轉調度。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
List<Task> tasks = new List<Task>
{
run_task("Task1", 5000),
run_task("Task2", 3000),
run_task("Task3", 7000)
};
int time_slice = 1000;
await scheduler(tasks, time_slice);
Console.WriteLine("All tasks processed.");
}
static async Task scheduler(List<Task> tasks, int time_slice)
{
List<Task> runningTasks = new List<Task>();
List<Task> remainingTasks = new List<Task>(tasks);
while (remainingTasks.Count > 0 || runningTasks.Count > 0)
{
if (remainingTasks.Count > 0)
{
runningTasks.Add(remainingTasks[0]);
remainingTasks.RemoveAt(0);
}
List<Task> currentRunningTasks = new List<Task>(runningTasks);
foreach (Task task in currentRunningTasks)
{
if (!task.IsCompleted)
{
await run_task_for_time_slice(task, time_slice);
if (task.IsCompleted)
{
runningTasks.Remove(task);
}
else
{
remainingTasks.Add(task);
runningTasks.Remove(task);
}
}
}
}
}
static async Task run_task_for_time_slice(Task task, int time_slice)
{
CancellationTokenSource cts = new CancellationTokenSource(time_slice);
try
{
await task.WaitAsync(cts.Token);
}
catch (TaskCanceledException)
{
Console.WriteLine($"Task {task.Id} preempted after {time_slice} ms");
}
}
static Task run_task(string taskName, int total_time)
{
return Task.Run(async () =>
{
int elapsedTime = 0;
int time_slice = 1000;
while (elapsedTime < total_time)
{
Console.WriteLine($"{taskName} is running. Elapsed time: {elapsedTime} ms");
await Task.Delay(time_slice);
elapsedTime += time_slice;
}
Console.WriteLine($"{taskName} is completed.");
});
}
}
3.3 多線程
多線程通過創(chuàng)建多個執(zhí)行單元實現(xiàn)并發(fā)。線程共享進程資源,通過同步機制(如互斥鎖)協(xié)調訪問。
代碼示例:多線程同步
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Test.EventBus
{
public class DemoB
{
private static Mutex mutex = new Mutex();
private static StringBuilder sharedData = new StringBuilder();
public void ShowMsg(string name, string msg)
{
Console.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
var eventMsg = new EventMessage
{
Name = name,
Msg = msg,
CreatedDate = DateTime.Now
};
EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
}
public static void RunDemo()
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
int taskId = i;
tasks.Add(Task.Run(() => thread_function($"Task{taskId}")));
}
Task.WaitAll(tasks);
}
static void thread_function(string name)
{
for (int i = 0; i < 5; i++)
{
modify_shared_data(name, i);
}
}
static void modify_shared_data(string name, int iteration)
{
mutex.WaitOne();
try
{
sharedData.AppendLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
Console.WriteLine($"{name} is modifying shared data at iteration {iteration} on {DateTime.Now}");
}
finally
{
mutex.ReleaseMutex();
}
}
}
public class EventMessage
{
public string Name { get; set; }
public string Msg { get; set; }
public DateTime CreatedDate { get; set; }
}
public static class EventPublishSubscribeUtils
{
public static void PublishEvent(EventMessage eventMsg, string eventName)
{
Console.WriteLine($"Published event: {eventName} => Name: {eventMsg.Name}, Msg: {eventMsg.Msg}, CreatedDate: {eventMsg.CreatedDate}");
}
}
class Program
{
static void Main(string[] args)
{
DemoB.RunDemo();
Console.WriteLine("All threads completed.");
}
}
}
3.4 異步編程
異步編程通過事件循環(huán)和回調函數(shù)處理I/O密集型任務,避免阻塞主線程。
代碼示例:異步I/O
using System;
using System.IO;
using System.Threading.Tasks;
namespace AsyncIOExample
{
class Program
{
static async Task Main(string[] args)
{
string filePath1 = "example1.txt";
string filePath2 = "example2.txt";
File.WriteAllText(filePath1, "Data from example1.txt");
File.WriteAllText(filePath2, "Data from example2.txt");
await async_read(filePath1, data => callback(data, filePath1));
await async_read(filePath2, data => callback(data, filePath2));
Console.WriteLine("All asynchronous read operations completed.");
}
static async Task async_read(string file, Action<string> callback)
{
Console.WriteLine($"Starting asynchronous read for file: {file}");
string data = await read_file(file);
callback(data);
}
static async Task<string> read_file(string file)
{
using (StreamReader reader = new StreamReader(file))
{
string data = await reader.ReadToEndAsync();
return data;
}
}
static void callback(string data, string file)
{
Console.WriteLine($"Data read from {file}: {data}");
}
}
}
3.5 協(xié)程
協(xié)程通過yield和resume機制在單線程內實現(xiàn)并發(fā),適用于I/O密集型任務,具有低開銷優(yōu)勢。
代碼示例:協(xié)程
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CoroutineExample
{
class Program
{
static async Task Main(string[] args)
{
IAsyncEnumerable<string> coroutine = coroutine_example();
IAsyncEnumerator<string> enumerator = coroutine.GetAsyncEnumerator();
if (await enumerator.MoveNextAsync())
{
Console.WriteLine("Coroutine started.");
await enumerator.MoveNextAsync();
enumerator.Current = "Data1";
await enumerator.MoveNextAsync();
enumerator.Current = "Data2";
await enumerator.MoveNextAsync();
enumerator.Current = "Data3";
await enumerator.DisposeAsync();
}
}
static async IAsyncEnumerable<string> coroutine_example()
{
string data = null;
while (true)
{
await Task.Delay(100);
data = yield return data;
process(data);
}
}
static void process(string data)
{
if (data != null)
{
Console.WriteLine($"Processed data: {data}");
}
else
{
Console.WriteLine("No data to process.");
}
}
}
}
3.6 事件驅動
事件驅動編程通過事件循環(huán)監(jiān)聽和處理事件,適用于GUI和網(wǎng)絡應用。
代碼示例:事件驅動
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace EventDrivenExample
{
class Program
{
static async Task Main(string[] args)
{
EventLoop eventLoop = new EventLoop();
eventLoop.RegisterHandler("Event1", Event1Handler);
eventLoop.RegisterHandler("Event2", Event2Handler);
eventLoop.TriggerEvent(new Event { Type = "Event1", Data = "Data for Event1" });
eventLoop.TriggerEvent(new Event { Type = "Event2", Data = "Data for Event2" });
await eventLoop.Start();
Console.WriteLine("Event loop completed.");
}
static void Event1Handler(Event e)
{
Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
}
static void Event2Handler(Event e)
{
Console.WriteLine($"Handling {e.Type} with data: {e.Data}");
}
}
public class Event
{
public string Type { get; set; }
public string Data { get; set; }
}
public class EventLoop
{
private Queue<Event> _eventQueue = new Queue<Event>();
private Dictionary<string, Action<Event>> _handlers = new Dictionary<string, Action<Event>>();
private bool _running = false;
public void RegisterHandler(string eventType, Action<Event> handler)
{
if (_handlers.ContainsKey(eventType))
{
_handlers[eventType] += handler;
}
else
{
_handlers[eventType] = handler;
}
}
public void TriggerEvent(Event e)
{
lock (_eventQueue)
{
_eventQueue.Enqueue(e);
}
}
public async Task Start()
{
_running = true;
while (_running)
{
Event e = null;
lock (_eventQueue)
{
if (_eventQueue.Count > 0)
{
e = _eventQueue.Dequeue();
}
}
if (e != null)
{
if (_handlers.TryGetValue(e.Type, out Action<Event> handler))
{
handler(e);
}
else
{
Console.WriteLine($"No handler registered for event type: {e.Type}");
}
}
else
{
await Task.Delay(100);
}
}
}
public void Stop()
{
_running = false;
}
}
}
3.7 多進程
多進程通過創(chuàng)建獨立進程實現(xiàn)并發(fā),進程間通過IPC(如管道或消息隊列)通信,適用于CPU密集型任務
在C#中,多進程可以通過使用 System.Diagnostics.Process 類來創(chuàng)建和管理獨立進程。進程間通信(IPC)可以通過多種方式實現(xiàn),例如使用命名管道(System.IO.Pipes)或內存映射文件(System.IO.MemoryMappedFiles)。在這個示例中,我們將使用命名管道來進行進程間通信。
代碼示例:多進程
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipes;
using System.Text;
using System.Threading.Tasks;
namespace MultiProcessExample
{
class Program
{
static async Task Main(string[] args)
{
int num_processes = 3;
List<Process> processes = new List<Process>();
List<Task<string>> readTasks = new List<Task<string>>();
var server = new NamedPipeServerStream("testpipe", PipeDirection.In, num_processes, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
for (int i = 0; i < num_processes; i++)
{
Process p = new Process();
p.StartInfo.FileName = "dotnet";
p.StartInfo.Arguments = $"MultiProcessExample.dll worker {i}";
p.StartInfo.UseShellExecute = false;
p.StartInfo.RedirectStandardOutput = true;
p.StartInfo.CreateNoWindow = true;
p.Start();
processes.Add(p);
readTasks.Add(Task.Run(() => read_from_process(p)));
}
foreach (var process in processes)
{
process.WaitForExit();
}
string[] results = await Task.WhenAll(readTasks);
foreach (var result in results)
{
Console.WriteLine($"Received result: {result}");
}
server.Close();
}
static string read_from_process(Process process)
{
string result = process.StandardOutput.ReadToEnd();
return result;
}
}
class Worker
{
static async Task Main(string[] args)
{
if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
{
Console.WriteLine("Invalid arguments.");
return;
}
using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
{
try
{
await pipeClient.ConnectAsync();
string result = compute(id);
send_result(pipeClient, result);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
static string compute(int id)
{
Console.WriteLine($"Worker {id} is computing...");
Task.Delay(1000).Wait();
return $"Result from Worker {id}";
}
static void send_result(NamedPipeClientStream pipeClient, string result)
{
try
{
byte[] resultBytes = Encoding.UTF8.GetBytes(result);
pipeClient.Write(resultBytes, 0, resultBytes.Length);
pipeClient.Flush();
}
catch (Exception ex)
{
Console.WriteLine($"Error sending result: {ex.Message}");
}
}
}
}
4.實現(xiàn)并行的技術
4.1 多線程(Multithreading)
多線程通過在單個或多個處理器核心上運行多個線程來實現(xiàn)并行。在多核處理器上,線程可以真正并行執(zhí)行;在單核處理器上,通過時間片切換實現(xiàn)偽并行。多線程適用于I/O密集型和計算密集型任務,能提高資源利用率和程序響應速度。
代碼示例::使用了System.Threading.Thread來創(chuàng)建和管理多個線程,并使用Task來提交和等待任務的完成。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThreadExample
{
class Program
{
static void Main(string[] args)
{
int N = 3;
List<Thread> threads = new List<Thread>();
List<string> results = new List<string>();
object lockObject = new object();
for (int i = 0; i < N; i++)
{
int id = i;
Thread thread = new Thread(() => task_function(id, results, lockObject));
threads.Add(thread);
thread.Start();
}
foreach (Thread thread in threads)
{
thread.Join();
}
foreach (string result in results)
{
Console.WriteLine($"Result from thread: {result}");
}
Console.WriteLine("All threads completed.");
}
static void task_function(int id, List<string> results, object lockObject)
{
string result = perform_task(id);
lock (lockObject)
{
results.Add(result);
}
}
static string perform_task(int id)
{
Console.WriteLine($"Thread {id} is processing.");
Thread.Sleep(1000);
return $"Result from Thread {id}";
}
}
}
使用 Task 和 async/await 實現(xiàn):
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace MultiThreadExample
{
class Program
{
static async Task Main(string[] args)
{
int N = 3;
List<Task<string>> tasks = new List<Task<string>>();
for (int i = 0; i < N; i++)
{
int id = i;
Task<string> task = Task.Run(() => task_function(id));
tasks.Add(task);
}
string[] results = await Task.WhenAll(tasks);
foreach (string result in results)
{
Console.WriteLine($"Result from task: {result}");
}
Console.WriteLine("All tasks completed.");
}
static string task_function(int id)
{
string result = perform_task(id);
return result;
}
static string perform_task(int id)
{
Console.WriteLine($"Task {id} is processing.");
Task.Delay(1000).Wait();
return $"Result from Task {id}";
}
}
}
4.2 多進程(Multiprocessing)
多進程通過創(chuàng)建多個獨立進程實現(xiàn)并行,每個進程運行在不同的處理器核心上。進程間通過管道或消息隊列等通信機制協(xié)調工作。多進程適用于需要高隔離性和安全性的任務,如科學計算和服務器應用。
代碼示例::
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipes;
using System.Text;
using System.Threading.Tasks;
namespace MultiProcessExample
{
class Program
{
static async Task Main(string[] args)
{
int N = 3;
List<Process> processes = new List<Process>();
List<Task<string>> readTasks = new List<Task<string>>();
using (NamedPipeServerStream pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.In, N, PipeTransmissionMode.Message, PipeOptions.Asynchronous))
{
for (int i = 0; i < N; i++)
{
Process process = create_process(i);
processes.Add(process);
process.Start();
readTasks.Add(Task.Run(() => read_from_pipe(pipeServer)));
}
foreach (var process in processes)
{
process.WaitForExit();
}
string[] results = await Task.WhenAll(readTasks);
foreach (var result in results)
{
Console.WriteLine($"Received result: {result}");
}
pipeServer.Close();
}
Console.WriteLine("All processes completed.");
}
static Process create_process(int id)
{
Process process = new Process();
process.StartInfo.FileName = "dotnet";
process.StartInfo.Arguments = $"MultiProcessExample.dll worker {id}";
process.StartInfo.UseShellExecute = false;
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.CreateNoWindow = true;
return process;
}
static async Task<string> read_from_pipe(NamedPipeServerStream pipeServer)
{
await pipeServer.WaitForConnectionAsync();
byte[] buffer = new byte[1024];
int bytesRead = await pipeServer.ReadAsync(buffer, 0, buffer.Length);
string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
pipeServer.Disconnect();
return result;
}
}
}
**使用Worker類:
**
using System;
using System.IO.Pipes;
using System.Threading.Tasks;
namespace MultiProcessExample
{
class Worker
{
static async Task Main(string[] args)
{
if (args.Length != 2 || args[0] != "worker" || !int.TryParse(args[1], out int id))
{
Console.WriteLine("Invalid arguments.");
return;
}
using (NamedPipeClientStream pipeClient = new NamedPipeClientStream(".", "testpipe", PipeDirection.Out, PipeOptions.Asynchronous))
{
try
{
await pipeClient.ConnectAsync();
string result = compute(id);
send_result(pipeClient, result);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
}
static string compute(int id)
{
Console.WriteLine($"Worker {id} is computing...");
Task.Delay(1000).Wait();
return $"Result from Worker {id}";
}
static void send_result(NamedPipeClientStream pipeClient, string result)
{
try
{
byte[] resultBytes = Encoding.UTF8.GetBytes(result);
pipeClient.Write(resultBytes, 0, resultBytes.Length);
pipeClient.Flush();
}
catch (Exception ex)
{
Console.WriteLine($"Error sending result: {ex.Message}");
}
}
}
}
4.3 分布式計算(Distributed Computing)
分布式計算將任務分配到網(wǎng)絡中的多臺計算機上并行執(zhí)行,通常使用消息傳遞接口(MPI)進行通信。適用于大規(guī)模數(shù)據(jù)處理和復雜計算任務,如天氣預報和分布式數(shù)據(jù)庫。
為了簡化實現(xiàn),我們可以使用一個簡單的消息傳遞庫,例如 NamedPipes 和 Task 來模擬MPI的行為。這里我們使用 NamedPipes 來進行進程間通信,并模擬主節(jié)點和工作節(jié)點之間的數(shù)據(jù)交換。
代碼示例:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipes;
using System.Text;
using System.Threading.Tasks;
namespace DistributedComputingExample
{
class Program
{
static async Task Main(string[] args)
{
int num_workers = 3;
List<Process> workers = new List<Process>();
List<Task<string>> readTasks = new List<Task<string>>();
for (int i = 1; i <= num_workers; i++)
{
Process worker = create_worker_process(i);
workers.Add(worker);
worker.Start();
}
if (args.Length == 0 || args[0] != "worker")
{
string data = load_data(num_workers);
Console.WriteLine("Data loaded.");
List<NamedPipeServerStream> sendPipes = new List<NamedPipeServerStream>();
for (int i = 1; i <= num_workers; i++)
{
NamedPipeServerStream sendPipe = new NamedPipeServerStream($"sendpipe_{i}", PipeDirection.Out, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
sendPipes.Add(sendPipe);
}
for (int i = 1; i <= num_workers; i++)
{
string data_chunk = data.Split('|')[i - 1];
send_data(sendPipes[i - 1], data_chunk);
}
List<NamedPipeServerStream> receivePipes = new List<NamedPipeServerStream>();
for (int i = 1; i <= num_workers; i++)
{
NamedPipeServerStream receivePipe = new NamedPipeServerStream($"receivepipe_{i}", PipeDirection.In, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous);
receivePipes.Add(receivePipe);
}
for (int i = 1; i <= num_workers; i++)
{
readTasks.Add(Task.Run(() => read_from_pipe(receivePipes[i - 1])));
}
foreach (var worker in workers)
{
worker.WaitForExit();
}
string[] results = await Task.WhenAll(readTasks);
string final_result = aggregate(results);
Console.WriteLine($"Final result: {final_result}");
}
else
{
int id = int.Parse(args[1]);
Console.WriteLine($"Worker {id} started.");
using (NamedPipeClientStream receivePipe = new NamedPipeClientStream(".", $"sendpipe_{id}", PipeDirection.In, PipeOptions.Asynchronous))
{
await receivePipe.ConnectAsync();
string data_chunk = receive_data(receivePipe);
Console.WriteLine($"Worker {id} received data: {data_chunk}");
string result = process(data_chunk);
Console.WriteLine($"Worker {id} processed data: {result}");
using (NamedPipeClientStream sendPipe = new NamedPipeClientStream(".", $"receivepipe_{id}", PipeDirection.Out, PipeOptions.Asynchronous))
{
await sendPipe.ConnectAsync();
send_result(sendPipe, result);
}
}
}
}
static Process create_worker_process(int id)
{
Process process = new Process();
process.StartInfo.FileName = "dotnet";
process.StartInfo.Arguments = $"DistributedComputingExample.dll worker {id}";
process.StartInfo.UseShellExecute = false;
process.StartInfo.RedirectStandardOutput = true;
process.StartInfo.CreateNoWindow = true;
return process;
}
static string load_data(int num_chunks)
{
string data = "DataChunk1|DataChunk2|DataChunk3";
return data;
}
static void send_data(NamedPipeServerStream pipe, string data)
{
try
{
byte[] dataBytes = Encoding.UTF8.GetBytes(data);
pipe.Write(dataBytes, 0, dataBytes.Length);
pipe.Flush();
pipe.Disconnect();
}
catch (Exception ex)
{
Console.WriteLine($"Error sending data: {ex.Message}");
}
}
static string receive_data(NamedPipeClientStream pipe)
{
try
{
byte[] buffer = new byte[1024];
int bytesRead = pipe.Read(buffer, 0, buffer.Length);
return Encoding.UTF8.GetString(buffer, 0, bytesRead);
}
catch (Exception ex)
{
Console.WriteLine($"Error receiving data: {ex.Message}");
return null;
}
}
static string process(string data_chunk)
{
Console.WriteLine($"Processing data chunk: {data_chunk}");
Task.Delay(1000).Wait();
return $"Processed {data_chunk}";
}
static void send_result(NamedPipeClientStream pipe, string result)
{
try
{
byte[] resultBytes = Encoding.UTF8.GetBytes(result);
pipe.Write(resultBytes, 0, resultBytes.Length);
pipe.Flush();
}
catch (Exception ex)
{
Console.WriteLine($"Error sending result: {ex.Message}");
}
}
static async Task<string> read_from_pipe(NamedPipeServerStream pipe)
{
await pipe.WaitForConnectionAsync();
byte[] buffer = new byte[1024];
int bytesRead = await pipe.ReadAsync(buffer, 0, buffer.Length);
string result = Encoding.UTF8.GetString(buffer, 0, bytesRead);
pipe.Disconnect();
return result;
}
static string aggregate(string[] results)
{
StringBuilder finalResult = new StringBuilder();
foreach (string result in results)
{
finalResult.AppendLine(result);
}
return finalResult.ToString();
}
}
}
===================================================================
使用第三方MPI庫
安裝MPI庫:
安裝 OpenMPI 或 Microsoft MPI。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MPI;
namespace DistributedComputingExample
{
class Program
{
static async Task Main(string[] args)
{
await MPI.StartMain(DistributedMain, args);
}
static async Task DistributedMain(string[] args)
{
int rank = MPI.Communicator.world.Rank;
int size = MPI.Communicator.world.Size;
if (rank == 0)
{
string data = load_data(size);
Console.WriteLine("Data loaded.");
for (int i = 1; i < size; i++)
{
string data_chunk = data.Split('|')[i - 1];
send_data(data_chunk, i);
}
List<string> results = new List<string>();
for (int i = 1; i < size; i++)
{
string result = receive_result(i);
results.Add(result);
}
string final_result = aggregate(results);
Console.WriteLine($"Final result: {final_result}");
}
else
{
string data_chunk = receive_data(0);
Console.WriteLine($"Worker {rank} received data: {data_chunk}");
string result = process(data_chunk);
Console.WriteLine($"Worker {rank} processed data: {result}");
send_result(result, 0);
}
}
static string load_data(int num_chunks)
{
string data = "DataChunk1|DataChunk2|DataChunk3";
return data;
}
static void send_data(string data, int destination)
{
byte[] dataBytes = Encoding.UTF8.GetBytes(data);
MPI.Communicator.world.Send(dataBytes, dataBytes.Length, destination, 0);
}
static string receive_data(int source)
{
int msgSize = MPI.Communicator.world.Receive<int>(source, 0);
byte[] buffer = new byte[msgSize];
MPI.Communicator.world.Receive(buffer, msgSize, source, 0);
return Encoding.UTF8.GetString(buffer);
}
static string process(string data_chunk)
{
Console.WriteLine($"Processing data chunk: {data_chunk}");
Task.Delay(1000).Wait();
return $"Processed {data_chunk}";
}
static void send_result(string result, int destination)
{
byte[] resultBytes = Encoding.UTF8.GetBytes(result);
MPI.Communicator.world.Send(resultBytes.Length, destination, 0);
MPI.Communicator.world.Send(resultBytes, resultBytes.Length, destination, 0);
}
static string receive_result(int source)
{
int msgSize = MPI.Communicator.world.Receive<int>(source, 0);
byte[] buffer = new byte[msgSize];
MPI.Communicator.world.Receive(buffer, msgSize, source, 0);
return Encoding.UTF8.GetString(buffer);
}
static string aggregate(List<string> results)
{
StringBuilder finalResult = new StringBuilder();
foreach (string result in results)
{
finalResult.AppendLine(result);
}
return finalResult.ToString();
}
}
}
4.4 GPU并行計算
GPU并行計算利用圖形處理單元(GPU)的多核心架構,通過CUDA或OpenCL等技術實現(xiàn)高度并行。適用于數(shù)據(jù)密集型任務,如圖像處理和機器學習。
代碼示例:
使用 CUDAfy.NET 實現(xiàn)GPU并行計算的示例。假設我們有一個簡單的計算任務,每個線程處理一個輸入元素并生成相應的輸出元素。
安裝 CUDAfy.NET
using System;
using Cudafy;
using Cudafy.Host;
using Cudafy.Translator;
namespace GpuParallelComputingExample
{
class Program
{
static void Main(string[] args)
{
int[] input = { 1, 2, 3, 4, 5 };
int[] output = new int[input.Length];
GPGPU gpu = CudafyHost.GetDevice(eGPUType.Cuda);
gpu.LoadModule(typeof(Program));
GPGPUDeviceVariable<int> d_input = gpu.Allocate(input);
GPGPUDeviceVariable<int> d_output = gpu.Allocate(output);
gpu.CopyToDevice(input, d_input);
gpu.CopyToDevice(output, d_output);
gpu.LaunchNewKernel(d_input.Size, 1, gpu_kernel, d_input, d_output);
gpu.CopyFromDevice(d_output, output);
gpu.Synchronize();
Console.WriteLine("Input: " + string.Join(", ", input));
Console.WriteLine("Output: " + string.Join(", ", output));
d_input.Free();
d_output.Free();
gpu.FreeAll();
}
[Cudafy]
public static void gpu_kernel(GPGPUThread thread, int[] input, int[] output)
{
int tid = thread.threadIdx.x;
if (tid < input.Length)
{
output[tid] = compute(input[tid]);
}
}
[Cudafy]
public static int compute(int value)
{
return value * value;
}
}
}
如果你更傾向于使用OpenCL而不是CUDA,可以使用 Managed OpenCL 庫。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ManagedOpenCL;
namespace GpuParallelComputingExample
{
class Program
{
static void Main(string[] args)
{
int[] input = { 1, 2, 3, 4, 5 };
int[] output = new int[input.Length];
CLPlatform platform = CLPlatform.GetPlatformIDs()[0];
CLDevice device = platform.GetDeviceIDs()[0];
CLContext context = CLContext.CreateContext(new[] { device });
CLCommandQueue queue = context.CreateCommandQueue(device, CLCommandQueueProperties.None);
string kernelCode = @"
__kernel void gpu_kernel(__global const int* input, __global int* output)
{
int tid = get_global_id(0);
if (tid < get_global_size(0))
{
output[tid] = compute(input[tid]);
}
}
int compute(int value)
{
return value * value;
}
";
CLProgram program = context.CreateProgramWithSource(new[] { kernelCode });
program.BuildProgram(new[] { device }, null, null, null);
CLMemoryBuffer<int> d_input = context.CreateBuffer(CLMemoryFlags.CopyHostPtr, input);
CLMemoryBuffer<int> d_output = context.CreateBuffer(CLMemoryFlags.WriteOnly, output.Length);
CLKernel kernel = program.CreateKernel("gpu_kernel");
kernel.SetMemoryArgument(0, d_input);
kernel.SetMemoryArgument(1, d_output);
queue.EnqueueNDRangeKernel(kernel, null, new[] { (long)input.Length }, null);
queue.EnqueueReadBuffer(d_output, true, 0, output.Length, output);
queue.Finish();
Console.WriteLine("Input: " + string.Join(", ", input));
Console.WriteLine("Output: " + string.Join(", ", output));
d_input.Dispose();
d_output.Dispose();
queue.Dispose();
program.Dispose();
context.Dispose();
}
}
}
4.5 任務并行(Task Parallelism)
任務并行將一個大任務分解為多個獨立子任務,并行執(zhí)行這些子任務。適用于任務間依賴較少的場景,如編譯器并行處理多個文件。
代碼示例:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace TaskParallelExample
{
class Program
{
static async Task Main(string[] args)
{
List<Task> tasks = new List<Task>
{
task1(),
task2(),
task3()
};
await Task.WhenAll(tasks);
Console.WriteLine("All tasks completed.");
}
static async Task task1()
{
Console.WriteLine($"Task1 started on thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
Console.WriteLine($"Task1 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
}
static async Task task2()
{
Console.WriteLine($"Task2 started on thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
Console.WriteLine($"Task2 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
}
static async Task task3()
{
Console.WriteLine($"Task3 started on thread: {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(1000);
Console.WriteLine($"Task3 completed on thread: {Thread.CurrentThread.ManagedThreadId}");
}
}
}
4.6 數(shù)據(jù)并行(Data Parallelism)
數(shù)據(jù)并行將數(shù)據(jù)分割成多個部分,每個部分由不同的處理器或線程并行處理。適用于矩陣運算和圖像處理等數(shù)據(jù)密集型任務。
代碼示例:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace DataParallelExample
{
class Program
{
static void Main(string[] args)
{
int N = 10;
List<int> input = Enumerable.Range(0, N).ToList();
int[] output = new int[N];
Parallel.ForEach(input, (i, loopState) =>
{
output[i] = compute(i);
Console.WriteLine($"Processed element {i} on thread: {Task.CurrentId}");
});
Console.WriteLine("Input: " + string.Join(", ", input));
Console.WriteLine("Output: " + string.Join(", ", output));
}
static int compute(int value)
{
Console.WriteLine($"Computing value: {value} on thread: {Task.CurrentId}");
Task.Delay(100).Wait();
return value * value;
}
}
}
4.7 流水線并行(Pipeline Parallelism)
流水線并行將任務分解為一系列階段,每個階段由不同處理器或線程處理,形成處理流水線。適用于數(shù)據(jù)流處理和視頻編碼等場景。
代碼示例:
using System;
using System.Threading.Tasks;
namespace PipelineParallelExample
{
class Program
{
static async Task Main(string[] args)
{
string input = "Initial Data";
string finalOutput = await StartPipeline(input);
Console.WriteLine($"Final Output: {finalOutput}");
}
static async Task<string> StartPipeline(string input)
{
string stage1Output = await stage1(input);
string stage2Output = await stage2(stage1Output);
string finalOutput = await stage3(stage2Output);
return finalOutput;
}
static async Task<string> stage1(string input)
{
Console.WriteLine($"Stage 1 started with input: {input}");
await Task.Delay(1000);
string intermediate1 = $"Stage1: Processed {input}";
Console.WriteLine($"Stage 1 completed with output: {intermediate1}");
return intermediate1;
}
static async Task<string> stage2(string intermediate1)
{
Console.WriteLine($"Stage 2 started with input: {intermediate1}");
await Task.Delay(1000);
string intermediate2 = $"Stage2: Processed {intermediate1}";
Console.WriteLine($"Stage 2 completed with output: {intermediate2}");
return intermediate2;
}
static async Task<string> stage3(string intermediate2)
{
Console.WriteLine($"Stage 3 started with input: {intermediate2}");
await Task.Delay(1000);
string output = $"Stage3: Processed {intermediate2}";
Console.WriteLine($"Stage 3 completed with output: {output}");
return output;
}
}
}
4.8 Actor模型
Actor模型是一種并發(fā)計算模型,通過將系統(tǒng)分解為獨立執(zhí)行的Actor來實現(xiàn)并發(fā)和并行。每個Actor可以通過消息傳遞與其他演員通信,避免共享內存和鎖的使用。常見的Actor模型有Orleans、Akka、Erlang等。
代碼示例:
using System;
using System.Threading.Tasks;
using Akka.Actor;
namespace ActorModelExample
{
class Program
{
static async Task Main(string[] args)
{
var system = ActorSystem.Create("ActorSystem");
var actor1 = system.ActorOf(Props.Create(() => new Actor1(system)), "Actor1");
var actor2 = system.ActorOf(Props.Create(() => new Actor2(system)), "Actor2");
actor1.Tell(new PingMessage(actor2));
await Task.Delay(2000);
actor1.Tell(new StopMessage());
actor2.Tell(new StopMessage());
await Task.Delay(1000);
await system.Terminate();
}
}
public class PingMessage
{
public IActorRef TargetActor { get; }
public PingMessage(IActorRef targetActor)
{
TargetActor = targetActor;
}
}
public class PongMessage
{
public IActorRef TargetActor { get; }
public PongMessage(IActorRef targetActor)
{
TargetActor = targetActor;
}
}
public class StopMessage { }
public class Actor1 : ReceiveActor
{
private readonly ActorSystem _system;
public Actor1(ActorSystem system)
{
_system = system;
Receive<PingMessage>(ping =>
{
Console.WriteLine($"Actor1 received Ping from Actor {Sender.Path}");
ping.TargetActor.Tell(new PongMessage(Self));
});
Receive<PongMessage>(pong =>
{
Console.WriteLine($"Actor1 received Pong from Actor {pong.TargetActor.Path}");
});
Receive<StopMessage>(_ =>
{
Console.WriteLine("Actor1 stopping.");
Context.Stop(Self);
});
}
}
public class Actor2 : ReceiveActor
{
private readonly ActorSystem _system;
public Actor2(ActorSystem system)
{
_system = system;
Receive<PingMessage>(ping =>
{
Console.WriteLine($"Actor2 received Ping from Actor {Sender.Path}");
ping.TargetActor.Tell(new PongMessage(Self));
});
Receive<StopMessage>(_ =>
{
Console.WriteLine("Actor2 stopping.");
Context.Stop(Self);
});
}
}
}
5 實踐運用
5.1 軟件開發(fā)中的并行應用
并行廣泛應用于需要高計算能力的場景,包括:
- 科學模擬:天氣預報、分子動力學等任務涉及大量方程求解,可通過并行化顯著加速。
- 機器學習:深度神經(jīng)網(wǎng)絡訓練依賴矩陣運算,TensorFlow和PyTorch等框架利用GPU并行性加速訓練過程。
- 圖像與視頻處理:如3D渲染或視頻濾鏡應用,可將任務分配到多核或GPU上并行執(zhí)行。
常見的并行編程模型包括:
- T- PL:TPL是.NET中用于并行編程的一個強大庫
- OpenMP:基于指令的共享內存并行API,適用于C/C++和Fortran。
- MPI(消息傳遞接口):分布式內存并行的標準,用于高性能計算集群。
- CUDA:NVIDIA的并行計算平臺,支持GPU上的細粒度并行。
5.2 軟件開發(fā)中的并發(fā)應用
并發(fā)在需要處理多任務或事件的系統(tǒng)中至關重要,例如:
- Web服務器:如Apache和Nginx,通過多線程、多進程或事件驅動架構并發(fā)處理大量客戶端請求。
- 圖形用戶界面(GUI):并發(fā)確保界面在執(zhí)行后臺任務(如數(shù)據(jù)加載)時仍能響應用戶輸入。
- 數(shù)據(jù)庫系統(tǒng):通過鎖和事務等并發(fā)控制機制,管理多用戶對數(shù)據(jù)的并發(fā)訪問。
常見的并發(fā)模型包括:
- 多線程:C#、Java和C++提供線程庫(如System.Thread、java.lang.Thread、std::thread)實現(xiàn)并發(fā)。
- 異步編程:Node.js和Python的asyncio支持非阻塞代碼,適用于I/O密集型任務。
- Actor模型:Erlang和Akka框架通過獨立的Actor單元和消息傳遞實現(xiàn)并發(fā),避免共享內存問題。
6. 并發(fā)與并行編程的挑戰(zhàn)
6.1 并發(fā)挑戰(zhàn)
并發(fā)引入了多個復雜問題:
- 競爭條件(Race Conditions):多個線程同時訪問共享資源,可能導致不可預測的結果。例如,未同步的計數(shù)器遞增可能丟失更新。
- 死鎖(Deadlocks):線程間相互等待對方釋放資源,導致永久阻塞。例如,兩個線程各自持有對方需要的鎖。
- 活鎖(Livelocks):線程不斷嘗試解決問題但無進展,如反復讓出資源。
- 饑餓(Starvation):某些線程因調度不公而無法獲得資源。
解決這些問題通常依賴同步原語(如互斥鎖、信號量),但過度同步可能降低性能。
6.2 并行挑戰(zhàn)
并行計算也有其難點:
- 負載均衡:確保所有處理器或核心均勻分擔工作量,避免部分核心空閑。
- 通信開銷:分布式系統(tǒng)中,節(jié)點間通信成本可能抵消并行收益。
- 可擴展性:隨著處理器數(shù)量增加,同步開銷或串行部分可能導致收益遞減。
并行算法需精心設計,采用動態(tài)負載均衡或工作竊取等技術應對這些挑戰(zhàn)。
7. 管理并行與并發(fā)的工具與技術
7.1 調試與測試
并發(fā)與并行程序的非確定性使其調試異常困難,常用工具包括:
- 靜態(tài)分析:如Intel Inspector或FindBugs,可在不運行代碼的情況下檢測潛在問題。
- 運行時驗證:Valgrind的Helgrind等工具在程序運行時監(jiān)控同步錯誤。
- 測試框架:JUnit或pytest可擴展用于并發(fā)測試,模擬多線程場景。
7.2 設計模式
設計模式為常見問題提供解決方案:
- 線程池:管理固定數(shù)量的線程執(zhí)行任務,減少創(chuàng)建和銷毀開銷。
- 生產者-消費者:生產者生成數(shù)據(jù),消費者處理數(shù)據(jù),通過同步隊列協(xié)調。
- Map-Reduce:將任務映射到數(shù)據(jù)分片并歸約結果,適用于大數(shù)據(jù)處理。
7.3 編程語言支持
現(xiàn)代語言內置了對并行與并發(fā)的支持:
- CSharp:通過TPL和System.Collections.Concurrent等庫簡化并發(fā)和并行編程。
- Go:通過goroutines和通道簡化并發(fā)編程。
- Rust:通過所有權模型在編譯時防止數(shù)據(jù)競爭。
- Java:提供java.util.concurrent包,包括線程池、并發(fā)集合等高級工具。
8.并行與并發(fā)的權衡
8.1 復雜度與性能
并行與并發(fā)提升性能的同時增加了代碼復雜度:
- 多線程:提供細粒度控制,但易引入競爭條件。
- 異步編程:避免線程開銷,但可能導致回調地獄或復雜邏輯。
8.2 共享內存與消息傳遞
并發(fā)模型分為兩種:
- 共享內存:線程共享數(shù)據(jù),需同步以避免沖突,效率高但易出錯。
- 消息傳遞:通過消息通信避免共享狀態(tài),安全性高但可能引入延遲。
如何選擇取決于性能、安全性和應用需求。
轉自https://www.cnblogs.com/chenshibao/p/18865227
該文章在 2025/9/9 10:42:22 編輯過