Monday, December 16, 2013

File System Watcher Integrate with Windows Workflow Foundation

Today I want to share how to integrate file system watcher in workflow. There are a few ways to do it. Option 1 (simple way) is whenever there is a file is dropped into a folder, the file system watcher will kick in and spawn a workflow instance to process your file. Option 2 (hard way) is to integrate file system watcher into the workflow which mean you need just 1 workflow instance to wait for file watcher event fire then process the file. I want to share the hard way.

Concept

Here is the flow of the concept. Picture is worth a thousand words.


Challenges

In order to implement the above flow, I need a custom workflow activity to handle the logic. I chose to use NativeActivity is because I need to use the bookmarking feature. Bookmarking can induce idle, I want to keep my workflow instance idle state whenever there is no file exists in the folder.

Next challenge is whenever an instance has gone idle, the activity context will be different when the file system watcher event kick in to wake the instance to resume the process. Therefore, I need a custom workflow instance extension to handle the bookmark resume by implementing IWorkflowInstanceExtension.

Implementation

First, prepare the custom workflow instance extension which can support the resume bookmark functionality.

public class FileReceiveExtension : IWorkflowInstanceExtension
{
    private WorkflowInstanceProxy _proxy;

    public IEnumerable<object> GetAdditionalExtensions()
    {
        return null;
    }

    public void SetInstance(WorkflowInstanceProxy instance)
    {
        _proxy = instance;
    }

    public void ResumeBookmark(Bookmark bookmark, FileInfo file)
    {
        IAsyncResult result = _proxy.BeginResumeBookmark(bookmark, file,
            (asyncResult) =>
            {
                    
            }, null);
    }
}

Next, create a custom NativeActivity then add the custom workflow instance extension into the activity cache metadata.

protected override void CacheMetadata(NativeActivityMetadata metadata)
{
    metadata.AddDefaultExtensionProvider<FileReceiveExtension>(() => new FileReceiveExtension());
    base.CacheMetadata(metadata);
}

Set the CanInduceIdle property value to true.

protected override bool CanInduceIdle
{
    get
    {
        return true;
    }
}


Write the custom activity implementation to create file system watcher and register the necessary event handler.

protected override void Execute(NativeActivityContext context)
{
    //Get the folder path from the activity parameter
    string monitoringPath = context.GetValue(this.MonitoringPath);

    FileSystemWatcher watcher = new FileSystemWatcher();
    watcher.Path = monitoringPath;
    watcher.NotifyFilter = NotifyFilters.FileName | NotifyFilters.LastWrite;
    watcher.Created += watcher_Created;
    watcher.Error += watcher_Error;
    watcher.EnableRaisingEvents = true;

    _extension = context.GetExtension<FileReceiveExtension>();
    _bookmark = context.CreateBookmark(BookmarkResumed);
}

Write the implementation of file system watcher created event.

private void watcher_Created(object sender, FileSystemEventArgs e)
{
    //Resume the bookmark and pass the FileInfo object as activity parameter
    _extension.ResumeBookmark(_bookmark, new FileInfo(e.FullPath));
}

Write the implementation of bookmark resume event.

private void BookmarkResumed(NativeActivityContext context, Bookmark bookmark, object value)
{
    FileInfo file = (FileInfo)value;

    //Call your own file processor method
    //Note: I am using async for better performance (Optional)
    Task task = _fileProcessor.ProcessAsync(file);

    task.ContinueWith((e) =>
    {
        if (e.IsFaulted)
        {
            Console.WriteLine(e.Exception);
        }
    });

    //Create a new bookmark immediately after a file is being processed
    _bookmark = context.CreateBookmark(BookmarkResumed);
}

Draw the workflow with the new custom activity. The Start and Stop receive activities are used to control my file system watcher process by making service call.



Start hosting the workflow service, then call the Start web method to test it out. You only need to call the method once to spawn one workflow instance only. Since I had created the custom file receive activity that accept monitoring path, you can spawn another workflow instance to have another instance of file system watcher which monitor different file path.


If you are interested with the source code, feel free to download it from HERE.



Saturday, December 7, 2013

How to Limit Transaction Per Second (tps) in WCF?

Service throttling is one of the very useful features in WCF. You can simply configure your service with a custom service behavior to limit the number of service call, session and instance by using the following configuration:

      <serviceBehaviors>
        <behavior name="MyBehavior">
          <serviceThrottling 
            maxConcurrentCalls="1" 
            maxConcurrentSessions="1" 
            maxConcurrentInstances="1"
          />
        </behavior>
      </serviceBehaviors>

However, the WCF service throttling is affecting service level only. What if you have a requirement that your service should be limited to accept only 3 requests per second? You cannot precisely do the restriction and controlling with service throttling. Therefore, we have to control the transaction limit at the code level instead, the idea comes from Serena Yeoh.

First, we have to create a WCF service. My service setup is as follow:

[ServiceContract]
public interface ISampleService
{
    [OperationContract]
    string GetData(string value);

}

The GetData is one very simple and dumb operation. It just accept whatever string value and then return the same value back to client.

Next, your service has to be singleton. Why? Because I can only control the limit with 1 instance only.

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single)]

public class SampleService : ISampleService


The following is the method I used to control the transaction limit:

//Lock an object to prevent multiple instance access / execute the following code at the same time
lock (countLock)
{
    //StopWatch timer started in constructor
    //Check if the timer elapsed time is less than 1 second
    if (watch.ElapsedMilliseconds <= 1000)
    {
        //If there are 3 hits in less than 1 second
        if (counter >= 3)
        {
            //Prepare to sleep or wait until the next second time up
            //Calculate the sleep time by using 1 second time minus the elapsed time
            int sleepTime = (int)(1000 - watch.ElapsedMilliseconds);
            counter = 1; //reset back the hit counter

            Debug.WriteLine("Sleep : " + sleepTime + " ms");

            if (sleepTime >= 0)
                Thread.Sleep(sleepTime);

            watch.Restart();
        }
        else
        {
            //The hit is still within the limit, let it pass
            counter++;
        }
    }
    else
    {
        //The elapsed time has exceed 1 seconds
        //Reset the counter and timer
        //Proceed to process
        counter = 1;
        watch.Restart();
    }

}


In order to test my service, I have created a console program to call my WCF service.

class Program
{
    static void Main(string[] args)
    {
        //Parallelly or concurrently hitting my service
        //to test whether the service process more than 3 transactions per second
        Parallel.For(0, 100, i =>
        {
            SampleServiceClient proxy = new SampleServiceClient();
            Console.WriteLine(string.Format(
                "{0} Service call {1} : {2}",
                DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss.fff"),
                i,
                proxy.GetData("Test")));
        });

        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }
}

This is the result that I got, notice that there are only 3 transaction per second only even though I used parallel for loop to call my WCF service.



If you are interested with my source code, feel to download it from HERE.


Send Transactional SMS with API

This post cover how to send transactional SMS using the Alibaba Cloud Short Message Service API. Transactional SMS usually come with One Tim...