- Fix shamefully dangerously broken Work Thread/Queue/Item implementation: * Do not pollute the kernel with 10 real-time threads and 5 high-priority threads in order to manage work items. Work threads are very-low priority (< 7) and should never pre-empt userthreads like they do now. 1 priority 7, 5 priority 5 and 3 priority 4 threads are now properly created. * Implement a worker thread balance set manager. On SMP systems, it is able to determine when a new thread should be allocate to execute on a free CPU. On both UP and MP, it is also able to detect if a work queue has deadlocked, and will allocate new dynamic threads to unfreeze the queue. * Add check for threads returning with APC disabled, and re-enable APCs if this happend. This hack is used in NT for broken drivers. * Lots of code changes to support dynamic threads, which: - Can terminate. - Use a 10 minute timeout on the kernel queue. * Add skeleton code for swapping worker thread stacks as well as worker thread shutdown (not yet implemented). * Add WORKER_INVALID bugcheck definition. * These changes seem to make ROS a lot more responsive.
- NDK: * Make more compatible with MS IFS * Fix EX_WORK_QUEUE definition. * Fix ETHREAD offsets. * Fix RtlIsNameLegalDOS8Dot3 definition. * Move splay tree defines to IFS. Modified: trunk/reactos/include/ndk/exfuncs.h Modified: trunk/reactos/include/ndk/extypes.h Modified: trunk/reactos/include/ndk/ifssupp.h Modified: trunk/reactos/include/ndk/iofuncs.h Modified: trunk/reactos/include/ndk/obfuncs.h Modified: trunk/reactos/include/ndk/pstypes.h Modified: trunk/reactos/include/ndk/rtlfuncs.h Modified: trunk/reactos/include/ndk/rtltypes.h Modified: trunk/reactos/lib/rtl/dos8dot3.c Modified: trunk/reactos/ntoskrnl/ex/work.c Modified: trunk/reactos/ntoskrnl/ntoskrnl.mc Modified: trunk/reactos/w32api/include/ddk/ntifs.h _____
Modified: trunk/reactos/include/ndk/exfuncs.h --- trunk/reactos/include/ndk/exfuncs.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/exfuncs.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -485,6 +485,7 @@
IN HANDLE EventHandle );
+NTSYSAPI NTSTATUS NTAPI ZwCreateEvent( @@ -741,6 +742,7 @@ IN HANDLE PortHandle );
+NTSYSAPI NTSTATUS NTAPI ZwSetEvent( _____
Modified: trunk/reactos/include/ndk/extypes.h --- trunk/reactos/include/ndk/extypes.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/extypes.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -273,7 +273,7 @@
typedef struct _EX_WORK_QUEUE { KQUEUE WorkerQueue; - ULONG DynamicThreadCount; + LONG DynamicThreadCount; ULONG WorkItemsProcessed; ULONG WorkItemsProcessedLastPass; ULONG QueueDepthLastPass; @@ -318,6 +318,8 @@ // // Executive Pushlock // +#undef EX_PUSH_LOCK +#undef PEX_PUSH_LOCK typedef struct _EX_PUSH_LOCK { union _____
Modified: trunk/reactos/include/ndk/ifssupp.h --- trunk/reactos/include/ndk/ifssupp.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/ifssupp.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -117,6 +117,13 @@
LIST_ENTRY ThreadListHead; } KQUEUE, *PKQUEUE, *RESTRICTED_POINTER PRKQUEUE;
+typedef struct _ACE_HEADER +{ + UCHAR AceType; + UCHAR AceFlags; + USHORT AceSize; +} ACE_HEADER, *PACE_HEADER; + typedef enum _RTL_GENERIC_COMPARE_RESULTS { GenericLessThan, _____
Modified: trunk/reactos/include/ndk/iofuncs.h --- trunk/reactos/include/ndk/iofuncs.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/iofuncs.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -562,6 +562,7 @@
IN PUNICODE_STRING EntryValue );
+NTSYSAPI NTSTATUS NTAPI ZwDeleteFile( @@ -591,6 +592,7 @@ IN ULONG Unknown2 );
+NTSYSAPI NTSTATUS NTAPI ZwFlushBuffersFile( _____
Modified: trunk/reactos/include/ndk/obfuncs.h --- trunk/reactos/include/ndk/obfuncs.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/obfuncs.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -296,6 +296,7 @@
IN BOOLEAN GenerateOnClose );
+NTSYSAPI NTSTATUS NTAPI ZwDuplicateObject( @@ -321,6 +322,7 @@ IN HANDLE Handle );
+NTSYSAPI NTSTATUS NTAPI ZwOpenDirectoryObject( @@ -425,6 +427,7 @@ IN PLARGE_INTEGER Time );
+NTSYSAPI NTSTATUS NTAPI ZwWaitForSingleObject( _____
Modified: trunk/reactos/include/ndk/pstypes.h --- trunk/reactos/include/ndk/pstypes.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/pstypes.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -600,7 +600,8 @@
#include <pshpack4.h> typedef struct _ETHREAD { - KTHREAD Tcb; /* 1B8 */ + KTHREAD Tcb; /* 000 */ + PVOID Padding; /* 1B4 */ LARGE_INTEGER CreateTime; /* 1B8 */ union { _____
Modified: trunk/reactos/include/ndk/rtlfuncs.h --- trunk/reactos/include/ndk/rtlfuncs.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/rtlfuncs.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -157,8 +157,6 @@
#define RtlEqualLuid(L1, L2) (((L1)->HighPart == (L2)->HighPart) && \ ((L1)->LowPart == (L2)->LowPart))
-#endif - // // RTL Splay Tree Functions // @@ -246,6 +244,7 @@ _SplayParent->RightChild = _SplayChild; \ _SplayChild->Parent = _SplayParent; \ } +#endif
// // Error and Exception Functions @@ -1694,9 +1693,9 @@ BOOLEAN NTAPI RtlIsNameLegalDOS8Dot3( - IN PUNICODE_STRING UnicodeName, - IN PANSI_STRING AnsiName, - PBOOLEAN Unknown + IN PCUNICODE_STRING Name, + IN OUT POEM_STRING OemName OPTIONAL, + IN OUT PBOOLEAN NameContainsSpaces OPTIONAL );
NTSYSAPI _____
Modified: trunk/reactos/include/ndk/rtltypes.h --- trunk/reactos/include/ndk/rtltypes.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/include/ndk/rtltypes.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -538,16 +538,6 @@
CSHORT Weekday; } TIME_FIELDS, *PTIME_FIELDS;
-#else -// -// ACE Definitions -// -typedef struct _ACE_HEADER -{ - UCHAR AceType; - UCHAR AceFlags; - USHORT AceSize; -} ACE_HEADER, *PACE_HEADER; #endif typedef struct _ACE { _____
Modified: trunk/reactos/lib/rtl/dos8dot3.c --- trunk/reactos/lib/rtl/dos8dot3.c 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/lib/rtl/dos8dot3.c 2006-01-03 21:34:19 UTC (rev 20554) @@ -232,10 +232,11 @@
/* * @implemented */ -BOOLEAN NTAPI -RtlIsNameLegalDOS8Dot3(IN PUNICODE_STRING UnicodeName, - IN PANSI_STRING AnsiName, - OUT PBOOLEAN SpacesFound) +BOOLEAN +NTAPI +RtlIsNameLegalDOS8Dot3(IN PCUNICODE_STRING UnicodeName, + IN OUT POEM_STRING AnsiName OPTIONAL, + IN OUT PBOOLEAN SpacesFound OPTIONAL) { PANSI_STRING name = AnsiName; ANSI_STRING DummyString; _____
Modified: trunk/reactos/ntoskrnl/ex/work.c --- trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/ntoskrnl/ex/work.c 2006-01-03 21:34:19 UTC (rev 20554) @@ -1,11 +1,9 @@
/* * COPYRIGHT: See COPYING in the top level directory - * PROJECT: ReactOS kernel + * PROJECT: ReactOS Kernel * FILE: ntoskrnl/ex/work.c - * PURPOSE: Manage system work queues - * - * PROGRAMMERS: Alex Ionescu - Used correct work queue array and added some fixes and checks. - * Gunnar Dalsnes - Implemented + * PURPOSE: Manage system work queues and worker threads + * PROGRAMMER: Alex Ionescu (alex@relsoft.net) */
/* INCLUDES ******************************************************************/ @@ -18,70 +16,167 @@ #pragma alloc_text(INIT, ExpInitializeWorkerThreads) #endif
-/* DEFINES *******************************************************************/ +/* DATA **********************************************************************/
-#define NUMBER_OF_WORKER_THREADS (5) +/* Number of worker threads for each Queue */ +#define EX_HYPERCRITICAL_WORK_THREADS 1 +#define EX_DELAYED_WORK_THREADS 3 +#define EX_CRITICAL_WORK_THREADS 5
-/* TYPES *********************************************************************/ +/* Magic flag for dynamic worker threads */ +#define EX_DYNAMIC_WORK_THREAD 0x80000000
-/* GLOBALS *******************************************************************/ +/* Worker thread priorities */ +#define EX_HYPERCRITICAL_QUEUE_PRIORITY 7 +#define EX_CRITICAL_QUEUE_PRIORITY 5 +#define EX_DELAYED_QUEUE_PRIORITY 4
-/* - * PURPOSE: Queue of items waiting to be processed at normal priority - */ +/* The actual worker queue array */ EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
-/* FUNCTIONS ****************************************************************/ +/* Accounting of the total threads and registry hacked threads */ +ULONG ExpCriticalWorkerThreads; +ULONG ExpDelayedWorkerThreads; +ULONG ExpAdditionalCriticalWorkerThreads; +ULONG ExpAdditionalDelayedWorkerThreads;
-/* - * FUNCTION: Entry point for a worker thread - * ARGUMENTS: - * context = Parameters - * RETURNS: Status - * NOTE: To kill a worker thread you must queue an item whose callback - * calls PsTerminateSystemThread - */ -static +/* Future support for stack swapping worker threads */ +BOOLEAN ExpWorkersCanSwap; +LIST_ENTRY ExpWorkerListHead; +KMUTANT ExpWorkerSwapinMutex; + +/* The worker balance set manager events */ +KEVENT ExpThreadSetManagerEvent; +KEVENT ExpThreadSetManagerShutdownEvent; + +/* Thread pointers for future worker thread shutdown support */ +PETHREAD ExpWorkerThreadBalanceManagerPtr; +PETHREAD ExpLastWorkerThread; + +/* PRIVATE FUNCTIONS *********************************************************/ + +/*++ + * @name ExpWorkerThreadEntryPoint + * + * The ExpWorkerThreadEntryPoint routine is the entrypoint for any new + * worker thread created by teh system. + * + * @param Context + * Contains the work queue type masked with a flag specifing whether the + * thread is dynamic or not. + * + * @return None. + * + * @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue + * while a static thread will never timeout. + * + * Worker threads must return at IRQL == PASSIVE_LEVEL, must not have + * active impersonation info, and must not have disabled APCs. + * + * NB: We will re-enable APCs for broken threads but all other cases + * will generate a bugcheck. + * + *--*/ VOID -STDCALL +NTAPI ExpWorkerThreadEntryPoint(IN PVOID Context) { PWORK_QUEUE_ITEM WorkItem; PLIST_ENTRY QueueEntry; WORK_QUEUE_TYPE WorkQueueType; PEX_WORK_QUEUE WorkQueue; + LARGE_INTEGER Timeout; + PLARGE_INTEGER TimeoutPointer = NULL; + PETHREAD Thread = PsGetCurrentThread(); + KPROCESSOR_MODE WaitMode; + EX_QUEUE_WORKER_INFO OldValue, NewValue;
+ /* Check if this is a dyamic thread */ + if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD) + { + /* It is, which means we will eventually time out after 10 minutes */ + Timeout.QuadPart = Int32x32To64(10, -10000000 * 60); + TimeoutPointer = &Timeout; + } + /* Get Queue Type and Worker Queue */ - WorkQueueType = (WORK_QUEUE_TYPE)Context; + WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context & + ~EX_DYNAMIC_WORK_THREAD); WorkQueue = &ExWorkerQueue[WorkQueueType];
+ /* Select the wait mode */ + WaitMode = (UCHAR)WorkQueue->Info.WaitMode; + + /* Nobody should have initialized this yet, do it now */ + ASSERT(Thread->ExWorkerCanWaitUser == 0); + if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE; + + /* If we shouldn't swap, disable that feature */ + if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE); + + /* Set the worker flags */ + do + { + /* Check if the queue is being disabled */ + if (WorkQueue->Info.QueueDisabled) + { + /* Re-enable stack swapping and kill us */ + KeSetKernelStackSwapEnable(TRUE); + PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN); + } + + /* Increase the worker count */ + OldValue = WorkQueue->Info; + NewValue = OldValue; + NewValue.WorkerCount++; + } + while (InterlockedCompareExchange((PLONG)&WorkQueue->Info, + *(PLONG)&NewValue, + *(PLONG)&OldValue) != *(PLONG)&OldValue); + + /* Success, you are now officially a worker thread! */ + Thread->ActiveExWorker = TRUE; + /* Loop forever */ - while (TRUE) { - +ProcessLoop: + for (;;) + { /* Wait for Something to Happen on the Queue */ - QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode, NULL); + QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, + WaitMode, + TimeoutPointer);
- /* Can't happen since we do a KernelMode wait (and we're a system thread) */ - ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC); + /* Check if we timed out and quit this loop in that case */ + if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
- /* this should never happen either, since we wait with NULL timeout, - * but there's a slight possibility that STATUS_TIMEOUT is returned - * at queue rundown in NT (unlikely) -Gunnar - */ - ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT); - /* Increment Processed Work Items */ InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
/* Get the Work Item */ WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
+ /* Make sure nobody is trying to play smart with us */ + ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress); + /* Call the Worker Routine */ WorkItem->WorkerRoutine(WorkItem->Parameter);
+ /* Make sure APCs are not disabled */ + if (Thread->Tcb.SpecialApcDisable) + { + /* We're nice and do it behind your back */ + DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back " + "with APCs disabled!\n", + WorkItem->WorkerRoutine, + WorkItem->Parameter, + WorkItem); + Thread->Tcb.SpecialApcDisable = 0; + } + /* Make sure it returned at right IRQL */ - if (KeGetCurrentIrql() != PASSIVE_LEVEL) { - + if (KeGetCurrentIrql() != PASSIVE_LEVEL) + { + /* It didn't, bugcheck! */ KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL, (ULONG_PTR)WorkItem->WorkerRoutine, KeGetCurrentIrql(), @@ -90,8 +185,9 @@ }
/* Make sure it returned with Impersionation Disabled */ - if (PsGetCurrentThread()->ActiveImpersonationInfo) { - + if (Thread->ActiveImpersonationInfo) + { + /* It didn't, bugcheck! */ KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD, (ULONG_PTR)WorkItem->WorkerRoutine, (ULONG_PTR)WorkItem->Parameter, @@ -99,87 +195,470 @@ 0); } } + + /* This is a dynamic thread. Terminate it unless IRPs are pending */ + if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop; + + /* Don't terminate it if the queue is disabled either */ + if (WorkQueue->Info.QueueDisabled) goto ProcessLoop; + + /* Set the worker flags */ + do + { + /* Decrease the worker count */ + OldValue = WorkQueue->Info; + NewValue = OldValue; + NewValue.WorkerCount--; + } + while (InterlockedCompareExchange((PLONG)&WorkQueue->Info, + *(PLONG)&NewValue, + *(PLONG)&OldValue) != *(PLONG)&OldValue); + + /* Decrement dynamic thread count */ + InterlockedDecrement(&WorkQueue->DynamicThreadCount); + + /* We're not a worker thread anymore */ + Thread->ActiveExWorker = FALSE; + + /* Re-enable the stack swap */ + KeSetKernelStackSwapEnable(TRUE); + return; }
-static +/*++ + * @name ExpCreateWorkerThread + * + * The ExpCreateWorkerThread routine creates a new worker thread for the + * specified queue. + ** + * @param QueueType + * Type of the queue to use for this thread. Valid values are: + * - DelayedWorkQueue + * - CriticalWorkQueue + * - HyperCriticalWorkQueue + * + * @param Dynamic + * Specifies whether or not this thread is a dynamic thread. + * + * @return None. + * + * @remarks HyperCritical work threads run at priority 7; Critical work threads + * run at priority 5, and delayed work threads run at priority 4. + * + * This, worker threads cannot pre-empty a normal user-mode thread. + * + *--*/ VOID -STDCALL -ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType, - KPRIORITY Priority) +NTAPI +ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType, + IN BOOLEAN Dynamic) { - ULONG i; PETHREAD Thread; HANDLE hThread; + ULONG Context; + KPRIORITY Priority;
- /* Loop through how many threads we need to create */ - for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) { + /* Check if this is going to be a dynamic thread */ + Context = WorkQueueType;
- /* Create the System Thread */ - PsCreateSystemThread(&hThread, - THREAD_ALL_ACCESS, - NULL, - NULL, - NULL, - ExpWorkerThreadEntryPoint, - (PVOID)WorkQueueType); + /* Add the dynamic mask */ + if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
- /* Get the Thread */ - ObReferenceObjectByHandle(hThread, - THREAD_SET_INFORMATION, - PsThreadType, + /* Create the System Thread */ + PsCreateSystemThread(&hThread, + THREAD_ALL_ACCESS, + NULL, + NULL, + NULL, + ExpWorkerThreadEntryPoint, + (PVOID)Context); + + /* If the thread is dynamic */ + if (Dynamic) + { + /* Increase the count */ + InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount); + } + + /* Set the priority */ + if (WorkQueueType == DelayedWorkQueue) + { + /* Priority == 4 */ + Priority = EX_DELAYED_QUEUE_PRIORITY; + } + else if (WorkQueueType == CriticalWorkQueue) + { + /* Priority == 5 */ + Priority = EX_CRITICAL_QUEUE_PRIORITY; + } + else + { + /* Priority == 7 */ + Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY; + } + + /* Get the Thread */ + ObReferenceObjectByHandle(hThread, + THREAD_SET_INFORMATION, + PsThreadType, + KernelMode, + (PVOID*)&Thread, + NULL); + + /* Set the Priority */ + KeSetPriorityThread(&Thread->Tcb, Priority); + + /* Dereference and close handle */ + ObDereferenceObject(Thread); + ZwClose(hThread); +} + +/*++ + * @name ExpCheckDynamicThreadCount + * + * The ExpCheckDynamicThreadCount routine checks every queue and creates a + * dynamic thread if the queue seems to be deadlocked. + * + * @param None + * + * @return None. + * + * @remarks The algorithm for deciding if a new thread must be created is + * based on wether the queue has processed no new items in the last + * second, and new items are still enqueued. + * + *--*/ +VOID +NTAPI +ExpDetectWorkerThreadDeadlock(VOID) +{ + ULONG i; + PEX_WORK_QUEUE Queue; + + /* Loop the 3 queues */ + for (i = 0; i < MaximumWorkQueue; i++) + { + /* Get the queue */ + Queue = &ExWorkerQueue[i]; + ASSERT(Queue->DynamicThreadCount <= 16); + + /* Check if stuff is on the queue that still is unprocessed */ + if ((Queue->QueueDepthLastPass) && + (Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) && + (Queue->DynamicThreadCount < 16)) + { + /* Stuff is still on the queue and nobody did anything about it */ + DPRINT1("EX: Work Queue Deadlock detected: %d\n", i); + ExpCreateWorkerThread(i, TRUE); + DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount); + } + + /* Update our data */ + Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed; + Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue); + } +} + +/*++ + * @name ExpCheckDynamicThreadCount + * + * The ExpCheckDynamicThreadCount routine checks every queue and creates a + * dynamic thread if the queue requires one. + * + * @param None + * + * @return None. + * + * @remarks The algorithm for deciding if a new thread must be created is + * documented in the ExQueueWorkItem routine. + * + *--*/ +VOID +NTAPI +ExpCheckDynamicThreadCount(VOID) +{ + ULONG i; + PEX_WORK_QUEUE Queue; + + /* Loop the 3 queues */ + for (i = 0; i < MaximumWorkQueue; i++) + { + /* Get the queue */ + Queue = &ExWorkerQueue[i]; + + /* Check if still need a new thread. See ExQueueWorkItem */ + if ((Queue->Info.MakeThreadsAsNecessary) && + (!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) && + (Queue->WorkerQueue.CurrentCount < + Queue->WorkerQueue.MaximumCount) && + (Queue->DynamicThreadCount < 16)) + { + /* Create a new thread */ + DPRINT1("EX: Creating new dynamic thread as requested\n"); + ExpCreateWorkerThread(i, TRUE); + } + } +} + +/*++ + * @name ExpWorkerThreadBalanceManager + * + * The ExpWorkerThreadBalanceManager routine is the entrypoint for the + * worker thread balance set manager. + * + * @param Context + * Unused. + * + * @return None. + * + * @remarks The worker thread balance set manager listens every second, but can + * also be woken up by an event when a new thread is needed, or by the + * special shutdown event. This thread runs at priority 7. + * + * This routine must run at IRQL == PASSIVE_LEVEL. + * + *--*/ +VOID +NTAPI +ExpWorkerThreadBalanceManager(IN PVOID Context) +{ + KTIMER Timer; + LARGE_INTEGER Timeout; + NTSTATUS Status; + PVOID WaitEvents[2]; + PAGED_CODE(); + UNREFERENCED_PARAMETER(Context); + + /* Raise our priority above all other worker threads */ + KeSetBasePriorityThread(KeGetCurrentThread(), + EX_CRITICAL_QUEUE_PRIORITY + 1); + + /* Setup the timer */ + KeInitializeTimer(&Timer); + Timeout.QuadPart = Int32x32To64(-1, 10000000); + + /* We'll wait on the periodic timer and also the emergency event */ + WaitEvents[0] = &Timer; + WaitEvents[1] = &ExpThreadSetManagerEvent; + WaitEvents[2] = &ExpThreadSetManagerShutdownEvent; + + /* Start wait loop */ + for (;;) + { + /* Wait for the timer */ + KeSetTimer(&Timer, Timeout, NULL); + Status = KeWaitForMultipleObjects(3, + WaitEvents, + WaitAny, + Executive, + KernelMode, + FALSE, + NULL, + NULL); + if (Status == 0) + { + /* Our timer expired. Check for deadlocks */ + ExpDetectWorkerThreadDeadlock(); + } + else if (Status == 1) + { + /* Someone notified us, verify if we should create a new thread */ + ExpCheckDynamicThreadCount(); + } + else if (Status == 2) + { + /* We are shutting down. Cancel the timer */ + DPRINT1("System shutdown\n"); + KeCancelTimer(&Timer); + + /* Make sure we have a final thread */ + ASSERT(ExpLastWorkerThread); + + /* Wait for it */ + KeWaitForSingleObject(ExpLastWorkerThread, + Executive, KernelMode, - (PVOID*)&Thread, + FALSE, NULL);
- /* Set the Priority */ - KeSetPriorityThread(&Thread->Tcb, Priority); - - /* Dereference and close handle */ - ObDereferenceObject(Thread); - ZwClose(hThread); + /* Dereference it and kill us */ + ObDereferenceObject(ExpLastWorkerThread); + PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN); + } } }
+/*++ + * @name ExpInitializeWorkerThreads + * + * The ExpInitializeWorkerThreads routine initializes worker thread and + * work queue support. + * + * @param None. + * + * @return None. + * + * @remarks This routine is only called once during system initialization. + * + *--*/ VOID INIT_FUNCTION -STDCALL +NTAPI ExpInitializeWorkerThreads(VOID) { ULONG WorkQueueType; + ULONG CriticalThreads, DelayedThreads; + HANDLE ThreadHandle; + PETHREAD Thread; + ULONG i;
+ /* Setup the stack swap support */ + KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE); + InitializeListHead(&ExpWorkerListHead); + ExpWorkersCanSwap = TRUE; + + /* Set the number of critical and delayed threads. We shouldn't hardcode */ + DelayedThreads = EX_DELAYED_WORK_THREADS; + CriticalThreads = EX_CRITICAL_WORK_THREADS; + + /* Protect against greedy registry modifications */ + ExpAdditionalDelayedWorkerThreads = + min(ExpAdditionalCriticalWorkerThreads, 16); + ExpAdditionalCriticalWorkerThreads = + min(ExpAdditionalCriticalWorkerThreads, 16); + + /* Calculate final count */ + DelayedThreads += ExpAdditionalDelayedWorkerThreads; + CriticalThreads += ExpAdditionalCriticalWorkerThreads; + /* Initialize the Array */ - for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) { - + for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) + { + /* Clear the structure and initialize the queue */ RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE)); KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0); }
- /* Create the built-in worker threads for each work queue */ - ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY); - ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY); - ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY); + /* Dynamic threads are only used for the critical queue */ + ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE; + + /* Initialize the balance set manager events */ + KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&ExpThreadSetManagerShutdownEvent, + NotificationEvent, + FALSE); + + /* Create the built-in worker threads for the critical queue */ + for (i = 0; i < CriticalThreads; i++) + { + /* Create the thread */ + ExpCreateWorkerThread(CriticalWorkQueue, FALSE); + ExpCriticalWorkerThreads++; + } + + /* Create the built-in worker threads for the delayed queue */ + for (i = 0; i < DelayedThreads; i++) + { + /* Create the thread */ + ExpCreateWorkerThread(DelayedWorkQueue, FALSE); + ExpDelayedWorkerThreads++; + } + + /* Create the built-in worker thread for the hypercritical queue */ + ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE); + + /* Create the balance set manager thread */ + PsCreateSystemThread(&ThreadHandle, + THREAD_ALL_ACCESS, + NULL, + 0, + NULL, + ExpWorkerThreadBalanceManager, + NULL); + + /* Get a pointer to it for the shutdown process */ + ObReferenceObjectByHandle(ThreadHandle, + THREAD_ALL_ACCESS, + NULL, + KernelMode, + (PVOID*)&Thread, + NULL); + ExpWorkerThreadBalanceManagerPtr = Thread; + + /* Close the handle and return */ + ZwClose(ThreadHandle); }
-/* - * @implemented +/* PUBLIC FUNCTIONS **********************************************************/ + +/*++ + * @name ExQueueWorkItem + * @implemented NT4 * - * FUNCTION: Inserts a work item in a queue for one of the system worker - * threads to process - * ARGUMENTS: - * WorkItem = Item to insert - * QueueType = Queue to insert it in - */ + * The ExQueueWorkItem routine acquires rundown protection for + * the specified descriptor. + * + * @param WorkItem + * Pointer to an initialized Work Queue Item structure. This structure + * must be located in nonpaged pool memory. + * + * @param QueueType + * Type of the queue to use for this item. Can be one of the following: + * - DelayedWorkQueue + * - CriticalWorkQueue + * - HyperCriticalWorkQueue + * + * @return None. + * + * @remarks This routine is obsolete. Use IoQueueWorkItem instead. + * + * Callers of this routine must be running at IRQL <= DISPATCH_LEVEL. + * + *--*/ VOID -STDCALL +NTAPI ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem, WORK_QUEUE_TYPE QueueType) { - ASSERT(WorkItem!=NULL); - ASSERT_IRQL(DISPATCH_LEVEL); + PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType]; + ASSERT(QueueType < MaximumWorkQueue); ASSERT(WorkItem->List.Flink == NULL);
+ /* Don't try to trick us */ + if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress) + { + /* Bugcheck the system */ + KEBUGCHECKEX(WORKER_INVALID, + 1, + (ULONG_PTR)WorkItem, + (ULONG_PTR)WorkItem->WorkerRoutine, + 0); + } + /* Insert the Queue */ - KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List); + KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List); + ASSERT(!WorkQueue->Info.QueueDisabled); + + /* + * Check if we need a new thread. Our decision is as follows: + * - This queue type must support Dynamic Threads (duh!) + * - It actually has to have unprocessed items + * - We have CPUs which could be handling another thread + * - We haven't abused our usage of dynamic threads. + */ + if ((WorkQueue->Info.MakeThreadsAsNecessary) && + (!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) && + (WorkQueue->WorkerQueue.CurrentCount < + WorkQueue->WorkerQueue.MaximumCount) && + (WorkQueue->DynamicThreadCount < 16)) + { + /* Let the balance manager know about it */ + DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n", + WorkQueue->WorkerQueue.CurrentCount, + WorkQueue->WorkerQueue.MaximumCount); + KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE); + } }
/* EOF */ + _____
Modified: trunk/reactos/ntoskrnl/ntoskrnl.mc --- trunk/reactos/ntoskrnl/ntoskrnl.mc 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/ntoskrnl/ntoskrnl.mc 2006-01-03 21:34:19 UTC (rev 20554) @@ -1003,6 +1003,14 @@
WORKER_THREAD_RETURNED_AT_BAD_IRQL .
+MessageId=0xE4 +Severity=Success +Facility=System +SymbolicName=WORKER_INVALID +Language=English +WORKER_INVALID +. + MessageId=0xE2 Severity=Success Facility=System _____
Modified: trunk/reactos/w32api/include/ddk/ntifs.h --- trunk/reactos/w32api/include/ddk/ntifs.h 2006-01-03 21:29:39 UTC (rev 20553) +++ trunk/reactos/w32api/include/ddk/ntifs.h 2006-01-03 21:34:19 UTC (rev 20554) @@ -1551,6 +1551,13 @@
TOKEN_CONTROL ClientTokenControl; } SECURITY_CLIENT_CONTEXT, *PSECURITY_CLIENT_CONTEXT;
+typedef struct _ACE_HEADER +{ + UCHAR AceType; + UCHAR AceFlags; + USHORT AceSize; +} ACE_HEADER, *PACE_HEADER; + typedef struct _TUNNEL { FAST_MUTEX Mutex; PRTL_SPLAY_LINKS Cache; @@ -3370,6 +3377,13 @@ NTKERNELAPI BOOLEAN NTAPI +KeSetKernelStackSwapEnable( + IN BOOLEAN Enable +); + +NTKERNELAPI +BOOLEAN +NTAPI MmCanFileBeTruncated ( IN PSECTION_OBJECT_POINTERS SectionObjectPointer, IN PLARGE_INTEGER NewFileSize @@ -3885,10 +3899,10 @@ NTSYSAPI BOOLEAN NTAPI -RtlIsNameLegalDOS8Dot3 ( [truncated at 1000 lines; 104 more skipped]