Problem
Did you ever experience in writing a multithreaded program which actually spawn too many threads to process files? We have multi cores multi threads processor, but not multi thread disk. File processing are relying on the disk I/O, and the disk can only perform one operation at one time, unless you have multiple disks and every threads access to different disks, or using RAID0. When there are too many threads trying to read or write data into a single disk, it actually flood the disk command queue. The threads have to wait for their turn to access the data in the disk.
So, how can we actually prevent or limit the number of threads to access the disk? In today post, I will cover about how to use Semaphore to make all the threads to wait, and only allow a number of threads to access the disk.
Solution
Using Semaphore to control the threads. You can configure a pool of thread that will be used by Semaphore, then using the WaitOne method to make all the threads to rendezvous at that point, then using Release method to release specific number of threads to continue the code execution.
The code example in this post is a file processor that limit a number of threads being used to process file. The following code actually limiting 2 threads to process 2 files at one time, and the maximum allowed thread count is base on number of logical processor you have.
private static Semaphore
_semaphore;
static void
Main(string[] args)
{
//initialize the thread pool with the Semaphore constructor
_semaphore = new Semaphore(2,
Environment.ProcessorCount);
InitializeFileSystemWatcher();
Console.ReadKey();
}
When there are many files being dropped into a folder which is monitored by a file system watcher, only 2 files will be processed.
private static void
watcher_Created(object sender, FileSystemEventArgs
e)
{
//when detected a file, spawn
new thread to process it
Task.Run(()
=>
{
//threads rendezvous here
_semaphore.WaitOne();
ProcessFile(e.FullPath);
_semaphore.Release();
});
}
private static void
ProcessFile(string filename)
{
Console.WriteLine("["
+ DateTime.Now.ToString("yyyy-MM-dd
hh:mm:ss.fff") + "] Process file "
+ filename);
Thread.Sleep(3000);
//assume it takes 3 seconds to process a file
}
This is how the files being processed, take note at the time stamp, 2 files are being processed at one time only.
There is another way to play around with the Semaphore. You can initialize the Semaphore with 0 initial thread count. So that, all the threads wait, until you call Release method with parameter from the main thread to release the threads like below:
static void
Main(string[] args)
{
_semaphore = new Semaphore(0,
Environment.ProcessorCount);
InitializeFileSystemWatcher();
while
(true)
{
Console.ReadKey();
//release 2 threads only
_semaphore.Release(2);
}
private static void
watcher_Created(object sender, FileSystemEventArgs
e)
{
//when detected a file, spawn
new thread to process it
Task.Run(()
=>
{
//threads rendezvous here
_semaphore.WaitOne();
ProcessFile(e.FullPath);
});
}
Then, do you notice the behavior is like horse racing, all the horses wait for signal to start running? Though it only release 2 horses at one time. And, you cannot release more than number of horses that you have, you will get error.
There are 2 types of Semaphore, basic one like above sample code, is a local Semaphore, used within same process. Another one is a named system Semaphore which you can used to control between processes.In order to define a named system Semaphore, you just need to specify a name in the Semaphore constructor like this:
_semaphore = new Semaphore(0, Environment.ProcessorCount, "MySemaphore");
Then, in another process, to use the same existing system Semaphore, call the OpenExisting method, like this:
_semaphore = Semaphore.OpenExisting("MySemaphore");
By the way, there is a light weight Semaphore call SemaphoreSlim, it is a local Semaphore, you may use this if you do not intend to create or use Semaphore from other processes.
Summary
If you are aware that Task.Run method is actually uses thread from thread pool, then Semaphore is also behaving the same like a thread pool. I feel that the ThreadPool is actually uses Semaphore to limit and control threads. Do you think so?
The file processor in this post is limiting the file process with the number of allowed thread, but if you want a much better solution that can process all the file at the same time but bit by bit, then you may want to refer to my other post, Round Robin File Processor.