Problem
I wonder if you ever encounter a given system requirement that need you to process files immediately which are dropped into one specific folder? And then, when one file is big, it takes a lot of time to complete the process, but it causes the other files which were dropped later are being processed late. In order to fasten the process, you use multithreading to process all the files at the same time. Later, you will realize multithreading would stress the database and you will encounter command timeout or resource deadlock related error.
Solution
Since all the files in one folder are required to be processed immediately, but you do not want to stress up the database, how about we process every file at the same time but bit by bit and turn by turn like round robin concept? This idea comes from Serena Yeoh. Later you will find out how I implement the concept in this post. This concept will create an illusion or user experience that all the files "look like" are currently being processed. In fact, all the files are not completely processed yet. Also, it solve the problem when one big file is being processed, the other smaller files that come in later no longer need to queue.
Above image is to illustrate the round robin file processor behavior. There are 6 files waiting to be processed. File 1 is going to be processed first, it will be processed for 3 seconds only, then pause. Next file will be processed for the same 3 seconds duration then pause. Same treatment for the rest of the files. Until the last file is being processed for 3 seconds, the next turn will go back to File 1 and resume its process.
Implementation
First, create a file system watcher that will pick up a file to process from a specific folder.
FileSystemWatcher
watcher = new FileSystemWatcher();
watcher.Path = Path.Combine(Environment.CurrentDirectory,
"TestData");
watcher.Filter = "*.*";
watcher.NotifyFilter
= NotifyFilters.FileName | NotifyFilters.LastWrite;
watcher.IncludeSubdirectories
= false;
watcher.Created +=
watcher_Created;
watcher.Error +=
watcher_Error;
watcher.EnableRaisingEvents = true;
The following is how I process the file when the Created event is fired. The most important thing that make round robin behavior is the ManualResetEvent reference is kept in a List. The ManualResetEvent will be used in another long running thread that manage the thread to pause/resume process.
private static void
watcher_Created(object sender, FileSystemEventArgs
e)
{
ProcessFile(e.FullPath);
}
private static void
ProcessFile(string filename)
{
//Create a manual reset event
with false signal initial state
ManualResetEvent
resetEvent = new ManualResetEvent(false);
//Create a new thread to
process the file
//So that it wont jam the
main thread
Task
task = Task.Run(() =>
{
int
retry = 0;
while
(retry < 3)
{
try
{
string
filenameOnly = Path.GetFileName(filename);
using
(FileStream stream = File.OpenRead(filename))
using
(StreamReader reader = new StreamReader(stream))
{
string
data = null;
long
line = 1;
while
((data = reader.ReadLine()) != null)
//check if the file is empty or end of line
{
resetEvent.WaitOne(); //always
pause, wait for signal to resume
Console.WriteLine("Processing
" + filenameOnly + " - Line "
+ line.ToString());
line++;
Thread.Sleep(1000);
//just to slow down the process to see the round robin
behavior
//when
the Task is cancelled, stop the processing
if
(_cts.IsCancellationRequested)
break;
}
}
break;
}
catch
(IOException)
{
//file may be locked
by other process while copying file
//retry reading file
after 1 sec
Thread.Sleep(1000);
retry++;
}
}
}, _cts.Token);
//store the ManualResetEvent
reference into a list
_signalList.Add(resetEvent);
}
The following is the long running Task that manage threads to pause or resume. What it does is every 3 seconds, it will pause all the threads, then keep track every thread's turn and resume only the thread that reach its turn.
//Long running task
Task.Run(()
=>
{
int
counter = 0;
while
(!_cts.IsCancellationRequested)
{
try
{
//Pause all
foreach
(var signal in
_signalList)
signal.Reset();
if
(_signalList != null && _signalList.Count
> 0)
{
//Resume one
var
resetEvent = _signalList[counter];
resetEvent.Set();
counter++;
//Resume chunk
//Code example below
resume 4 file processing at the same time
//var resetEvents =
_signalList.Skip(counter).Take(4);
//foreach (var
resetEvent in resetEvents)
// resetEvent.Set();
//counter += 4;
if
(counter == _signalList.Count)
counter = 0;
}
Thread.Sleep(3000);
}
catch(Exception
ex)
{
Console.WriteLine(ex);
}
}
}, _cts.Token);
The counter variable is used as a enumerator to track turn and trigger the Set() or Reset() method from the ManualResetEvent reference from the _signalList.
This is how my console look like when I place 3 files into the folder for testing:
While these 3 files are still being processed, if I drop the 4th file in, this is what going to be happen:
And then, it will automatically resume the first file process.
Lastly, if you are interested with my complete source code, feel free to download it from HERE.
No comments:
Post a Comment