Skip to main content
deleted 49 characters in body
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238

Good Implementation of Queue with TPL?task parallel library

So I am new to programming using the .Net TPL FrameworkFramework and multi-threading in general. I have googled up some tutorials and articles and put together the solution below.

//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
            
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }

//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
    private static volatile clJobQueue instance;
    private static volatile BlockingCollection<clJob> queue;
    private static volatile CancellationTokenSource cts;

    private static object syncRoot = new Object();

    static clJobQueue() { }
    private clJobQueue() { }

    public static clJobQueue Instance
    {
        get
        {
            if (instance == null)
            {
                lock (syncRoot)
                {
                    if (instance == null)
                    {
                        instance = new clJobQueue();
                        queue = new BlockingCollection<clJob>(10);
                        cts = new CancellationTokenSource();
                    }
                }
            }

            return instance;
        }
    }

    public bool Add(clJob jobItem)
    {
        queue.Add(jobItem, cts.Token);
        jobItem.setJobStatus(JobStatus.Queued);
        jobItem.JobQueue = this;
        return true;
    }

    public clJob GetNextJob()
    {
        try
        {
            clJob nextJob = queue.Take(cts.Token);
            nextJob.setJobStatus(JobStatus.Scheduled);
            return nextJob;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Cancelling Queue");
        }
        return null;
    }

    public void CloseQueue()
    {
        cts.Cancel();
        Console.WriteLine("Closing Queue");
        while (!IsEmpty)
        {
            clJob job = queue.Take();
            job.setJobStatus(JobStatus.Scheduled);
        }
    }

    public int Count
    {
        get { return queue.Count; }
    }

    public bool IsEmpty
    {
        get { return Count <= 0; }
    }

    public void Populate()
    {
        Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
    }
}

//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }
}

Thanks Mobin

//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
            
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }

//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
    private static volatile clJobQueue instance;
    private static volatile BlockingCollection<clJob> queue;
    private static volatile CancellationTokenSource cts;

    private static object syncRoot = new Object();

    static clJobQueue() { }
    private clJobQueue() { }

    public static clJobQueue Instance
    {
        get
        {
            if (instance == null)
            {
                lock (syncRoot)
                {
                    if (instance == null)
                    {
                        instance = new clJobQueue();
                        queue = new BlockingCollection<clJob>(10);
                        cts = new CancellationTokenSource();
                    }
                }
            }

            return instance;
        }
    }

    public bool Add(clJob jobItem)
    {
        queue.Add(jobItem, cts.Token);
        jobItem.setJobStatus(JobStatus.Queued);
        jobItem.JobQueue = this;
        return true;
    }

    public clJob GetNextJob()
    {
        try
        {
            clJob nextJob = queue.Take(cts.Token);
            nextJob.setJobStatus(JobStatus.Scheduled);
            return nextJob;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Cancelling Queue");
        }
        return null;
    }

    public void CloseQueue()
    {
        cts.Cancel();
        Console.WriteLine("Closing Queue");
        while (!IsEmpty)
        {
            clJob job = queue.Take();
            job.setJobStatus(JobStatus.Scheduled);
        }
    }

    public int Count
    {
        get { return queue.Count; }
    }

    public bool IsEmpty
    {
        get { return Count <= 0; }
    }

    public void Populate()
    {
        Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
    }
}

//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }
}

Good Implementation of Queue with TPL?

So I am new to programming using the .Net TPL Framework and multi-threading in general. I have googled up some tutorials and articles and put together the solution below.

//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
            
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }

//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
    private static volatile clJobQueue instance;
    private static volatile BlockingCollection<clJob> queue;
    private static volatile CancellationTokenSource cts;

    private static object syncRoot = new Object();

    static clJobQueue() { }
    private clJobQueue() { }

    public static clJobQueue Instance
    {
        get
        {
            if (instance == null)
            {
                lock (syncRoot)
                {
                    if (instance == null)
                    {
                        instance = new clJobQueue();
                        queue = new BlockingCollection<clJob>(10);
                        cts = new CancellationTokenSource();
                    }
                }
            }

            return instance;
        }
    }

    public bool Add(clJob jobItem)
    {
        queue.Add(jobItem, cts.Token);
        jobItem.setJobStatus(JobStatus.Queued);
        jobItem.JobQueue = this;
        return true;
    }

    public clJob GetNextJob()
    {
        try
        {
            clJob nextJob = queue.Take(cts.Token);
            nextJob.setJobStatus(JobStatus.Scheduled);
            return nextJob;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Cancelling Queue");
        }
        return null;
    }

    public void CloseQueue()
    {
        cts.Cancel();
        Console.WriteLine("Closing Queue");
        while (!IsEmpty)
        {
            clJob job = queue.Take();
            job.setJobStatus(JobStatus.Scheduled);
        }
    }

    public int Count
    {
        get { return queue.Count; }
    }

    public bool IsEmpty
    {
        get { return Count <= 0; }
    }

    public void Populate()
    {
        Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
    }
}

//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }
}

Thanks Mobin

Queue with task parallel library

I am new to programming using the .Net TPL Framework and multi-threading in general. I have googled up some tutorials and articles and put together the solution below.

//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
            
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }

//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
    private static volatile clJobQueue instance;
    private static volatile BlockingCollection<clJob> queue;
    private static volatile CancellationTokenSource cts;

    private static object syncRoot = new Object();

    static clJobQueue() { }
    private clJobQueue() { }

    public static clJobQueue Instance
    {
        get
        {
            if (instance == null)
            {
                lock (syncRoot)
                {
                    if (instance == null)
                    {
                        instance = new clJobQueue();
                        queue = new BlockingCollection<clJob>(10);
                        cts = new CancellationTokenSource();
                    }
                }
            }

            return instance;
        }
    }

    public bool Add(clJob jobItem)
    {
        queue.Add(jobItem, cts.Token);
        jobItem.setJobStatus(JobStatus.Queued);
        jobItem.JobQueue = this;
        return true;
    }

    public clJob GetNextJob()
    {
        try
        {
            clJob nextJob = queue.Take(cts.Token);
            nextJob.setJobStatus(JobStatus.Scheduled);
            return nextJob;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Cancelling Queue");
        }
        return null;
    }

    public void CloseQueue()
    {
        cts.Cancel();
        Console.WriteLine("Closing Queue");
        while (!IsEmpty)
        {
            clJob job = queue.Take();
            job.setJobStatus(JobStatus.Scheduled);
        }
    }

    public int Count
    {
        get { return queue.Count; }
    }

    public bool IsEmpty
    {
        get { return Count <= 0; }
    }

    public void Populate()
    {
        Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
    }
}

//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }
}
Source Link

Good Implementation of Queue with TPL?

So I am new to programming using the .Net TPL Framework and multi-threading in general. I have googled up some tutorials and articles and put together the solution below.

It is a simple multithreaded queue and a scheduler that polls the queue and processes each item as a seperate task.

It would be great if you'll can comment on the solution and point out areas where I am doing things wrong or how it can be done more efficiently.

//Console app that starts the Queue and Scheduler
class Program
    {        
        static void Main(string[] args)
        {
            Console.WriteLine("Job Scheduler Started. Press Ctrl-C to end");

            Parallel.Invoke(
                () => clJobQueue.Instance.Populate(),
                () => clJobRunner.Instance.Start()
                );

            var autoResetEvent = new AutoResetEvent(false);
            Console.CancelKeyPress += (sender, eventArgs) =>
            {
                eventArgs.Cancel = true;
                autoResetEvent.Set();
            };

            autoResetEvent.WaitOne();
            
            Stop();
            Console.WriteLine("Job Scheduler Shutting Down");
            Console.ReadLine();
        }
        
        private static void Stop()
        {            
            Parallel.Invoke(
             () => clJobQueue.Instance.CloseQueue(),
             () => clJobRunner.Instance.Stop()
             ); 
        }
    }

//Singelton pattern Queue using a BlockingCollection.
//Contains a Populate method that polls a db and retreives new items
public sealed class clJobQueue
{
    private static volatile clJobQueue instance;
    private static volatile BlockingCollection<clJob> queue;
    private static volatile CancellationTokenSource cts;

    private static object syncRoot = new Object();

    static clJobQueue() { }
    private clJobQueue() { }

    public static clJobQueue Instance
    {
        get
        {
            if (instance == null)
            {
                lock (syncRoot)
                {
                    if (instance == null)
                    {
                        instance = new clJobQueue();
                        queue = new BlockingCollection<clJob>(10);
                        cts = new CancellationTokenSource();
                    }
                }
            }

            return instance;
        }
    }

    public bool Add(clJob jobItem)
    {
        queue.Add(jobItem, cts.Token);
        jobItem.setJobStatus(JobStatus.Queued);
        jobItem.JobQueue = this;
        return true;
    }

    public clJob GetNextJob()
    {
        try
        {
            clJob nextJob = queue.Take(cts.Token);
            nextJob.setJobStatus(JobStatus.Scheduled);
            return nextJob;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Cancelling Queue");
        }
        return null;
    }

    public void CloseQueue()
    {
        cts.Cancel();
        Console.WriteLine("Closing Queue");
        while (!IsEmpty)
        {
            clJob job = queue.Take();
            job.setJobStatus(JobStatus.Scheduled);
        }
    }

    public int Count
    {
        get { return queue.Count; }
    }

    public bool IsEmpty
    {
        get { return Count <= 0; }
    }

    public void Populate()
    {
        Parallel.Invoke(
                () => {
                    while (!cts.IsCancellationRequested)
                    {
                        Console.WriteLine("Looking for X Job");
                        clJob job = clXJob.getJob();
                        if (job.JobID == 0)
                        {
                            Console.WriteLine("No X Jobs in DB, Sleeping for 60s");
                            Thread.Sleep(TimeSpan.FromSeconds(60));
                        }
                        else
                        {
                            Console.WriteLine("Adding X Job to Queue - {0} {1}", job.JobID, job.JobName);
                            Add(job);
                        }
                    }
                },
            () => {
                while (!cts.IsCancellationRequested)
                {
                    Console.WriteLine("Looking for Y Job");
                    clJob job = clYJob.getJob();
                    if (job == null || job.JobID == 0)
                    {
                        Console.WriteLine("No Y Jobs in DB, Sleeping for 60s");
                        Thread.Sleep(TimeSpan.FromSeconds(60));
                    }
                    else
                    {
                        Console.WriteLine("Adding Y Job to Queue - {0} {1}", job.JobID, job.JobName);
                        Add(job);
                    }
                }
            });
    }
}

//Polls the queue for items and initiates the items Run() method as a new Task
public sealed class clJobRunner
    {
        private static volatile clJobRunner instance;
        private static volatile CancellationTokenSource cts;

        static clJobRunner() { }
        private clJobRunner() { }

        public static clJobRunner Instance
        {
            get
            {
                if (instance == null)
                {
                    instance = new clJobRunner();
                    cts = new CancellationTokenSource();
                }

                return instance;
            }
        }

        public void Start()
        {
            Task.Run(() =>
            {
                while (!cts.IsCancellationRequested)
                {
                    clJob job;
                    if ((job = clJobQueue.Instance.GetNextJob()) == null)
                    {
                        Console.WriteLine("Queue Empty, Sleeping for 30s");
                        Thread.Sleep(TimeSpan.FromSeconds(30));
                        continue;
                    }

                    Console.WriteLine("Running Job - {0} {1}", job.JobID, job.JobName);
                    Task.Run(() => job.Run());
                }
                Console.WriteLine("CancellationRequested!");
            }, cts.Token);
        }

        public void Stop()
        {
            cts.Cancel();
            Console.WriteLine("Stopping Job Runner");
        }
}

Thanks Mobin