пятница, 24 июля 2009 г.

TaskCollection - многопоточная обработка в линейных алгоритмах

Выполнение задач в несколько потоков требует управление этими потоками, поэтому приходится отойти от идеи линейного кодирования что усложняет код.

Однако. Мы можем создать классы реализующие один и тот же интерфейс IRunable но различные по своей функциональности. Передать в них работу, загрузить их в TaskCollection, и получить результаты параллельного выполнения не отходя от линейного алгоритма исходной задачи.

Сложность состоит в ПОЛНОМ распараллеливании всей задачи исключив взаимосвязи. А после отработки собрать назад готовые результаты воедино.

Пример класса может быть таким:





class Task : IRunable
{
public Task(string str)
{
this.str = str;
}

string str = null;
/// <summary>
/// Отсюда можно забрать результаты
/// </summary>
public int Len { get; private set; }

public void Run()
{
//фоновые операции
Len = str.Length;
//завершение
}
}




Таких классов может быть создано множество. Далее пишем:

TaskCollection<IRunable> tasks = new TaskCollection<IRunable>(2); // 2=число потоков

tasks.Add(new Task("Первая"));
tasks.Add(new Task("Вторая задача"));
...
tasks.Add(new Task("Трёхсотая задача"));

Замечу что можно класть ЛЮБЫЕ задачи реализовавшие интерфейс IRunable:

class CalcProg : IRunable {...}
class Task666 : IRunable {...}

tasks.Add(new CalcProg(333,600,Color.Black));
tasks.Add(new Task666("Третья задача", 666));

Затем вызывается метод RunWait() выполняющий пуск обработки.
Благодаря интерфейсам задачи могут быть совершенно разные, а выполнены будут параллельно.

tasks.RunWait();

Текущий поток ждёт завершения всех задач. В случае ошибки она попадает сюда
tasks.ErrorResults // завершения с ошибками, здесь же хранятся Exception
в случае успеха в этот список
tasks.SuccessResults // успешные завершения


Когда вы кладёте задачу то можете задать приоритет её выполнения:
tasks.Add(new Task("текст"), 0);
tasks.Add(new Task("текст"), 15);
tasks.Add(new Task("текст"), 10);

Если это будет работать в 2 потока, то первыми выполнятся 2 и 3 задачи, потом 1.
В 1-поточном варианте порядок будет 2,3,1.




public interface IRunable
{
void Run();
}

public class TaskCollection<T> : IEnumerable<T> where T : IRunable
{
#region IEnumerable<T> support
protected class ItemContainer
{
public ItemContainer(T item, int priority)
{
Item = item; Priority = priority; Used = false;
}
public T Item { get; private set; }
public int Priority { get; set; }
public bool Used { get; set; }
}
protected List<ItemContainer> itemsPriority = new List<ItemContainer>();
protected List<T> items = new List<T>();


IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
return items.GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return items.GetEnumerator();
}
#endregion

static EventWaitHandle waitHandle = new ManualResetEvent(false);

public TaskCollection(uint maxParallel)
{
if (maxParallel == 0) throw new Exception("Параметр конструктора uint maxParallel не может быть равен нулю");
if (maxParallel > int.MaxValue) throw new Exception("Параметр конструктора uint maxParallel не может быть таким большим");
this.maxParallel = (int)maxParallel;
}
int maxParallel = 1;
int curr = 0;

void add(T item, int priority)
{
if (items.Contains(item) == false)
{
items.Add(item);
itemsPriority.Add(new TaskCollection<T>.ItemContainer(item, priority));
}
}
void remove(T item)
{
items.Remove(item);
itemsPriority.RemoveAll(z => (object)z.Item == (object)item);
}

public void RunWait()
{


runThread();
waitHandle.Reset(); //сброс всех статусов
waitHandle.WaitOne(); // ожидание завершения
}
public void Clear()
{
lock (locker)
{
waitHandle.WaitOne(); // нужно быть уверенным что "МОЖНО" стереть
items.Clear();
itemsPriority.Clear();
succesed.Clear();
errored.Clear();
}
}


object locker = new object();
public void Add(T item, int priority)
{
lock (locker)
{
add(item, priority);
}
}
public void Add(T item)
{
lock (locker)
{
add(item, 0);
}
}
public void Remove(T item)
{
lock (locker)
{
remove(item);
}
}


void runThread()
{
lock (locker)
{
if (curr < maxParallel)
{
ItemContainer[] itemCont = itemsPriority.Where(w2 => w2.Used == false)
.Where(z => z.Priority ==
itemsPriority.Where(w => w.Used == false).Max(m => m.Priority)
).Take(maxParallel - curr).ToArray();
if (itemCont.Length != 0)
{
foreach (ItemContainer cont in itemCont)
{
cont.Used = true;
curr++;
ThreadPool.QueueUserWorkItem(threadBody, cont);
}
}
}
}
}
void threadBody(object obj)
{
Exception exception = null;
ItemContainer itemContainer = obj as ItemContainer;
IRunable r = itemContainer.Item as IRunable;
try
{
if (r != null) r.Run();
else throw new Exception("Объект типа " + typeof(T).ToString() + " не реализует интерфейс IRunable");
}
catch (Exception ex)
{
exception = ex;
}
finally
{
bool needAddTasks = false;
lock (locker)
{
if (exception != null)
errored.Add(new Rezult<T>((T)r, exception));
else
succesed.Add((T)r);

curr--;
bool hasUnusedItems = itemsPriority.FirstOrDefault(z => z.Used == false) != null && curr < maxParallel;
if (curr > 0 && hasUnusedItems == false) needAddTasks = false;
else if (curr < maxParallel && hasUnusedItems == true) needAddTasks = true;

if (curr == 0 && hasUnusedItems == false) waitHandle.Set(); //завершаю
}
if (needAddTasks) runThread(); //продолжаю
}
}

List<T> succesed = new List<T>();
List<Rezult<T>> errored = new List<Rezult<T>>();

/// <summary>
/// Задачи выполненные без ошибок
/// </summary>
public IEnumerable<T> SuccessResults { get { return succesed; } }

/// <summary>
/// Задачи с ошибкой
/// </summary>
public IEnumerable<Rezult<T>> ErrorResults { get { return errored; } }
}
public class Rezult<T>
{
public Rezult(T item, Exception err)
{
Item = item; Error = err;
}
public T Item { get; private set; }
public Exception Error { get; private set; }
}

ThreadInvoke - выполнение делегата в другом потоке

Иногда требуется выполнить делегат в другом потоке. Приведу пример самого простого способа. Здесь, когда мы пишем
threadInvoke.Run(methodToCall, ...
компилятор создаёт статический делегат GenericDelegate соотвествующий methodToCall.
ThreadInvoke выполняет ожидание завершения работы делегата, и передаёт управление назад в асинхронный поток. Объявлено 2 типа GenericDelegate - возвращающий назад значение и не возвращающий.

Пример использования класс Test.
Класс ThreadInvoke содержит рабочий код.







class Test
{
ThreadInvoke threadInvoke = new ThreadInvoke();

/// <summary>
/// Метод который будет вызван из асинхронного потока
/// </summary>
/// <param name="msg">Параметр переданный из асинъронного потока</param>
void methodToCall(string msg)
{
System.Windows.Forms.MessageBox.Show("Сообщение из асинхронного потока: " + msg);
}

/// <summary>
/// Метод который будет вызван из асинхронного потока
/// </summary>
/// <param name="msg">Параметр переданный из асинъронного потока</param>
/// <returns>Возвращает в асинхронный поток число</returns>
int methodToCall2(string msg)
{
return msg.Length;
}

/// <summary>
/// Точка входа в пример
/// </summary>
public void start()
{
new System.Threading.Thread(new System.Threading.ThreadStart(asyncMethod)).Start();
}

void asyncMethod()
{
// получение длины текста используя другой поток
int len = threadInvoke.Run<string, int>(methodToCall2, "асинхронный поток");
// передача параметра без ответа
threadInvoke.Run(methodToCall, "асинхронный поток получил значение " + len);
// Функиця methodToCall отработала т.к. threadInvoke.Run выполняет ожидание
}
}




/// <summary>
/// Привязывается к потоку и вызывает методы с ожиданием в другом потоке
/// </summary>
/// <typeparam name="TIn">Тип входящего параметра</typeparam>
/// <typeparam name="TOut">Тип выходящего параметра</typeparam>
public class ThreadInvoke
{
static EventWaitHandle thread = new AutoResetEvent(false);
SynchronizationContext sync;
public ThreadInvoke()
{
sync = AsyncOperationManager.SynchronizationContext; //сопоставил контекст
}

/// <summary>
/// Поменять рабочий поток на текущий
/// </summary>
public void ChangeThread()
{
sync = AsyncOperationManager.SynchronizationContext; //сопоставил контекст
}

/// <summary>
/// Поменять рабочий поток на контекст указанного потока
/// </summary>
/// <param name="context"></param>
public void ChangeThread(SynchronizationContext context)
{
sync = context; //сопоставил контекст
}

//SendOrPostCallback callback = new SendOrPostCallback(threadBody);
object locker = new object();

/// <summary>
/// Выполняет делегат на потоке и возвращает результат
/// </summary>
/// <typeparam name="TInput">Тип входящего значения</typeparam>
/// <typeparam name="TOutput">Тип выходящего значения</typeparam>
/// <param name="method">Делегат для запуска</param>
/// <param name="param">Передаваемый параметр</param>
/// <returns></returns>
public TOutput Run<TInput, TOutput>(GenericDelegate<TInput, TOutput> method, TInput param)
{
lock (locker)
{
EventWaitHandle thread = new AutoResetEvent(false);
//thread.Reset(); // включаю ожидание
Container1<TInput, TOutput> container = new Container1<TInput, TOutput>(param, method);
sync.Post(delegate(object perem)
{
Container1<TInput, TOutput> cont = perem as Container1<TInput, TOutput>;
try
{
object ret = cont.Method.Invoke(cont.InValue);
try
{
cont.OutValue = (TOutput)ret;
}
catch (Exception)
{
throw new Exception("Ошибка преобразрвания возвращённого значения к типу " + typeof(TOutput).ToString());
}
}
catch (Exception ex)
{
cont.Error = ex;
}
finally
{
thread.Set();// конец операции
}


}, container);
thread.WaitOne(-1);//жду когда окончится
if (container.Error != null) throw container.Error;
return container.OutValue;
}
}

public void Run<TInput>(GenericDelegate<TInput> method, TInput param)
{
lock (locker)
{
EventWaitHandle thread = new AutoResetEvent(false);
//thread.Reset(); // включаю ожидание
Container2<TInput> container = new Container2<TInput>(param, method);
sync.Post(delegate(object perem)
{
Container2<TInput> cont = perem as Container2<TInput>;
try
{
cont.Method.Invoke(cont.InValue);
}
catch (Exception ex)
{
cont.Error = ex;
}
finally
{
thread.Set();// конец операции
}


}, container);
thread.WaitOne(-1);//жду когда окончится
if (container.Error != null) throw container.Error;
}
}

class Container1<TIn1, TOut1>
{
public Container1(TIn1 inValue, GenericDelegate<TIn1, TOut1> method)
{
InValue = inValue;
Method = method;
}
public TIn1 InValue { get; private set; }
public TOut1 OutValue { get; set; }
public Exception Error { get; set; }
public GenericDelegate<TIn1, TOut1> Method { get; private set; }
}
class Container2<TIn1>
{
public Container2(TIn1 inValue, GenericDelegate<TIn1> method)
{
InValue = inValue;
Method = method;
}
public TIn1 InValue { get; private set; }
public Exception Error { get; set; }
public GenericDelegate<TIn1> Method { get; private set; }
}

}

/// <summary>
/// Делегат для запуска чего-либо на другом потоке
/// </summary>
/// <typeparam name="TInput">Тип входящего значения</typeparam>
/// <typeparam name="TOutput">Тип выходящего значения</typeparam>
/// <param name="perem">Параметр передаваемый в метод</param>
/// <returns>Возвращаемое значение</returns>
public delegate TOutput GenericDelegate<TInput, TOutput>(TInput perem);
public delegate void GenericDelegate<TInput>(TInput perem);