Attached is an experimental implementation of work queues using KQUEUEs. It depends on the KQUEUE patch I sent...
Regards, Filip
Index: ntoskrnl/ex/work.c =================================================================== RCS file: /CVS/ReactOS/reactos/ntoskrnl/ex/work.c,v retrieving revision 1.22 diff -u -r1.22 work.c --- ntoskrnl/ex/work.c 17 Nov 2004 23:55:36 -0000 1.22 +++ ntoskrnl/ex/work.c 21 Nov 2004 13:26:34 -0000 @@ -24,19 +24,9 @@ typedef struct _WORK_QUEUE { /* - * PURPOSE: Head of the list of waiting work items + * PURPOSE: The actual queue for the work items */ - LIST_ENTRY Head; - - /* - * PURPOSE: Sychronize access to the work queue - */ - KSPIN_LOCK Lock; - - /* - * PURPOSE: Worker threads with nothing to do wait on this event - */ - KSEMAPHORE Sem; + KQUEUE WorkerQueue;
/* * PURPOSE: Thread associated with work queue @@ -75,22 +65,12 @@
for(;;) { - current = ExInterlockedRemoveHeadList(&queue->Head, - &queue->Lock); + current = KeRemoveQueue(&queue->WorkerQueue, KernelMode, NULL); if (current!=NULL) { item = CONTAINING_RECORD(current,WORK_QUEUE_ITEM,List); item->WorkerRoutine(item->Parameter); } - else - { - KeWaitForSingleObject((PVOID)&queue->Sem, - Executive, - KernelMode, - FALSE, - NULL); - DPRINT("Woke from wait\n"); - } } }
@@ -100,11 +80,7 @@ ULONG i; PETHREAD Thread;
- InitializeListHead(&WorkQueue->Head); - KeInitializeSpinLock(&WorkQueue->Lock); - KeInitializeSemaphore(&WorkQueue->Sem, - 0, - 256); + KeInitializeQueue(&WorkQueue->WorkerQueue, NUMBER_OF_WORKER_THREADS); for (i=0; i<NUMBER_OF_WORKER_THREADS; i++) { PsCreateSystemThread(&WorkQueue->Thread[i], @@ -154,47 +130,26 @@ ASSERT(WorkItem!=NULL); ASSERT_IRQL(DISPATCH_LEVEL);
- /* - * Insert the item in the appropiate queue and wake up any thread - * waiting for something to do - */ + /* + * Insert the item in the appropiate queue and wake up any thread + * waiting for something to do + */ switch(QueueType) { - case DelayedWorkQueue: - ExInterlockedInsertTailList(&EiNormalWorkQueue.Head, - &WorkItem->List, - &EiNormalWorkQueue.Lock); - KeReleaseSemaphore(&EiNormalWorkQueue.Sem, - IO_NO_INCREMENT, - 1, - FALSE); - break; + case DelayedWorkQueue: + KeInsertQueue(&EiNormalWorkQueue.WorkerQueue, + &WorkItem->List); + break; - case CriticalWorkQueue: - ExInterlockedInsertTailList(&EiCriticalWorkQueue.Head, - &WorkItem->List, - &EiCriticalWorkQueue.Lock); - KeReleaseSemaphore(&EiCriticalWorkQueue.Sem, - IO_NO_INCREMENT, - 1, - FALSE); - break; - - case HyperCriticalWorkQueue: - ExInterlockedInsertTailList(&EiHyperCriticalWorkQueue.Head, - &WorkItem->List, - &EiHyperCriticalWorkQueue.Lock); - KeReleaseSemaphore(&EiHyperCriticalWorkQueue.Sem, - IO_NO_INCREMENT, - 1, - FALSE); - break; - -#ifdef __USE_W32API - case MaximumWorkQueue: - // Unimplemented - break; -#endif + case CriticalWorkQueue: + KeInsertQueue(&EiCriticalWorkQueue.WorkerQueue, + &WorkItem->List); + break; + + case HyperCriticalWorkQueue: + KeInsertQueue(&EiHyperCriticalWorkQueue.WorkerQueue, + &WorkItem->List); + break; } }