Author: janderwald
Date: Wed Jul 22 23:27:44 2009
New Revision: 42147
URL:
http://svn.reactos.org/svn/reactos?rev=42147&view=rev
Log:
- Fix totally broken KSWORKER implementation (it may have worked, but it didnt do what it
should have)
- Re-Implement KsQueueWorkItem, KsIncrementCountedWorker, KsDecrementCountedWorker,
KsRegisterCountedWorker, KsUnregisterWorker, KsRegisterWorker
Modified:
trunk/reactos/drivers/ksfilter/ks/worker.c
Modified: trunk/reactos/drivers/ksfilter/ks/worker.c
URL:
http://svn.reactos.org/svn/reactos/trunk/reactos/drivers/ksfilter/ks/worker…
==============================================================================
--- trunk/reactos/drivers/ksfilter/ks/worker.c [iso-8859-1] (original)
+++ trunk/reactos/drivers/ksfilter/ks/worker.c [iso-8859-1] Wed Jul 22 23:27:44 2009
@@ -15,14 +15,64 @@
typedef struct
{
+ WORK_QUEUE_ITEM WorkItem;
+
KEVENT Event;
KSPIN_LOCK Lock;
WORK_QUEUE_TYPE Type;
LONG Counter;
+ LONG QueuedWorkItemCount;
+ LIST_ENTRY QueuedWorkItems;
+
+ PWORK_QUEUE_ITEM CountedWorkItem;
+}KSIWORKER, *PKSIWORKER;
+
+VOID
+NTAPI
+WorkItemRoutine(
+ IN PVOID Context)
+{
+ PKSIWORKER KsWorker;
+ KIRQL OldLevel;
PWORK_QUEUE_ITEM WorkItem;
- ULONG WorkItemActive;
- ULONG DeleteInProgress;
-}KS_WORKER;
+ PLIST_ENTRY Entry;
+
+
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Context;
+
+ /* acquire back the lock */
+ KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
+
+ do
+ {
+ /* remove first entry */
+ Entry = RemoveHeadList(&KsWorker->QueuedWorkItems);
+ /* get offset to work item */
+ WorkItem = (PWORK_QUEUE_ITEM)CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List);
+
+ /* release lock as the callback might call one KsWorker functions */
+ KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
+
+ /* now dispatch the work */
+ WorkItem->WorkerRoutine(WorkItem->Parameter);
+
+ /* acquire back the lock */
+ KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
+
+ /* decrement queued work item count */
+ InterlockedDecrement(&KsWorker->QueuedWorkItemCount);
+
+ }while(KsWorker->QueuedWorkItemCount);
+
+ /* release the lock */
+ KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
+
+ /* signal completion event */
+ KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE);
+
+}
+
/*
@implemented
@@ -34,8 +84,8 @@
IN WORK_QUEUE_TYPE WorkQueueType,
OUT PKSWORKER* Worker)
{
- KS_WORKER * KsWorker;
- UNIMPLEMENTED;
+ PKSIWORKER KsWorker;
+
if (WorkQueueType != CriticalWorkQueue &&
WorkQueueType != DelayedWorkQueue &&
@@ -44,16 +94,22 @@
return STATUS_INVALID_PARAMETER;
}
- KsWorker = ExAllocatePool(NonPagedPool, sizeof(KS_WORKER));
+ /* allocate worker context */
+ KsWorker = ExAllocatePool(NonPagedPool, sizeof(KSIWORKER));
if (!KsWorker)
return STATUS_INSUFFICIENT_RESOURCES;
+ /* initialze the work ctx */
+ ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker);
+ /* setup type */
KsWorker->Type = WorkQueueType;
+ /* set counter to zero */
KsWorker->Counter = 0;
- KsWorker->WorkItemActive = 0;
- KsWorker->WorkItem = NULL;
- KsWorker->DeleteInProgress = FALSE;
+ /* Initialize work item queue */
+ InitializeListHead(&KsWorker->QueuedWorkItems);
+ /* initialize work item lock */
KeInitializeSpinLock(&KsWorker->Lock);
+ /* initialize event */
KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
*Worker = KsWorker;
@@ -63,53 +119,68 @@
/*
@implemented
*/
-KSDDKAPI VOID NTAPI
+KSDDKAPI
+VOID
+NTAPI
KsUnregisterWorker(
IN PKSWORKER Worker)
{
- KS_WORKER * KsWorker;
+ PKSIWORKER KsWorker;
KIRQL OldIrql;
if (!Worker)
return;
- KsWorker = (KS_WORKER *)Worker;
-
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Worker;
+ /* acquire spinlock */
KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
-
- KsWorker->DeleteInProgress = TRUE;
-
- if (KsWorker->WorkItemActive)
- {
+ /* fake status running to avoid work items to be queued by the counted worker */
+ KsWorker->Counter = 1;
+ /* is there currently a work item active */
+ if (KsWorker->QueuedWorkItemCount)
+ {
+ /* release the lock */
KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
+ /* wait for the worker routine to finish */
KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE,
NULL);
}
else
{
+ /* no work item active, just release the lock */
KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
}
-
- ExFreePool(KsWorker);
-}
-
-/*
- @implemented
-*/
-KSDDKAPI NTSTATUS NTAPI
+ /* free worker context */
+ FreeItem(KsWorker);
+}
+
+/*
+ @implemented
+*/
+KSDDKAPI
+NTSTATUS
+NTAPI
KsRegisterCountedWorker(
IN WORK_QUEUE_TYPE WorkQueueType,
IN PWORK_QUEUE_ITEM CountedWorkItem,
OUT PKSWORKER* Worker)
{
NTSTATUS Status;
- KS_WORKER * KsWorker;
-
+ PKSIWORKER KsWorker;
+
+ /* check for counted work item parameter */
+ if (!CountedWorkItem)
+ return STATUS_INVALID_PARAMETER_2;
+
+ /* create the work ctx */
Status = KsRegisterWorker(WorkQueueType, Worker);
-
+ /* check for success */
if (NT_SUCCESS(Status))
{
- KsWorker = (KS_WORKER *)Worker;
- KsWorker->WorkItem = CountedWorkItem;
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Worker;
+ /* store counted work item */
+ KsWorker->CountedWorkItem = CountedWorkItem;
}
return Status;
@@ -124,21 +195,18 @@
KsDecrementCountedWorker(
IN PKSWORKER Worker)
{
- KS_WORKER * KsWorker;
+ PKSIWORKER KsWorker;
LONG Counter;
+ /* did the caller pass a work ctx */
if (!Worker)
return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker;
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Worker;
+ /* decrement counter */
Counter = InterlockedDecrement(&KsWorker->Counter);
-
- if (KsWorker->DeleteInProgress)
- {
- /* signal that we are done */
- KeSetEvent(&KsWorker->Event, 0, 0);
- }
-
+ /* return result */
return Counter;
}
@@ -151,19 +219,24 @@
KsIncrementCountedWorker(
IN PKSWORKER Worker)
{
- KS_WORKER * KsWorker;
+ PKSIWORKER KsWorker;
LONG Counter;
+ /* did the caller pass a work ctx */
if (!Worker)
return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker;
-
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Worker;
+ /* increment counter */
Counter = InterlockedIncrement(&KsWorker->Counter);
if (Counter == 1)
{
- KsQueueWorkItem(Worker, KsWorker->WorkItem);
- }
+ /* this is the first work item in list, so queue a real work item */
+ KsQueueWorkItem(Worker, KsWorker->CountedWorkItem);
+ }
+
+ /* return current counter */
return Counter;
}
@@ -177,25 +250,31 @@
IN PKSWORKER Worker,
IN PWORK_QUEUE_ITEM WorkItem)
{
- KS_WORKER * KsWorker;
+ PKSIWORKER KsWorker;
KIRQL OldIrql;
- NTSTATUS Status = STATUS_SUCCESS;
-
+
+ /* check for all parameters */
if (!Worker || !WorkItem)
return STATUS_INVALID_PARAMETER;
- KsWorker = (KS_WORKER *)Worker;
+ /* get ks worker implementation */
+ KsWorker = (PKSIWORKER)Worker;
+ /* lock the work queue */
KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
-
- if (!KsWorker->DeleteInProgress)
- {
+ /* insert work item to list */
+ InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List);
+ /* increment active count */
+ InterlockedIncrement(&KsWorker->QueuedWorkItemCount);
+ /* is this the first work item */
+ if (KsWorker->QueuedWorkItemCount == 1)
+ {
+ /* clear event */
+ KeClearEvent(&KsWorker->Event);
+ /* it is, queue it */
ExQueueWorkItem(WorkItem, KsWorker->Type);
}
- else
- {
- Status = STATUS_UNSUCCESSFUL;
- }
-
+ /* release lock */
KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
- return Status;
-}
+
+ return STATUS_SUCCESS;
+}