Dispatching & Queue Rewrite II:
- Rewrote wait code. It is now cleaner, more optimized and faster. All waiting functions are now clearly differentiated instead of sharing code. These functions are called up to a dozen times a second, so having dedicated code for each of them is a real boost in speed. - Fixed several queue issues, made a dedicated queue wait/wake function (you are not supposed to use KeWaitFor on a queue, and this is also a speed boost), and make it compatible with new wait code. - Optimized Work Queue code to be much smaller and better organized, by using an array instead of hard-coded multiple static variables. Also, add support for the real NT structures and implementation, paving the road for Dynamic Work Items, which also have timeouts, and deadlock dection + debug info. - Simplified PsBlockThread and made it compatible with wait code. - Added support for priority boosting when unwaiting a thread; will use later, as well as put proper boosting for dispatch objects. - Inlined all dispatcher lock functions and header initialization for speed. - Moved executive wait code into ob. Modified: trunk/reactos/config Modified: trunk/reactos/include/ddk/extypes.h Modified: trunk/reactos/ntoskrnl/Makefile Modified: trunk/reactos/ntoskrnl/ex/init.c Modified: trunk/reactos/ntoskrnl/ex/work.c Modified: trunk/reactos/ntoskrnl/include/internal/ex.h Modified: trunk/reactos/ntoskrnl/include/internal/ke.h Modified: trunk/reactos/ntoskrnl/include/internal/ps.h Modified: trunk/reactos/ntoskrnl/ke/event.c Modified: trunk/reactos/ntoskrnl/ke/i386/tskswitch.S Modified: trunk/reactos/ntoskrnl/ke/kthread.c Modified: trunk/reactos/ntoskrnl/ke/mutex.c Modified: trunk/reactos/ntoskrnl/ke/queue.c Modified: trunk/reactos/ntoskrnl/ke/sem.c Modified: trunk/reactos/ntoskrnl/ke/timer.c Modified: trunk/reactos/ntoskrnl/ke/wait.c Modified: trunk/reactos/ntoskrnl/ntoskrnl.def Modified: trunk/reactos/ntoskrnl/ps/kill.c Modified: trunk/reactos/ntoskrnl/ps/thread.c _____
Modified: trunk/reactos/config --- trunk/reactos/config 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/config 2005-03-14 05:54:32 UTC (rev 14047) @@ -5,7 +5,6 @@
# Possible values in the future: alpha,i386,m68k,mips,powerpc ARCH := i386
- # # Which cpu should reactos optimize for # example : i486, i586, pentium, pentium2, pentium3, pentium4 @@ -33,11 +32,6 @@ CONFIG_SMP := 0
# -# whether to use a 3GB User, 1GB Kernel memory map -# -3GB := 0 - -# # Which version of NDIS do we support up to? # #NDISVERSION=NDIS50 _____
Modified: trunk/reactos/include/ddk/extypes.h --- trunk/reactos/include/ddk/extypes.h 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/include/ddk/extypes.h 2005-03-14 05:54:32 UTC (rev 14047) @@ -30,6 +30,22 @@
MaximumWorkQueue } WORK_QUEUE_TYPE;
+typedef struct _EX_QUEUE_WORKER_INFO { + UCHAR QueueDisabled:1; + UCHAR MakeThreadsAsNecessary:1; + UCHAR WaitMode:1; + ULONG WorkerCount:29; +} EX_QUEUE_WORKER_INFO, *PEX_QUEUE_WORKER_INFO; + +typedef struct _EX_WORK_QUEUE { + KQUEUE WorkerQueue; + ULONG DynamicThreadCount; + ULONG WorkItemsProcessed; + ULONG WorkItemsProcessedLastPass; + ULONG QueueDepthLastPass; + EX_QUEUE_WORKER_INFO Info; +} EX_WORK_QUEUE, *PEX_WORK_QUEUE; + typedef ULONG_PTR ERESOURCE_THREAD, *PERESOURCE_THREAD;
typedef struct _OWNER_ENTRY _____
Modified: trunk/reactos/ntoskrnl/Makefile --- trunk/reactos/ntoskrnl/Makefile 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/Makefile 2005-03-14 05:54:32 UTC (rev 14047) @@ -212,7 +212,8 @@
ob/object.o \ ob/sdcache.o \ ob/security.o \ - ob/symlink.o + ob/symlink.o \ + ob/wait.o
# Process Manager (Ps) OBJECTS_PS = \ _____
Modified: trunk/reactos/ntoskrnl/ex/init.c --- trunk/reactos/ntoskrnl/ex/init.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ex/init.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -27,11 +27,9 @@
extern ULONG_PTR LastKernelAddress; extern LOADER_MODULE KeLoaderModules[64]; extern PRTL_MESSAGE_RESOURCE_DATA KiBugCodeMessages; -#if 0 extern LIST_ENTRY KiProfileListHead; extern LIST_ENTRY KiProfileSourceListHead; extern KSPIN_LOCK KiProfileLock; -#endif
VOID PspPostInitSystemProcess(VOID);
@@ -427,12 +425,10 @@ /* Bring back the IRQL to Passive */ KeLowerIrql(PASSIVE_LEVEL);
-#if 0 /* Initialize Profiling */ InitializeListHead(&KiProfileListHead); InitializeListHead(&KiProfileSourceListHead); KeInitializeSpinLock(&KiProfileLock); -#endif
/* Load basic Security for other Managers */ if (!SeInit1()) KEBUGCHECK(SECURITY_INITIALIZATION_FAILED); @@ -488,7 +484,7 @@ PspPostInitSystemProcess();
/* initialize the worker threads */ - ExInitializeWorkerThreads(); + ExpInitializeWorkerThreads();
/* initialize callbacks */ ExpInitializeCallbacks(); _____
Modified: trunk/reactos/ntoskrnl/ex/work.c --- trunk/reactos/ntoskrnl/ex/work.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ex/work.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -1,11 +1,11 @@
-/* $Id$ - * +/* * COPYRIGHT: See COPYING in the top level directory * PROJECT: ReactOS kernel * FILE: ntoskrnl/ex/work.c * PURPOSE: Manage system work queues * - * PROGRAMMERS: David Welch (welch@mcmail.com) + * PROGRAMMERS: Alex Ionescu - Used correct work queue array and added some fixes and checks. + * Gunnar Dalsnes - Implemented */
/* INCLUDES ******************************************************************/ @@ -25,17 +25,10 @@ /* * PURPOSE: Queue of items waiting to be processed at normal priority */ -KQUEUE EiNormalWorkQueue; +EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
-KQUEUE EiCriticalWorkQueue; - -KQUEUE EiHyperCriticalWorkQueue; - /* FUNCTIONS ****************************************************************/
-//static NTSTATUS STDCALL -static VOID STDCALL -ExWorkerThreadEntryPoint(IN PVOID context) /* * FUNCTION: Entry point for a worker thread * ARGUMENTS: @@ -44,161 +37,138 @@ * NOTE: To kill a worker thread you must queue an item whose callback * calls PsTerminateSystemThread */ +static +VOID +STDCALL +ExpWorkerThreadEntryPoint(IN PVOID Context) { - - PWORK_QUEUE_ITEM item; - PLIST_ENTRY current; + PWORK_QUEUE_ITEM WorkItem; + PLIST_ENTRY QueueEntry; + WORK_QUEUE_TYPE WorkQueueType; + PEX_WORK_QUEUE WorkQueue;
- while (TRUE) - { - current = KeRemoveQueue( (PKQUEUE)context, KernelMode, NULL ); + /* Get Queue Type and Worker Queue */ + WorkQueueType = (WORK_QUEUE_TYPE)Context; + WorkQueue = &ExWorkerQueue[WorkQueueType]; + + /* Loop forever */ + while (TRUE) { + + /* Wait for Something to Happen on the Queue */ + QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode, NULL);
- /* can't happend since we do a KernelMode wait (and we're a system thread) */ - ASSERT((NTSTATUS)current != STATUS_USER_APC); + /* Can't happen since we do a KernelMode wait (and we're a system thread) */ + ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
- /* this should never happend either, since we wait with NULL timeout, - * but there's a slight possibility that STATUS_TIMEOUT is returned - * at queue rundown in NT (unlikely) -Gunnar - */ - ASSERT((NTSTATUS)current != STATUS_TIMEOUT); - - /* based on INVALID_WORK_QUEUE_ITEM bugcheck desc. */ - if (current->Flink == NULL || current->Blink == NULL) - { - KeBugCheck(INVALID_WORK_QUEUE_ITEM); - } - - /* "reinitialize" item (same as done in ExInitializeWorkItem) */ - current->Flink = NULL; - - item = CONTAINING_RECORD( current, WORK_QUEUE_ITEM, List); - item->WorkerRoutine(item->Parameter); - - if (KeGetCurrentIrql() != PASSIVE_LEVEL) - { - KeBugCheck(IRQL_NOT_LESS_OR_EQUAL); - } - } - + /* this should never happen either, since we wait with NULL timeout, + * but there's a slight possibility that STATUS_TIMEOUT is returned + * at queue rundown in NT (unlikely) -Gunnar + */ + ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT); + + /* Increment Processed Work Items */ + InterlockedIncrement(&WorkQueue->WorkItemsProcessed); + + /* Get the Work Item */ + WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List); + + /* Call the Worker Routine */ + WorkItem->WorkerRoutine(WorkItem->Parameter); + + /* Make sure it returned at right IRQL */ + if (KeGetCurrentIrql() != PASSIVE_LEVEL) { + + /* FIXME: Make this an Ex */ + KEBUGCHECK(WORKER_THREAD_RETURNED_AT_BAD_IRQL); + } + + /* Make sure it returned with Impersionation Disabled */ + if (PsGetCurrentThread()->ActiveImpersonationInfo) { + + /* FIXME: Make this an Ex */ + KEBUGCHECK(IMPERSONATING_WORKER_THREAD); + } + } }
-static VOID ExInitializeWorkQueue(PKQUEUE WorkQueue, - KPRIORITY Priority) +static +VOID +STDCALL +ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType, + KPRIORITY Priority) { ULONG i; PETHREAD Thread; HANDLE hThread; - NTSTATUS Status; - - PAGED_CODE(); - + /* Loop through how many threads we need to create */ for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) { - + /* Create the System Thread */ - Status = PsCreateSystemThread(&hThread, - THREAD_ALL_ACCESS, - NULL, - NULL, - NULL, - ExWorkerThreadEntryPoint, - (PVOID)WorkQueue); - if(NT_SUCCESS(Status)) - { - /* Get the Thread */ - Status = ObReferenceObjectByHandle(hThread, - THREAD_SET_INFORMATION, - PsThreadType, - KernelMode, - (PVOID*)&Thread, - NULL); - - if(NT_SUCCESS(Status)) - { - /* Set the Priority */ - KeSetPriorityThread(&Thread->Tcb, Priority); - - /* Dereference and close handle */ - ObDereferenceObject(Thread); - ZwClose(hThread); - } - else - { - DPRINT1("Unable to reference worker thread handle 0x%x, Status: 0x%x!\n", hThread, Status); - KEBUGCHECK(0); - } - } - else - { - DPRINT1("Unable to create worker thread, Status: 0x%x!\n", Status); - KEBUGCHECK(0); - } + PsCreateSystemThread(&hThread, + THREAD_ALL_ACCESS, + NULL, + NULL, + NULL, + ExpWorkerThreadEntryPoint, + (PVOID)WorkQueueType); + + /* Get the Thread */ + ObReferenceObjectByHandle(hThread, + THREAD_SET_INFORMATION, + PsThreadType, + KernelMode, + (PVOID*)&Thread, + NULL); + + /* Set the Priority */ + KeSetPriorityThread(&Thread->Tcb, Priority); + + /* Dereference and close handle */ + ObDereferenceObject(Thread); + ZwClose(hThread); } }
-VOID INIT_FUNCTION -ExInitializeWorkerThreads(VOID) +VOID +INIT_FUNCTION +ExpInitializeWorkerThreads(VOID) { - KeInitializeQueue( &EiNormalWorkQueue, NUMBER_OF_WORKER_THREADS ); - KeInitializeQueue( &EiCriticalWorkQueue , NUMBER_OF_WORKER_THREADS ); - KeInitializeQueue( &EiHyperCriticalWorkQueue , NUMBER_OF_WORKER_THREADS ); - - ExInitializeWorkQueue(&EiNormalWorkQueue, - LOW_PRIORITY); - ExInitializeWorkQueue(&EiCriticalWorkQueue, - LOW_REALTIME_PRIORITY); - ExInitializeWorkQueue(&EiHyperCriticalWorkQueue, - HIGH_PRIORITY); + ULONG WorkQueueType; + + /* Initialize the Array */ + for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) { + + RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE)); + KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0); + } + + /* Create the built-in worker threads for each work queue */ + ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY); + ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY); + ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY); }
/* * @implemented - */ -VOID STDCALL -ExQueueWorkItem (PWORK_QUEUE_ITEM WorkItem, - WORK_QUEUE_TYPE QueueType) -/* + * * FUNCTION: Inserts a work item in a queue for one of the system worker * threads to process * ARGUMENTS: * WorkItem = Item to insert * QueueType = Queue to insert it in */ +VOID +STDCALL +ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem, + WORK_QUEUE_TYPE QueueType) { ASSERT(WorkItem!=NULL); ASSERT_IRQL(DISPATCH_LEVEL); ASSERT(WorkItem->List.Flink == NULL); - /* - * Insert the item in the appropiate queue and wake up any thread - * waiting for something to do - */ - switch(QueueType) - { - case DelayedWorkQueue: - KeInsertQueue ( - &EiNormalWorkQueue, - &WorkItem->List - ); - break; - - case CriticalWorkQueue: - KeInsertQueue ( - &EiCriticalWorkQueue, - &WorkItem->List - ); - break; - - case HyperCriticalWorkQueue: - KeInsertQueue ( - &EiHyperCriticalWorkQueue, - &WorkItem->List - ); - break; - - default: - break; - - } + + /* Insert the Queue */ + KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List); }
/* EOF */ _____
Modified: trunk/reactos/ntoskrnl/include/internal/ex.h --- trunk/reactos/ntoskrnl/include/internal/ex.h 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/include/internal/ex.h 2005-03-14 05:54:32 UTC (rev 14047) @@ -86,8 +86,10 @@
extern ULONG ExpTimeZoneId;
extern POBJECT_TYPE ExEventPairObjectType; +extern POBJECT_TYPE EXPORTED ExMutantObjectType; +extern POBJECT_TYPE EXPORTED ExSemaphoreObjectType; +extern POBJECT_TYPE EXPORTED ExTimerType;
- /* INITIALIZATION FUNCTIONS *************************************************/
VOID @@ -100,7 +102,7 @@ VOID ExpInitTimeZoneInfo(VOID); VOID -ExInitializeWorkerThreads(VOID); +ExpInitializeWorkerThreads(VOID); VOID ExpInitLookasideLists(VOID); VOID _____
Modified: trunk/reactos/ntoskrnl/include/internal/ke.h --- trunk/reactos/ntoskrnl/include/internal/ke.h 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/include/internal/ke.h 2005-03-14 05:54:32 UTC (rev 14047) @@ -139,34 +139,39 @@
IN KPROFILE_SOURCE Source );
-VOID KiInsertProfileIntoProcess(PLIST_ENTRY ListHead, PKPROFILE Profile); -VOID KiInsertProfile(PKPROFILE Profile); -VOID KiRemoveProfile(PKPROFILE Profile); -VOID STDCALL KiDeleteProfile(PVOID ObjectBody);
- VOID STDCALL KeUpdateSystemTime(PKTRAP_FRAME TrapFrame, KIRQL Irql); VOID STDCALL KeUpdateRunTime(PKTRAP_FRAME TrapFrame, KIRQL Irql);
VOID STDCALL KiExpireTimers(PKDPC Dpc, PVOID DeferredContext, PVOID SystemArgument1, PVOID SystemArgument2);
-KIRQL KeAcquireDispatcherDatabaseLock(VOID); -VOID KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID); -VOID KeReleaseDispatcherDatabaseLock(KIRQL Irql); -VOID KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID); +KIRQL inline FASTCALL KeAcquireDispatcherDatabaseLock(VOID); +VOID inline FASTCALL KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID); +VOID inline FASTCALL KeReleaseDispatcherDatabaseLock(KIRQL Irql); +VOID inline FASTCALL KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID);
BOOLEAN KiDispatcherObjectWake(DISPATCHER_HEADER* hdr, KPRIORITY increment); VOID STDCALL KeExpireTimers(PKDPC Apc, PVOID Arg1, PVOID Arg2, PVOID Arg3); -VOID KeInitializeDispatcherHeader(DISPATCHER_HEADER* Header, ULONG Type, - ULONG Size, ULONG SignalState); +VOID inline FASTCALL KeInitializeDispatcherHeader(DISPATCHER_HEADER* Header, ULONG Type, + ULONG Size, ULONG SignalState); VOID KeDumpStackFrames(PULONG Frame); BOOLEAN KiTestAlert(VOID);
-BOOLEAN KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS WaitStatus); +VOID FASTCALL KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS WaitStatus); + +BOOLEAN STDCALL KiInsertTimer(PKTIMER Timer, LARGE_INTEGER DueTime);
+VOID inline FASTCALL KiSatisfyObjectWait(PDISPATCHER_HEADER Object, PKTHREAD Thread); + +BOOLEAN inline FASTCALL KiIsObjectSignaled(PDISPATCHER_HEADER Object, PKTHREAD Thread); + +VOID inline FASTCALL KiSatisifyMultipleObjectWaits(PKWAIT_BLOCK WaitBlock); + +VOID FASTCALL KiWaitTest(PDISPATCHER_HEADER Object, KPRIORITY Increment); + PULONG KeGetStackTopThread(struct _ETHREAD* Thread); VOID KeContextToTrapFrame(PCONTEXT Context, PKTRAP_FRAME TrapFrame); VOID STDCALL KiDeliverApc(KPROCESSOR_MODE PreviousMode, @@ -191,6 +196,7 @@ KeTestAlertThread(IN KPROCESSOR_MODE AlertMode);
BOOLEAN STDCALL KeRemoveQueueApc (PKAPC Apc); +VOID FASTCALL KiWakeQueue(IN PKQUEUE Queue); PLIST_ENTRY STDCALL KeRundownQueue(IN PKQUEUE Queue);
extern LARGE_INTEGER SystemBootTime; @@ -202,7 +208,7 @@ VOID KeInitTimer(VOID); VOID KeInitDpc(struct _KPRCB* Prcb); VOID KeInitDispatcher(VOID); -VOID KeInitializeDispatcher(VOID); +VOID inline FASTCALL KeInitializeDispatcher(VOID); VOID KiInitializeSystemClock(VOID); VOID KiInitializeBugCheck(VOID); VOID Phase1Initialization(PVOID Context); _____
Modified: trunk/reactos/ntoskrnl/include/internal/ps.h --- trunk/reactos/ntoskrnl/include/internal/ps.h 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/include/internal/ps.h 2005-03-14 05:54:32 UTC (rev 14047) @@ -525,8 +525,11 @@
ULONG PsEnumThreadsByProcess(PEPROCESS Process); PEPROCESS PsGetNextProcess(PEPROCESS OldProcess); VOID -PsBlockThread(PNTSTATUS Status, UCHAR Alertable, ULONG WaitMode, - BOOLEAN DispatcherLock, KIRQL WaitIrql, UCHAR WaitReason); +STDCALL +PsBlockThread(PNTSTATUS Status, + UCHAR Alertable, + ULONG WaitMode, + UCHAR WaitReason); VOID PsUnblockThread(PETHREAD Thread, PNTSTATUS WaitStatus, KPRIORITY Increment); VOID _____
Modified: trunk/reactos/ntoskrnl/ke/event.c --- trunk/reactos/ntoskrnl/ke/event.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ke/event.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -91,7 +91,7 @@
Event->Header.SignalState = 1;
/* Wake the Event */ - KiDispatcherObjectWake(&Event->Header, Increment); + KiWaitTest(&Event->Header, Increment); }
/* Unsignal it */ @@ -196,7 +196,7 @@ /* We must do a full wait satisfaction */ DPRINT("Notification Event or WaitAll, Wait on the Event and Signal\n"); Event->Header.SignalState = 1; - KiDispatcherObjectWake(&Event->Header, Increment); + KiWaitTest(&Event->Header, Increment); }
} else { _____
Modified: trunk/reactos/ntoskrnl/ke/i386/tskswitch.S --- trunk/reactos/ntoskrnl/ke/i386/tskswitch.S 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ke/i386/tskswitch.S 2005-03-14 05:54:32 UTC (rev 14047) @@ -204,7 +204,7 @@
*/ sti
- call _KeReleaseDispatcherDatabaseLockFromDpcLevel + call @KeReleaseDispatcherDatabaseLockFromDpcLevel@0
cmpl $0, _PiNrThreadsAwaitingReaping je 5f _____
Modified: trunk/reactos/ntoskrnl/ke/kthread.c --- trunk/reactos/ntoskrnl/ke/kthread.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ke/kthread.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -56,7 +56,7 @@
/* Signal and satisfy */ Thread->SuspendSemaphore.Header.SignalState++; - KiDispatcherObjectWake(&Thread->SuspendSemaphore.Header, IO_NO_INCREMENT); + KiWaitTest(&Thread->SuspendSemaphore.Header, IO_NO_INCREMENT); } }
_____
Modified: trunk/reactos/ntoskrnl/ke/mutex.c --- trunk/reactos/ntoskrnl/ke/mutex.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ke/mutex.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -177,7 +177,7 @@
/* Wake the Mutant */ DPRINT("Waking the Mutant\n"); - KiDispatcherObjectWake(&Mutant->Header, Increment); + KiWaitTest(&Mutant->Header, Increment); } }
_____
Modified: trunk/reactos/ntoskrnl/ke/queue.c --- trunk/reactos/ntoskrnl/ke/queue.c 2005-03-14 04:51:51 UTC (rev 14046) +++ trunk/reactos/ntoskrnl/ke/queue.c 2005-03-14 05:54:32 UTC (rev 14047) @@ -16,245 +16,483 @@
/* FUNCTIONS *****************************************************************/
- +LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head); + /* * @implemented */ -VOID STDCALL +VOID +STDCALL KeInitializeQueue(IN PKQUEUE Queue, - IN ULONG Count OPTIONAL) + IN ULONG Count OPTIONAL) { - KeInitializeDispatcherHeader(&Queue->Header, - QueueObject, - sizeof(KQUEUE)/sizeof(ULONG), - 0); - InitializeListHead(&Queue->EntryListHead); - InitializeListHead(&Queue->ThreadListHead); - Queue->CurrentCount = 0; - Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count; + DPRINT("KeInitializeQueue %x\n", Queue); + + /* Initialize the Header */ + KeInitializeDispatcherHeader(&Queue->Header, + QueueObject, + sizeof(KQUEUE)/sizeof(ULONG), + 0); + + /* Initialize the Lists */ + InitializeListHead(&Queue->EntryListHead); + InitializeListHead(&Queue->ThreadListHead); + + /* Set the Current and Maximum Count */ + Queue->CurrentCount = 0; + Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count; }
- /* * @implemented - * - * Returns number of entries in the queue */ -LONG STDCALL -KeReadStateQueue(IN PKQUEUE Queue) +LONG +STDCALL +KeInsertHeadQueue(IN PKQUEUE Queue, + IN PLIST_ENTRY Entry) { - return(Queue->Header.SignalState); -} - -/* - * Returns the previous number of entries in the queue - */ -LONG STDCALL -KiInsertQueue( - IN PKQUEUE Queue, - IN PLIST_ENTRY Entry, - BOOLEAN Head - ) -{ - ULONG InitialState; - - DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry); + LONG PreviousState; + KIRQL OldIrql; + + DPRINT("KeInsertHeadQueue %x\n", Queue); + + /* Lock the Dispatcher Database */ + OldIrql = KeAcquireDispatcherDatabaseLock(); + + /* Insert the Queue */ + PreviousState = KiInsertQueue(Queue, Entry, TRUE); + + /* Release the Dispatcher Lock */ + KeReleaseDispatcherDatabaseLock(OldIrql);
- InitialState = Queue->Header.SignalState; - - if (Head) - { - InsertHeadList(&Queue->EntryListHead, Entry); - } - else - { - InsertTailList(&Queue->EntryListHead, Entry); - } - - //inc. num entries in queue - Queue->Header.SignalState++; - - /* Why the KeGetCurrentThread()->Queue != Queue? - * KiInsertQueue might be called from an APC for the current thread. - * -Gunnar - */ - if (Queue->CurrentCount < Queue->MaximumCount && - !IsListEmpty(&Queue->Header.WaitListHead) && - KeGetCurrentThread()->Queue != Queue) - { - KiDispatcherObjectWake(&Queue->Header, IO_NO_INCREMENT); - } - - return InitialState; + /* Return previous State */ + return PreviousState; }
- - /* * @implemented */ LONG STDCALL -KeInsertHeadQueue(IN PKQUEUE Queue, - IN PLIST_ENTRY Entry) +KeInsertQueue(IN PKQUEUE Queue, + IN PLIST_ENTRY Entry) { - LONG Result; - KIRQL OldIrql; + LONG PreviousState; + KIRQL OldIrql; + + DPRINT("KeInsertQueue %x\n", Queue); + + /* Lock the Dispatcher Database */ + OldIrql = KeAcquireDispatcherDatabaseLock(); + + /* Insert the Queue */ + PreviousState = KiInsertQueue(Queue, Entry, FALSE); + + /* Release the Dispatcher Lock */ + KeReleaseDispatcherDatabaseLock(OldIrql);
- OldIrql = KeAcquireDispatcherDatabaseLock(); - Result = KiInsertQueue(Queue,Entry,TRUE); - KeReleaseDispatcherDatabaseLock(OldIrql); - - return Result; + /* Return previous State */ + return PreviousState; }
- /* * @implemented + * + * Returns number of entries in the queue */ -LONG STDCALL -KeInsertQueue(IN PKQUEUE Queue, - IN PLIST_ENTRY Entry) +LONG +STDCALL +KeReadStateQueue(IN PKQUEUE Queue) { - LONG Result; - KIRQL OldIrql; - - OldIrql = KeAcquireDispatcherDatabaseLock(); - Result = KiInsertQueue(Queue,Entry,FALSE); - KeReleaseDispatcherDatabaseLock(OldIrql); - - return Result; + /* Returns the Signal State */ + return(Queue->Header.SignalState); }
- /* * @implemented */ -PLIST_ENTRY STDCALL +PLIST_ENTRY +STDCALL KeRemoveQueue(IN PKQUEUE Queue, - IN KPROCESSOR_MODE WaitMode, - IN PLARGE_INTEGER Timeout OPTIONAL) + IN KPROCESSOR_MODE WaitMode, + IN PLARGE_INTEGER Timeout OPTIONAL) {
- PLIST_ENTRY ListEntry; - NTSTATUS Status; - PKTHREAD Thread = KeGetCurrentThread(); - KIRQL OldIrql; + PLIST_ENTRY ListEntry; + NTSTATUS Status; + PKTHREAD Thread = KeGetCurrentThread(); + KIRQL OldIrql; + PKQUEUE PreviousQueue; + PKWAIT_BLOCK WaitBlock; + PKWAIT_BLOCK TimerWaitBlock; + PKTIMER Timer;
- OldIrql = KeAcquireDispatcherDatabaseLock (); + DPRINT("KeRemoveQueue %x\n", Queue); + + /* Check if the Lock is already held */ + if (Thread->WaitNext) { + + DPRINT("Lock is already held\n"); + + } else { + + /* Lock the Dispatcher Database */ + DPRINT("Lock not held, acquiring\n"); + OldIrql = KeAcquireDispatcherDatabaseLock(); + Thread->WaitIrql = OldIrql; + }
- if (Thread->Queue != Queue) - { - /* - * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the - * Queue->ThreadListHead when the thread registers with the queue and unlinked when - * the thread registers with a new queue. The Thread->Queue already tells us what - * queue the thread is registered with. - * -Gunnar - */ + /* This is needed so that we can set the new queue right here, before additional processing */ + PreviousQueue = Thread->Queue; + Thread->Queue = Queue;
- //unregister thread from previous queue (if any) - if (Thread->Queue) - { - RemoveEntryList(&Thread->QueueListEntry); - Thread->Queue->CurrentCount--; - - if (Thread->Queue->CurrentCount < Thread->Queue->MaximumCount && - !IsListEmpty(&Thread->Queue->EntryListHead)) - { - KiDispatcherObjectWake(&Thread->Queue->Header, 0); - } + /* Check if this is a different queue */ + if (Queue != PreviousQueue) { + + /* + * INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the + * Queue->ThreadListHead when the thread registers with the queue and unlinked when + * the thread registers with a new queue. The Thread->Queue already tells us what + * queue the thread is registered with. + * -Gunnar + */ + DPRINT("Different Queue\n"); + if (PreviousQueue) { + + /* Remove from this list */ + DPRINT("Removing Old Queue\n"); + RemoveEntryList(&Thread->QueueListEntry); + + /* Wake the queue */ + DPRINT("Activating new thread\n"); + KiWakeQueue(PreviousQueue); }
- // register thread with this queue - InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry); - Thread->Queue = Queue; - } - else /* if (Thread->Queue == Queue) */ - { - //dec. num running threads - Queue->CurrentCount--; - } + /* Insert in this new Queue */ + DPRINT("Inserting new Queue!\n"); + InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
- - - - while (TRUE) - { - if (Queue->CurrentCount < Queue->MaximumCount && !IsListEmpty(&Queue->EntryListHead)) - { - ListEntry = RemoveHeadList(&Queue->EntryListHead); - //dec. num entries in queue - Queue->Header.SignalState--; - //inc. num running threads - Queue->CurrentCount++; - - KeReleaseDispatcherDatabaseLock(OldIrql); - return ListEntry; - } - else - { - //inform KeWaitXxx that we are holding disp. lock - Thread->WaitNext = TRUE; - Thread->WaitIrql = OldIrql; + } else { + + /* Same queue, decrement waiting threads */ + DPRINT("Same Queue!\n"); + Queue->CurrentCount--; + } + + /* Loop until the queue is processed */ + while (TRUE) { + + /* Get the Entry */ + ListEntry = Queue->EntryListHead.Flink; + + /* Check if the counts are valid and if there is still a queued entry */ + if ((Queue->CurrentCount < Queue->MaximumCount) && + (ListEntry != &Queue->EntryListHead)) { + + /* Remove the Entry and Save it */ + DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n", + Queue->CurrentCount, Queue->MaximumCount); + ListEntry = RemoveHeadList(&Queue->EntryListHead); + + /* Decrease the number of entries */ + Queue->Header.SignalState--; + + /* Increase numbef of running threads */ + Queue->CurrentCount++; + + /* Check if the entry is valid. If not, bugcheck */ + if (!ListEntry->Flink || !ListEntry->Blink) { + + KEBUGCHECK(INVALID_WORK_QUEUE_ITEM); + } + + /* Remove the Entry */ + RemoveEntryList(ListEntry); + ListEntry->Flink = NULL; + + /* Nothing to wait on */ + break; + + } else { + + /* Do the wait */ + DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n", + Queue->CurrentCount, Queue->MaximumCount); + + /* Use the Thread's Wait Block, it's big enough */ + Thread->WaitBlockList = &Thread->WaitBlock[0]; + + /* Fail if there's an APC Pending */ + if (WaitMode == UserMode && Thread->ApcState.UserApcPending) { + + /* Return the status and increase the pending threads */ + ListEntry = (PLIST_ENTRY)STATUS_USER_APC; + Queue->CurrentCount++; + + /* Nothing to wait on */ + break; + } + + /* Build the Wait Block */ + WaitBlock = &Thread->WaitBlock[0]; + WaitBlock->Object = (PVOID)Queue; + WaitBlock->WaitKey = STATUS_SUCCESS; + WaitBlock->WaitType = WaitAny; + WaitBlock->Thread = Thread; + WaitBlock->NextWaitBlock = NULL; + + Thread->WaitStatus = STATUS_SUCCESS; + + /* We need to wait for the object... check if we have a timeout */ + if (Timeout) { + + /* If it's zero, then don't do any waiting */ + if (!Timeout->QuadPart) { + + /* Instant Timeout, return the status and increase the pending threads */ + DPRINT("Queue Wait has timed out\n"); + ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT; + Queue->CurrentCount++; + + /* Nothing to wait on */ + break; + } + + /* + * Set up the Timer. We'll use the internal function so that we can + * hold on to the dispatcher lock. + */ + Timer = &Thread->Timer; + TimerWaitBlock = &Thread->WaitBlock[1];
- Status = KeWaitForSingleObject(Queue, - WrQueue, - WaitMode, - TRUE, //bAlertable - Timeout); - - if (Status == STATUS_TIMEOUT || Status == STATUS_USER_APC) - { - return (PVOID)Status; - } - - OldIrql = KeAcquireDispatcherDatabaseLock (); - } - } + /* Set up the Timer Wait Block */ + TimerWaitBlock->Object = (PVOID)Timer; + TimerWaitBlock->Thread = Thread; + TimerWaitBlock->WaitKey = STATUS_TIMEOUT; + TimerWaitBlock->WaitType = WaitAny; + TimerWaitBlock->NextWaitBlock = NULL; + + /* Link the timer to this Wait Block */ + InitializeListHead(&Timer->Header.WaitListHead); [truncated at 1000 lines; 2258 more skipped]