7
\$\begingroup\$

I am in the process of writing a Cloud application (mostly hobby, learning) that required a very quick cache, queue, and messaging system. I have looked at a few different options from Microsoft (hosting is on Azure) and all seem to be slow (for my relative needs). Then I hit Redis, and the speed was right where I needed it to be for what I am using.

Other thoughts that I had before using this, is that I also want to keep my usage of components down to a minimum, in case I need to move from Azure to baremetal, etc, I can always host my own Redis.

I decided, for both learning and for sport, to write a queue system that could work as a AtMostOnce or AtLeastOnce that would be reliable over system failure. This class should also be able to be run on multiple machines (in this case workerroles) and be instanced by either IoC or manually.

The following is what I have so far, before I take care of some of the problems I have not implemented yet (cancellationTokens, shared ConnectionMultiplexer for instance). The following code does work as I have tested it on 3 different WorkerRoles instances, while also testing crashes and reboots. My concerns are aimed more at problems that I don't see, performance issues, and my lack of general experience. Feel free to tell me if I'm doing anything wrong, but be aware that I am aware there are packages out there already. I just like to do things myself.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace CitySurvival.WorkerCommon
{
    /// <summary>
    /// Needed: (2) Redis queues 1 for new messages, 1 for currently processing messages
    /// Needed: processing messages list is FILO
    /// 
    /// The queues will only contain the key to the message in redis, which is stored as
    /// a single entity for quick lookup
    /// 
    /// jobQueue  -- processingQueue
    /// job:1        job:2
    /// 
    /// job:1 (job to do index 1)
    /// job:2 (job to do index 2)
    /// 
    /// Finish method, will LREM key, and Remove Key from database
    /// 
    /// ON adding a new job, send a Publish to say a new job is added
    /// 
    /// ON taking a job, RPOPLPUSH from jobQueue to processingQueue
    /// 
    /// Checking for failed jobs, experation time 10 seconds (this should be long enough 
    /// to process anything)
    /// If job stays in processingQueue for longer than timeout, RPOPLPUSH to jobQueue
    /// 
    /// TODO: cancellationTokens (being in with autofac for global token or use Factory param)
    /// TODO: Get ConnectionMultiplexer from Constructor, or Factory
    /// </summary>
    public class RedisJobQueue
    {
        public delegate RedisJobQueue Factory(string jobName);
        private IConnectionMultiplexer ConnectionMultiplexer => _lazyConnection.Value;

        private readonly Lazy<IConnectionMultiplexer> _lazyConnection = new Lazy<IConnectionMultiplexer>(() => StackExchange.Redis.ConnectionMultiplexer.Connect("ConnctionString"));

        private readonly string _jobQueue;
        private readonly string _processingQueue;
        private readonly string _subChannel;
        private readonly string _jobName;

        private Task _managementTask;

        private bool _receiving;

        public event EventHandler<JobReceivedEventArgs> OnJobReceived; 

        public RedisJobQueue(/*ConnectionMultiplexer multiplexer, */string jobName)
        {
            //_connectionMultiplexer = multiplexer;
            _jobQueue = jobName + ":jobs";
            _processingQueue = jobName + ":process";
            _subChannel = jobName + ":channel";
            _jobName = jobName;
        }



        private IDatabase Database => ConnectionMultiplexer.GetDatabase();

        /// <summary>
        /// When a job is finished, remove it from the processingQueue and from the
        /// cache database.
        /// </summary>
        /// <param name="key"></param>
        /// <param name="failed">Operation failed, requeue for another attempt</param>
        public async Task Finish(string key, bool failed = false)
        {
            var db = Database;
            await db.ListRemoveAsync(_processingQueue, key);

            if (failed)
            {
                // How many times to fail before dead
                if (await db.HashExistsAsync(key, "failedcount"))
                {
                    var count = await db.HashGetAsync(key, "failedcount");
                    if (count.IsInteger)
                    {
                        if ((int) count >= 10)
                        {
                            // for now, delete the key, later we might integrate a dead message
                            // queue
                            await db.KeyDeleteAsync(key);
                            return;
                        }
                    }
                }

                db.HashIncrement(key, "failedcount");
                db.HashDelete(key, "active");
                db.ListRightPush(_jobQueue, key);

                ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
            }
            else
            {
                // Job was successfully run, remove the key
                await db.KeyDeleteAsync(key);
            }
        }

        /// <summary>
        /// Do we consume messages from the queue
        /// </summary>
        /// <returns></returns>
        public RedisJobQueue AsConsumer()
        {
            var sub = ConnectionMultiplexer.GetSubscriber();
            sub.Subscribe(_subChannel, (channel, value) => HandleNewJobs());

            // Assume on starting that we have jobs waiting to be handled
            HandleNewJobs();
            return this;
        }

        /// <summary>
        /// Runs a Task every 10 seconds to see if any remaining items are in
        /// processing queue
        /// </summary>
        /// <returns></returns>
        public RedisJobQueue AsManager()
        {
            _managementTask = Task.Factory.StartNew(async () =>
            {
                while (true)
                {
                    await Task.Delay(10000);
                    var timeToKill = (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds - 10000;
                    RedisValue[] values = Database.ListRange(_processingQueue);
                    foreach (var value in from value in values let activeTime = (double)Database.HashGet((string)value, "active") where activeTime < timeToKill select value)
                    {
                        await Finish(value, true);
                    }
                }
            });

            return this;
        }

        /// <summary>
        /// Move key from JobQueue to processingQueue, get key value from cache.
        /// 
        /// Also set the active field. Indicates when job was retrieved so we can monitor
        /// its time.
        /// </summary>
        /// <returns></returns>
        private Dictionary<RedisValue, RedisValue> GetJob()
        {
            Dictionary<RedisValue, RedisValue> value;
            while (true)
            {
                string key = Database.ListRightPopLeftPush(_jobQueue, _processingQueue);
                // If key is null, then nothing was there to get, so no value is available
                if (string.IsNullOrEmpty(key))
                {
                    value = new Dictionary<RedisValue, RedisValue>();
                    break;
                }

                Database.HashSet(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
                value = Database.HashGetAll(key).ToDictionary();

                if (value.Count == 0)
                {
                    Database.ListRemove(_processingQueue, key);
                    continue;
                }
                value.Add("key", key);
                break;
            }
            return value;
        }

        /// <summary>
        /// Move key from JobQueue to processingQueue, get key value from cache.
        /// 
        /// Also set the active field. Indicates when job was retrieved so we can monitor
        /// its time.
        /// </summary>
        /// <returns></returns>
        private async Task<Dictionary<RedisValue, RedisValue>> GetJobAsync()
        {
            var db = Database;
            Dictionary<RedisValue, RedisValue> value;
            while (true)
            {
                string key = await db.ListRightPopLeftPushAsync(_jobQueue, _processingQueue);
                // If key is null, then nothing was there to get, so no value is available
                if (string.IsNullOrEmpty(key))
                {
                    value = new Dictionary<RedisValue, RedisValue>();
                    break;
                }

                await db.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
                value = (await db.HashGetAllAsync(key)).ToDictionary();

                // if Count is 0, remove it and check for the next job
                if (value.Count == 0)
                {
                    await db.ListRemoveAsync(_processingQueue, key);
                    continue;
                }

                value.Add("key", key);

                break;
            }
            return value;
        }

        /// <summary>
        /// We have received an indicator that new jobs are available
        /// We process until we are out of jobs.
        /// </summary>
        private async void HandleNewJobs()
        {
            if (_receiving)
            {
                Trace.WriteLine("Already Receiving Jobs...");
                return;
            }
            _receiving = true;
            Trace.WriteLine("Trying to get jobs...");
            var job = await GetJobAsync();
            // If a valid job cannot be found, it will return an empty Dictionary
            while (job.Count != 0)
            {
                // Fire the Event
                OnJobReceived?.Invoke(this, new JobReceivedEventArgs(job, job["key"]));
                // Get a new job if there is one
                job = await GetJobAsync();
            }
            _receiving = false;
        }

        /// <summary>
        /// Add a job to the Queue
        /// </summary>
        /// <param name="job"></param>
        public void AddJob(RedisValue job)
        {
            if (job.IsNullOrEmpty) return;

            var id = Database.StringIncrement(_jobName + ":jobid");
            var key = _jobName + ":" + id;
            Database.HashSet(key, "payload", job);
            Database.ListLeftPush(_jobQueue, key);
            ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
            Trace.WriteLine("Added Job");
        }

        /// <summary>
        /// Add a job to the Queue (async)
        /// </summary>
        /// <param name="job"></param>
        public async Task AddJobAsync(RedisValue job)
        {
            if (job.IsNullOrEmpty) return;

            var id = await Database.StringIncrementAsync(_jobName + ":jobid");
            var key = _jobName + ":" + id;
            await Database.HashSetAsync(key, "payload", job);
            await Database.ListLeftPushAsync(_jobQueue, key);
            await ConnectionMultiplexer.GetSubscriber().PublishAsync(_subChannel, "");
            Trace.WriteLine("Added Job");
        }
    }
}

New Code

I have added and edited a few items from the original code, have also incorperated some changes listed below. The new code can be found on Github in this gist GitHub Gist for Redis Job/Message Queue

\$\endgroup\$

2 Answers 2

3
\$\begingroup\$
  • dead code like in the constructor arguments for RedisJobQueue should be deleted. To keep track of changes a version control system like GIT or SVN should be used.

  • You are using C# 6.0 so you can make use of the string interpolation using the $ operator like so

    public RedisJobQueue(string jobName)
    {
        _jobQueue =  $"{jobName}:jobs";
        _processingQueue = $"{jobName}:process";
        _subChannel = $"{jobName}:channel";
        _jobName = jobName;
    }  
    
  • if possible reduce horizontal spacing by using guard conditions. For the Finish() method this can be done easily by inverting the condition of the if and returning early like so

    public async Task Finish(string key, bool failed = false)
    {
        var db = Database;
        await db.ListRemoveAsync(_processingQueue, key);
    
        if (!failed)
        {
            await db.KeyDeleteAsync(key);
            return;
        }
    
        // How many times to fail before dead
        if (await db.HashExistsAsync(key, "failedcount"))
        {
            var count = await db.HashGetAsync(key, "failedcount");
            if (count.IsInteger)
            {
                if ((int) count >= 10)
                {
                    // for now, delete the key, later we might integrate a dead message
                    // queue
                    await db.KeyDeleteAsync(key);
                    return;
                }
            }
        }
    
        db.HashIncrement(key, "failedcount");
        db.HashDelete(key, "active");
        db.ListRightPush(_jobQueue, key);
    
        ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
    }
    

    you could consider to change the parameter failed to success with a default value of true to prevent the negative check !failed, but this will involve some major changes to the caller of this method.

  • you can remove the duplicated code where you have both , synchronously and asynchronously methods, like so

    private Dictionary<RedisValue, RedisValue> GetJob()
    {
        return Task.Run(GetJobAsync).Result;
    }  
    

    See also: calling-async-method-synchronously

\$\endgroup\$
1
  • \$\begingroup\$ I will be working on this some more later to night, but one suggestion that doesn't seem to make sense. I had thought that the 'Task.Run(GetJobAsync).Result' would work originally myself. I have seen that pattern enough :), but done this simply for me returns Ambigious invocation errors. So I have to use Run(() => GetJobAsync()) instead, not a big deal. \$\endgroup\$ Commented Sep 24, 2015 at 6:58
2
\$\begingroup\$

In AsManager you start a long running task - you should pass the TaskCreationOptions.LongRunning to StartNew to help the framework making better decisions.

Also the task runs in a while (true) loop without an obvious bailout condition. It would probably a bit cleaner if you pass a CancellationToken to it so you can cancel the task and end it when shutting down

\$\endgroup\$
1
  • \$\begingroup\$ Thanks for the advice, I have already changed all the while(true)s to use cancellationTokens (check out the gist I linked at the bottom), I wish CR would allow the use of gists in questions. I didn't think about the TaskCreationOptions, i will look into that. \$\endgroup\$ Commented Sep 24, 2015 at 8:39

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.