Sunday, July 20, 2014

C# - Semaphore

Problem

Did you ever experience in writing a multithreaded program which actually spawn too many threads to process files? We have multi cores multi threads processor, but not multi thread disk. File processing are relying on the disk I/O, and the disk can only perform one operation at one time, unless you have multiple disks and every threads access to different disks, or using RAID0. When there are too many threads trying to read or write data into a single disk, it actually flood the disk command queue. The threads have to wait for their turn to access the data in the disk.

So, how can we actually prevent or limit the number of threads to access the disk? In today post, I will cover about how to use Semaphore to make all the threads to wait, and only allow a number of threads to access the disk.

Solution

Using Semaphore to control the threads. You can configure a pool of thread that will be used by Semaphore, then using the WaitOne method to make all the threads to rendezvous at that point, then using Release method to release specific number of threads to continue the code execution.

The code example in this post is a file processor that limit a number of threads being used to process file. The following code actually limiting 2 threads to process 2 files at one time, and the maximum allowed thread count is base on number of logical processor you have.

private static Semaphore _semaphore;

static void Main(string[] args)
{
    //initialize the thread pool with the Semaphore constructor
    _semaphore = new Semaphore(2, Environment.ProcessorCount);

    InitializeFileSystemWatcher();

    Console.ReadKey();

}

When there are many files being dropped into a folder which is monitored by a file system watcher, only 2 files will be processed.

private static void watcher_Created(object sender, FileSystemEventArgs e)
{
    //when detected a file, spawn new thread to process it
    Task.Run(() =>
    {
        //threads rendezvous here
        _semaphore.WaitOne();

        ProcessFile(e.FullPath);

        _semaphore.Release();
    });

}

private static void ProcessFile(string filename)
{
    Console.WriteLine("[" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss.fff") + "] Process file " + filename);
    Thread.Sleep(3000); //assume it takes 3 seconds to process a file

}

This is how the files being processed, take note at the time stamp, 2 files are being processed at one time only.



There is another way to play around with the Semaphore. You can initialize the Semaphore with 0 initial thread count. So that, all the threads wait, until you call Release method with parameter from the main thread to release the threads like below:

static void Main(string[] args)
{
    _semaphore = new Semaphore(0, Environment.ProcessorCount);

    InitializeFileSystemWatcher();

    while (true)
    {
        Console.ReadKey();
               
        //release 2 threads only
        _semaphore.Release(2);
    }
}

private static void watcher_Created(object sender, FileSystemEventArgs e)
{
    //when detected a file, spawn new thread to process it
    Task.Run(() =>
    {
        //threads rendezvous here
        _semaphore.WaitOne();

        ProcessFile(e.FullPath);
    });

}

Then, do you notice the behavior is like horse racing, all the horses wait for signal to start running? Though it only release 2 horses at one time. And, you cannot release more than number of horses that you have, you will get error.

There are 2 types of Semaphore, basic one like above sample code, is a local Semaphore, used within same process. Another one is a named system Semaphore which you can used to control between processes.In order to define a named system Semaphore, you just need to specify a name in the Semaphore constructor like this:

_semaphore = new Semaphore(0, Environment.ProcessorCount, "MySemaphore");

Then, in another process, to use the same existing system Semaphore, call the OpenExisting method, like this:

_semaphore = Semaphore.OpenExisting("MySemaphore");

By the way, there is a light weight Semaphore call SemaphoreSlimit is a local Semaphore, you may use this if you do not intend to create or use Semaphore from other processes.

Summary

If you are aware that Task.Run method is actually uses thread from thread pool, then Semaphore is also behaving the same like a thread pool. I feel that the ThreadPool is actually uses Semaphore to limit and control threads. Do you think so?

The file processor in this post is limiting the file process with the number of allowed thread, but if you want a much better solution that can process all the file at the same time but bit by bit, then you may want to refer to my other post, Round Robin File Processor.



Sunday, July 13, 2014

C# - Lock & Monitor

Today topic is about going deep into the lock syntax. When we write a multithreaded program and there are multiple threads trying to access and update an object, the common problem that we may encounter is the result is out of sync.

For instance, if you are required to create a simple program that generates sequence number, and the number must not duplicate, how would you code it?

The following is the implementation for a simple sequence number generator.

public class Sequence
{
    public static int CurrentNumber { get; set; }

    public int Next()
    {
        int result = 0;

        result = ++CurrentNumber;

        return result;
    }

}

Above code work fine in a single threaded program like below:

class Program
{
    static void Main(string[] args)
    {
        Sequence seq = new Sequence();

        for (int i = 0; i < 100; i++)
        {
            Console.WriteLine("Next number: " + seq.Next());
        }

        Console.ReadKey();
    }
}

But, ever wonder what will happen if run the above code in a multithreaded program? Instead of writing a lot of code to spawn threads to test it out, I will just use Parallel class library is good enough. See the following modified program:

class Program
{
    static void Main(string[] args)
    {
        Sequence seq = new Sequence();

        Parallel.For(0, 100, i =>
        {
            Console.WriteLine("Next number: " + seq.Next());
        });

        Console.ReadKey();
    }
}

And, here is the result:


Duplicate numbers were generated, and it is wrong. So, how to make it right? When there are many threads trying to access one object at the same time, we have to make all the threads to wait except one is allowed to run.

The code block which required threads to wait is called critical section. In order to prevent all the threads entering into the critical section, we have to use the lock syntax (which actually uses Monitor class) to lock the critical section and only allowing one thread accessing the code block at one time.

The code block that generates the sequence number is the place where need to apply the lock. So, how to use the lock syntax? Check this out:

public class Sequence
{
    public static int CurrentNumber { get; set; }
    private static object _lock;

    static Sequence()
    {
        _lock = new object();
    }

    public int Next()
    {
        int result = 0;

        lock (_lock)
        {
            result = ++CurrentNumber;
        }
           
        return result;
    }
}

Also, take note that when using the lock syntax, avoid locking a public modifier object because once it is locked, no one can access it, not even its member. It will cause unnecessary wait to other threads which uses that object. Therefore, it is bad to use the lock in the following ways:

lock (this)

The this is the public Sequence class, it will cause other threads to wait which just want to access the CurrentNumber property.

lock (typeof(Sequence))

Locking typeof(Sequence) will cause wait to other threads that trying to use the Sequence type.

The best practice is to instantiate a private object for lock.

This is how the Intermediate Language (IL) look like when using the lock syntax:

.method public hidebysig instance int32  Next() cil managed
{
  // Code size       46 (0x2e)
  .maxstack  2
  .locals init ([0] int32 result,
           [1] bool '<>s__LockTaken0',
           [2] object CS$2$0000)
  IL_0000:  ldc.i4.0
  IL_0001:  stloc.0
  IL_0002:  ldc.i4.0
  IL_0003:  stloc.1
  .try
  {
    IL_0004:  ldsfld     object LockSample.Sequence::_lock
    IL_0009:  dup
    IL_000a:  stloc.2
    IL_000b:  ldloca.s   '<>s__LockTaken0'
    IL_000d:  call       void [mscorlib]System.Threading.Monitor::Enter(object,
                                                                        bool&)
    IL_0012:  call       int32 LockSample.Sequence::get_CurrentNumber()
    IL_0017:  ldc.i4.1
    IL_0018:  add
    IL_0019:  dup
    IL_001a:  call       void LockSample.Sequence::set_CurrentNumber(int32)
    IL_001f:  stloc.0
    IL_0020:  leave.s    IL_002c
  }  // end .try
  finally
  {
    IL_0022:  ldloc.1
    IL_0023:  brfalse.s  IL_002b
    IL_0025:  ldloc.2
    IL_0026:  call       void [mscorlib]System.Threading.Monitor::Exit(object)
    IL_002b:  endfinally
  }  // end handler
  IL_002c:  ldloc.0
  IL_002d:  ret
} // end of method Sequence::Next

Note the yellow highlighted IL code? Therefore, the lock syntax is translated into:

public class Sequence
{
    public static int CurrentNumber { get; set; }
    private static object _lock;

    static Sequence()
    {
        _lock = new object();
    }

    public int Next()
    {
        int result = 0;

        try
        {
            Monitor.Enter(_lock);
            result = ++CurrentNumber;
        }
        finally
        {
            Monitor.Exit(_lock);
        }

        return result;
    }
}

The lock syntax only uses the Enter method and Exit method to mark the beginning and ending of critical section and acquiring and releasing lock which are good enough to control the thread access. You may use other available methods in the Monitor class library for more extensive thread control such as the Wait method to temporary release current thread lock and wait for signal to acquire lock, or the Pulse method to send signals to all waiting threads that the state of the lock object has changed, the queuing threads can ready to acquire the lock.

I would like to keep this post simple, this simple sequence number generator does not require a complicated thread handling, no point in using Wait and Pulse methods. However, if you want to know more about using these 2 methods, checkout the MSDN arcticles for Wait and Pulse. And, warn you, if you are trying to make the threads interact with each other by using Wait and Pulse, it is very likely would cause deadlock because Monitor class does not keep track a Pulse method has been called or not, if one thread has called Wait after a thread called Pulse, that waiting thread will wait forever.

Enjoy locking~


Monday, July 7, 2014

C# - Memory Mapped File

Problem

Did you ever encounter a requirement which require your application to get some data from another application? For instance, you have created an application A, the value from one of the variable in application A is required to be shared with other application B, C and D, all these application are going to run in one same server. What would you do?

Solution

Today's topic cover about Memory Mapped File. The fastest way to access data no doubt is direct access from the memory. In .NET 4.0, it introduced Memory Mapped File, a file that contain the contents of virtual memory. This file can be read and write by any program as long as you know the file location. The file content is mapped directly to the virtual memory. Therefore, this virtual memory can become a shared memory which can be accessible directly by any application without any constraint.

Memory Mapped File support persisted and non-persisted file mode, persisted mode will keep the data whenever the last process working with the file has ended, the non-persisted mode will not the keep the data, it will be cleared by garbage collector after the last process has finished working on it. For more info, refer HERE and the concept from HERE.

Memory Mapped File can be used to create shared memory, I can create a generic component which allow me to store any object into it and share it with all the application. But, in this post, I just keep it simple and I am writing a simple string shared memory. The string value can be read and changed. The string value changes will be reflected in all the application immediately. Also, the value will be persisted, after the process is being killed or server get rebooted, the string value will still be available.

There is one challenge here. The file can only be accessible by one process one thread at one time because the file is locked automatically whenever a thread is accessing it. Normally, we can use the lock syntax to block other threads entering critical section of code until it has been released, but in order to lock processes, we have to use mutual exclusion (mutex).

Memory Mapped File is available under the System.IO namespace as long as your application target framework is .NET 4.0 or later. Any external assembly reference is not required.

using System.IO;

using System.IO.MemoryMappedFiles;

This is how the shared memory code look like:

public class SharedMemory : IDisposable
{
    //Keep the mutex as static to prevent early garbage collection
    private static Mutex _mutex;
    private static object _numLock;

    static SharedMemory()
    {
        _numLock = new object();
        if (!Mutex.TryOpenExisting("sharedMutex", out _mutex))
        {
            _mutex = new Mutex(true, "sharedMutex");
        }
    }

    public void Set(string value)
    {

        //lock thread
        lock (_numLock)
        {
            //lock process with mutex
            if (_mutex.WaitOne())
            {
                //access memory mapped file (need persistence)
                using (var memMapFile = MemoryMappedFile.CreateFromFile(
                            @"D:\temp\SharedMemoryMap", //file location
                            FileMode.OpenOrCreate,  //create new file if not exist, open if exist
                            "shared", //map name
                            1024)) //size
                {
                    //update the number to memory view
                    using (var stream = memMapFile.CreateViewStream())
                    using (var writer = new BinaryWriter(stream))
                    {
                        writer.Write(value);
                    }
                }

                //release the mutex for other process to access the memory mapped file
                _mutex.ReleaseMutex();
            }
        }

    }

    public string Get()
    {
        string value = null;

        //lock thread
        lock (_numLock)
        {
            //lock process with mutex
            if (_mutex.WaitOne())
            {
                //access memory mapped file (need persistence)
                using (var memMapFile = MemoryMappedFile.CreateFromFile(
                            @"D:\temp\SharedMemoryMap", //file location
                            FileMode.OpenOrCreate,  //create new file if not exist, open if exist
                            "shared", //map name
                            1024)) //size
                {
                    //get last number from memory view
                    using (var stream = memMapFile.CreateViewStream())
                    using (var reader = new BinaryReader(stream))
                    {
                        value = reader.ReadString();
                    }
                }

                //release the mutex for other process to access the memory mapped file
                _mutex.ReleaseMutex();
            }
        }
        return value;
    }

    public void Dispose()
    {
        if (_mutex != null)
            _mutex.Dispose();
    }

}

Now, I have created and started a few console application which all are having the same shared memory assembly reference. Then, call the Get() function to retrieve the memory value, then the Set() function to change the value.

class Program
{
    static void Main(string[] args)
    {
        bool quit = false;

        using (SharedMemory mem = new SharedMemory())
        {
            do
            {
                Console.WriteLine("Current value: " + mem.Get());
                Console.Write("New value: ");

                string input = Console.ReadLine();
                if (input == "quit")
                    quit = true;
                else
                    mem.Set(input);
            }
            while (!quit);
        }
    }

}

Close all the console windows, then reopen one back. You will still be able to get the last persisted string value which you had entered in the last closed console.

If you go to the memory mapped file location which you had specified in the code, you will see a file is created in the folder and the content is in binary form. That's the value in the virtual memory.

If you are interested with my source code, feel free to download from HERE.

Send Transactional SMS with API

This post cover how to send transactional SMS using the Alibaba Cloud Short Message Service API. Transactional SMS usually come with One Tim...