Ярлыки

.Net (17) (9) 1с8 (4) 1с8.2 УП (3) документирование (1) интеграция (10) карта (1) собеседование (1) ado (1) ajax (11) ASP (1) asp.net (10) authentication (1) c# (14) coding (2) Crm Ribbon (2) csv (1) datareader (1) delegate (5) dhtml (5) exam (1) excel (10) ext (1) extjs (8) google maps (1) iis (3) javascript (33) JSON (5) linq (1) LN (5) log (1) lotus notes (5) mail (1) MS CRM (63) MS CRM 2015 (3) MS CRM 5 (48) MS CRM 6 (28) ms office (2) msi (1) MVC (1) namespace (1) oData (3) outlook (1) parent-child (2) plugin (1) program (4) Project Management (1) remote debugger (1) REST (1) SharePoint (1) SharePoint 2010 (2) Silverlight (2) soft (1) sql (13) sql reporting service (8) sql2005 (3) ssrs (2) Thread (4) tree (1) vb (6) vba (1) VSTO (1) WCF (4) wmi (1) wsc (2) xml (1) Xrm.Page (1) xslt (1)

среда, 9 мая 2012 г.

.Net: Про потоки

Здесь кратко о том, как работать с потоками на C#. Создание, стыковка, APM, синхронизация/блокировка. В хронологическом порядке от версии net 1.1 к 4.0.


1. Схема процесса .Net
2.1. Просто поток
2.2. Поток из пула.
2.3. Фоновые и активные потоки
3. APM (Asynchronous Programming Model)
4.1. Синхронизация потоков
4.2. Обновление элементов форм из неосновного потока
4.3. Класс BackgroundWorker
4.4. Асинхронные операции в веб-службах ASP.NET
4.5. Асинхронные операции на страницах ASP.NET
4.6. Назначение асинхронных моделей
5. Новое из Framework 4.
5.1. Делегаты Action & Func
5.2. Task (задания)
5.3. Пример заданий
5.4. Пример - массив одновременно исполняющихся заданий
5.5. Последовательно исполняющиеся задания, где входными данными для следующего является результат предыдущего задания
5.6. Отмена задания (особенности отладчика)
5.7. Дочерние задания
5.8. Фабрика заданий
5.9. Преобразование IAsyncResult в Task
5.10. Преобразование EAP в Task
6. Класс Parallel
7. Ссылки

1. Схема процесса .Net


Некоторые используемые термины
  • Поток - последовательно исполняемые команды. Здесь это можно представить себе как трубу, по которой вместо воды протекает код нашей программы. В каждой программе есть хотя бы одна такая труба. В заметке обзорно рассматривается как написать программу с несколькими такими трубами.
  • Блокировка потока - код в потоке не исполняется, кто-то закрыл вентиль на трубе.
  • Синхронизация потоков - подразумевает правила поведения потоков при использовании общих данных. То есть это защита общих ресурсов/данных от одновременного изменения. В сравнении с трубой это управление вентилями.
Зачем вообще нужны потоки:
  • изоляция одного кода от другого;
  • параллельное выполнение;
  • иногда потоки упрощают код.

2.1. Просто поток

using System;
using System.Threading;

namespace Sample_
{
    class Worker
    {
        public void DoWork()
        {
            Console.WriteLine("ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
            for (int i = 0; i < 600; i++)
                Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
            Console.WriteLine();
        }
    }

    class Program
    {
        delegate void DoWorkDelegate();

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            DoWorkDelegate dw = new DoWorkDelegate(w.DoWork);

            Thread backGroundThread1 = new Thread(new ThreadStart(dw));
            //backGroundThread1.IsBackground = true;

            backGroundThread1.Start();

            Console.ReadLine();
        }
    }
}
Просто в отдельном потоке что-то делаю. При этом много (машинного) времени потрачено на создание потока. Таким потоком можно управлять, например, подождать пока он выполнится (Join) или приостановить. Свойство потока IsBackground позволяет завершиться основному потоку (процессу/программе)  до того, как завершится backGroundThread1.

2.2. Поток из пула.

using System;
using System.Threading;

namespace Sample_pool
{
    class Worker
    {
        public void DoWork(object state)
        {
            Console.WriteLine("ID потока Worker: {0}, state: {1}", Thread.CurrentThread.GetHashCode(), state);
            for (int i = 0; i < 600; i++)
                Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
            Console.WriteLine();
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            WaitCallback wc = w.DoWork;
            ThreadPool.QueueUserWorkItem(wc, "MyState");
            
            Console.ReadLine();
        }
    }
}
Здесь новый поток получен из пула. Это значит, что время на его создание не тратилось (ну условно, конечно.). После того, как поток выполнит работу, он не уничтожится, а вернется обратно в пул. Не имеется встроенных средств управления таким потоком.
Вывод этих программ (2.1, 2.2) одинаков.

2.3. Фоновые и активные потоки

Потоки из пула всегда фоновые, поток созданный явно - активный. Этим признаком можно управлять (IsBackground). Разница такая - программа не завершится пока все активные потоки не завершатся. А фоновые потоки не мешают программе завершиться.

3. APM (Asynchronous Programming Model)

using System;
using System.Threading;

namespace Sample20
{
    sealed class Worker
    {
        public void DoWork()
        {
            Console.WriteLine("ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
            for (int i = 0; i < 600; i++)
                Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
            Console.WriteLine();
        }
    }

    sealed class Program
    {
        delegate void DoWorkDelegate();

        static void DoWorkIsDone(IAsyncResult ar)
        {
            // показывает, что этот метод выполняется в соответствующем асинхронном потоке
            Console.WriteLine("ID потока DoWorkIsDone: {0}", Thread.CurrentThread.GetHashCode());
            ((DoWorkDelegate)ar.AsyncState).EndInvoke(ar);
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            DoWorkDelegate dw = null;
            dw += new DoWorkDelegate(w.DoWork);

            dw.BeginInvoke(DoWorkIsDone, dw);

            Console.ReadLine();
        }
    }
}

У делегатов имеется пара методов: BeginInvoke и EndInvoke. BeginInvoke использует поток из пула для выполнения dw (w.DoWork). Когда работа выполнится в том же потоке будет вызван метод DoWorkIsDone. А уже в нем вызывается EndInvoke.
Что надо знать про EndInvoke:
  • EndInvoke приостанавливает вызывающий поток до завершения асинхронного потока
  • EndInvoke обязан быть вызван и только один раз (он высвобождает ресурсы, захваченные BeginInvoke).
  • EndInvoke должен быть вызван от того же делегата, что и BeginInvoke.
  • В случае возникновения исключений о них станет известно при вызове EndInvoke.
Ввиду вышесказанного, иногда встречающаяся в коде конструкция
dw.EndInvoke(dw.BeginInvoke(DoWorkIsDone, dw)),
не имеет смысла.

Этот пример показывает, что EndInvoke приостанавливает поток

using System;
using System.Threading;

namespace Sample21
{
    sealed class Worker
    {
        public string DoWork()
        {
            //Thread.Sleep(1000);
            Console.WriteLine("ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
            for (int i = 0; i < 60; i++)
                Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
            Console.WriteLine();
            return ("DoWork()");
        }
    }

    sealed class Program
    {
        delegate string DoWorkDelegate();

        static void DoWorkIsDone(IAsyncResult ar)
        {
            Thread.Sleep(1000);
            // показывает, что этот метод выполняется в соответствующем асинхронном потоке
            Console.WriteLine("ID потока в методе DoWorkIsDone: {0}", Thread.CurrentThread.GetHashCode());
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            DoWorkDelegate dw = null;
            dw += new DoWorkDelegate(w.DoWork);

            IAsyncResult ar = dw.BeginInvoke(DoWorkIsDone, dw);
            Console.WriteLine("Делаю полезную работу в основном потоке");
            string str = string.Empty;
            
            // демонстрация приостановки потока
            for (int i = 0; i < 10000000; i++)
            {
                str = dw.EndInvoke(ar); //приостановил основной поток до окончания асинхронного потока
                Console.WriteLine(str); // напечатал результат

                if (str == string.Empty)
                    Console.WriteLine("i: {0}", i); // !!! эта строка никогда не будет напечатана
                else break;
            }
            Console.WriteLine("BB");

            Console.ReadLine();
        }
    }
}

Этот пример показывает, что EndInvoke приостанавливает вызывающий поток, .

using System;
using System.Threading;

namespace Sample20
{
    sealed class Worker
    {
        public void DoWork()
        {
            Thread.Sleep(500);
            Console.WriteLine("ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
            for (int i = 0; i < 60; i++)
                Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
            Console.WriteLine();
        }
    }

    sealed class Program
    {
        delegate void DoWorkDelegate();

        static void DoWorkIsDone(IAsyncResult ar)
        {
            // показывает, что этот метод выполняется в соответствующем асинхронном потоке
            Console.WriteLine("ID потока в методе DoWorkIsDone: {0}", Thread.CurrentThread.GetHashCode());
            ((DoWorkDelegate)ar.AsyncState).EndInvoke(ar);
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            DoWorkDelegate dw = null;
            dw += new DoWorkDelegate(w.DoWork);

            dw.BeginInvoke(DoWorkIsDone, dw);

            // в течении секунды каждые 10мс чего-то делает
            for (int i = 0; i < 100; i++)
            {
                Console.Write("i: {0}. ", i);
                Thread.Sleep(10);
            }

            Console.ReadLine();
        }
    }
}

4.1. Синхронизация потоков

Что будет, если в предыдущих примерах BeginInvoke сделать несколько раз подряд? На консоле будет неразбериха. То есть, в этих примерах консоль - это объект, который надо защитить от одновременной записи (изменения состояния) и от чтения во время изменения.

using System;
using System.Threading;

namespace Sample2
{
    sealed class Worker
    {
        // object m_lock = new object();
        public void DoWork()
        {
            lock (Program.ConsoleLock)
            {
                Console.WriteLine("ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
                for (int i = 0; i < Program.Iterations; i++)
                    Console.Write("П{0,2}:Н{1,-3}  ", Thread.CurrentThread.GetHashCode(), i);
                Console.WriteLine();
            }
        }
    }

    sealed class Program
    {
        internal static object ConsoleLock = new object();
        public static readonly int Iterations = 30;
        delegate void DoWorkDelegate();

        static void DoWorkIsDone(IAsyncResult ar)
        {
            // показывает, что этот метод выполняется в соответствующем асинхронном потоке
            // но когда пишет в консоль и нуждается в блокировке
            //Console.WriteLine("ID потока DoWorkIsDone: {0}", Thread.CurrentThread.GetHashCode());
            ((DoWorkDelegate)ar.AsyncState).EndInvoke(ar);
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            DoWorkDelegate dw = null;
            dw += new DoWorkDelegate(w.DoWork);
            
            dw.BeginInvoke(DoWorkIsDone, dw);
            dw.BeginInvoke(DoWorkIsDone, dw);
            dw.BeginInvoke(DoWorkIsDone, dw);

            lock (Program.ConsoleLock)
            {
                for (int i = 0; i < Program.Iterations; i++)
                    Console.Write(" НЕ НРАВИТСЯ?");
                Console.WriteLine();
            }

            Console.ReadLine();
        }
    }
}
Тогда все будет аккуратно.
Для блокирования в двух (и более) местах надо использовать только одну (закрытую) переменную ConsoleLock.

Еще один пример на передачу параметров и возврат значений

using System;
using System.Threading;

namespace Sample4
{
    sealed class Worker
    {
        public int TwoPower(int num)
        {
            return ((int)Math.Pow(2, num));
        }
    }

    sealed class Program
    {
        internal static object ConsoleLock = new object();
        public delegate int TwoPowerDelegate(int num);

        static void TwoPowerIsDone(IAsyncResult ar)
        {
            // в этом методе неизвестно в какую степень возвели двойку
            int res = ((TwoPowerDelegate)ar.AsyncState).EndInvoke(ar);
            lock (ConsoleLock)
            {
                Console.WriteLine("П{0,2}:Результат={1,-3}  ", Thread.CurrentThread.GetHashCode(), res);
            }
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            TwoPowerDelegate tpd = w.TwoPower;

            tpd.BeginInvoke(9, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(2, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(3, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(5, TwoPowerIsDone, tpd);

            Console.ReadLine();
        }
    }
}

В этом примере в методе TwoPowerIsDone неизвестно в какую степень возвели двойку. Когда эта инфо нужна можно делать так:

using System;
using System.Threading;

namespace Sample4
{
    sealed class TwoPowerResult
    {
        public int Result;
        public int Power;
    }
    sealed class Worker
    {
        public TwoPowerResult TwoPower(int num)
        {
            return (new TwoPowerResult() { Result = (int)Math.Pow(2, num), Power = num });
        }
    }

    sealed class Program
    {
        internal static object ConsoleLock = new object();
        public delegate TwoPowerResult TwoPowerDelegate(int num);

        static void TwoPowerIsDone(IAsyncResult ar)
        {
            TwoPowerResult res = ((TwoPowerDelegate)ar.AsyncState).EndInvoke(ar);
            lock (ConsoleLock)
            {
                Console.WriteLine("П{0,2}:Результат={1,-3}:Степень={2}", Thread.CurrentThread.GetHashCode(), res.Result, res.Power);
            }
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Worker w = new Worker();

            TwoPowerDelegate tpd = w.TwoPower;

            tpd.BeginInvoke(9, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(2, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(3, TwoPowerIsDone, tpd);
            tpd.BeginInvoke(5, TwoPowerIsDone, tpd);

            Console.ReadLine();
        }
    }
}

4.2. Обновление элементов форм из неосновного потока

.Net (Windows Forms, WPF, Silverlight) сделан так, что обновление элементов формы возможно только из главного GUI потока.
В классе System.Windows.Forms.Control есть метод Invoke для синхронного обновления контрола и BeginInvoke для асинхронного. Этими методами пользуются для обновления элементов формы из неосновного потока (не GUI потока). В Silverlight применяется класс System.Windows.Threading.Dispatcher (так же и просто в нет с версии 3.5).

Чтобы без проблем перемещаться между потоками я пользуюсь SynchronizationContext так:


private static AsyncCallback SyncContextCallback(AsyncCallback callback)
{
    SynchronizationContext sc = SynchronizationContext.Current;
    if (sc == null) return callback;
    return asyncResult => sc.Post(result => callback((IAsyncResult)result), asyncResult);
}

void OnButtonClick()
{
    var webRequest = WebRequest.Create("http://...");
    webRequest.BeginGetResponse(SyncContextCallback(ProcessWebResponse), webRequest);
}

void ProcessWebResponse(IAsyncResult result)
{
    // здесь всега GUI поток - обновляю форму
    var webRequest = (WebRequest)result.AsyncState;
    using (var webResponse = webRequest.EndGetResponse(result))
    {
        Label1.Text = "Len : " + webResponse.ContentLength;
    }
}

4.3. Класс BackgroundWorker

System.ComponentModel.BackgroundWorker выполняет операцию в отдельном потоке. В ряде случаев он существенно упрощает взаимодействие нового потока с основным GUI потоком за счет поддержки событий (реализует шаблон Event-Based Asynchronous Pattern (EAP) ). Событие  ProgressChanged инициируется GUI потоком, что разрешает в обработчике работать с формой (благодаря SynchronizationContext, который и накладывает прикладную модель на потоковую). BackgroundWorker поддерживает отмену. Предназначен для вычислительных операций, а не для операций ввода-вывода.

using System;
using System.ComponentModel;
using System.Windows.Forms;

namespace BW
{
    static class Program
    {
        [STAThread]
        static void Main()
        {
            Application.EnableVisualStyles();
            Application.SetCompatibleTextRenderingDefault(false);
            Application.Run(new Form1());
        }
    }

    public partial class Form1 : Form
    {
        Label resultLabel;
        Button startAsyncButton;
        Button cancelAsyncButton;
        BackgroundWorker backgroundWorker1;

        public Form1()
        {
            this.Text = "BackgroundWorker Sample";
            this.Height = 140; this.Width = 222;
            resultLabel = new Label() { Text = "", Top = 30, Left = 20, Width = 200 };
            startAsyncButton = new Button() { Text = "Start", Top = 60, Left = 20, Width = 70};
            cancelAsyncButton = new Button() {Text="Cancel", Top = 60, Left = 100, Width = 70 };
            
            startAsyncButton.Click += startAsyncButton_Click;
            cancelAsyncButton.Click += cancelAsyncButton_Click;

            this.Controls.AddRange(new Control[] { resultLabel, startAsyncButton, cancelAsyncButton });

            backgroundWorker1 = new BackgroundWorker()
            {
                WorkerReportsProgress = true,
                WorkerSupportsCancellation = true
            };
            backgroundWorker1.ProgressChanged += backgroundWorker1_ProgressChanged;
            backgroundWorker1.DoWork += backgroundWorker1_DoWork;
            backgroundWorker1.RunWorkerCompleted += backgroundWorker1_RunWorkerCompleted;
        }

        private void startAsyncButton_Click(object sender, EventArgs e)
        {
            if (backgroundWorker1.IsBusy != true)
            {
                backgroundWorker1.RunWorkerAsync();
            }
        }

        private void cancelAsyncButton_Click(object sender, EventArgs e)
        {
            if (backgroundWorker1.WorkerSupportsCancellation == true)
            {
                backgroundWorker1.CancelAsync();
            }
        }

        private void backgroundWorker1_DoWork(object sender, DoWorkEventArgs e)
        {
            BackgroundWorker worker = sender as BackgroundWorker;

            for (int i = 1; i <= 10; i++)
            {
                if (worker.CancellationPending == true)
                {
                    e.Cancel = true;
                    break;
                }
                else
                {
                    System.Threading.Thread.Sleep(500);
                    worker.ReportProgress(i * 10);
                }
            }
        }

        // в этом методе без проблем работаю с формой во время выполнения асинх. опрерации.
        private void backgroundWorker1_ProgressChanged(object sender, ProgressChangedEventArgs e)
        {
            resultLabel.Text = (e.ProgressPercentage.ToString() + "%");
        }

        private void backgroundWorker1_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
        {
            if (e.Cancelled == true)
            {
                resultLabel.Text = "Canceled!";
            }
            else if (e.Error != null)
            {
                resultLabel.Text = "Error: " + e.Error.Message;
            }
            else
            {
                resultLabel.Text = "Done!";
            }
        }
    }
}

4.4. Асинхронные операции в веб-службах ASP.NET

Веб-службы имеют встроенную поддержку асинхронных операций. Для этого надо реализовать пару методов: BeginXxx и EndXxx в сигнатуре, как показано в примере.

<%@ WebService Language="C#" Class="WebAsync.WebServiceAsync" %>
using System;
using System.Threading;
using System.Web;
using System.Web.Services;

namespace WebAsync
{
    [WebService(Namespace = "http://tempuri.org/")]
    [WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
    [System.ComponentModel.ToolboxItem(false)]
    [System.Web.Script.Services.ScriptService]
    public class WebServiceAsync : System.Web.Services.WebService
    {
        delegate string DoWorkDelegate(object o);
        string ResultString = string.Format("ID основного потока: {0}<br />", Thread.CurrentThread.GetHashCode());

        [WebMethod]
        public IAsyncResult BeginDoWorker(string param1, AsyncCallback ac, object o)
        {
            DoWorkDelegate doWorkDelegate = DoWork;
            return doWorkDelegate.BeginInvoke(param1 == string.Empty ? "  Вот и параметр!  " : param1, 
                ac, doWorkDelegate);
        }

        [WebMethod]
        public string EndDoWorker(IAsyncResult ar)
        {
            return ((ar.AsyncState as DoWorkDelegate).EndInvoke(ar));
        }

        [WebMethod]
        public string SyncDoWork()
        {
            DoWork(null);
            return ResultString;
        }

        string DoWork(object o)
        {
            ResultString += string.Format(o.ToString()+"ID потока Worker: {0}", Thread.CurrentThread.GetHashCode());
            return ResultString;
        }

    }
}


А на практике при асинхронном вызове DoWork может быть потерян context исполнения. То есть новый поток будет трудится от другого пользователя. Чтобы это исправить надо вручную установить правильную имперсонализацию. Для этого до вызова DoWork устанавливаются переменные:

WindowsIdentity winId = (WindowsIdentity)HttpContext.Current.User.Identity;
WindowsImpersonationContext ctx = null;

, а в метод DoWork (или SyncDoWork) включается имперсонализация:

try
        {
            // требуется для асинхронного вызова
            ctx = winId.Impersonate();
            ... // старый код
          }
finally{ct.Undo();}

Еще заметка - чтобы имперсонализация работала в web.conFig секция httpModules должна быть по умолчанию, то есть без <clear />.


4.5. Асинхронные операции на страницах ASP.NET

Страница поддерживает асинхронные вызовы. Для этого надо в директиве @Page указать Async=true и в коде зарегистрировать пару методов с помощью AddOnPreRenderCompleteAsync.
Можно зарегистрировать несколько пар методов, но выполняться эти пары будут последовательно.  Если требуется выполнять несколько асинхронных методов одновременно, необходимо зарегистрировать одну пару и внутри запустить несколько асинхронных операций. В примере ниже показано как это сделать и как состыковать потоки.

<%@ Page Language="C#" Async="true" AutoEventWireup="true"  %>

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">

<html xmlns="http://www.w3.org/1999/xhtml" >
  <head id="Head1" runat="server">
    <title>
      Page.AddOnPreRenderCompleteAsync Example</title>
  </head>
  <body>
    <form id="form1" runat="server">
    <asp:Label ID="Label1" runat="server" Text="Label1"></asp:Label>
    </form>
  </body>
</html>

<script runat="server">
    string DoWork(object o)
    {
        System.Threading.Thread.Sleep(25); //долго работает, но скорее DoWork1
        return string.Format("<b>DoWork</b> <span style=color:green>{1}</span>, ID потока DoWork: {0}",
            System.Threading.Thread.CurrentThread.GetHashCode(), o) + " <br/>";
    }
    
    string DoWork1(object o)
    {
        System.Threading.Thread.Sleep(100); //долго работает, но дольше DoWork
        return string.Format("<b>DoWork1</b> <span style=color:green>{1}</span>, ID потока DoWork: {0}",
            System.Threading.Thread.CurrentThread.GetHashCode(), o) + " <br/>";
    }
    void EndDoWork1(IAsyncResult ar)
    {
        Label1.Text += (ar.AsyncState as DoWorkDelegate).EndInvoke(ar); 
        EndDoWork1Complete = true;
    }
    
    delegate string DoWorkDelegate(object o);
    IAsyncResult arDoWork1;
    bool EndDoWork1Complete = false;

    protected void Page_Load(object sender, EventArgs e)
    {
        BeginEventHandler bh = new BeginEventHandler(BeginAsync);
        EndEventHandler eh = new EndEventHandler(EndAsync);

        AddOnPreRenderCompleteAsync(bh, eh, "param [Page_Load]");
    }


    IAsyncResult BeginAsync(Object src, EventArgs args, AsyncCallback cb, Object state)
    {
        Label1.Text = "BeginAsync: thread " + System.Threading.Thread.CurrentThread.GetHashCode() + " <br/>";
        DoWorkDelegate doWorkDelegate = DoWork; 
        DoWorkDelegate doWorkDelegate1 = DoWork1; // вложенный асинх. поток
        arDoWork1 = doWorkDelegate1.BeginInvoke(state, EndDoWork1, doWorkDelegate1); // начинаю исполнять
        // начинаю основную (в контексте AddOnPreRenderCompleteAsync) полезную асинхронную опреацию.
        return doWorkDelegate.BeginInvoke(state, cb /*ссылка на EndAsync*/, doWorkDelegate); 
    }

    void EndAsync(IAsyncResult ar)
    {
        while (!EndDoWork1Complete) { System.Threading.Thread.Sleep(10); } // стыковка потоков
        Label1.Text += (ar.AsyncState as DoWorkDelegate).EndInvoke(ar); 
        Label1.Text += "EndAsync: thread " + System.Threading.Thread.CurrentThread.GetHashCode();
    }
</script>

4.6. Назначение асинхронных моделей

APM и EPM (IAsyncResult) предназначен для ввода-вывода, другие модели - для вычислений.

5. Новое из Framework 4.

Теперь можно рассмотреть Action/Func, Task, поддержку отмены асинхронной операции. На класс Parallel, на модель асинхр. прогр. на базе событий. Потом можно посмотреть на то, как использовать эти знания при работе с WCF. Но это не сейчас.

5.1. Делегаты Action & Func

Action - это делегат на метод, возвращающий void (на процедуру).
Func - это делегат на метод, возвращающий значение (на функцию).
В .Net много одинаковых по сигнатуре делегатов с разными именами, так что Action и Func как бы собирательный образ. Имеются перегрузки и обобщенные версии. Активно используются в модели программирования на основе заданий (Task).

5.2. Task (задания)

Запустить асинхронную операцию можно такими способами.
  1. ThreadPool.QueueUserWorkItem(voidMethod, 55); // запуск асинх. операции
  2. new Task(voidMethod, 55).Start(); // аналог предыдущей строки
У первого (смотри п.2.2.) есть ограничения: нет встроенных способов 
  1. узнать о завершении операции;
  2. получить возвращаемое значение.
Кроме того задания имеют встроенную поддержку отмены, управлением способом выполнения задания, могут собираться в группы (что позволит управлять всеми заниями сразу, а не каждым в отдельности).

5.3. Пример 

using System;
using System.Threading;
using System.Threading.Tasks;


namespace Sample5
{
    class Program
    {
        // метод для делегата Action
        static void DoWork1(object p)
        {
            Console.WriteLine("do1 (Action) -> П{0,2}:Парам={1}", Thread.CurrentThread.GetHashCode(), p);
        }
        // метод для делегата Func
        static int DoWork2(object p)
        {
            Console.WriteLine("do2 (Func) П{0,2}:Парам={1}", Thread.CurrentThread.GetHashCode(), p); return 7;
        }

        static byte DoWorkIsDone(Task<int> tsk)
        {
            Console.WriteLine("ID потока в методе DoWorkIsDone: {0}", Thread.CurrentThread.GetHashCode());
            Console.WriteLine("ar.IsCompleted={0}", tsk.IsCompleted);
            Console.WriteLine("ar.Result={0}", tsk.Result);
            return ((byte)tsk.Result);
        }

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

            Action<object> DoWork1Action = DoWork1;

            Console.WriteLine("\nЗадание для делегата DoWork1Action");
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n");

            Task t0 = new Task(DoWork1Action, 202);
            t0.Start(); // Task сам решает в каком потоке исполнить метод (может и в основном)
            t0.Wait(); 
            //t0.RunSynchronously(); //- исполнить синхронно в этом же потоке
            Console.WriteLine("\n");

            Func<object, int> DoWork2Func = DoWork2;
            Console.WriteLine("Задания для делегата DoWork2Func");
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n");

            Console.WriteLine("Синхронный вызов DoWork2Func(100), ={0}\n", DoWork2Func(100));
            Console.WriteLine("Асинхронный вызов DoWork2Func(101) (APM), ={0}\n", 
                DoWork2Func.EndInvoke(DoWork2Func.BeginInvoke(101, null, DoWork2Func)));
            Task<int> t1 = new Task<int>(DoWork2Func, 102);
            t1.Start(); // Task сам решает в каком потоке исполнить метод (может и в основном)

            // инфо об Exceptions всплывет только в вызове Result / Wait 
            Console.WriteLine("Асинхронный вызов DoWork2Func(102) (EPM), ={0}, \nпо завершении - синхронный DoWorkIsDone\n", 
                t1.ContinueWith<byte>(DoWorkIsDone, TaskContinuationOptions.ExecuteSynchronously).Result);

            Console.WriteLine("Асинхронный вызов DoWork2Func(103) в фабрике, ={0}\n", 
                Task.Factory.StartNew(DoWork2Func, 103).Result);

            Console.ReadLine();
        }
    }
}

5.4. Пример - массив одновременно исполняющихся заданий

static int Multiplication3(int factor) { Thread.Sleep(50); Console.WriteLine("factor=" + factor + " id=" + Thread.CurrentThread.GetHashCode()); return factor * 3; }

// демо - массив одновременно исполняющихся заданий
static void DemoTask1()
{
    var sw = Stopwatch.StartNew();
    Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());
    var taskArr = new[]
    {
        new Task<int>( n => Multiplication3((int) n), 2),
        new Task<int>( n => Multiplication3((int) n), 4),
        new Task<int>( n => Multiplication3((int) n), 6)
    };
    Array.ForEach(taskArr, tsk => tsk.Start()); // надо запустить вручную
    Task.WaitAll(taskArr); // тут жду в демо целях
    // и показываю результаты
    Array.ForEach(taskArr, tsk => Console.WriteLine(tsk.Result));
    sw.Stop();
    Console.WriteLine("исполнялось {0}ms",sw.ElapsedMilliseconds);
    Console.ReadKey();
}

5.5. Последовательно исполняющиеся задания, где входными данными для следующего является результат предыдущего задания

// демо - последовательно исполняющиеся задания, где входными данными для следующего является результат предыдущего задания
static void DemoTask2()
{
    var sw = Stopwatch.StartNew();

    Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());
    
    var task0 = new Task<int>(n => Multiplication3((int)n), 2);
    Task<int> task1 = null, task2 = null;
    var taskArr = new Task<int>[3];
    taskArr[0] = task0;

    task0.ContinueWith(n =>
    {
        task1 = new Task<int>(n1 => Multiplication3((int)n1), n.Result);
        task1.Start(); // после объявления задания уже не важно где его запустить
        taskArr[1] = task1;
        //task1.Start(); // после объявления задания уже не важно где его запустить
        task1.ContinueWith(nn =>
        {
            task2 = new Task<int>(n1 => Multiplication3((int)n1), nn.Result);
            taskArr[2] = task2;
            task2.Start();
        });
        //task1.Start(); // после объявления задания уже не важно где его запустить
    });

    task0.Start();
    // жду объявление последнего задания
    while (task2 == null) { Thread.Sleep(3); }

    //Task.WaitAll(taskArr); // тут жду в демо целях
    // и показываю результаты
    Array.ForEach(taskArr, tsk => tsk
        .ContinueWith(n => Console.WriteLine(n.Result), TaskContinuationOptions.OnlyOnRanToCompletion));
    sw.Stop();
    Console.WriteLine("исполнялось {0}ms", sw.ElapsedMilliseconds);
    Console.ReadKey();
}

5.6. Отмена задания (особенности отладчика)

Как сделать отмену - смотрим пример. В коде отмечены различия исполнения кода в режиме отладчика.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.ComponentModel;
using System.Windows.Forms;

namespace TaskCancel
{
    static class Program
    {
        [STAThread]
        static void Main()
        {
            Application.EnableVisualStyles();
            Application.Run(new Form1());
        }
    }

    public partial class Form1 : Form
    {
        Label resultLabel;
        Button startAsyncButton;
        Button cancelAsyncButton;

        Task tsk;
        CancellationTokenSource cts;

        public Form1()
        {
            this.Text = "Task Cancel";
            this.Height = 140; this.Width = 222;
            resultLabel = new Label() { Text = "", Top = 30, Left = 20, Width = 200 };
            startAsyncButton = new Button() { Text = "Start", Top = 60, Left = 20, Width = 70 };
            cancelAsyncButton = new Button() { Text = "Cancel", Top = 60, Left = 100, Width = 70 };

            startAsyncButton.Click += startAsyncButton_Click;
            cancelAsyncButton.Click += cancelAsyncButton_Click;

            this.Controls.AddRange(new Control[] { resultLabel, startAsyncButton, cancelAsyncButton });

        }

        // потокобезопасно изменяет текст метки
        void SafeUpdateLabelText(string msg)
        {
            Form.ActiveForm.BeginInvoke(new WaitCallback(
                    n => { resultLabel.Text = n.ToString(); }
                ), msg);
        }

        private void startAsyncButton_Click(object sender, EventArgs e)
        {
            cts = new CancellationTokenSource();
            tsk = new Task(() => tsk_DoWork(cts.Token));
            tsk.Start();            

            //!! F5 (дебуггер) будет всегда тут
            tsk.ContinueWith(
                n => { 
                    if(cts.IsCancellationRequested)
                        SafeUpdateLabelText(resultLabel.Text + "  Cancelled.");
                    else
                        SafeUpdateLabelText(resultLabel.Text + "  Done!"); 
                }
                , TaskContinuationOptions.OnlyOnRanToCompletion);

            //!! это не появится по F5 (дебуггер), 
            //а при простом выполнении сработает OnlyOnFaulted 
            //(а не ожидаемый OnlyOnCanceled, ибо внизу Throw)
            tsk.ContinueWith(
                n => { SafeUpdateLabelText(resultLabel.Text + "  CANCELLED."); }
                , TaskContinuationOptions.NotOnRanToCompletion);

        }

        private void cancelAsyncButton_Click(object sender, EventArgs e)
        {
            if (tsk != null) if (tsk.Status == TaskStatus.Running) cts.Cancel();
        }

        private void tsk_DoWork(CancellationToken ct)
        {
            for (int i = 1; i <= 10; i++)
            {
                if (ct.IsCancellationRequested)
                {
                    //break;
                    if(ct.CanBeCanceled)
                        //!! по F5 на следущей строке будет СТОП
                        ct.ThrowIfCancellationRequested(); 
                }
                System.Threading.Thread.Sleep(150);
                SafeUpdateLabelText(i * 10 + "%");
            }
        }
    }
}

5.7. Дочерние задания

У заданий имеется поддержка отношения предок-потомок, которое задается флагом TaskCreationOptions.AttachedToParent. Родительское задание будет завершено только по завершению всех его потомков.

// демо - дочерние задания, исполняющиеся одновременно
static void DemoTask3()
{
    var sw = Stopwatch.StartNew();

    Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

    Task<int[]> rootTask = new Task<int[]>(() =>
        {
            var results = new int[3]; // массив результатов
            new Task(() => results[0] = Multiplication3(5), TaskCreationOptions.AttachedToParent).Start();
            new Task(() => results[1] = Multiplication3(7), TaskCreationOptions.AttachedToParent).Start();
            new Task(() => results[2] = Multiplication3(9), TaskCreationOptions.AttachedToParent).Start();
            // возвращаю ссылку на массив результатов
            return (results);
        });

    var cwt = rootTask.ContinueWith(parentTask => Array.ForEach(parentTask.Result, Console.WriteLine))
        .ContinueWith(n =>
        {
            sw.Stop();
            Console.WriteLine("исполнялось {0}ms", sw.ElapsedMilliseconds);
        });

    rootTask.Start();
    Console.ReadKey();
}

5.8. Фабрика заданий

Позволяют получить набор заданий, находящихся в одном и том же состоянии.

// демо - фабрика заданий
static void DemoTask4()
{
    var sw = Stopwatch.StartNew();

    Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());

    Task rootTask = new Task(() =>
    {
        var cts = new CancellationTokenSource();
        var tf = new TaskFactory<int>(cts.Token, TaskCreationOptions.AttachedToParent,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        var childTasks = new[]{
            tf.StartNew(() => Multiplication3(5)), 
            tf.StartNew(() => Multiplication3(7)), 
            tf.StartNew(() => Multiplication3(9))
        };

        // после завершения дочерних
        tf.ContinueWhenAll(
            childTasks,
            // для нормально отработавших заданий
            completedTasks => completedTasks.Where(t => !t.IsCanceled && !t.IsFaulted)
                // пишу результат в консоль
                .Select(t => { Console.WriteLine(t.Result.ToString()); return t.Result; })
                // возвращаю максимум 
                //(а вот FirstOrDefault здесь нельзя применить - будет напечатан только первый результат)
                .Max(),
            CancellationToken.None).ContinueWith(t => Console.WriteLine("Max = {0}", t.Result));
    });

    var cwt = rootTask
        .ContinueWith(n =>
        {
            sw.Stop();
            Console.WriteLine("исполнялось {0}ms", sw.ElapsedMilliseconds);
        });

    rootTask.Start();
    Console.ReadKey();
}


5.9. Преобразование IAsyncResult в Task


using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Linq;

namespace ThreadTaskFromIAR
{
    class Program
    {
        //синхронный метод
        static int Multiplication3(int factor) { Thread.Sleep(50); Console.WriteLine("factor=" + factor + " id=" + Thread.CurrentThread.GetHashCode()); return factor * 3; }
        #region асинх обертка
        static IAsyncResult BeginMultiplication3(AsyncCallback ac, object factor)
        {
            Multiplication3Delegate m3d = Multiplication3;
            return m3d.BeginInvoke(int.Parse(factor.ToString()), ac, m3d);
        }
        
        static int EndMultiplication3(IAsyncResult ar) {
            var ii = (ar.AsyncState as Multiplication3Delegate).EndInvoke(ar);
            
            Console.WriteLine(" id=" + Thread.CurrentThread.GetHashCode());
            Console.WriteLine(" result=" + ii);

            return ii;
        }

        delegate int Multiplication3Delegate(int f);
        #endregion

        static void Main(string[] args)
        {
            Console.WriteLine("ID основного потока: {0}", Thread.CurrentThread.GetHashCode());
            int result = 0;
            Task.Factory.FromAsync<int>(BeginMultiplication3, EndMultiplication3, "7")
                .ContinueWith(res => { result = res.Result; Console.WriteLine("RESULT: {0}", res.Result); });//.Wait();
            //Console.WriteLine("result111: {0}", result);
            Console.ReadKey();
        }
    }
}

5.10. Преобразование EAP в Task



using System;
using System.ComponentModel;
using System.Drawing;
using System.Net;
using System.Windows.Forms;
using System.Threading;
using System.Threading.Tasks;

namespace ThreadTaskFromEAP
{
    public partial class ThreadTaskFromEAP_Form : Form
    {
        private System.ComponentModel.IContainer components = null;
        private System.Windows.Forms.Panel panelTop;
        private System.Windows.Forms.Button btnGetString;
        private System.Windows.Forms.Panel panelFill;
        private System.Windows.Forms.TextBox txtString;

        private void InitializeComponent()
        {
            this.panelTop = new System.Windows.Forms.Panel();
            this.btnGetString = new System.Windows.Forms.Button();
            this.panelFill = new System.Windows.Forms.Panel();
            this.txtString = new System.Windows.Forms.TextBox();
            this.panelTop.SuspendLayout();
            this.panelFill.SuspendLayout();
            this.SuspendLayout();
            // 
            // panelTop
            // 
            this.panelTop.Controls.Add(this.btnGetString);
            this.panelTop.Dock = System.Windows.Forms.DockStyle.Top;
            this.panelTop.Location = new System.Drawing.Point(0, 0);
            this.panelTop.Name = "panelTop";
            this.panelTop.Size = new System.Drawing.Size(275, 50);
            this.panelTop.TabIndex = 0;
            // 
            // btnGetString
            // 
            this.btnGetString.Location = new System.Drawing.Point(12, 12);
            this.btnGetString.Name = "btnGetString";
            this.btnGetString.Size = new System.Drawing.Size(75, 23);
            this.btnGetString.TabIndex = 0;
            this.btnGetString.Text = "btnGetString";
            this.btnGetString.UseVisualStyleBackColor = true;
            this.btnGetString.Click += new System.EventHandler(this.btnGetString_Click);
            // 
            // panelFill
            // 
            this.panelFill.Controls.Add(this.txtString);
            this.panelFill.Dock = System.Windows.Forms.DockStyle.Fill;
            this.panelFill.Location = new System.Drawing.Point(0, 50);
            this.panelFill.Name = "panelFill";
            this.panelFill.Padding = new System.Windows.Forms.Padding(5);
            this.panelFill.Size = new System.Drawing.Size(275, 80);
            this.panelFill.TabIndex = 1;
            // 
            // txtString
            // 
            this.txtString.Dock = System.Windows.Forms.DockStyle.Fill;
            this.txtString.Location = new System.Drawing.Point(5, 5);
            this.txtString.Multiline = true;
            this.txtString.Name = "txtString";
            this.txtString.ScrollBars = System.Windows.Forms.ScrollBars.Both;
            this.txtString.Size = new System.Drawing.Size(265, 70);
            this.txtString.TabIndex = 0;
            // 
            // ThreadTaskFromEAP_Form
            // 
            this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F);
            this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font;
            this.ClientSize = new System.Drawing.Size(275, 130);
            this.Controls.Add(this.panelFill);
            this.Controls.Add(this.panelTop);
            this.Name = "ThreadTaskFromEAP_Form";
            this.Text = "ThreadTaskFromEAP";
            this.panelTop.ResumeLayout(false);
            this.panelFill.ResumeLayout(false);
            this.panelFill.PerformLayout();
            this.ResumeLayout(false);

        }

        public ThreadTaskFromEAP_Form()
        {
            InitializeComponent();
        }

        private void btnGetString_Click(object sender, EventArgs e)
        {
            WebClient wc = new WebClient(); // поддерживает события
            var tcs = new TaskCompletionSource<string>();
            wc.DownloadStringCompleted += (sender1, ea) =>
            {
                // этот код всегда выполняется в GUI потоке
                // руками задаю свойства задания
                if (ea.Cancelled) tcs.SetCanceled();
                else if (ea.Error != null) tcs.SetException(ea.Error);
                else tcs.SetResult(ea.Result);
            };

            //теперь продолжаю задание
            tcs.Task.ContinueWith(t =>
            {
                try {
                    txtString.Text = t.Result;
                }
                catch (AggregateException aex)
                {
                    txtString.Text = aex.GetBaseException().Message;
                }
                // ! флаг ExecuteSynchronously обязателен (иначе выйду из GUI-потока)
            }, TaskContinuationOptions.ExecuteSynchronously);
            // начинаю асинх. загрузку
            wc.DownloadStringAsync(new Uri("http://ya.ru"));
        }
    }
}


6. Класс Parallel

Методы этого класса выполняют указанные им методы параллельно, и при этом используют задания. Имеются перегрузки для методов For, ForEach, Invoke.Выглядит это просто, но о синхронизации потоков заботится программист.
  • Parallel.For(0,19999, i=>DoWork(i))
  • Parallel.ForEach(collection, item=>DoWork(item))
  • Parallel.Invoke(()=>M1(), ()=>M2(), ...)
For и ForEach могут принять по три делегата:
  1. инициализация потока (а уж сколько потоков получится решит метод For)
  2. обработка каждого элемента
  3. завершение потока

Комментариев нет: