Attached is an experimental implementation of work queues using KQUEUEs.
It depends on the KQUEUE patch I sent...
Regards,
Filip
Index: ntoskrnl/ex/work.c
===================================================================
RCS file: /CVS/ReactOS/reactos/ntoskrnl/ex/work.c,v
retrieving revision 1.22
diff -u -r1.22 work.c
--- ntoskrnl/ex/work.c 17 Nov 2004 23:55:36 -0000 1.22
+++ ntoskrnl/ex/work.c 21 Nov 2004 13:26:34 -0000
@@ -24,19 +24,9 @@
typedef struct _WORK_QUEUE
{
/*
- * PURPOSE: Head of the list of waiting work items
+ * PURPOSE: The actual queue for the work items
*/
- LIST_ENTRY Head;
-
- /*
- * PURPOSE: Sychronize access to the work queue
- */
- KSPIN_LOCK Lock;
-
- /*
- * PURPOSE: Worker threads with nothing to do wait on this event
- */
- KSEMAPHORE Sem;
+ KQUEUE WorkerQueue;
/*
* PURPOSE: Thread associated with work queue
@@ -75,22 +65,12 @@
for(;;)
{
- current = ExInterlockedRemoveHeadList(&queue->Head,
- &queue->Lock);
+ current = KeRemoveQueue(&queue->WorkerQueue, KernelMode, NULL);
if (current!=NULL)
{
item = CONTAINING_RECORD(current,WORK_QUEUE_ITEM,List);
item->WorkerRoutine(item->Parameter);
}
- else
- {
- KeWaitForSingleObject((PVOID)&queue->Sem,
- Executive,
- KernelMode,
- FALSE,
- NULL);
- DPRINT("Woke from wait\n");
- }
}
}
@@ -100,11 +80,7 @@
ULONG i;
PETHREAD Thread;
- InitializeListHead(&WorkQueue->Head);
- KeInitializeSpinLock(&WorkQueue->Lock);
- KeInitializeSemaphore(&WorkQueue->Sem,
- 0,
- 256);
+ KeInitializeQueue(&WorkQueue->WorkerQueue, NUMBER_OF_WORKER_THREADS);
for (i=0; i<NUMBER_OF_WORKER_THREADS; i++)
{
PsCreateSystemThread(&WorkQueue->Thread[i],
@@ -154,47 +130,26 @@
ASSERT(WorkItem!=NULL);
ASSERT_IRQL(DISPATCH_LEVEL);
- /*
- * Insert the item in the appropiate queue and wake up any thread
- * waiting for something to do
- */
+ /*
+ * Insert the item in the appropiate queue and wake up any thread
+ * waiting for something to do
+ */
switch(QueueType)
{
- case DelayedWorkQueue:
- ExInterlockedInsertTailList(&EiNormalWorkQueue.Head,
- &WorkItem->List,
- &EiNormalWorkQueue.Lock);
- KeReleaseSemaphore(&EiNormalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
+ case DelayedWorkQueue:
+ KeInsertQueue(&EiNormalWorkQueue.WorkerQueue,
+ &WorkItem->List);
+ break;
- case CriticalWorkQueue:
- ExInterlockedInsertTailList(&EiCriticalWorkQueue.Head,
- &WorkItem->List,
- &EiCriticalWorkQueue.Lock);
- KeReleaseSemaphore(&EiCriticalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
-
- case HyperCriticalWorkQueue:
- ExInterlockedInsertTailList(&EiHyperCriticalWorkQueue.Head,
- &WorkItem->List,
- &EiHyperCriticalWorkQueue.Lock);
- KeReleaseSemaphore(&EiHyperCriticalWorkQueue.Sem,
- IO_NO_INCREMENT,
- 1,
- FALSE);
- break;
-
-#ifdef __USE_W32API
- case MaximumWorkQueue:
- // Unimplemented
- break;
-#endif
+ case CriticalWorkQueue:
+ KeInsertQueue(&EiCriticalWorkQueue.WorkerQueue,
+ &WorkItem->List);
+ break;
+
+ case HyperCriticalWorkQueue:
+ KeInsertQueue(&EiHyperCriticalWorkQueue.WorkerQueue,
+ &WorkItem->List);
+ break;
}
}