Dispatching & Queue Rewrite II:
- Rewrote wait code. It is now cleaner, more optimized and faster.
All waiting
functions are now clearly differentiated instead of sharing code.
These functions
are called up to a dozen times a second, so having dedicated code
for each of
them is a real boost in speed.
- Fixed several queue issues, made a dedicated queue wait/wake
function (you are not
supposed to use KeWaitFor on a queue, and this is also a speed
boost), and make it
compatible with new wait code.
- Optimized Work Queue code to be much smaller and better organized,
by using an
array instead of hard-coded multiple static variables. Also, add
support for the
real NT structures and implementation, paving the road for Dynamic
Work Items, which
also have timeouts, and deadlock dection + debug info.
- Simplified PsBlockThread and made it compatible with wait code.
- Added support for priority boosting when unwaiting a thread; will
use later, as well
as put proper boosting for dispatch objects.
- Inlined all dispatcher lock functions and header initialization
for speed.
- Moved executive wait code into ob.
Modified: trunk/reactos/config
Modified: trunk/reactos/include/ddk/extypes.h
Modified: trunk/reactos/ntoskrnl/Makefile
Modified: trunk/reactos/ntoskrnl/ex/init.c
Modified: trunk/reactos/ntoskrnl/ex/work.c
Modified: trunk/reactos/ntoskrnl/include/internal/ex.h
Modified: trunk/reactos/ntoskrnl/include/internal/ke.h
Modified: trunk/reactos/ntoskrnl/include/internal/ps.h
Modified: trunk/reactos/ntoskrnl/ke/event.c
Modified: trunk/reactos/ntoskrnl/ke/i386/tskswitch.S
Modified: trunk/reactos/ntoskrnl/ke/kthread.c
Modified: trunk/reactos/ntoskrnl/ke/mutex.c
Modified: trunk/reactos/ntoskrnl/ke/queue.c
Modified: trunk/reactos/ntoskrnl/ke/sem.c
Modified: trunk/reactos/ntoskrnl/ke/timer.c
Modified: trunk/reactos/ntoskrnl/ke/wait.c
Modified: trunk/reactos/ntoskrnl/ntoskrnl.def
Modified: trunk/reactos/ntoskrnl/ps/kill.c
Modified: trunk/reactos/ntoskrnl/ps/thread.c
_____
Modified: trunk/reactos/config
--- trunk/reactos/config 2005-03-14 04:51:51 UTC (rev 14046)
+++ trunk/reactos/config 2005-03-14 05:54:32 UTC (rev 14047)
@@ -5,7 +5,6 @@
# Possible values in the future: alpha,i386,m68k,mips,powerpc
ARCH := i386
-
#
# Which cpu should reactos optimize for
# example : i486, i586, pentium, pentium2, pentium3, pentium4
@@ -33,11 +32,6 @@
CONFIG_SMP := 0
#
-# whether to use a 3GB User, 1GB Kernel memory map
-#
-3GB := 0
-
-#
# Which version of NDIS do we support up to?
#
#NDISVERSION=NDIS50
_____
Modified: trunk/reactos/include/ddk/extypes.h
--- trunk/reactos/include/ddk/extypes.h 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/include/ddk/extypes.h 2005-03-14 05:54:32 UTC (rev
14047)
@@ -30,6 +30,22 @@
MaximumWorkQueue
} WORK_QUEUE_TYPE;
+typedef struct _EX_QUEUE_WORKER_INFO {
+ UCHAR QueueDisabled:1;
+ UCHAR MakeThreadsAsNecessary:1;
+ UCHAR WaitMode:1;
+ ULONG WorkerCount:29;
+} EX_QUEUE_WORKER_INFO, *PEX_QUEUE_WORKER_INFO;
+
+typedef struct _EX_WORK_QUEUE {
+ KQUEUE WorkerQueue;
+ ULONG DynamicThreadCount;
+ ULONG WorkItemsProcessed;
+ ULONG WorkItemsProcessedLastPass;
+ ULONG QueueDepthLastPass;
+ EX_QUEUE_WORKER_INFO Info;
+} EX_WORK_QUEUE, *PEX_WORK_QUEUE;
+
typedef ULONG_PTR ERESOURCE_THREAD, *PERESOURCE_THREAD;
typedef struct _OWNER_ENTRY
_____
Modified: trunk/reactos/ntoskrnl/Makefile
--- trunk/reactos/ntoskrnl/Makefile 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/Makefile 2005-03-14 05:54:32 UTC (rev
14047)
@@ -212,7 +212,8 @@
ob/object.o \
ob/sdcache.o \
ob/security.o \
- ob/symlink.o
+ ob/symlink.o \
+ ob/wait.o
# Process Manager (Ps)
OBJECTS_PS = \
_____
Modified: trunk/reactos/ntoskrnl/ex/init.c
--- trunk/reactos/ntoskrnl/ex/init.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ex/init.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -27,11 +27,9 @@
extern ULONG_PTR LastKernelAddress;
extern LOADER_MODULE KeLoaderModules[64];
extern PRTL_MESSAGE_RESOURCE_DATA KiBugCodeMessages;
-#if 0
extern LIST_ENTRY KiProfileListHead;
extern LIST_ENTRY KiProfileSourceListHead;
extern KSPIN_LOCK KiProfileLock;
-#endif
VOID PspPostInitSystemProcess(VOID);
@@ -427,12 +425,10 @@
/* Bring back the IRQL to Passive */
KeLowerIrql(PASSIVE_LEVEL);
-#if 0
/* Initialize Profiling */
InitializeListHead(&KiProfileListHead);
InitializeListHead(&KiProfileSourceListHead);
KeInitializeSpinLock(&KiProfileLock);
-#endif
/* Load basic Security for other Managers */
if (!SeInit1()) KEBUGCHECK(SECURITY_INITIALIZATION_FAILED);
@@ -488,7 +484,7 @@
PspPostInitSystemProcess();
/* initialize the worker threads */
- ExInitializeWorkerThreads();
+ ExpInitializeWorkerThreads();
/* initialize callbacks */
ExpInitializeCallbacks();
_____
Modified: trunk/reactos/ntoskrnl/ex/work.c
--- trunk/reactos/ntoskrnl/ex/work.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ex/work.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -1,11 +1,11 @@
-/* $Id$
- *
+/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
* FILE: ntoskrnl/ex/work.c
* PURPOSE: Manage system work queues
*
- * PROGRAMMERS: David Welch (welch(a)mcmail.com)
+ * PROGRAMMERS: Alex Ionescu - Used correct work queue array and
added some fixes and checks.
+ * Gunnar Dalsnes - Implemented
*/
/* INCLUDES
******************************************************************/
@@ -25,17 +25,10 @@
/*
* PURPOSE: Queue of items waiting to be processed at normal priority
*/
-KQUEUE EiNormalWorkQueue;
+EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
-KQUEUE EiCriticalWorkQueue;
-
-KQUEUE EiHyperCriticalWorkQueue;
-
/* FUNCTIONS
****************************************************************/
-//static NTSTATUS STDCALL
-static VOID STDCALL
-ExWorkerThreadEntryPoint(IN PVOID context)
/*
* FUNCTION: Entry point for a worker thread
* ARGUMENTS:
@@ -44,161 +37,138 @@
* NOTE: To kill a worker thread you must queue an item whose callback
* calls PsTerminateSystemThread
*/
+static
+VOID
+STDCALL
+ExpWorkerThreadEntryPoint(IN PVOID Context)
{
-
- PWORK_QUEUE_ITEM item;
- PLIST_ENTRY current;
+ PWORK_QUEUE_ITEM WorkItem;
+ PLIST_ENTRY QueueEntry;
+ WORK_QUEUE_TYPE WorkQueueType;
+ PEX_WORK_QUEUE WorkQueue;
- while (TRUE)
- {
- current = KeRemoveQueue( (PKQUEUE)context, KernelMode, NULL );
+ /* Get Queue Type and Worker Queue */
+ WorkQueueType = (WORK_QUEUE_TYPE)Context;
+ WorkQueue = &ExWorkerQueue[WorkQueueType];
+
+ /* Loop forever */
+ while (TRUE) {
+
+ /* Wait for Something to Happen on the Queue */
+ QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode,
NULL);
- /* can't happend since we do a KernelMode wait (and we're a
system thread) */
- ASSERT((NTSTATUS)current != STATUS_USER_APC);
+ /* Can't happen since we do a KernelMode wait (and we're a
system thread) */
+ ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
- /* this should never happend either, since we wait with NULL
timeout,
- * but there's a slight possibility that STATUS_TIMEOUT is
returned
- * at queue rundown in NT (unlikely) -Gunnar
- */
- ASSERT((NTSTATUS)current != STATUS_TIMEOUT);
-
- /* based on INVALID_WORK_QUEUE_ITEM bugcheck desc. */
- if (current->Flink == NULL || current->Blink == NULL)
- {
- KeBugCheck(INVALID_WORK_QUEUE_ITEM);
- }
-
- /* "reinitialize" item (same as done in ExInitializeWorkItem) */
- current->Flink = NULL;
-
- item = CONTAINING_RECORD( current, WORK_QUEUE_ITEM, List);
- item->WorkerRoutine(item->Parameter);
-
- if (KeGetCurrentIrql() != PASSIVE_LEVEL)
- {
- KeBugCheck(IRQL_NOT_LESS_OR_EQUAL);
- }
- }
-
+ /* this should never happen either, since we wait with NULL
timeout,
+ * but there's a slight possibility that STATUS_TIMEOUT is
returned
+ * at queue rundown in NT (unlikely) -Gunnar
+ */
+ ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT);
+
+ /* Increment Processed Work Items */
+ InterlockedIncrement(&WorkQueue->WorkItemsProcessed);
+
+ /* Get the Work Item */
+ WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM,
List);
+
+ /* Call the Worker Routine */
+ WorkItem->WorkerRoutine(WorkItem->Parameter);
+
+ /* Make sure it returned at right IRQL */
+ if (KeGetCurrentIrql() != PASSIVE_LEVEL) {
+
+ /* FIXME: Make this an Ex */
+ KEBUGCHECK(WORKER_THREAD_RETURNED_AT_BAD_IRQL);
+ }
+
+ /* Make sure it returned with Impersionation Disabled */
+ if (PsGetCurrentThread()->ActiveImpersonationInfo) {
+
+ /* FIXME: Make this an Ex */
+ KEBUGCHECK(IMPERSONATING_WORKER_THREAD);
+ }
+ }
}
-static VOID ExInitializeWorkQueue(PKQUEUE WorkQueue,
- KPRIORITY Priority)
+static
+VOID
+STDCALL
+ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType,
+ KPRIORITY Priority)
{
ULONG i;
PETHREAD Thread;
HANDLE hThread;
- NTSTATUS Status;
-
- PAGED_CODE();
-
+
/* Loop through how many threads we need to create */
for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
-
+
/* Create the System Thread */
- Status = PsCreateSystemThread(&hThread,
- THREAD_ALL_ACCESS,
- NULL,
- NULL,
- NULL,
- ExWorkerThreadEntryPoint,
- (PVOID)WorkQueue);
- if(NT_SUCCESS(Status))
- {
- /* Get the Thread */
- Status = ObReferenceObjectByHandle(hThread,
- THREAD_SET_INFORMATION,
- PsThreadType,
- KernelMode,
- (PVOID*)&Thread,
- NULL);
-
- if(NT_SUCCESS(Status))
- {
- /* Set the Priority */
- KeSetPriorityThread(&Thread->Tcb, Priority);
-
- /* Dereference and close handle */
- ObDereferenceObject(Thread);
- ZwClose(hThread);
- }
- else
- {
- DPRINT1("Unable to reference worker thread handle 0x%x,
Status: 0x%x!\n", hThread, Status);
- KEBUGCHECK(0);
- }
- }
- else
- {
- DPRINT1("Unable to create worker thread, Status: 0x%x!\n",
Status);
- KEBUGCHECK(0);
- }
+ PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ ExpWorkerThreadEntryPoint,
+ (PVOID)WorkQueueType);
+
+ /* Get the Thread */
+ ObReferenceObjectByHandle(hThread,
+ THREAD_SET_INFORMATION,
+ PsThreadType,
+ KernelMode,
+ (PVOID*)&Thread,
+ NULL);
+
+ /* Set the Priority */
+ KeSetPriorityThread(&Thread->Tcb, Priority);
+
+ /* Dereference and close handle */
+ ObDereferenceObject(Thread);
+ ZwClose(hThread);
}
}
-VOID INIT_FUNCTION
-ExInitializeWorkerThreads(VOID)
+VOID
+INIT_FUNCTION
+ExpInitializeWorkerThreads(VOID)
{
- KeInitializeQueue( &EiNormalWorkQueue, NUMBER_OF_WORKER_THREADS );
- KeInitializeQueue( &EiCriticalWorkQueue , NUMBER_OF_WORKER_THREADS
);
- KeInitializeQueue( &EiHyperCriticalWorkQueue ,
NUMBER_OF_WORKER_THREADS );
-
- ExInitializeWorkQueue(&EiNormalWorkQueue,
- LOW_PRIORITY);
- ExInitializeWorkQueue(&EiCriticalWorkQueue,
- LOW_REALTIME_PRIORITY);
- ExInitializeWorkQueue(&EiHyperCriticalWorkQueue,
- HIGH_PRIORITY);
+ ULONG WorkQueueType;
+
+ /* Initialize the Array */
+ for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue;
WorkQueueType++) {
+
+ RtlZeroMemory(&ExWorkerQueue[WorkQueueType],
sizeof(EX_WORK_QUEUE));
+ KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue,
0);
+ }
+
+ /* Create the built-in worker threads for each work queue */
+ ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY);
+ ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY);
+ ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY);
}
/*
* @implemented
- */
-VOID STDCALL
-ExQueueWorkItem (PWORK_QUEUE_ITEM WorkItem,
- WORK_QUEUE_TYPE QueueType)
-/*
+ *
* FUNCTION: Inserts a work item in a queue for one of the system
worker
* threads to process
* ARGUMENTS:
* WorkItem = Item to insert
* QueueType = Queue to insert it in
*/
+VOID
+STDCALL
+ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
+ WORK_QUEUE_TYPE QueueType)
{
ASSERT(WorkItem!=NULL);
ASSERT_IRQL(DISPATCH_LEVEL);
ASSERT(WorkItem->List.Flink == NULL);
- /*
- * Insert the item in the appropiate queue and wake up any thread
- * waiting for something to do
- */
- switch(QueueType)
- {
- case DelayedWorkQueue:
- KeInsertQueue (
- &EiNormalWorkQueue,
- &WorkItem->List
- );
- break;
-
- case CriticalWorkQueue:
- KeInsertQueue (
- &EiCriticalWorkQueue,
- &WorkItem->List
- );
- break;
-
- case HyperCriticalWorkQueue:
- KeInsertQueue (
- &EiHyperCriticalWorkQueue,
- &WorkItem->List
- );
- break;
-
- default:
- break;
-
- }
+
+ /* Insert the Queue */
+ KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue,
&WorkItem->List);
}
/* EOF */
_____
Modified: trunk/reactos/ntoskrnl/include/internal/ex.h
--- trunk/reactos/ntoskrnl/include/internal/ex.h 2005-03-14
04:51:51 UTC (rev 14046)
+++ trunk/reactos/ntoskrnl/include/internal/ex.h 2005-03-14
05:54:32 UTC (rev 14047)
@@ -86,8 +86,10 @@
extern ULONG ExpTimeZoneId;
extern POBJECT_TYPE ExEventPairObjectType;
+extern POBJECT_TYPE EXPORTED ExMutantObjectType;
+extern POBJECT_TYPE EXPORTED ExSemaphoreObjectType;
+extern POBJECT_TYPE EXPORTED ExTimerType;
-
/* INITIALIZATION FUNCTIONS
*************************************************/
VOID
@@ -100,7 +102,7 @@
VOID
ExpInitTimeZoneInfo(VOID);
VOID
-ExInitializeWorkerThreads(VOID);
+ExpInitializeWorkerThreads(VOID);
VOID
ExpInitLookasideLists(VOID);
VOID
_____
Modified: trunk/reactos/ntoskrnl/include/internal/ke.h
--- trunk/reactos/ntoskrnl/include/internal/ke.h 2005-03-14
04:51:51 UTC (rev 14046)
+++ trunk/reactos/ntoskrnl/include/internal/ke.h 2005-03-14
05:54:32 UTC (rev 14047)
@@ -139,34 +139,39 @@
IN KPROFILE_SOURCE Source
);
-VOID KiInsertProfileIntoProcess(PLIST_ENTRY ListHead, PKPROFILE
Profile);
-VOID KiInsertProfile(PKPROFILE Profile);
-VOID KiRemoveProfile(PKPROFILE Profile);
-VOID STDCALL KiDeleteProfile(PVOID ObjectBody);
-
VOID STDCALL KeUpdateSystemTime(PKTRAP_FRAME TrapFrame, KIRQL Irql);
VOID STDCALL KeUpdateRunTime(PKTRAP_FRAME TrapFrame, KIRQL Irql);
VOID STDCALL KiExpireTimers(PKDPC Dpc, PVOID DeferredContext, PVOID
SystemArgument1, PVOID SystemArgument2);
-KIRQL KeAcquireDispatcherDatabaseLock(VOID);
-VOID KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID);
-VOID KeReleaseDispatcherDatabaseLock(KIRQL Irql);
-VOID KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID);
+KIRQL inline FASTCALL KeAcquireDispatcherDatabaseLock(VOID);
+VOID inline FASTCALL KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID);
+VOID inline FASTCALL KeReleaseDispatcherDatabaseLock(KIRQL Irql);
+VOID inline FASTCALL KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID);
BOOLEAN KiDispatcherObjectWake(DISPATCHER_HEADER* hdr, KPRIORITY
increment);
VOID STDCALL KeExpireTimers(PKDPC Apc,
PVOID Arg1,
PVOID Arg2,
PVOID Arg3);
-VOID KeInitializeDispatcherHeader(DISPATCHER_HEADER* Header, ULONG
Type,
- ULONG Size, ULONG SignalState);
+VOID inline FASTCALL KeInitializeDispatcherHeader(DISPATCHER_HEADER*
Header, ULONG Type,
+ ULONG Size, ULONG SignalState);
VOID KeDumpStackFrames(PULONG Frame);
BOOLEAN KiTestAlert(VOID);
-BOOLEAN KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS
WaitStatus);
+VOID FASTCALL KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS
WaitStatus);
+
+BOOLEAN STDCALL KiInsertTimer(PKTIMER Timer, LARGE_INTEGER DueTime);
+VOID inline FASTCALL KiSatisfyObjectWait(PDISPATCHER_HEADER Object,
PKTHREAD Thread);
+
+BOOLEAN inline FASTCALL KiIsObjectSignaled(PDISPATCHER_HEADER Object,
PKTHREAD Thread);
+
+VOID inline FASTCALL KiSatisifyMultipleObjectWaits(PKWAIT_BLOCK
WaitBlock);
+
+VOID FASTCALL KiWaitTest(PDISPATCHER_HEADER Object, KPRIORITY
Increment);
+
PULONG KeGetStackTopThread(struct _ETHREAD* Thread);
VOID KeContextToTrapFrame(PCONTEXT Context, PKTRAP_FRAME TrapFrame);
VOID STDCALL KiDeliverApc(KPROCESSOR_MODE PreviousMode,
@@ -191,6 +196,7 @@
KeTestAlertThread(IN KPROCESSOR_MODE AlertMode);
BOOLEAN STDCALL KeRemoveQueueApc (PKAPC Apc);
+VOID FASTCALL KiWakeQueue(IN PKQUEUE Queue);
PLIST_ENTRY STDCALL KeRundownQueue(IN PKQUEUE Queue);
extern LARGE_INTEGER SystemBootTime;
@@ -202,7 +208,7 @@
VOID KeInitTimer(VOID);
VOID KeInitDpc(struct _KPRCB* Prcb);
VOID KeInitDispatcher(VOID);
-VOID KeInitializeDispatcher(VOID);
+VOID inline FASTCALL KeInitializeDispatcher(VOID);
VOID KiInitializeSystemClock(VOID);
VOID KiInitializeBugCheck(VOID);
VOID Phase1Initialization(PVOID Context);
_____
Modified: trunk/reactos/ntoskrnl/include/internal/ps.h
--- trunk/reactos/ntoskrnl/include/internal/ps.h 2005-03-14
04:51:51 UTC (rev 14046)
+++ trunk/reactos/ntoskrnl/include/internal/ps.h 2005-03-14
05:54:32 UTC (rev 14047)
@@ -525,8 +525,11 @@
ULONG PsEnumThreadsByProcess(PEPROCESS Process);
PEPROCESS PsGetNextProcess(PEPROCESS OldProcess);
VOID
-PsBlockThread(PNTSTATUS Status, UCHAR Alertable, ULONG WaitMode,
- BOOLEAN DispatcherLock, KIRQL WaitIrql, UCHAR WaitReason);
+STDCALL
+PsBlockThread(PNTSTATUS Status,
+ UCHAR Alertable,
+ ULONG WaitMode,
+ UCHAR WaitReason);
VOID
PsUnblockThread(PETHREAD Thread, PNTSTATUS WaitStatus, KPRIORITY
Increment);
VOID
_____
Modified: trunk/reactos/ntoskrnl/ke/event.c
--- trunk/reactos/ntoskrnl/ke/event.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ke/event.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -91,7 +91,7 @@
Event->Header.SignalState = 1;
/* Wake the Event */
- KiDispatcherObjectWake(&Event->Header, Increment);
+ KiWaitTest(&Event->Header, Increment);
}
/* Unsignal it */
@@ -196,7 +196,7 @@
/* We must do a full wait satisfaction */
DPRINT("Notification Event or WaitAll, Wait on the
Event and Signal\n");
Event->Header.SignalState = 1;
- KiDispatcherObjectWake(&Event->Header, Increment);
+ KiWaitTest(&Event->Header, Increment);
}
} else {
_____
Modified: trunk/reactos/ntoskrnl/ke/i386/tskswitch.S
--- trunk/reactos/ntoskrnl/ke/i386/tskswitch.S 2005-03-14 04:51:51 UTC
(rev 14046)
+++ trunk/reactos/ntoskrnl/ke/i386/tskswitch.S 2005-03-14 05:54:32 UTC
(rev 14047)
@@ -204,7 +204,7 @@
*/
sti
- call _KeReleaseDispatcherDatabaseLockFromDpcLevel
+ call @KeReleaseDispatcherDatabaseLockFromDpcLevel@0
cmpl $0, _PiNrThreadsAwaitingReaping
je 5f
_____
Modified: trunk/reactos/ntoskrnl/ke/kthread.c
--- trunk/reactos/ntoskrnl/ke/kthread.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ke/kthread.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -56,7 +56,7 @@
/* Signal and satisfy */
Thread->SuspendSemaphore.Header.SignalState++;
- KiDispatcherObjectWake(&Thread->SuspendSemaphore.Header,
IO_NO_INCREMENT);
+ KiWaitTest(&Thread->SuspendSemaphore.Header,
IO_NO_INCREMENT);
}
}
_____
Modified: trunk/reactos/ntoskrnl/ke/mutex.c
--- trunk/reactos/ntoskrnl/ke/mutex.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ke/mutex.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -177,7 +177,7 @@
/* Wake the Mutant */
DPRINT("Waking the Mutant\n");
- KiDispatcherObjectWake(&Mutant->Header, Increment);
+ KiWaitTest(&Mutant->Header, Increment);
}
}
_____
Modified: trunk/reactos/ntoskrnl/ke/queue.c
--- trunk/reactos/ntoskrnl/ke/queue.c 2005-03-14 04:51:51 UTC (rev
14046)
+++ trunk/reactos/ntoskrnl/ke/queue.c 2005-03-14 05:54:32 UTC (rev
14047)
@@ -16,245 +16,483 @@
/* FUNCTIONS
*****************************************************************/
-
+LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry,
BOOLEAN Head);
+
/*
* @implemented
*/
-VOID STDCALL
+VOID
+STDCALL
KeInitializeQueue(IN PKQUEUE Queue,
- IN ULONG Count OPTIONAL)
+ IN ULONG Count OPTIONAL)
{
- KeInitializeDispatcherHeader(&Queue->Header,
- QueueObject,
- sizeof(KQUEUE)/sizeof(ULONG),
- 0);
- InitializeListHead(&Queue->EntryListHead);
- InitializeListHead(&Queue->ThreadListHead);
- Queue->CurrentCount = 0;
- Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors :
Count;
+ DPRINT("KeInitializeQueue %x\n", Queue);
+
+ /* Initialize the Header */
+ KeInitializeDispatcherHeader(&Queue->Header,
+ QueueObject,
+ sizeof(KQUEUE)/sizeof(ULONG),
+ 0);
+
+ /* Initialize the Lists */
+ InitializeListHead(&Queue->EntryListHead);
+ InitializeListHead(&Queue->ThreadListHead);
+
+ /* Set the Current and Maximum Count */
+ Queue->CurrentCount = 0;
+ Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors :
Count;
}
-
/*
* @implemented
- *
- * Returns number of entries in the queue
*/
-LONG STDCALL
-KeReadStateQueue(IN PKQUEUE Queue)
+LONG
+STDCALL
+KeInsertHeadQueue(IN PKQUEUE Queue,
+ IN PLIST_ENTRY Entry)
{
- return(Queue->Header.SignalState);
-}
-
-/*
- * Returns the previous number of entries in the queue
- */
-LONG STDCALL
-KiInsertQueue(
- IN PKQUEUE Queue,
- IN PLIST_ENTRY Entry,
- BOOLEAN Head
- )
-{
- ULONG InitialState;
-
- DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
+ LONG PreviousState;
+ KIRQL OldIrql;
+
+ DPRINT("KeInsertHeadQueue %x\n", Queue);
+
+ /* Lock the Dispatcher Database */
+ OldIrql = KeAcquireDispatcherDatabaseLock();
+
+ /* Insert the Queue */
+ PreviousState = KiInsertQueue(Queue, Entry, TRUE);
+
+ /* Release the Dispatcher Lock */
+ KeReleaseDispatcherDatabaseLock(OldIrql);
- InitialState = Queue->Header.SignalState;
-
- if (Head)
- {
- InsertHeadList(&Queue->EntryListHead, Entry);
- }
- else
- {
- InsertTailList(&Queue->EntryListHead, Entry);
- }
-
- //inc. num entries in queue
- Queue->Header.SignalState++;
-
- /* Why the KeGetCurrentThread()->Queue != Queue?
- * KiInsertQueue might be called from an APC for the current thread.
- * -Gunnar
- */
- if (Queue->CurrentCount < Queue->MaximumCount &&
- !IsListEmpty(&Queue->Header.WaitListHead) &&
- KeGetCurrentThread()->Queue != Queue)
- {
- KiDispatcherObjectWake(&Queue->Header, IO_NO_INCREMENT);
- }
-
- return InitialState;
+ /* Return previous State */
+ return PreviousState;
}
-
-
/*
* @implemented
*/
LONG STDCALL
-KeInsertHeadQueue(IN PKQUEUE Queue,
- IN PLIST_ENTRY Entry)
+KeInsertQueue(IN PKQUEUE Queue,
+ IN PLIST_ENTRY Entry)
{
- LONG Result;
- KIRQL OldIrql;
+ LONG PreviousState;
+ KIRQL OldIrql;
+
+ DPRINT("KeInsertQueue %x\n", Queue);
+
+ /* Lock the Dispatcher Database */
+ OldIrql = KeAcquireDispatcherDatabaseLock();
+
+ /* Insert the Queue */
+ PreviousState = KiInsertQueue(Queue, Entry, FALSE);
+
+ /* Release the Dispatcher Lock */
+ KeReleaseDispatcherDatabaseLock(OldIrql);
- OldIrql = KeAcquireDispatcherDatabaseLock();
- Result = KiInsertQueue(Queue,Entry,TRUE);
- KeReleaseDispatcherDatabaseLock(OldIrql);
-
- return Result;
+ /* Return previous State */
+ return PreviousState;
}
-
/*
* @implemented
+ *
+ * Returns number of entries in the queue
*/
-LONG STDCALL
-KeInsertQueue(IN PKQUEUE Queue,
- IN PLIST_ENTRY Entry)
+LONG
+STDCALL
+KeReadStateQueue(IN PKQUEUE Queue)
{
- LONG Result;
- KIRQL OldIrql;
-
- OldIrql = KeAcquireDispatcherDatabaseLock();
- Result = KiInsertQueue(Queue,Entry,FALSE);
- KeReleaseDispatcherDatabaseLock(OldIrql);
-
- return Result;
+ /* Returns the Signal State */
+ return(Queue->Header.SignalState);
}
-
/*
* @implemented
*/
-PLIST_ENTRY STDCALL
+PLIST_ENTRY
+STDCALL
KeRemoveQueue(IN PKQUEUE Queue,
- IN KPROCESSOR_MODE WaitMode,
- IN PLARGE_INTEGER Timeout OPTIONAL)
+ IN KPROCESSOR_MODE WaitMode,
+ IN PLARGE_INTEGER Timeout OPTIONAL)
{
- PLIST_ENTRY ListEntry;
- NTSTATUS Status;
- PKTHREAD Thread = KeGetCurrentThread();
- KIRQL OldIrql;
+ PLIST_ENTRY ListEntry;
+ NTSTATUS Status;
+ PKTHREAD Thread = KeGetCurrentThread();
+ KIRQL OldIrql;
+ PKQUEUE PreviousQueue;
+ PKWAIT_BLOCK WaitBlock;
+ PKWAIT_BLOCK TimerWaitBlock;
+ PKTIMER Timer;
- OldIrql = KeAcquireDispatcherDatabaseLock ();
+ DPRINT("KeRemoveQueue %x\n", Queue);
+
+ /* Check if the Lock is already held */
+ if (Thread->WaitNext) {
+
+ DPRINT("Lock is already held\n");
+
+ } else {
+
+ /* Lock the Dispatcher Database */
+ DPRINT("Lock not held, acquiring\n");
+ OldIrql = KeAcquireDispatcherDatabaseLock();
+ Thread->WaitIrql = OldIrql;
+ }
- if (Thread->Queue != Queue)
- {
- /*
- * INVESTIGATE: What is the Thread->QueueListEntry used for? It's
linked it into the
- * Queue->ThreadListHead when the thread registers with the queue
and unlinked when
- * the thread registers with a new queue. The Thread->Queue
already tells us what
- * queue the thread is registered with.
- * -Gunnar
- */
+ /* This is needed so that we can set the new queue right here,
before additional processing */
+ PreviousQueue = Thread->Queue;
+ Thread->Queue = Queue;
- //unregister thread from previous queue (if any)
- if (Thread->Queue)
- {
- RemoveEntryList(&Thread->QueueListEntry);
- Thread->Queue->CurrentCount--;
-
- if (Thread->Queue->CurrentCount < Thread->Queue->MaximumCount
&&
- !IsListEmpty(&Thread->Queue->EntryListHead))
- {
- KiDispatcherObjectWake(&Thread->Queue->Header, 0);
- }
+ /* Check if this is a different queue */
+ if (Queue != PreviousQueue) {
+
+ /*
+ * INVESTIGATE: What is the Thread->QueueListEntry used for?
It's linked it into the
+ * Queue->ThreadListHead when the thread registers with the
queue and unlinked when
+ * the thread registers with a new queue. The Thread->Queue
already tells us what
+ * queue the thread is registered with.
+ * -Gunnar
+ */
+ DPRINT("Different Queue\n");
+ if (PreviousQueue) {
+
+ /* Remove from this list */
+ DPRINT("Removing Old Queue\n");
+ RemoveEntryList(&Thread->QueueListEntry);
+
+ /* Wake the queue */
+ DPRINT("Activating new thread\n");
+ KiWakeQueue(PreviousQueue);
}
- // register thread with this queue
- InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
- Thread->Queue = Queue;
- }
- else /* if (Thread->Queue == Queue) */
- {
- //dec. num running threads
- Queue->CurrentCount--;
- }
+ /* Insert in this new Queue */
+ DPRINT("Inserting new Queue!\n");
+ InsertTailList(&Queue->ThreadListHead,
&Thread->QueueListEntry);
-
-
-
- while (TRUE)
- {
- if (Queue->CurrentCount < Queue->MaximumCount &&
!IsListEmpty(&Queue->EntryListHead))
- {
- ListEntry = RemoveHeadList(&Queue->EntryListHead);
- //dec. num entries in queue
- Queue->Header.SignalState--;
- //inc. num running threads
- Queue->CurrentCount++;
-
- KeReleaseDispatcherDatabaseLock(OldIrql);
- return ListEntry;
- }
- else
- {
- //inform KeWaitXxx that we are holding disp. lock
- Thread->WaitNext = TRUE;
- Thread->WaitIrql = OldIrql;
+ } else {
+
+ /* Same queue, decrement waiting threads */
+ DPRINT("Same Queue!\n");
+ Queue->CurrentCount--;
+ }
+
+ /* Loop until the queue is processed */
+ while (TRUE) {
+
+ /* Get the Entry */
+ ListEntry = Queue->EntryListHead.Flink;
+
+ /* Check if the counts are valid and if there is still a queued
entry */
+ if ((Queue->CurrentCount < Queue->MaximumCount) &&
+ (ListEntry != &Queue->EntryListHead)) {
+
+ /* Remove the Entry and Save it */
+ DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum
Count: %d\n",
+ Queue->CurrentCount, Queue->MaximumCount);
+ ListEntry = RemoveHeadList(&Queue->EntryListHead);
+
+ /* Decrease the number of entries */
+ Queue->Header.SignalState--;
+
+ /* Increase numbef of running threads */
+ Queue->CurrentCount++;
+
+ /* Check if the entry is valid. If not, bugcheck */
+ if (!ListEntry->Flink || !ListEntry->Blink) {
+
+ KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
+ }
+
+ /* Remove the Entry */
+ RemoveEntryList(ListEntry);
+ ListEntry->Flink = NULL;
+
+ /* Nothing to wait on */
+ break;
+
+ } else {
+
+ /* Do the wait */
+ DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum
Count: %d\n",
+ Queue->CurrentCount, Queue->MaximumCount);
+
+ /* Use the Thread's Wait Block, it's big enough */
+ Thread->WaitBlockList = &Thread->WaitBlock[0];
+
+ /* Fail if there's an APC Pending */
+ if (WaitMode == UserMode &&
Thread->ApcState.UserApcPending) {
+
+ /* Return the status and increase the pending threads
*/
+ ListEntry = (PLIST_ENTRY)STATUS_USER_APC;
+ Queue->CurrentCount++;
+
+ /* Nothing to wait on */
+ break;
+ }
+
+ /* Build the Wait Block */
+ WaitBlock = &Thread->WaitBlock[0];
+ WaitBlock->Object = (PVOID)Queue;
+ WaitBlock->WaitKey = STATUS_SUCCESS;
+ WaitBlock->WaitType = WaitAny;
+ WaitBlock->Thread = Thread;
+ WaitBlock->NextWaitBlock = NULL;
+
+ Thread->WaitStatus = STATUS_SUCCESS;
+
+ /* We need to wait for the object... check if we have a
timeout */
+ if (Timeout) {
+
+ /* If it's zero, then don't do any waiting */
+ if (!Timeout->QuadPart) {
+
+ /* Instant Timeout, return the status and increase
the pending threads */
+ DPRINT("Queue Wait has timed out\n");
+ ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
+ Queue->CurrentCount++;
+
+ /* Nothing to wait on */
+ break;
+ }
+
+ /*
+ * Set up the Timer. We'll use the internal function so
that we can
+ * hold on to the dispatcher lock.
+ */
+ Timer = &Thread->Timer;
+ TimerWaitBlock = &Thread->WaitBlock[1];
- Status = KeWaitForSingleObject(Queue,
- WrQueue,
- WaitMode,
- TRUE, //bAlertable
- Timeout);
-
- if (Status == STATUS_TIMEOUT || Status == STATUS_USER_APC)
- {
- return (PVOID)Status;
- }
-
- OldIrql = KeAcquireDispatcherDatabaseLock ();
- }
- }
+ /* Set up the Timer Wait Block */
+ TimerWaitBlock->Object = (PVOID)Timer;
+ TimerWaitBlock->Thread = Thread;
+ TimerWaitBlock->WaitKey = STATUS_TIMEOUT;
+ TimerWaitBlock->WaitType = WaitAny;
+ TimerWaitBlock->NextWaitBlock = NULL;
+
+ /* Link the timer to this Wait Block */
+ InitializeListHead(&Timer->Header.WaitListHead);
[truncated at 1000 lines; 2258 more skipped]