Однако. Мы можем создать классы реализующие один и тот же интерфейс IRunable но различные по своей функциональности. Передать в них работу, загрузить их в TaskCollection, и получить результаты параллельного выполнения не отходя от линейного алгоритма исходной задачи.
Сложность состоит в ПОЛНОМ распараллеливании всей задачи исключив взаимосвязи. А после отработки собрать назад готовые результаты воедино.
Пример класса может быть таким:
class Task : IRunable
{
public Task(string str)
{
this.str = str;
}
string str = null;
/// <summary>
/// Отсюда можно забрать результаты
/// </summary>
public int Len { get; private set; }
public void Run()
{
//фоновые операции
Len = str.Length;
//завершение
}
}
Таких классов может быть создано множество. Далее пишем:
TaskCollection<IRunable> tasks = new TaskCollection<IRunable>(2); // 2=число потоков
tasks.Add(new Task("Первая"));
tasks.Add(new Task("Вторая задача"));
...
tasks.Add(new Task("Трёхсотая задача"));
Замечу что можно класть ЛЮБЫЕ задачи реализовавшие интерфейс IRunable:
class CalcProg : IRunable {...}
class Task666 : IRunable {...}
tasks.Add(new CalcProg(333,600,Color.Black));
tasks.Add(new Task666("Третья задача", 666));
Затем вызывается метод RunWait() выполняющий пуск обработки.
Благодаря интерфейсам задачи могут быть совершенно разные, а выполнены будут параллельно.
tasks.RunWait();
Текущий поток ждёт завершения всех задач. В случае ошибки она попадает сюда
tasks.ErrorResults // завершения с ошибками, здесь же хранятся Exception
в случае успеха в этот список
tasks.SuccessResults // успешные завершения
Когда вы кладёте задачу то можете задать приоритет её выполнения:
tasks.Add(new Task("текст"), 0);
tasks.Add(new Task("текст"), 15);
tasks.Add(new Task("текст"), 10);
Если это будет работать в 2 потока, то первыми выполнятся 2 и 3 задачи, потом 1.
В 1-поточном варианте порядок будет 2,3,1.
public interface IRunable
{
void Run();
}
public class TaskCollection<T> : IEnumerable<T> where T : IRunable
{
#region IEnumerable<T> support
protected class ItemContainer
{
public ItemContainer(T item, int priority)
{
Item = item; Priority = priority; Used = false;
}
public T Item { get; private set; }
public int Priority { get; set; }
public bool Used { get; set; }
}
protected List<ItemContainer> itemsPriority = new List<ItemContainer>();
protected List<T> items = new List<T>();
IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
return items.GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return items.GetEnumerator();
}
#endregion
static EventWaitHandle waitHandle = new ManualResetEvent(false);
public TaskCollection(uint maxParallel)
{
if (maxParallel == 0) throw new Exception("Параметр конструктора uint maxParallel не может быть равен нулю");
if (maxParallel > int.MaxValue) throw new Exception("Параметр конструктора uint maxParallel не может быть таким большим");
this.maxParallel = (int)maxParallel;
}
int maxParallel = 1;
int curr = 0;
void add(T item, int priority)
{
if (items.Contains(item) == false)
{
items.Add(item);
itemsPriority.Add(new TaskCollection<T>.ItemContainer(item, priority));
}
}
void remove(T item)
{
items.Remove(item);
itemsPriority.RemoveAll(z => (object)z.Item == (object)item);
}
public void RunWait()
{
runThread();
waitHandle.Reset(); //сброс всех статусов
waitHandle.WaitOne(); // ожидание завершения
}
public void Clear()
{
lock (locker)
{
waitHandle.WaitOne(); // нужно быть уверенным что "МОЖНО" стереть
items.Clear();
itemsPriority.Clear();
succesed.Clear();
errored.Clear();
}
}
object locker = new object();
public void Add(T item, int priority)
{
lock (locker)
{
add(item, priority);
}
}
public void Add(T item)
{
lock (locker)
{
add(item, 0);
}
}
public void Remove(T item)
{
lock (locker)
{
remove(item);
}
}
void runThread()
{
lock (locker)
{
if (curr < maxParallel)
{
ItemContainer[] itemCont = itemsPriority.Where(w2 => w2.Used == false)
.Where(z => z.Priority ==
itemsPriority.Where(w => w.Used == false).Max(m => m.Priority)
).Take(maxParallel - curr).ToArray();
if (itemCont.Length != 0)
{
foreach (ItemContainer cont in itemCont)
{
cont.Used = true;
curr++;
ThreadPool.QueueUserWorkItem(threadBody, cont);
}
}
}
}
}
void threadBody(object obj)
{
Exception exception = null;
ItemContainer itemContainer = obj as ItemContainer;
IRunable r = itemContainer.Item as IRunable;
try
{
if (r != null) r.Run();
else throw new Exception("Объект типа " + typeof(T).ToString() + " не реализует интерфейс IRunable");
}
catch (Exception ex)
{
exception = ex;
}
finally
{
bool needAddTasks = false;
lock (locker)
{
if (exception != null)
errored.Add(new Rezult<T>((T)r, exception));
else
succesed.Add((T)r);
curr--;
bool hasUnusedItems = itemsPriority.FirstOrDefault(z => z.Used == false) != null && curr < maxParallel;
if (curr > 0 && hasUnusedItems == false) needAddTasks = false;
else if (curr < maxParallel && hasUnusedItems == true) needAddTasks = true;
if (curr == 0 && hasUnusedItems == false) waitHandle.Set(); //завершаю
}
if (needAddTasks) runThread(); //продолжаю
}
}
List<T> succesed = new List<T>();
List<Rezult<T>> errored = new List<Rezult<T>>();
/// <summary>
/// Задачи выполненные без ошибок
/// </summary>
public IEnumerable<T> SuccessResults { get { return succesed; } }
/// <summary>
/// Задачи с ошибкой
/// </summary>
public IEnumerable<Rezult<T>> ErrorResults { get { return errored; } }
}
public class Rezult<T>
{
public Rezult(T item, Exception err)
{
Item = item; Error = err;
}
public T Item { get; private set; }
public Exception Error { get; private set; }
}