Geeks With Blogs
Doug.Instance Improving the world one post at a time

I'm working on an application that lets users run tasks that are "long-running" (could take 5-10 seconds depending on the server load).  For one or two users, this would practically be real time.  However, when I start to scale I need to be able to let users know how long they will need to wait and optionally throttle how often they can run these tasks.  Since I also have a general need for push messages from the server, I decided to give Websockets a try.  I understand that this is kind of like using a sledgehammer to drive in a thumbtack, but I figured I would eventually abstract the messaging system so I could replace it with COMET or SignalR later. What I was looking for in the process looks something like this:

The key here is that I don't want to host a separate process to sit and listen to a message queue. I want all the clients to be able to basically sit and watch the queue themselves and then run their own task and somewhat elegantly receive the response.  Not shown in this diagram is another key concept - I want to recognize when the client disconnects and mark the task as abandoned so it doesn't run.

The first step of the process is to queue the task in an action in my web API controller:

        public bool QueueTask([FromBody] TaskInfo taskInfo)
        {
            // Only allow one item in queue per account.
            AccountInfo account = AccountService.GetAccount(User.Identity.Name);

            if (account == null || TaskQueue.Any(i => i.AccountId == account.Id))
            {
                return false;
            }

            taskInfo.AccountId = account.Id;

            TaskQueue.Enqueue(taskInfo);

            return true;
        }

In this code, AccountService is application-specific code injected into my API controller and allows me to get info on the current user. TaskQueue is an in-memory FIFO queue based which is basically just a wrapper for the ConcurrentQueue class.  It would only take some minor changes to implement this solution using a ConcurrentQueue instance instead of this custom class.  This method simply returns true if the data (defined by the TaskInfo class) is added to the queue successfully.

Next, we need to open the socket so we can return the response once the task is processed from the queue:

        [HttpGet]
        public HttpResponseMessage OpenSocket()
        {
            HttpContext.Current.AcceptWebSocketRequest(ProcessQueue);

            return new HttpResponseMessage(HttpStatusCode.SwitchingProtocols);
        }

This is all it takes to open a socket.  The magic then happens in the ProcessQueue method:

        private async Task ProcessQueue(AspNetWebSocketContext context)
        {
            WebSocket socket = context.WebSocket;

            AccountInfo account = null;

            // Use new instance since service is disposed on API thread
            using (Services.Sql.AccountService accountService = 
                new Services.Sql.AccountService())
            {
                account = accountService.GetAccount(User.Identity.Name);
            }

            if (account == null)
            {
                return;
            }

            while (true)
            {
                try
                {

                    if (socket.State == WebSocketState.Open)
                    {
                        TaskInfo taskInfo = TaskQueue.Peek();

                        // Run task and dequeue if it is for current user
                        if (taskInfo.AccountId == account.Id)
                        {
                            await RunTask(context, taskInfo);
                            TaskQueue.Dequeue();
                            break;
                        }
                        // Dequeue item if it is abandoned (possible) or complete (shouldn't happen)
                        else if (taskInfo.IsAbandoned || taskInfo.IsComplete)
                        {
                            TaskQueue.Dequeue();
                        }
                        // Break out of loop if queue is empty (shouldn't happen)
                        else if (!TaskQueue.Any())
                        {
                            break;
                        }
                        // Wait for queue to process
                        else
                        {
                            await Task.Delay(100);
                        }
                    }
                    else
                    {
                        // Mark task abandoned if socket closed or aborted
                        if (socket.State == WebSocketState.Aborted ||
                            socket.State == WebSocketState.Closed)
                        {
                            AbandonTask(account.Id);
                            break;
                        }
                    }
                }
                catch (Exception exp)
                {
                    // Log any errors and mark task abandoned
                    Logger.Error("Error processing task queue.", exp);

                    if (account != null)
                    {
                        AbandonTask(account.Id);
                    }

                    break;
                }
            }
        }

Really, there are only 3 conditions to consider in the loop - if the task is for the current user, abandoned, or otherwise wait. Since sockets will dequeue their own items when they are complete, there should never be a situation where the queue is empty or the next item is complete.

Another key of this process working is that the task itself runs in a loop. Therefore, it actually runs async and sends data back to the client as it runs which ensures the socket doesn't close in the middle of the process:

        public async Task RunTask(AspNetWebSocketContext context, TaskInfo taskInfo)
        {
            TaskRunner taskRunner = new TaskRunner(taskInfo);

            while (!taskRunner.IsComplete)
            {
                taskRunner.Tick();
                await SendMessage(context, taskRunner.Entries[taskRunner.Entries.Count - 1]);
            }

            taskInfo.IsComplete = true;
        }

For anyone interested, the code to send the messages looks like this:

        private async Task SendMessage(AspNetWebSocketContext context, object message)
        {
            await SendMessage(context, JsonConvert.SerializeObject(message));
        }

        private async Task SendMessage(AspNetWebSocketContext context, string message)
        {

            try
            {
                if (context != null && context.WebSocket.State == WebSocketState.Open)
                {
                    byte[] messageData = Encoding.UTF8.GetBytes(message);
                    var outputBuffer = new ArraySegment(messageData);
                    await context.WebSocket.SendAsync(outputBuffer, WebSocketMessageType.Text,
                        true, CancellationToken.None);
                }
            }
            catch (Exception exp)
            {
                Logger.Error("Error sending practice results.", exp);
            }
        }

Since I am using AngularJS for the client, I simply queue the task with HTTP POST and then watch the socket for a response:

    function queueTask(taskInfo) {
      $http.post('api/TaskData/QueueTask',
        taskInfo
      ).then(function(result) {
        if (result.data) {
          openSocket();
        }
      });
    }
    
    function openSocket() {
      var results = {
        Entries: []
      };
      
      if (!svc.socket || true) {
        svc.socket = new WebSocket('ws://localhost:54320/api/TaskData/OpenSocket')
        svc.socket.onopen = function () {
          console.log("connected");
        };
        svc.socket.onmessage = function (evt) {
          results.Entries.push(JSON.parse(evt.data));
          console.log(evt.data);
        };
        svc.socket.onerror = function (evt) {
          console.log(evt.message);
        };
        svc.socket.onclose = function (evt) {
          svc.results = results;
          toaster.pop('success', 'Task complete');
console.log("disconnected"); }; } }

That's it! Unfortunately, I didn't find out until after I completed this implementation that GoDaddy does not support Websockets so I will be moving to a different communication mechanism sooner rather than later (most likely COMET).

Posted on Saturday, January 2, 2016 10:48 AM | Back to top


Comments on this post: In-Memory Queue with Web API 2 and Websockets

No comments posted yet.
Your comment:
 (will show your gravatar)


Copyright © Doug Lampe | Powered by: GeeksWithBlogs.net