- Fix shamefully dangerously broken Work Thread/Queue/Item
implementation:
* Do not pollute the kernel with 10 real-time threads and 5
high-priority threads in order to manage work items. Work threads are
very-low priority (< 7) and should never pre-empt userthreads like they
do now. 1 priority 7, 5 priority 5 and 3 priority 4 threads are now
properly created.
* Implement a worker thread balance set manager. On SMP systems, it is
able to determine when a new thread should be allocate to execute on a
free CPU. On both UP and MP, it is also able to detect if a work queue
has deadlocked, and will allocate new dynamic threads to unfreeze the
queue.
* Add check for threads returning with APC disabled, and re-enable
APCs if this happend. This hack is used in NT for broken drivers.
* Lots of code changes to support dynamic threads, which:
- Can terminate.
- Use a 10 minute timeout on the kernel queue.
* Add skeleton code for swapping worker thread stacks as well as
worker thread shutdown (not yet implemented).
* Add WORKER_INVALID bugcheck definition.
* These changes seem to make ROS a lot more responsive.
- NDK:
* Make more compatible with MS IFS
* Fix EX_WORK_QUEUE definition.
* Fix ETHREAD offsets.
* Fix RtlIsNameLegalDOS8Dot3 definition.
* Move splay tree defines to IFS.
Modified: trunk/reactos/include/ndk/exfuncs.h
Modified: trunk/reactos/include/ndk/extypes.h
Modified: trunk/reactos/include/ndk/ifssupp.h
Modified: trunk/reactos/include/ndk/iofuncs.h
Modified: trunk/reactos/include/ndk/obfuncs.h
Modified: trunk/reactos/include/ndk/pstypes.h
Modified: trunk/reactos/include/ndk/rtlfuncs.h
Modified: trunk/reactos/include/ndk/rtltypes.h
Modified: trunk/reactos/lib/rtl/dos8dot3.c
Modified: trunk/reactos/ntoskrnl/ex/work.c
Modified: trunk/reactos/ntoskrnl/ntoskrnl.mc
Modified: trunk/reactos/w32api/include/ddk/ntifs.h
_____
Modified: trunk/reactos/include/ndk/exfuncs.h
--- trunk/reactos/include/ndk/exfuncs.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/exfuncs.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -485,6 +485,7 @@
IN HANDLE EventHandle
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwCreateEvent(
@@ -741,6 +742,7 @@
IN HANDLE PortHandle
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwSetEvent(
_____
Modified: trunk/reactos/include/ndk/extypes.h
--- trunk/reactos/include/ndk/extypes.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/extypes.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -273,7 +273,7 @@
typedef struct _EX_WORK_QUEUE
{
KQUEUE WorkerQueue;
- ULONG DynamicThreadCount;
+ LONG DynamicThreadCount;
ULONG WorkItemsProcessed;
ULONG WorkItemsProcessedLastPass;
ULONG QueueDepthLastPass;
@@ -318,6 +318,8 @@
//
// Executive Pushlock
//
+#undef EX_PUSH_LOCK
+#undef PEX_PUSH_LOCK
typedef struct _EX_PUSH_LOCK
{
union
_____
Modified: trunk/reactos/include/ndk/ifssupp.h
--- trunk/reactos/include/ndk/ifssupp.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/ifssupp.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -117,6 +117,13 @@
LIST_ENTRY ThreadListHead;
} KQUEUE, *PKQUEUE, *RESTRICTED_POINTER PRKQUEUE;
+typedef struct _ACE_HEADER
+{
+ UCHAR AceType;
+ UCHAR AceFlags;
+ USHORT AceSize;
+} ACE_HEADER, *PACE_HEADER;
+
typedef enum _RTL_GENERIC_COMPARE_RESULTS
{
GenericLessThan,
_____
Modified: trunk/reactos/include/ndk/iofuncs.h
--- trunk/reactos/include/ndk/iofuncs.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/iofuncs.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -562,6 +562,7 @@
IN PUNICODE_STRING EntryValue
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwDeleteFile(
@@ -591,6 +592,7 @@
IN ULONG Unknown2
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwFlushBuffersFile(
_____
Modified: trunk/reactos/include/ndk/obfuncs.h
--- trunk/reactos/include/ndk/obfuncs.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/obfuncs.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -296,6 +296,7 @@
IN BOOLEAN GenerateOnClose
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwDuplicateObject(
@@ -321,6 +322,7 @@
IN HANDLE Handle
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwOpenDirectoryObject(
@@ -425,6 +427,7 @@
IN PLARGE_INTEGER Time
);
+NTSYSAPI
NTSTATUS
NTAPI
ZwWaitForSingleObject(
_____
Modified: trunk/reactos/include/ndk/pstypes.h
--- trunk/reactos/include/ndk/pstypes.h 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/include/ndk/pstypes.h 2006-01-03 21:34:19 UTC (rev
20554)
@@ -600,7 +600,8 @@
#include <pshpack4.h>
typedef struct _ETHREAD
{
- KTHREAD Tcb; /* 1B8
*/
+ KTHREAD Tcb; /* 000
*/
+ PVOID Padding; /* 1B4
*/
LARGE_INTEGER CreateTime; /* 1B8
*/
union
{
_____
Modified: trunk/reactos/include/ndk/rtlfuncs.h
--- trunk/reactos/include/ndk/rtlfuncs.h 2006-01-03 21:29:39 UTC
(rev 20553)
+++ trunk/reactos/include/ndk/rtlfuncs.h 2006-01-03 21:34:19 UTC
(rev 20554)
@@ -157,8 +157,6 @@
#define RtlEqualLuid(L1, L2) (((L1)->HighPart == (L2)->HighPart) && \
((L1)->LowPart == (L2)->LowPart))
-#endif
-
//
// RTL Splay Tree Functions
//
@@ -246,6 +244,7 @@
_SplayParent->RightChild = _SplayChild; \
_SplayChild->Parent = _SplayParent; \
}
+#endif
//
// Error and Exception Functions
@@ -1694,9 +1693,9 @@
BOOLEAN
NTAPI
RtlIsNameLegalDOS8Dot3(
- IN PUNICODE_STRING UnicodeName,
- IN PANSI_STRING AnsiName,
- PBOOLEAN Unknown
+ IN PCUNICODE_STRING Name,
+ IN OUT POEM_STRING OemName OPTIONAL,
+ IN OUT PBOOLEAN NameContainsSpaces OPTIONAL
);
NTSYSAPI
_____
Modified: trunk/reactos/include/ndk/rtltypes.h
--- trunk/reactos/include/ndk/rtltypes.h 2006-01-03 21:29:39 UTC
(rev 20553)
+++ trunk/reactos/include/ndk/rtltypes.h 2006-01-03 21:34:19 UTC
(rev 20554)
@@ -538,16 +538,6 @@
CSHORT Weekday;
} TIME_FIELDS, *PTIME_FIELDS;
-#else
-//
-// ACE Definitions
-//
-typedef struct _ACE_HEADER
-{
- UCHAR AceType;
- UCHAR AceFlags;
- USHORT AceSize;
-} ACE_HEADER, *PACE_HEADER;
#endif
typedef struct _ACE
{
_____
Modified: trunk/reactos/lib/rtl/dos8dot3.c
--- trunk/reactos/lib/rtl/dos8dot3.c 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/lib/rtl/dos8dot3.c 2006-01-03 21:34:19 UTC (rev
20554)
@@ -232,10 +232,11 @@
/*
* @implemented
*/
-BOOLEAN NTAPI
-RtlIsNameLegalDOS8Dot3(IN PUNICODE_STRING UnicodeName,
- IN PANSI_STRING AnsiName,
- OUT PBOOLEAN SpacesFound)
+BOOLEAN
+NTAPI
+RtlIsNameLegalDOS8Dot3(IN PCUNICODE_STRING UnicodeName,
+ IN OUT POEM_STRING AnsiName OPTIONAL,
+ IN OUT PBOOLEAN SpacesFound OPTIONAL)
{
PANSI_STRING name = AnsiName;
ANSI_STRING DummyString;
_____
Modified: trunk/reactos/ntoskrnl/ex/work.c
--- trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:34:19 UTC (rev
20554)
@@ -1,11 +1,9 @@
/*
* COPYRIGHT: See COPYING in the top level directory
- * PROJECT: ReactOS kernel
+ * PROJECT: ReactOS Kernel
* FILE: ntoskrnl/ex/work.c
- * PURPOSE: Manage system work queues
- *
- * PROGRAMMERS: Alex Ionescu - Used correct work queue array and
added some fixes and checks.
- * Gunnar Dalsnes - Implemented
+ * PURPOSE: Manage system work queues and worker threads
+ * PROGRAMMER: Alex Ionescu (alex(a)relsoft.net)
*/
/* INCLUDES
******************************************************************/
@@ -18,70 +16,167 @@
#pragma alloc_text(INIT, ExpInitializeWorkerThreads)
#endif
-/* DEFINES
*******************************************************************/
+/* DATA
**********************************************************************/
-#define NUMBER_OF_WORKER_THREADS (5)
+/* Number of worker threads for each Queue */
+#define EX_HYPERCRITICAL_WORK_THREADS 1
+#define EX_DELAYED_WORK_THREADS 3
+#define EX_CRITICAL_WORK_THREADS 5
-/* TYPES
*********************************************************************/
+/* Magic flag for dynamic worker threads */
+#define EX_DYNAMIC_WORK_THREAD 0x80000000
-/* GLOBALS
*******************************************************************/
+/* Worker thread priorities */
+#define EX_HYPERCRITICAL_QUEUE_PRIORITY 7
+#define EX_CRITICAL_QUEUE_PRIORITY 5
+#define EX_DELAYED_QUEUE_PRIORITY 4
-/*
- * PURPOSE: Queue of items waiting to be processed at normal priority
- */
+/* The actual worker queue array */
EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
-/* FUNCTIONS
****************************************************************/
+/* Accounting of the total threads and registry hacked threads */
+ULONG ExpCriticalWorkerThreads;
+ULONG ExpDelayedWorkerThreads;
+ULONG ExpAdditionalCriticalWorkerThreads;
+ULONG ExpAdditionalDelayedWorkerThreads;
-/*
- * FUNCTION: Entry point for a worker thread
- * ARGUMENTS:
- * context = Parameters
- * RETURNS: Status
- * NOTE: To kill a worker thread you must queue an item whose callback
- * calls PsTerminateSystemThread
- */
-static
+/* Future support for stack swapping worker threads */
+BOOLEAN ExpWorkersCanSwap;
+LIST_ENTRY ExpWorkerListHead;
+KMUTANT ExpWorkerSwapinMutex;
+
+/* The worker balance set manager events */
+KEVENT ExpThreadSetManagerEvent;
+KEVENT ExpThreadSetManagerShutdownEvent;
+
+/* Thread pointers for future worker thread shutdown support */
+PETHREAD ExpWorkerThreadBalanceManagerPtr;
+PETHREAD ExpLastWorkerThread;
+
+/* PRIVATE FUNCTIONS
*********************************************************/
+
+/*++
+ * @name ExpWorkerThreadEntryPoint
+ *
+ * The ExpWorkerThreadEntryPoint routine is the entrypoint for any
new
+ * worker thread created by teh system.
+ *
+ * @param Context
+ * Contains the work queue type masked with a flag specifing
whether the
+ * thread is dynamic or not.
+ *
+ * @return None.
+ *
+ * @remarks A dynamic thread can timeout after 10 minutes of waiting on
a queue
+ * while a static thread will never timeout.
+ *
+ * Worker threads must return at IRQL == PASSIVE_LEVEL, must
not have
+ * active impersonation info, and must not have disabled APCs.
+ *
+ * NB: We will re-enable APCs for broken threads but all other
cases
+ * will generate a bugcheck.
+ *
+ *--*/
VOID
-STDCALL
+NTAPI
ExpWorkerThreadEntryPoint(IN PVOID Context)
{
PWORK_QUEUE_ITEM WorkItem;
PLIST_ENTRY QueueEntry;
WORK_QUEUE_TYPE WorkQueueType;
PEX_WORK_QUEUE WorkQueue;
+ LARGE_INTEGER Timeout;
+ PLARGE_INTEGER TimeoutPointer = NULL;
+ PETHREAD Thread = PsGetCurrentThread();
+ KPROCESSOR_MODE WaitMode;
+ EX_QUEUE_WORKER_INFO OldValue, NewValue;
+ /* Check if this is a dyamic thread */
+ if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
+ {
+ /* It is, which means we will eventually time out after 10
minutes */
+ Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
+ TimeoutPointer = &Timeout;
+ }
+
/* Get Queue Type and Worker Queue */
- WorkQueueType = (WORK_QUEUE_TYPE)Context;
+ WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
+ ~EX_DYNAMIC_WORK_THREAD);
WorkQueue = &ExWorkerQueue[WorkQueueType];
+ /* Select the wait mode */
+ WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
+
+ /* Nobody should have initialized this yet, do it now */
+ ASSERT(Thread->ExWorkerCanWaitUser == 0);
+ if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
+
+ /* If we shouldn't swap, disable that feature */
+ if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
+
+ /* Set the worker flags */
+ do
+ {
+ /* Check if the queue is being disabled */
+ if (WorkQueue->Info.QueueDisabled)
+ {
+ /* Re-enable stack swapping and kill us */
+ KeSetKernelStackSwapEnable(TRUE);
+ PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+ }
+
+ /* Increase the worker count */
+ OldValue = WorkQueue->Info;
+ NewValue = OldValue;
+ NewValue.WorkerCount++;
+ }
+ while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+ *(PLONG)&NewValue,
+ *(PLONG)&OldValue) !=
*(PLONG)&OldValue);
+
+ /* Success, you are now officially a worker thread! */
+ Thread->ActiveExWorker = TRUE;
+
/* Loop forever */
- while (TRUE) {
-
+ProcessLoop:
+ for (;;)
+ {
/* Wait for Something to Happen on the Queue */
- QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode,
NULL);
+ QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
+ WaitMode,
+ TimeoutPointer);
- /* Can't happen since we do a KernelMode wait (and we're a
system thread) */
- ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
+ /* Check if we timed out and quit this loop in that case */
+ if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
- /* this should never happen either, since we wait with NULL
timeout,
- * but there's a slight possibility that STATUS_TIMEOUT is
returned
- * at queue rundown in NT (unlikely) -Gunnar
- */
- ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT);
-
/* Increment Processed Work Items */
InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
/* Get the Work Item */
WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM,
List);
+ /* Make sure nobody is trying to play smart with us */
+ ASSERT((ULONG_PTR)WorkItem->WorkerRoutine >
MmUserProbeAddress);
+
/* Call the Worker Routine */
WorkItem->WorkerRoutine(WorkItem->Parameter);
+ /* Make sure APCs are not disabled */
+ if (Thread->Tcb.SpecialApcDisable)
+ {
+ /* We're nice and do it behind your back */
+ DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back
"
+ "with APCs disabled!\n",
+ WorkItem->WorkerRoutine,
+ WorkItem->Parameter,
+ WorkItem);
+ Thread->Tcb.SpecialApcDisable = 0;
+ }
+
/* Make sure it returned at right IRQL */
- if (KeGetCurrentIrql() != PASSIVE_LEVEL) {
-
+ if (KeGetCurrentIrql() != PASSIVE_LEVEL)
+ {
+ /* It didn't, bugcheck! */
KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
(ULONG_PTR)WorkItem->WorkerRoutine,
KeGetCurrentIrql(),
@@ -90,8 +185,9 @@
}
/* Make sure it returned with Impersionation Disabled */
- if (PsGetCurrentThread()->ActiveImpersonationInfo) {
-
+ if (Thread->ActiveImpersonationInfo)
+ {
+ /* It didn't, bugcheck! */
KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
(ULONG_PTR)WorkItem->WorkerRoutine,
(ULONG_PTR)WorkItem->Parameter,
@@ -99,87 +195,470 @@
0);
}
}
+
+ /* This is a dynamic thread. Terminate it unless IRPs are pending
*/
+ if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
+
+ /* Don't terminate it if the queue is disabled either */
+ if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
+
+ /* Set the worker flags */
+ do
+ {
+ /* Decrease the worker count */
+ OldValue = WorkQueue->Info;
+ NewValue = OldValue;
+ NewValue.WorkerCount--;
+ }
+ while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
+ *(PLONG)&NewValue,
+ *(PLONG)&OldValue) !=
*(PLONG)&OldValue);
+
+ /* Decrement dynamic thread count */
+ InterlockedDecrement(&WorkQueue->DynamicThreadCount);
+
+ /* We're not a worker thread anymore */
+ Thread->ActiveExWorker = FALSE;
+
+ /* Re-enable the stack swap */
+ KeSetKernelStackSwapEnable(TRUE);
+ return;
}
-static
+/*++
+ * @name ExpCreateWorkerThread
+ *
+ * The ExpCreateWorkerThread routine creates a new worker thread
for the
+ * specified queue.
+ **
+ * @param QueueType
+ * Type of the queue to use for this thread. Valid values are:
+ * - DelayedWorkQueue
+ * - CriticalWorkQueue
+ * - HyperCriticalWorkQueue
+ *
+ * @param Dynamic
+ * Specifies whether or not this thread is a dynamic thread.
+ *
+ * @return None.
+ *
+ * @remarks HyperCritical work threads run at priority 7; Critical work
threads
+ * run at priority 5, and delayed work threads run at priority
4.
+ *
+ * This, worker threads cannot pre-empty a normal user-mode
thread.
+ *
+ *--*/
VOID
-STDCALL
-ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType,
- KPRIORITY Priority)
+NTAPI
+ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
+ IN BOOLEAN Dynamic)
{
- ULONG i;
PETHREAD Thread;
HANDLE hThread;
+ ULONG Context;
+ KPRIORITY Priority;
- /* Loop through how many threads we need to create */
- for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
+ /* Check if this is going to be a dynamic thread */
+ Context = WorkQueueType;
- /* Create the System Thread */
- PsCreateSystemThread(&hThread,
- THREAD_ALL_ACCESS,
- NULL,
- NULL,
- NULL,
- ExpWorkerThreadEntryPoint,
- (PVOID)WorkQueueType);
+ /* Add the dynamic mask */
+ if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
- /* Get the Thread */
- ObReferenceObjectByHandle(hThread,
- THREAD_SET_INFORMATION,
- PsThreadType,
+ /* Create the System Thread */
+ PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ ExpWorkerThreadEntryPoint,
+ (PVOID)Context);
+
+ /* If the thread is dynamic */
+ if (Dynamic)
+ {
+ /* Increase the count */
+
InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
+ }
+
+ /* Set the priority */
+ if (WorkQueueType == DelayedWorkQueue)
+ {
+ /* Priority == 4 */
+ Priority = EX_DELAYED_QUEUE_PRIORITY;
+ }
+ else if (WorkQueueType == CriticalWorkQueue)
+ {
+ /* Priority == 5 */
+ Priority = EX_CRITICAL_QUEUE_PRIORITY;
+ }
+ else
+ {
+ /* Priority == 7 */
+ Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY;
+ }
+
+ /* Get the Thread */
+ ObReferenceObjectByHandle(hThread,
+ THREAD_SET_INFORMATION,
+ PsThreadType,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+
+ /* Set the Priority */
+ KeSetPriorityThread(&Thread->Tcb, Priority);
+
+ /* Dereference and close handle */
+ ObDereferenceObject(Thread);
+ ZwClose(hThread);
+}
+
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ * The ExpCheckDynamicThreadCount routine checks every queue and
creates a
+ * dynamic thread if the queue seems to be deadlocked.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created
is
+ * based on wether the queue has processed no new items in the
last
+ * second, and new items are still enqueued.
+ *
+ *--*/
+VOID
+NTAPI
+ExpDetectWorkerThreadDeadlock(VOID)
+{
+ ULONG i;
+ PEX_WORK_QUEUE Queue;
+
+ /* Loop the 3 queues */
+ for (i = 0; i < MaximumWorkQueue; i++)
+ {
+ /* Get the queue */
+ Queue = &ExWorkerQueue[i];
+ ASSERT(Queue->DynamicThreadCount <= 16);
+
+ /* Check if stuff is on the queue that still is unprocessed */
+ if ((Queue->QueueDepthLastPass) &&
+ (Queue->WorkItemsProcessed ==
Queue->WorkItemsProcessedLastPass) &&
+ (Queue->DynamicThreadCount < 16))
+ {
+ /* Stuff is still on the queue and nobody did anything
about it */
+ DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
+ ExpCreateWorkerThread(i, TRUE);
+ DPRINT1("Dynamic threads queued %d\n",
Queue->DynamicThreadCount);
+ }
+
+ /* Update our data */
+ Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
+ Queue->QueueDepthLastPass =
KeReadStateQueue(&Queue->WorkerQueue);
+ }
+}
+
+/*++
+ * @name ExpCheckDynamicThreadCount
+ *
+ * The ExpCheckDynamicThreadCount routine checks every queue and
creates a
+ * dynamic thread if the queue requires one.
+ *
+ * @param None
+ *
+ * @return None.
+ *
+ * @remarks The algorithm for deciding if a new thread must be created
is
+ * documented in the ExQueueWorkItem routine.
+ *
+ *--*/
+VOID
+NTAPI
+ExpCheckDynamicThreadCount(VOID)
+{
+ ULONG i;
+ PEX_WORK_QUEUE Queue;
+
+ /* Loop the 3 queues */
+ for (i = 0; i < MaximumWorkQueue; i++)
+ {
+ /* Get the queue */
+ Queue = &ExWorkerQueue[i];
+
+ /* Check if still need a new thread. See ExQueueWorkItem */
+ if ((Queue->Info.MakeThreadsAsNecessary) &&
+ (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
+ (Queue->WorkerQueue.CurrentCount <
+ Queue->WorkerQueue.MaximumCount) &&
+ (Queue->DynamicThreadCount < 16))
+ {
+ /* Create a new thread */
+ DPRINT1("EX: Creating new dynamic thread as requested\n");
+ ExpCreateWorkerThread(i, TRUE);
+ }
+ }
+}
+
+/*++
+ * @name ExpWorkerThreadBalanceManager
+ *
+ * The ExpWorkerThreadBalanceManager routine is the entrypoint for
the
+ * worker thread balance set manager.
+ *
+ * @param Context
+ * Unused.
+ *
+ * @return None.
+ *
+ * @remarks The worker thread balance set manager listens every second,
but can
+ * also be woken up by an event when a new thread is needed,
or by the
+ * special shutdown event. This thread runs at priority 7.
+ *
+ * This routine must run at IRQL == PASSIVE_LEVEL.
+ *
+ *--*/
+VOID
+NTAPI
+ExpWorkerThreadBalanceManager(IN PVOID Context)
+{
+ KTIMER Timer;
+ LARGE_INTEGER Timeout;
+ NTSTATUS Status;
+ PVOID WaitEvents[2];
+ PAGED_CODE();
+ UNREFERENCED_PARAMETER(Context);
+
+ /* Raise our priority above all other worker threads */
+ KeSetBasePriorityThread(KeGetCurrentThread(),
+ EX_CRITICAL_QUEUE_PRIORITY + 1);
+
+ /* Setup the timer */
+ KeInitializeTimer(&Timer);
+ Timeout.QuadPart = Int32x32To64(-1, 10000000);
+
+ /* We'll wait on the periodic timer and also the emergency event */
+ WaitEvents[0] = &Timer;
+ WaitEvents[1] = &ExpThreadSetManagerEvent;
+ WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
+
+ /* Start wait loop */
+ for (;;)
+ {
+ /* Wait for the timer */
+ KeSetTimer(&Timer, Timeout, NULL);
+ Status = KeWaitForMultipleObjects(3,
+ WaitEvents,
+ WaitAny,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL,
+ NULL);
+ if (Status == 0)
+ {
+ /* Our timer expired. Check for deadlocks */
+ ExpDetectWorkerThreadDeadlock();
+ }
+ else if (Status == 1)
+ {
+ /* Someone notified us, verify if we should create a new
thread */
+ ExpCheckDynamicThreadCount();
+ }
+ else if (Status == 2)
+ {
+ /* We are shutting down. Cancel the timer */
+ DPRINT1("System shutdown\n");
+ KeCancelTimer(&Timer);
+
+ /* Make sure we have a final thread */
+ ASSERT(ExpLastWorkerThread);
+
+ /* Wait for it */
+ KeWaitForSingleObject(ExpLastWorkerThread,
+ Executive,
KernelMode,
- (PVOID*)&Thread,
+ FALSE,
NULL);
- /* Set the Priority */
- KeSetPriorityThread(&Thread->Tcb, Priority);
-
- /* Dereference and close handle */
- ObDereferenceObject(Thread);
- ZwClose(hThread);
+ /* Dereference it and kill us */
+ ObDereferenceObject(ExpLastWorkerThread);
+ PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
+ }
}
}
+/*++
+ * @name ExpInitializeWorkerThreads
+ *
+ * The ExpInitializeWorkerThreads routine initializes worker thread
and
+ * work queue support.
+ *
+ * @param None.
+ *
+ * @return None.
+ *
+ * @remarks This routine is only called once during system
initialization.
+ *
+ *--*/
VOID
INIT_FUNCTION
-STDCALL
+NTAPI
ExpInitializeWorkerThreads(VOID)
{
ULONG WorkQueueType;
+ ULONG CriticalThreads, DelayedThreads;
+ HANDLE ThreadHandle;
+ PETHREAD Thread;
+ ULONG i;
+ /* Setup the stack swap support */
+ KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE);
+ InitializeListHead(&ExpWorkerListHead);
+ ExpWorkersCanSwap = TRUE;
+
+ /* Set the number of critical and delayed threads. We shouldn't
hardcode */
+ DelayedThreads = EX_DELAYED_WORK_THREADS;
+ CriticalThreads = EX_CRITICAL_WORK_THREADS;
+
+ /* Protect against greedy registry modifications */
+ ExpAdditionalDelayedWorkerThreads =
+ min(ExpAdditionalCriticalWorkerThreads, 16);
+ ExpAdditionalCriticalWorkerThreads =
+ min(ExpAdditionalCriticalWorkerThreads, 16);
+
+ /* Calculate final count */
+ DelayedThreads += ExpAdditionalDelayedWorkerThreads;
+ CriticalThreads += ExpAdditionalCriticalWorkerThreads;
+
/* Initialize the Array */
- for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue;
WorkQueueType++) {
-
+ for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue;
WorkQueueType++)
+ {
+ /* Clear the structure and initialize the queue */
RtlZeroMemory(&ExWorkerQueue[WorkQueueType],
sizeof(EX_WORK_QUEUE));
KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue,
0);
}
- /* Create the built-in worker threads for each work queue */
- ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY);
- ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY);
- ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY);
+ /* Dynamic threads are only used for the critical queue */
+ ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary =
TRUE;
+
+ /* Initialize the balance set manager events */
+ KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent,
FALSE);
+ KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
+ NotificationEvent,
+ FALSE);
+
+ /* Create the built-in worker threads for the critical queue */
+ for (i = 0; i < CriticalThreads; i++)
+ {
+ /* Create the thread */
+ ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
+ ExpCriticalWorkerThreads++;
+ }
+
+ /* Create the built-in worker threads for the delayed queue */
+ for (i = 0; i < DelayedThreads; i++)
+ {
+ /* Create the thread */
+ ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
+ ExpDelayedWorkerThreads++;
+ }
+
+ /* Create the built-in worker thread for the hypercritical queue */
+ ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
+
+ /* Create the balance set manager thread */
+ PsCreateSystemThread(&ThreadHandle,
+ THREAD_ALL_ACCESS,
+ NULL,
+ 0,
+ NULL,
+ ExpWorkerThreadBalanceManager,
+ NULL);
+
+ /* Get a pointer to it for the shutdown process */
+ ObReferenceObjectByHandle(ThreadHandle,
+ THREAD_ALL_ACCESS,
+ NULL,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+ ExpWorkerThreadBalanceManagerPtr = Thread;
+
+ /* Close the handle and return */
+ ZwClose(ThreadHandle);
}
-/*
- * @implemented
+/* PUBLIC FUNCTIONS
**********************************************************/
+
+/*++
+ * @name ExQueueWorkItem
+ * @implemented NT4
*
- * FUNCTION: Inserts a work item in a queue for one of the system
worker
- * threads to process
- * ARGUMENTS:
- * WorkItem = Item to insert
- * QueueType = Queue to insert it in
- */
+ * The ExQueueWorkItem routine acquires rundown protection for
+ * the specified descriptor.
+ *
+ * @param WorkItem
+ * Pointer to an initialized Work Queue Item structure. This
structure
+ * must be located in nonpaged pool memory.
+ *
+ * @param QueueType
+ * Type of the queue to use for this item. Can be one of the
following:
+ * - DelayedWorkQueue
+ * - CriticalWorkQueue
+ * - HyperCriticalWorkQueue
+ *
+ * @return None.
+ *
+ * @remarks This routine is obsolete. Use IoQueueWorkItem instead.
+ *
+ * Callers of this routine must be running at IRQL <=
DISPATCH_LEVEL.
+ *
+ *--*/
VOID
-STDCALL
+NTAPI
ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
WORK_QUEUE_TYPE QueueType)
{
- ASSERT(WorkItem!=NULL);
- ASSERT_IRQL(DISPATCH_LEVEL);
+ PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
+ ASSERT(QueueType < MaximumWorkQueue);
ASSERT(WorkItem->List.Flink == NULL);
+ /* Don't try to trick us */
+ if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
+ {
+ /* Bugcheck the system */
+ KEBUGCHECKEX(WORKER_INVALID,
+ 1,
+ (ULONG_PTR)WorkItem,
+ (ULONG_PTR)WorkItem->WorkerRoutine,
+ 0);
+ }
+
/* Insert the Queue */
- KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue,
&WorkItem->List);
+ KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
+ ASSERT(!WorkQueue->Info.QueueDisabled);
+
+ /*
+ * Check if we need a new thread. Our decision is as follows:
+ * - This queue type must support Dynamic Threads (duh!)
+ * - It actually has to have unprocessed items
+ * - We have CPUs which could be handling another thread
+ * - We haven't abused our usage of dynamic threads.
+ */
+ if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
+ (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
+ (WorkQueue->WorkerQueue.CurrentCount <
+ WorkQueue->WorkerQueue.MaximumCount) &&
+ (WorkQueue->DynamicThreadCount < 16))
+ {
+ /* Let the balance manager know about it */
+ DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount:
%d\n",
+ WorkQueue->WorkerQueue.CurrentCount,
+ WorkQueue->WorkerQueue.MaximumCount);
+ KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
+ }
}
/* EOF */
+
_____
Modified: trunk/reactos/ntoskrnl/ntoskrnl.mc
--- trunk/reactos/ntoskrnl/ntoskrnl.mc 2006-01-03 21:29:39 UTC (rev
20553)
+++ trunk/reactos/ntoskrnl/ntoskrnl.mc 2006-01-03 21:34:19 UTC (rev
20554)
@@ -1003,6 +1003,14 @@
WORKER_THREAD_RETURNED_AT_BAD_IRQL
.
+MessageId=0xE4
+Severity=Success
+Facility=System
+SymbolicName=WORKER_INVALID
+Language=English
+WORKER_INVALID
+.
+
MessageId=0xE2
Severity=Success
Facility=System
_____
Modified: trunk/reactos/w32api/include/ddk/ntifs.h
--- trunk/reactos/w32api/include/ddk/ntifs.h 2006-01-03 21:29:39 UTC
(rev 20553)
+++ trunk/reactos/w32api/include/ddk/ntifs.h 2006-01-03 21:34:19 UTC
(rev 20554)
@@ -1551,6 +1551,13 @@
TOKEN_CONTROL ClientTokenControl;
} SECURITY_CLIENT_CONTEXT, *PSECURITY_CLIENT_CONTEXT;
+typedef struct _ACE_HEADER
+{
+ UCHAR AceType;
+ UCHAR AceFlags;
+ USHORT AceSize;
+} ACE_HEADER, *PACE_HEADER;
+
typedef struct _TUNNEL {
FAST_MUTEX Mutex;
PRTL_SPLAY_LINKS Cache;
@@ -3370,6 +3377,13 @@
NTKERNELAPI
BOOLEAN
NTAPI
+KeSetKernelStackSwapEnable(
+ IN BOOLEAN Enable
+);
+
+NTKERNELAPI
+BOOLEAN
+NTAPI
MmCanFileBeTruncated (
IN PSECTION_OBJECT_POINTERS SectionObjectPointer,
IN PLARGE_INTEGER NewFileSize
@@ -3885,10 +3899,10 @@
NTSYSAPI
BOOLEAN
NTAPI
-RtlIsNameLegalDOS8Dot3 (
[truncated at 1000 lines; 104 more skipped]