--- trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:29:39 UTC (rev 20553)
+++ trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:34:19 UTC (rev 20554)
@@ -1,11 +1,9 @@
/*
* COPYRIGHT: See COPYING in the top level directory
- * PROJECT: ReactOS kernel
+ * PROJECT: ReactOS Kernel
* FILE: ntoskrnl/ex/work.c
- * PURPOSE: Manage system work queues
- *
- * PROGRAMMERS: Alex Ionescu - Used correct work queue array and added some fixes and checks.
- * Gunnar Dalsnes - Implemented
+ * PURPOSE: Manage system work queues and worker threads
+ * PROGRAMMER: Alex Ionescu (alex@relsoft.net)
*/
/* INCLUDES ******************************************************************/
@@ -18,70 +16,167 @@
#pragma alloc_text(INIT, ExpInitializeWorkerThreads)
#endif
-/* DEFINES *******************************************************************/
+/* DATA **********************************************************************/
-#define NUMBER_OF_WORKER_THREADS (5)
+/* Number of worker threads for each Queue */
+#define EX_HYPERCRITICAL_WORK_THREADS 1
+#define EX_DELAYED_WORK_THREADS 3
+#define EX_CRITICAL_WORK_THREADS 5
-/* TYPES *********************************************************************/
+/* Magic flag for dynamic worker threads */
+#define EX_DYNAMIC_WORK_THREAD 0x80000000
-/* GLOBALS *******************************************************************/
+/* Worker thread priorities */
+#define EX_HYPERCRITICAL_QUEUE_PRIORITY 7
+#define EX_CRITICAL_QUEUE_PRIORITY 5
+#define EX_DELAYED_QUEUE_PRIORITY 4
-/*
- * PURPOSE: Queue of items waiting to be processed at normal priority
- */
+/* The actual worker queue array */
EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
-/* FUNCTIONS ****************************************************************/
+/* Accounting of the total threads and registry hacked threads */
+ULONG ExpCriticalWorkerThreads;
+ULONG ExpDelayedWorkerThreads;
+ULONG ExpAdditionalCriticalWorkerThreads;
+ULONG ExpAdditionalDelayedWorkerThreads;
-/*
- * FUNCTION: Entry point for a worker thread
- * ARGUMENTS:
- * context = Parameters
- * RETURNS: Status
- * NOTE: To kill a worker thread you must queue an item whose callback
- * calls PsTerminateSystemThread
- */
-static
+/* Future support for stack swapping worker threads */
+BOOLEAN ExpWorkersCanSwap;
+LIST_ENTRY ExpWorkerListHead;
+KMUTANT ExpWorkerSwapinMutex;
+
+/* The worker balance set manager events */
+KEVENT ExpThreadSetManagerEvent;
+KEVENT ExpThreadSetManagerShutdownEvent;
+
+/* Thread pointers for future worker thread shutdown support */
+PETHREAD ExpWorkerThreadBalanceManagerPtr;
+PETHREAD ExpLastWorkerThread;
+
+/* PRIVATE FUNCTIONS *********************************************************/
+
+/*++
+ * @name ExpWorkerThreadEntryPoint
+ *
+ * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
+ * worker thread created by teh system.
+ *
+ * @param Context
+ * Contains the work queue type masked with a flag specifing whether the
+ * thread is dynamic or not.
+ *
+ * @return None.
+ *
+ * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
+ * while a static thread will never timeout.
+ *
+ * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
+ * active impersonation info, and must not have disabled APCs.
+ *
+ * NB: We will re-enable APCs for broken threads but all other cases
+ * will generate a bugcheck.
+ *
+ *--*/
VOID
-STDCALL
+NTAPI
ExpWorkerThreadEntryPoint(IN PVOID Context)
{
PWORK_QUEUE_ITEM WorkItem;
PLIST_ENTRY QueueEntry;
WORK_QUEUE_TYPE WorkQueueType;
PEX_WORK_QUEUE WorkQueue;
+ LARGE_INTEGER Timeout;
+ PLARGE_INTEGER TimeoutPointer = NULL;
+ PETHREAD Thread = PsGetCurrentThread();
+ KPROCESSOR_MODE WaitMode;
+ EX_QUEUE_WORKER_INFO OldValue, NewValue;
+ /* Check if this is a dyamic thread */
+ if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
+ {
+ /* It is, which means we will eventually time out after 10 minutes */
+ Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
+ TimeoutPointer = &Timeout;
+ }
+
/* Get Queue Type and Worker Queue */
- WorkQueueType = (WORK_QUEUE_TYPE)Context;
+ WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
+ ~EX_DYNAMIC_WORK_THREAD);
WorkQueue = &ExWorkerQueue[WorkQueueType];
+ /* Select the wait mode */
+ WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
+
+ /* Nobody should have initialized this yet, do it now */
+ ASSERT(Thread->ExWorkerCanWaitUser == 0);
+ if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
+
+ /* If we shouldn't swap, disable that feature */
+ if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
+
+ /* Set the worker flags */
+ do
+ {
+ /* Check if the queue is being disabled */
+ if (WorkQueue->Info.QueueDisabled)
+ {
+ /* Re-enable stack swapping and kill us */
+ KeSetKernelStackSwapEnable(TRUE);
+ PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+ }
+
+ /* Increase the worker count */
+ OldValue = WorkQueue->Info;
+ NewValue = OldValue;
+ NewValue.WorkerCount++;
+ }
+ while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+ *(PLONG)&NewValue,
+ *(PLONG)&OldValue) != *(PLONG)&OldValue);
+
+ /* Success, you are now officially a worker thread! */
+ Thread->ActiveExWorker = TRUE;
+
/* Loop forever */
- while (TRUE) {
-
+ProcessLoop:
+ for (;;)
+ {
/* Wait for Something to Happen on the Queue */
- QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode, NULL);
+ QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
+ WaitMode,
+ TimeoutPointer);
- /* Can't happen since we do a KernelMode wait (and we're a system thread) */
- ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
+ /* Check if we timed out and quit this loop in that case */
+ if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
- /* this should never happen either, since we wait with NULL timeout,
- * but there's a slight possibility that STATUS_TIMEOUT is returned
- * at queue rundown in NT (unlikely) -Gunnar
- */
- ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT);
-
/* Increment Processed Work Items */
InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
/* Get the Work Item */
WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
+ /* Make sure nobody is trying to play smart with us */
+ ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
+
/* Call the Worker Routine */
WorkItem->WorkerRoutine(WorkItem->Parameter);
+ /* Make sure APCs are not disabled */
+ if (Thread->Tcb.SpecialApcDisable)
+ {
+ /* We're nice and do it behind your back */
+ DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
+ "with APCs disabled!\n",
+ WorkItem->WorkerRoutine,
+ WorkItem->Parameter,
+ WorkItem);
+ Thread->Tcb.SpecialApcDisable = 0;
+ }
+
/* Make sure it returned at right IRQL */
- if (KeGetCurrentIrql() != PASSIVE_LEVEL) {
-
+ if (KeGetCurrentIrql() != PASSIVE_LEVEL)
+ {
+ /* It didn't, bugcheck! */
KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
(ULONG_PTR)WorkItem->WorkerRoutine,
KeGetCurrentIrql(),
@@ -90,8 +185,9 @@
}
/* Make sure it returned with Impersionation Disabled */
- if (PsGetCurrentThread()->ActiveImpersonationInfo) {
-
+ if (Thread->ActiveImpersonationInfo)
+ {
+ /* It didn't, bugcheck! */
KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
(ULONG_PTR)WorkItem->WorkerRoutine,
(ULONG_PTR)WorkItem->Parameter,
@@ -99,87 +195,470 @@
0);
}
}
+
+ /* This is a dynamic thread. Terminate it unless IRPs are pending */
+ if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
+
+ /* Don't terminate it if the queue is disabled either */
+ if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
+
+ /* Set the worker flags */
+ do
+ {
+ /* Decrease the worker count */
+ OldValue = WorkQueue->Info;
+ NewValue = OldValue;
+ NewValue.WorkerCount--;
+ }
+ while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+ *(PLONG)&NewValue,
+ *(PLONG)&OldValue) != *(PLONG)&OldValue);
+
+ /* Decrement dynamic thread count */
+ InterlockedDecrement(&WorkQueue->DynamicThreadCount);
+
+ /* We're not a worker thread anymore */
+ Thread->ActiveExWorker = FALSE;
+
+ /* Re-enable the stack swap */
+ KeSetKernelStackSwapEnable(TRUE);
+ return;
}
-static
+/*++
+ * @name ExpCreateWorkerThread
+ *
+ * The ExpCreateWorkerThread routine creates a new worker thread for the
+ * specified queue.
+ **
+ * @param QueueType
+ * Type of the queue to use for this thread. Valid values are:
+ * - DelayedWorkQueue
+ * - CriticalWorkQueue
+ * - HyperCriticalWorkQueue
+ *
+ * @param Dynamic
+ * Specifies whether or not this thread is a dynamic thread.
+ *
+ * @return None.
+ *
+ * @remarks HyperCritical work threads run at priority 7; Critical work threads
+ * run at priority 5, and delayed work threads run at priority 4.
+ *
+ * This, worker threads cannot pre-empty a normal user-mode thread.
+ *
+ *--*/
VOID
-STDCALL
-ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType,
- KPRIORITY Priority)
+NTAPI
+ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
+ IN BOOLEAN Dynamic)
{
- ULONG i;
PETHREAD Thread;
HANDLE hThread;
+ ULONG Context;
+ KPRIORITY Priority;
- /* Loop through how many threads we need to create */
- for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
+ /* Check if this is going to be a dynamic thread */
+ Context = WorkQueueType;
- /* Create the System Thread */
- PsCreateSystemThread(&hThread,
- THREAD_ALL_ACCESS,
- NULL,
- NULL,
- NULL,
- ExpWorkerThreadEntryPoint,
- (PVOID)WorkQueueType);
+ /* Add the dynamic mask */
+ if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
- /* Get the Thread */
- ObReferenceObjectByHandle(hThread,
- THREAD_SET_INFORMATION,
- PsThreadType,
+ /* Create the System Thread */
+ PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ ExpWorkerThreadEntryPoint,
+ (PVOID)Context);
+
+ /* If the thread is dynamic */
+ if (Dynamic)
+ {
+ /* Increase the count */
+ InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
+ }
+
+ /* Set the priority */
+ if (WorkQueueType == DelayedWorkQueue)
+ {
+ /* Priority == 4 */
+ Priority = EX_DELAYED_QUEUE_PRIORITY;
+ }
+ else if (WorkQueueType == CriticalWorkQueue)
+ {
+ /* Priority == 5 */
+ Priority = EX_CRITICAL_QUEUE_PRIORITY;
+ }
+ else
+ {
+ /* Priority == 7 */
+ Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY;
+ }
+
+ /* Get the Thread */
+ ObReferenceObjectByHandle(hThread,
+ THREAD_SET_INFORMATION,
+ PsThreadType,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+
+ /* Set the Priority */
+ KeSetPriorityThread(&Thread->Tcb, Priority);
+
+ /* Dereference and close handle */
+ ObDereferenceObject(Thread);
+ ZwClose(hThread);
+}
+
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ * The ExpCheckDynamicThreadCount routine checks every queue and creates a
+ * dynamic thread if the queue seems to be deadlocked.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created is
+ * based on wether the queue has processed no new items in the last
+ * second, and new items are still enqueued.
+ *
+ *--*/
+VOID
+NTAPI
+ExpDetectWorkerThreadDeadlock(VOID)
+{
+ ULONG i;
+ PEX_WORK_QUEUE Queue;
+
+ /* Loop the 3 queues */
+ for (i = 0; i < MaximumWorkQueue; i++)
+ {
+ /* Get the queue */
+ Queue = &ExWorkerQueue[i];
+ ASSERT(Queue->DynamicThreadCount <= 16);
+
+ /* Check if stuff is on the queue that still is unprocessed */
+ if ((Queue->QueueDepthLastPass) &&
+ (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
+ (Queue->DynamicThreadCount < 16))
+ {
+ /* Stuff is still on the queue and nobody did anything about it */
+ DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
+ ExpCreateWorkerThread(i, TRUE);
+ DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
+ }
+
+ /* Update our data */
+ Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
+ Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
+ }
+}
+
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ * The ExpCheckDynamicThreadCount routine checks every queue and creates a
+ * dynamic thread if the queue requires one.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created is
+ * documented in the ExQueueWorkItem routine.
+ *
+ *--*/
+VOID
+NTAPI
+ExpCheckDynamicThreadCount(VOID)
+{
+ ULONG i;
+ PEX_WORK_QUEUE Queue;
+
+ /* Loop the 3 queues */
+ for (i = 0; i < MaximumWorkQueue; i++)
+ {
+ /* Get the queue */
+ Queue = &ExWorkerQueue[i];
+
+ /* Check if still need a new thread. See ExQueueWorkItem */
+ if ((Queue->Info.MakeThreadsAsNecessary) &&
+ (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
+ (Queue->WorkerQueue.CurrentCount <
+ Queue->WorkerQueue.MaximumCount) &&
+ (Queue->DynamicThreadCount < 16))
+ {
+ /* Create a new thread */
+ DPRINT1("EX: Creating new dynamic thread as requested\n");
+ ExpCreateWorkerThread(i, TRUE);
+ }
+ }
+}
+
+/*++
+ * @name ExpWorkerThreadBalanceManager
+ *
+ * The ExpWorkerThreadBalanceManager routine is the entrypoint for the
+ * worker thread balance set manager.
+ *
+ * @param Context
+ * Unused.
+ *
+ * @return None.
+ *
+ * @remarks The worker thread balance set manager listens every second, but can
+ * also be woken up by an event when a new thread is needed, or by the
+ * special shutdown event. This thread runs at priority 7.
+ *
+ * This routine must run at IRQL == PASSIVE_LEVEL.
+ *
+ *--*/
+VOID
+NTAPI
+ExpWorkerThreadBalanceManager(IN PVOID Context)
+{
+ KTIMER Timer;
+ LARGE_INTEGER Timeout;
+ NTSTATUS Status;
+ PVOID WaitEvents[2];
+ PAGED_CODE();
+ UNREFERENCED_PARAMETER(Context);
+
+ /* Raise our priority above all other worker threads */
+ KeSetBasePriorityThread(KeGetCurrentThread(),
+ EX_CRITICAL_QUEUE_PRIORITY + 1);
+
+ /* Setup the timer */
+ KeInitializeTimer(&Timer);
+ Timeout.QuadPart = Int32x32To64(-1, 10000000);
+
+ /* We'll wait on the periodic timer and also the emergency event */
+ WaitEvents[0] = &Timer;
+ WaitEvents[1] = &ExpThreadSetManagerEvent;
+ WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
+
+ /* Start wait loop */
+ for (;;)
+ {
+ /* Wait for the timer */
+ KeSetTimer(&Timer, Timeout, NULL);
+ Status = KeWaitForMultipleObjects(3,
+ WaitEvents,
+ WaitAny,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL,
+ NULL);
+ if (Status == 0)
+ {
+ /* Our timer expired. Check for deadlocks */
+ ExpDetectWorkerThreadDeadlock();
+ }
+ else if (Status == 1)
+ {
+ /* Someone notified us, verify if we should create a new thread */
+ ExpCheckDynamicThreadCount();
+ }
+ else if (Status == 2)
+ {
+ /* We are shutting down. Cancel the timer */
+ DPRINT1("System shutdown\n");
+ KeCancelTimer(&Timer);
+
+ /* Make sure we have a final thread */
+ ASSERT(ExpLastWorkerThread);
+
+ /* Wait for it */
+ KeWaitForSingleObject(ExpLastWorkerThread,
+ Executive,
KernelMode,
- (PVOID*)&Thread,
+ FALSE,
NULL);
- /* Set the Priority */
- KeSetPriorityThread(&Thread->Tcb, Priority);
-
- /* Dereference and close handle */
- ObDereferenceObject(Thread);
- ZwClose(hThread);
+ /* Dereference it and kill us */
+ ObDereferenceObject(ExpLastWorkerThread);
+ PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+ }
}
}
+/*++
+ * @name ExpInitializeWorkerThreads
+ *
+ * The ExpInitializeWorkerThreads routine initializes worker thread and
+ * work queue support.
+ *
+ * @param None.
+ *
+ * @return None.
+ *
+ * @remarks This routine is only called once during system initialization.
+ *
+ *--*/
VOID
INIT_FUNCTION
-STDCALL
+NTAPI
ExpInitializeWorkerThreads(VOID)
{
ULONG WorkQueueType;
+ ULONG CriticalThreads, DelayedThreads;
+ HANDLE ThreadHandle;
+ PETHREAD Thread;
+ ULONG i;
+ /* Setup the stack swap support */
+ KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE);
+ InitializeListHead(&ExpWorkerListHead);
+ ExpWorkersCanSwap = TRUE;
+
+ /* Set the number of critical and delayed threads. We shouldn't hardcode */
+ DelayedThreads = EX_DELAYED_WORK_THREADS;
+ CriticalThreads = EX_CRITICAL_WORK_THREADS;
+
+ /* Protect against greedy registry modifications */
+ ExpAdditionalDelayedWorkerThreads =
+ min(ExpAdditionalCriticalWorkerThreads, 16);
+ ExpAdditionalCriticalWorkerThreads =
+ min(ExpAdditionalCriticalWorkerThreads, 16);
+
+ /* Calculate final count */
+ DelayedThreads += ExpAdditionalDelayedWorkerThreads;
+ CriticalThreads += ExpAdditionalCriticalWorkerThreads;
+
/* Initialize the Array */
- for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) {
-
+ for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
+ {
+ /* Clear the structure and initialize the queue */
RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
}
- /* Create the built-in worker threads for each work queue */
- ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY);
- ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY);
- ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY);
+ /* Dynamic threads are only used for the critical queue */
+ ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
+
+ /* Initialize the balance set manager events */
+ KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
+ KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
+ NotificationEvent,
+ FALSE);
+
+ /* Create the built-in worker threads for the critical queue */
+ for (i = 0; i < CriticalThreads; i++)
+ {
+ /* Create the thread */
+ ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
+ ExpCriticalWorkerThreads++;
+ }
+
+ /* Create the built-in worker threads for the delayed queue */
+ for (i = 0; i < DelayedThreads; i++)
+ {
+ /* Create the thread */
+ ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
+ ExpDelayedWorkerThreads++;
+ }
+
+ /* Create the built-in worker thread for the hypercritical queue */
+ ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
+
+ /* Create the balance set manager thread */
+ PsCreateSystemThread(&ThreadHandle,
+ THREAD_ALL_ACCESS,
+ NULL,
+ 0,
+ NULL,
+ ExpWorkerThreadBalanceManager,
+ NULL);
+
+ /* Get a pointer to it for the shutdown process */
+ ObReferenceObjectByHandle(ThreadHandle,
+ THREAD_ALL_ACCESS,
+ NULL,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+ ExpWorkerThreadBalanceManagerPtr = Thread;
+
+ /* Close the handle and return */
+ ZwClose(ThreadHandle);
}
-/*
- * @implemented
+/* PUBLIC FUNCTIONS **********************************************************/
+
+/*++
+ * @name ExQueueWorkItem
+ * @implemented NT4
*
- * FUNCTION: Inserts a work item in a queue for one of the system worker
- * threads to process
- * ARGUMENTS:
- * WorkItem = Item to insert
- * QueueType = Queue to insert it in
- */
+ * The ExQueueWorkItem routine acquires rundown protection for
+ * the specified descriptor.
+ *
+ * @param WorkItem
+ * Pointer to an initialized Work Queue Item structure. This structure
+ * must be located in nonpaged pool memory.
+ *
+ * @param QueueType
+ * Type of the queue to use for this item. Can be one of the following:
+ * - DelayedWorkQueue
+ * - CriticalWorkQueue
+ * - HyperCriticalWorkQueue
+ *
+ * @return None.
+ *
+ * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
+ *
+ * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
+ *
+ *--*/
VOID
-STDCALL
+NTAPI
ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
WORK_QUEUE_TYPE QueueType)
{
- ASSERT(WorkItem!=NULL);
- ASSERT_IRQL(DISPATCH_LEVEL);
+ PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
+ ASSERT(QueueType < MaximumWorkQueue);
ASSERT(WorkItem->List.Flink == NULL);
+ /* Don't try to trick us */
+ if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
+ {
+ /* Bugcheck the system */
+ KEBUGCHECKEX(WORKER_INVALID,
+ 1,
+ (ULONG_PTR)WorkItem,
+ (ULONG_PTR)WorkItem->WorkerRoutine,
+ 0);
+ }
+
/* Insert the Queue */
- KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List);
+ KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
+ ASSERT(!WorkQueue->Info.QueueDisabled);
+
+ /*
+ * Check if we need a new thread. Our decision is as follows:
+ * - This queue type must support Dynamic Threads (duh!)
+ * - It actually has to have unprocessed items
+ * - We have CPUs which could be handling another thread
+ * - We haven't abused our usage of dynamic threads.
+ */
+ if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
+ (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
+ (WorkQueue->WorkerQueue.CurrentCount <
+ WorkQueue->WorkerQueue.MaximumCount) &&
+ (WorkQueue->DynamicThreadCount < 16))
+ {
+ /* Let the balance manager know about it */
+ DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
+ WorkQueue->WorkerQueue.CurrentCount,
+ WorkQueue->WorkerQueue.MaximumCount);
+ KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
+ }
}
/* EOF */
+