Friday, May 23, 2014

Round Robin File Processor

Problem


I wonder if you ever encounter a given system requirement that need you to process files immediately which are dropped into one specific folder? And then, when one file is big, it takes a lot of time to complete the process, but it causes the other files which were dropped later are being processed late. In order to fasten the process, you use multithreading to process all the files at the same time. Later, you will realize multithreading would stress the database and you will encounter command timeout or resource deadlock related error.

Solution

Since all the files in one folder are required to be processed immediately, but you do not want to stress up the database,  how about we process every file at the same time but bit by bit and turn by turn like round robin concept? This idea comes from Serena Yeoh. Later you will find out how I implement the concept in this post. This concept will create an illusion or user experience that all the files "look like" are currently being processed. In fact, all the files are not completely processed yet. Also, it solve the problem when one big file is being processed, the other smaller files that come in later no longer need to queue.


Above image is to illustrate the round robin file processor behavior. There are 6 files waiting to be processed. File 1 is going to be processed first, it will be processed for 3 seconds only, then pause. Next file will be processed for the same 3 seconds duration then pause. Same treatment for the rest of the files. Until the last file is being processed for 3 seconds, the next turn will go back to File 1 and resume its process.

Implementation

First, create a file system watcher that will pick up a file to process from a specific folder.

FileSystemWatcher watcher = new FileSystemWatcher();
watcher.Path = Path.Combine(Environment.CurrentDirectory, "TestData");
watcher.Filter = "*.*";
watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;
watcher.IncludeSubdirectories = false;
watcher.Created += watcher_Created;
watcher.Error += watcher_Error;
watcher.EnableRaisingEvents = true;

The following is how I process the file when the Created event is fired. The most important thing that make round robin behavior is the ManualResetEvent reference is kept in a List. The ManualResetEvent will be used in another long running thread that manage the thread to pause/resume process.

private static void watcher_Created(object sender, FileSystemEventArgs e)
{
    ProcessFile(e.FullPath);
}

private static void ProcessFile(string filename)
{
    //Create a manual reset event with false signal initial state
    ManualResetEvent resetEvent = new ManualResetEvent(false);

    //Create a new thread to process the file
    //So that it wont jam the main thread
    Task task = Task.Run(() =>
    {
        int retry = 0;
        while (retry < 3)
        {
            try
            {
                string filenameOnly = Path.GetFileName(filename);

                using (FileStream stream = File.OpenRead(filename))
                using (StreamReader reader = new StreamReader(stream))
                {
                    string data = null;
                    long line = 1;
                    while ((data = reader.ReadLine()) != null) //check if the file is empty or end of line
                    {
                        resetEvent.WaitOne(); //always pause, wait for signal to resume

                        Console.WriteLine("Processing " + filenameOnly + " - Line " + line.ToString());
                        line++;

                        Thread.Sleep(1000); //just to slow down the process to see the round robin behavior

                        //when the Task is cancelled, stop the processing
                        if (_cts.IsCancellationRequested)
                            break;
                    }
                }

                break;
            }
            catch (IOException)
            {
                //file may be locked by other process while copying file
                //retry reading file after 1 sec
                Thread.Sleep(1000);
                retry++;
            }
        }
    }, _cts.Token);

    //store the ManualResetEvent reference into a list
    _signalList.Add(resetEvent);
}


The following is the long running Task that manage threads to pause or resume. What it does is every 3 seconds, it will pause all the threads, then keep track every thread's turn and resume only the thread that reach its turn.

//Long running task
Task.Run(() =>
{
    int counter = 0;
    while (!_cts.IsCancellationRequested)
    {
        try
        {
            //Pause all
            foreach (var signal in _signalList)
                signal.Reset();

            if (_signalList != null && _signalList.Count > 0)
            {
                //Resume one
                var resetEvent = _signalList[counter];
                resetEvent.Set();
                counter++;

                //Resume chunk
                //Code example below resume 4 file processing at the same time
                //var resetEvents = _signalList.Skip(counter).Take(4);
                //foreach (var resetEvent in resetEvents)
                //    resetEvent.Set();
                //counter += 4;

                if (counter == _signalList.Count)
                    counter = 0;
            }

            //Every 3 seconds, change turn
            Thread.Sleep(3000);
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex);
        }
    }
}, _cts.Token);

The counter variable is used as a enumerator to track turn and trigger the Set() or Reset() method from the ManualResetEvent reference from the _signalList.

This is how my console look like when I place 3 files into the folder for testing:



While these 3 files are still being processed, if I drop the 4th file in, this is what going to be happen:



And then, it will automatically resume the first file process.



Lastly, if you are interested with my complete source code, feel free to download it from HERE.



No comments:

Post a Comment

Send Transactional SMS with API

This post cover how to send transactional SMS using the Alibaba Cloud Short Message Service API. Transactional SMS usually come with One Tim...