using System;
using System.Threading;
using Chernobyl.DesignPatterns.Visitor;
using Chernobyl.Threading.Pools.Visitors;
namespace Chernobyl.Threading.Pools
{
///
/// A pool used for synchronous jobs. This pool features
/// the following:
///
/// - Synchronizes with other SynchronizedPools so that,
/// when a pool is depleted of all of it's jobs, it waits for other
/// pools to be depleted as well before "refreshing" it's job list.
/// - While it waits for other pools to deplete their job lists, it
/// attempts to get jobs from other pools to help free them of work.
/// - It does not return any jobs it takes from other pools. Instead
/// it assumes that those pools were to full and keeps the job.
///
///
public class SynchronizedPool : ThreadPool, IHost>
{
///
/// Constructor.
///
/// The next pool in the circular reference
/// chain of pools.
public SynchronizedPool(SynchronizedPool nextCachedJobPool)
{
AddSynchronizedPoolVisitor visitor = new AddSynchronizedPoolVisitor(nextCachedJobPool);
visitor.Visit(this);
}
///
/// Constructor.
///
public SynchronizedPool()
{}
///
/// Takes an object from the pool. If the pool has
/// no more objects to give or it does not want to
/// give an object out then null will be returned.
///
/// The object to fill with
/// the taken object.
/// True if there was an object that could be
/// taken from the pool and that object was assigned to
/// , false if otherwise.
public override bool Take(out Job objectToTake)
{
// are their any jobs left?
uint poolCount = 0;
objectToTake = RequestJob(this, this, ref poolCount);
if (objectToTake == null)
{
// their are no jobs left, we need to wait for other pools...
SynchronizingObject.Wait(poolCount);
}
lock (UncompletedJobs)
{
objectToTake = UncompletedJobs.First.Value;
}
return true;
}
///
/// Requests a job from this job pool. If this job pool
/// does not have a job then RequestJob() is called on the
/// next Synchronized pool.
///
/// The first pool to call this method. This is
/// the pool that wants a job.
/// The pool that is calling this function. You should
/// always pass 'this' as the parameter.
/// The number of times this method was called.
/// A job if one is available from our pool or any of the other pools;
/// null if no pool has a job.
public virtual Job RequestJob(SynchronizedPool requestingPool, SynchronizedPool callingPool, ref uint poolCount)
{
// check to see if we have looped back around to ourself
// requestingPool and callingPool will be equal if we
// have called this function on ourself.
if (requestingPool != callingPool)
if (requestingPool == this)
return null; // we have looped all the way around
// increment the pool count for this pool
poolCount++;
// check our uncompleted job list for jobs
lock(UncompletedJobs)
{
if (UncompletedJobs.First != null)
{
// we have an uncompleted job; send it back.
Job theJob = UncompletedJobs.First.Value;
UncompletedJobs.RemoveLast();
return theJob;
}
}
// we don't have a job; so lets ask the next
// job pool if he does.
if (Next == null)
throw new Exception("SynchronizedPools must be linked together in a circular referencing fashion.");
return Next.RequestJob(requestingPool, this, ref poolCount);
}
///
/// Takes in a visitor.
///
/// The visitor to take in.
public virtual void Accept(IVisitor visitor)
{
visitor.Visit(this);
}
///
/// The next job pool in the list of circular jobs pools.
///
public SynchronizedPool Next
{
get { return _Next; }
set { _Next = value; }
}
///
/// The backing field for .
///
SynchronizedPool _Next = null;
///
/// The object used to synchronize all of the SynchronizedPools.
///
public Synchronizer SynchronizingObject { get; internal set; }
///
/// The number of synchronized pools in this pool's circular
/// reference chain.
///
public uint SynchronizedPoolCount { get; internal set; }
///
/// The object used to synchronize SynchronizedPools
///
public class Synchronizer
{
///
/// Calling this methods causes a SynchronizedPool to wait
/// at it. When all of the other SynchronizedPools have called
/// this method, they will all be released (return) from this method.
///
/// The number of pools that will need to call this
/// method before all the pools will be released from it.
public void Wait(uint poolCount)
{
lock (_locker)
{
WaitCount++;
if (WaitCount >= poolCount)
{
// reset the clock in count
WaitCount = 0;
// tell all of the workers to begin working again
Monitor.PulseAll(_locker);
}
else
{
// this thread has to wait for all the other
// threads to finish; when the last one has
// it will call PulseAll.
Monitor.Wait(_locker);
}
}
}
///
/// The number of times the
///
uint WaitCount { get; set; }
///
/// The object used to 'lock' the threads in the
/// Wait() method.
///
readonly object _locker = new object();
}
}
}