Author: janderwald Date: Wed Jul 22 23:27:44 2009 New Revision: 42147
URL: http://svn.reactos.org/svn/reactos?rev=42147&view=rev Log: - Fix totally broken KSWORKER implementation (it may have worked, but it didnt do what it should have) - Re-Implement KsQueueWorkItem, KsIncrementCountedWorker, KsDecrementCountedWorker, KsRegisterCountedWorker, KsUnregisterWorker, KsRegisterWorker
Modified: trunk/reactos/drivers/ksfilter/ks/worker.c
Modified: trunk/reactos/drivers/ksfilter/ks/worker.c URL: http://svn.reactos.org/svn/reactos/trunk/reactos/drivers/ksfilter/ks/worker.... ============================================================================== --- trunk/reactos/drivers/ksfilter/ks/worker.c [iso-8859-1] (original) +++ trunk/reactos/drivers/ksfilter/ks/worker.c [iso-8859-1] Wed Jul 22 23:27:44 2009 @@ -15,14 +15,64 @@
typedef struct { + WORK_QUEUE_ITEM WorkItem; + KEVENT Event; KSPIN_LOCK Lock; WORK_QUEUE_TYPE Type; LONG Counter; + LONG QueuedWorkItemCount; + LIST_ENTRY QueuedWorkItems; + + PWORK_QUEUE_ITEM CountedWorkItem; +}KSIWORKER, *PKSIWORKER; + +VOID +NTAPI +WorkItemRoutine( + IN PVOID Context) +{ + PKSIWORKER KsWorker; + KIRQL OldLevel; PWORK_QUEUE_ITEM WorkItem; - ULONG WorkItemActive; - ULONG DeleteInProgress; -}KS_WORKER; + PLIST_ENTRY Entry; + + + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Context; + + /* acquire back the lock */ + KeAcquireSpinLock(&KsWorker->Lock, &OldLevel); + + do + { + /* remove first entry */ + Entry = RemoveHeadList(&KsWorker->QueuedWorkItems); + /* get offset to work item */ + WorkItem = (PWORK_QUEUE_ITEM)CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List); + + /* release lock as the callback might call one KsWorker functions */ + KeReleaseSpinLock(&KsWorker->Lock, OldLevel); + + /* now dispatch the work */ + WorkItem->WorkerRoutine(WorkItem->Parameter); + + /* acquire back the lock */ + KeAcquireSpinLock(&KsWorker->Lock, &OldLevel); + + /* decrement queued work item count */ + InterlockedDecrement(&KsWorker->QueuedWorkItemCount); + + }while(KsWorker->QueuedWorkItemCount); + + /* release the lock */ + KeReleaseSpinLock(&KsWorker->Lock, OldLevel); + + /* signal completion event */ + KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE); + +} +
/* @implemented @@ -34,8 +84,8 @@ IN WORK_QUEUE_TYPE WorkQueueType, OUT PKSWORKER* Worker) { - KS_WORKER * KsWorker; - UNIMPLEMENTED; + PKSIWORKER KsWorker; +
if (WorkQueueType != CriticalWorkQueue && WorkQueueType != DelayedWorkQueue && @@ -44,16 +94,22 @@ return STATUS_INVALID_PARAMETER; }
- KsWorker = ExAllocatePool(NonPagedPool, sizeof(KS_WORKER)); + /* allocate worker context */ + KsWorker = ExAllocatePool(NonPagedPool, sizeof(KSIWORKER)); if (!KsWorker) return STATUS_INSUFFICIENT_RESOURCES;
+ /* initialze the work ctx */ + ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker); + /* setup type */ KsWorker->Type = WorkQueueType; + /* set counter to zero */ KsWorker->Counter = 0; - KsWorker->WorkItemActive = 0; - KsWorker->WorkItem = NULL; - KsWorker->DeleteInProgress = FALSE; + /* Initialize work item queue */ + InitializeListHead(&KsWorker->QueuedWorkItems); + /* initialize work item lock */ KeInitializeSpinLock(&KsWorker->Lock); + /* initialize event */ KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
*Worker = KsWorker; @@ -63,53 +119,68 @@ /* @implemented */ -KSDDKAPI VOID NTAPI +KSDDKAPI +VOID +NTAPI KsUnregisterWorker( IN PKSWORKER Worker) { - KS_WORKER * KsWorker; + PKSIWORKER KsWorker; KIRQL OldIrql;
if (!Worker) return;
- KsWorker = (KS_WORKER *)Worker; - + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Worker; + /* acquire spinlock */ KeAcquireSpinLock(&KsWorker->Lock, &OldIrql); - - KsWorker->DeleteInProgress = TRUE; - - if (KsWorker->WorkItemActive) - { + /* fake status running to avoid work items to be queued by the counted worker */ + KsWorker->Counter = 1; + /* is there currently a work item active */ + if (KsWorker->QueuedWorkItemCount) + { + /* release the lock */ KeReleaseSpinLock(&KsWorker->Lock, OldIrql); + /* wait for the worker routine to finish */ KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE, NULL); } else { + /* no work item active, just release the lock */ KeReleaseSpinLock(&KsWorker->Lock, OldIrql); } - - ExFreePool(KsWorker); -} - -/* - @implemented -*/ -KSDDKAPI NTSTATUS NTAPI + /* free worker context */ + FreeItem(KsWorker); +} + +/* + @implemented +*/ +KSDDKAPI +NTSTATUS +NTAPI KsRegisterCountedWorker( IN WORK_QUEUE_TYPE WorkQueueType, IN PWORK_QUEUE_ITEM CountedWorkItem, OUT PKSWORKER* Worker) { NTSTATUS Status; - KS_WORKER * KsWorker; - + PKSIWORKER KsWorker; + + /* check for counted work item parameter */ + if (!CountedWorkItem) + return STATUS_INVALID_PARAMETER_2; + + /* create the work ctx */ Status = KsRegisterWorker(WorkQueueType, Worker); - + /* check for success */ if (NT_SUCCESS(Status)) { - KsWorker = (KS_WORKER *)Worker; - KsWorker->WorkItem = CountedWorkItem; + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Worker; + /* store counted work item */ + KsWorker->CountedWorkItem = CountedWorkItem; }
return Status; @@ -124,21 +195,18 @@ KsDecrementCountedWorker( IN PKSWORKER Worker) { - KS_WORKER * KsWorker; + PKSIWORKER KsWorker; LONG Counter;
+ /* did the caller pass a work ctx */ if (!Worker) return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker; + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Worker; + /* decrement counter */ Counter = InterlockedDecrement(&KsWorker->Counter); - - if (KsWorker->DeleteInProgress) - { - /* signal that we are done */ - KeSetEvent(&KsWorker->Event, 0, 0); - } - + /* return result */ return Counter; }
@@ -151,19 +219,24 @@ KsIncrementCountedWorker( IN PKSWORKER Worker) { - KS_WORKER * KsWorker; + PKSIWORKER KsWorker; LONG Counter;
+ /* did the caller pass a work ctx */ if (!Worker) return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker; - + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Worker; + /* increment counter */ Counter = InterlockedIncrement(&KsWorker->Counter); if (Counter == 1) { - KsQueueWorkItem(Worker, KsWorker->WorkItem); - } + /* this is the first work item in list, so queue a real work item */ + KsQueueWorkItem(Worker, KsWorker->CountedWorkItem); + } + + /* return current counter */ return Counter; }
@@ -177,25 +250,31 @@ IN PKSWORKER Worker, IN PWORK_QUEUE_ITEM WorkItem) { - KS_WORKER * KsWorker; + PKSIWORKER KsWorker; KIRQL OldIrql; - NTSTATUS Status = STATUS_SUCCESS; - + + /* check for all parameters */ if (!Worker || !WorkItem) return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker; + /* get ks worker implementation */ + KsWorker = (PKSIWORKER)Worker; + /* lock the work queue */ KeAcquireSpinLock(&KsWorker->Lock, &OldIrql); - - if (!KsWorker->DeleteInProgress) - { + /* insert work item to list */ + InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List); + /* increment active count */ + InterlockedIncrement(&KsWorker->QueuedWorkItemCount); + /* is this the first work item */ + if (KsWorker->QueuedWorkItemCount == 1) + { + /* clear event */ + KeClearEvent(&KsWorker->Event); + /* it is, queue it */ ExQueueWorkItem(WorkItem, KsWorker->Type); } - else - { - Status = STATUS_UNSUCCESSFUL; - } - + /* release lock */ KeReleaseSpinLock(&KsWorker->Lock, OldIrql); - return Status; -} + + return STATUS_SUCCESS; +}