using System; using System.Threading; using Chernobyl.DesignPatterns.Visitor; using Chernobyl.Threading.Pools.Visitors; namespace Chernobyl.Threading.Pools { /// /// A pool used for synchronous jobs. This pool features /// the following: /// /// Synchronizes with other SynchronizedPools so that, /// when a pool is depleted of all of it's jobs, it waits for other /// pools to be depleted as well before "refreshing" it's job list. /// While it waits for other pools to deplete their job lists, it /// attempts to get jobs from other pools to help free them of work. /// It does not return any jobs it takes from other pools. Instead /// it assumes that those pools were to full and keeps the job. /// /// public class SynchronizedPool : ThreadPool, IHost> { /// /// Constructor. /// /// The next pool in the circular reference /// chain of pools. public SynchronizedPool(SynchronizedPool nextCachedJobPool) { AddSynchronizedPoolVisitor visitor = new AddSynchronizedPoolVisitor(nextCachedJobPool); visitor.Visit(this); } /// /// Constructor. /// public SynchronizedPool() {} /// /// Takes an object from the pool. If the pool has /// no more objects to give or it does not want to /// give an object out then null will be returned. /// /// The object to fill with /// the taken object. /// True if there was an object that could be /// taken from the pool and that object was assigned to /// , false if otherwise. public override bool Take(out Job objectToTake) { // are their any jobs left? uint poolCount = 0; objectToTake = RequestJob(this, this, ref poolCount); if (objectToTake == null) { // their are no jobs left, we need to wait for other pools... SynchronizingObject.Wait(poolCount); } lock (UncompletedJobs) { objectToTake = UncompletedJobs.First.Value; } return true; } /// /// Requests a job from this job pool. If this job pool /// does not have a job then RequestJob() is called on the /// next Synchronized pool. /// /// The first pool to call this method. This is /// the pool that wants a job. /// The pool that is calling this function. You should /// always pass 'this' as the parameter. /// The number of times this method was called. /// A job if one is available from our pool or any of the other pools; /// null if no pool has a job. public virtual Job RequestJob(SynchronizedPool requestingPool, SynchronizedPool callingPool, ref uint poolCount) { // check to see if we have looped back around to ourself // requestingPool and callingPool will be equal if we // have called this function on ourself. if (requestingPool != callingPool) if (requestingPool == this) return null; // we have looped all the way around // increment the pool count for this pool poolCount++; // check our uncompleted job list for jobs lock(UncompletedJobs) { if (UncompletedJobs.First != null) { // we have an uncompleted job; send it back. Job theJob = UncompletedJobs.First.Value; UncompletedJobs.RemoveLast(); return theJob; } } // we don't have a job; so lets ask the next // job pool if he does. if (Next == null) throw new Exception("SynchronizedPools must be linked together in a circular referencing fashion."); return Next.RequestJob(requestingPool, this, ref poolCount); } /// /// Takes in a visitor. /// /// The visitor to take in. public virtual void Accept(IVisitor visitor) { visitor.Visit(this); } /// /// The next job pool in the list of circular jobs pools. /// public SynchronizedPool Next { get { return _Next; } set { _Next = value; } } /// /// The backing field for . /// SynchronizedPool _Next = null; /// /// The object used to synchronize all of the SynchronizedPools. /// public Synchronizer SynchronizingObject { get; internal set; } /// /// The number of synchronized pools in this pool's circular /// reference chain. /// public uint SynchronizedPoolCount { get; internal set; } /// /// The object used to synchronize SynchronizedPools /// public class Synchronizer { /// /// Calling this methods causes a SynchronizedPool to wait /// at it. When all of the other SynchronizedPools have called /// this method, they will all be released (return) from this method. /// /// The number of pools that will need to call this /// method before all the pools will be released from it. public void Wait(uint poolCount) { lock (_locker) { WaitCount++; if (WaitCount >= poolCount) { // reset the clock in count WaitCount = 0; // tell all of the workers to begin working again Monitor.PulseAll(_locker); } else { // this thread has to wait for all the other // threads to finish; when the last one has // it will call PulseAll. Monitor.Wait(_locker); } } } /// /// The number of times the /// uint WaitCount { get; set; } /// /// The object used to 'lock' the threads in the /// Wait() method. /// readonly object _locker = new object(); } } }