Шрифт:
Интервал:
Закладка:
internal virtual IMessage ReplyMessage {
get {return _replyMsg;}
}
Извлечение работы из очереди и ее выполнение
Напомним, что вызовы, перехваченные перехватчиком, ассоциированным со свойством синхронизации, инкапсулируются в работу (экземпляр класса WorkItem) и записываются в очередь работ (_workItemQueue). Эта очередь поддерживается свойством синхронизации, и мы уже говорили об ее инициализации как составной части процесса инициализации свойства синхронизации в целом. Тогда же говорилось о том, что в пуле рабочих потоков регистрируется делегат callBackDelegate типа WaitOrTimerCallBack, который будет вызываться и выполняться некоторым рабочим потоком всякий раз, когда будет установлено в состояние signaled событие _asyncWorkEvent.
Забегая вперед можно сказать, что это событие будет устанавливаться в состояние signaled всякий раз, когда можно будет начинать выполнение очередной работы из очереди работ, причем эта работа должна иметь асинхронный тип (инкапсулирует асинхронный вызов). В этом случае свободный или вновь сгенерированный рабочий поток начнет выполнять данную асинхронную работу путем вызова упомянутого делегата.
Делегат callBackDelegate содержит ссылку на функцию
DispatcherCallBack:
WaitOrTimerCallback callBackDelegate =
new WaitOrTimerCallback(this.DispatcherCallBack);
и именно эта функция будет выполняться в рабочем потоке, обрабатывая очередную асинхронную работу.
Кстати, эта же функция будет вызываться и в том случае, когда очередная работа имеет синхронный тип (инкапсулирует синхронный вызов). Правда, в этом случае она будет выполняться в потоке вызова, а не в произвольном рабочем потоке из пула потоков, как было при асинхронном вызове. Но об этом позже.
Код функции DispatcherCallBack представлен ниже:
private void DispatcherCallBack(Object statelgnored,
bool ignored) {
Workltem work;
lock (_workItemQueue) {
work = (Workltem) _workltemQueue.Dequeue();
}
ExecuteWorkltem(work);
HandleWorkCompletion();
}
В соответствии с определением типа WaitorTimerCallBack функция DispatcherCallBack имеет два параметра. Входной параметр типа Object (третий параметр при вызове ThreadPool.RegisterWaitForSingleObject. В нашем случае null) используется для задания подлежащей обработке информации. Выходной параметр типа bool принимает значение true в том случае, если вызов зарегистрированного в пуле рабочих потоков соответствующего делегата (в нашем случае callBackDelegate) произошел по причине истечения времени ожидания (в нашем случае это значение поля _timeOut). Если же вызов упомянутого делегата произошел в связи с тем, что зарегистрированное в этом же пуле событие типа AutoResetEvent (в нашем случае _asyncWorkEvent) перешло в состояние signaled, то второй параметр принимает значение false. В нашем случае оба эти параметра не учитываются.
Выполнение вызова DispatcherCallBack начинается с входа в критическую секцию, доступную при успешной блокировке объекта workItemQueue. В результате, при работе в данной критической секции с очередью работ, поддерживаемой текущим свойством синхронизации, никакой другой код не сможет заблокировать эту очередь и параллельно начать с ней работать.
Единственной операцией, выполняемой в данной критической секции, является извлечение из очереди работ очередной работы (с удалением из очереди). Здесь предполагается, что все необходимые для этого условия уже выполнены. О том, каковы эти условия, мы будем говорить позже.
После выхода из критической секции (освобождающего очередь работ для других потоков), вызываются последовательно ExecuteWorkltem(work) и HandleWorkCompletion ().
Реализация метода ExecuteWorkItem в рассматриваемом классе SynchronizationAttribute Очень проста:
internal void ExecuteWorkltem(Workitem work) {
work.Execute();
}
Тут все сводится к уже рассмотренному методу Execute класса WorkItem. Иными словами, исполняющий этот метод поток переходит в тот контекст, где должен будет выполняться вызов, с этим потоком связывается тот самый контекст вызова, который сопровождал этот вызов до его перехвата. После этого вызов передается следующему перехватчику, который будет его обрабатывать в зависимости от типа вызова (синхронный или асинхронный). В случае асинхронного вызова сразу же после этого происходит возврат потока в исходный контекст и восстанавливается его связь с исходным контекстом вызова. В случае синхронного вызова происходит тоже самое, но только после возврата ответа.
Вызов метода HandleWorkCompletion() должен выполнить некоторую подготовительную работу к выполнению следующей работы из очереди работ. Мы рассмотрим этот метод позже. Сейчас же следует начать с начала, т. е. рассмотреть весь процесс перехвата вызова и включения его в очередь (или, в некоторых случаях, исполнения без включения в очередь).
Перехват входящего вызова
Формирование перехватчика входящих вызовов
Класс SynchronizationAttribute реализует интерфейс IContributeServerContextSink. Благодаря этому факту, при формировании нового контекста синхронизации (в новом или в старом домене синхронизации) автоматически вызывается метод GetServerContextSink, объявленный в данном интерфейсе, для формирования перехватчика входящих вызовов для данного контекста. Ниже приводится код этого метода из Rotor:
public virtual IMessageSink GetserverContextsink {
IMessageSink nextSink) {
InitlfNecessary();
SynchronizedServerContextSink propertySink =
new SynchronizedServerContextSink (
this,
nextSink);
return (IMessageSink) propertySink;
}
Единственным входным параметром является ссылка на перехватчик, последний на данный момент в цепочке перехватчиков входящих вызовов для данного контекста. Возвращаемое значение является ссылкой на новый перехватчик, который будет добавлен в эту цепочку.
Прежде всего выполняется инициализация свойства синхронизации, если это необходимо. Метод InitIfNeccessary был рассмотрен ранее. Реально он будет выполнять инициализацию свойства синхронизации только в том случае, когда текущий контекст является первым контекстом домена синхронизации. В противном случае свойство синхронизации уже было инициализировано ранее.
Далее формируется экземпляр класса SynchronizedServerContextSink, который будет рассмотрен позже. Этот объект и будет выступать в роли перехватчика входящих вызовов. Ему передаются два параметра:
• this — ссылка на свойство синхронизации (экземпляр данного класса SynchronizationAttribute) — новое или ранее сформированное, живущее в первом контексте домена синхронизации
• nextsink — ссылка на следующий в цепочке перехватчик.
И, наконец, возвращается полученная ссылка на новый перехватчик.
Таким образом, хотя свойство синхронизации одно на весь домен синхронизации, для каждого контекста в этом домене формируется свой перехватчик, имеющий ссылку на это свойство синхронизации.
Как перехватчик обрабатывает синхронные вызовы
Теперь рассмотрим класс SynchronizedServerContextSink:
internal class SynchronizedServerContextSink
: InternalSink, IMessageSink {… }
Мы видим, что данный класс наследует классу InternalSink и реализует интерфейс IMessageSink. Класс InternalSink отсутствует в .NET. Интерфейс IMessageSink объявляет методы, которые должны быть реализованы перехватчиками всех типов.
Класс SynchronizedServerContextSinkсодержит два поля данных:
internal IMessageSink _nextSink;
internal SynchronizationAttribute _property;
Первое содержит ссылку на следующий перехватчик, а второе — на свойство синхронизации текущего домена синхронизации.
Единственный конструктор инициирует эти поля:
internal sSynchronizedServerContextSink (
SynchronizationAttribute prop,