Author: greatlrd
Date: Tue Jul 4 01:40:20 2006
New Revision: 22811
URL:
http://svn.reactos.org/svn/reactos?rev=22811&view=rev
Log:
forget this file sorry
Added:
trunk/reactos/lib/rtl/workitem.c (with props)
Added: trunk/reactos/lib/rtl/workitem.c
URL:
http://svn.reactos.org/svn/reactos/trunk/reactos/lib/rtl/workitem.c?rev=228…
==============================================================================
--- trunk/reactos/lib/rtl/workitem.c (added)
+++ trunk/reactos/lib/rtl/workitem.c Tue Jul 4 01:40:20 2006
@@ -1,0 +1,881 @@
+/*
+ * COPYRIGHT: See COPYING in the top level directory
+ * PROJECT: ReactOS system libraries
+ * PURPOSE: Work Item implementation
+ * FILE: lib/rtl/workitem.c
+ * PROGRAMMER:
+ */
+
+/* INCLUDES *****************************************************************/
+
+#include <rtl.h>
+
+#define NDEBUG
+#include <debug.h>
+
+/* FUNCTIONS ***************************************************************/
+
+#define MAX_WORKERTHREADS 0x100
+#define WORKERTHREAD_CREATION_THRESHOLD 0x5
+
+typedef struct _RTLP_IOWORKERTHREAD
+{
+ LIST_ENTRY ListEntry;
+ HANDLE ThreadHandle;
+ ULONG Flags;
+} RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
+
+typedef struct _RTLP_WORKITEM
+{
+ WORKERCALLBACKFUNC Function;
+ PVOID Context;
+ ULONG Flags;
+ HANDLE TokenHandle;
+} RTLP_WORKITEM, *PRTLP_WORKITEM;
+
+static LONG ThreadPoolInitialized = 0;
+static RTL_CRITICAL_SECTION ThreadPoolLock;
+static PRTLP_IOWORKERTHREAD PersistentIoThread;
+static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
+static HANDLE ThreadPoolCompletionPort;
+static LONG ThreadPoolWorkerThreads;
+static LONG ThreadPoolWorkerThreadsRequests;
+static LONG ThreadPoolWorkerThreadsLongRequests;
+static LONG ThreadPoolIOWorkerThreads;
+static LONG ThreadPoolIOWorkerThreadsRequests;
+static LONG ThreadPoolIOWorkerThreadsLongRequests;
+
+#define IsThreadPoolInitialized() ((volatile LONG)ThreadPoolInitialized == 1)
+
+static NTSTATUS
+RtlpInitializeThreadPool(VOID)
+{
+ NTSTATUS Status = STATUS_SUCCESS;
+ LONG InitStatus;
+
+ do
+ {
+ InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized,
+ 2,
+ 0);
+ if (InitStatus == 0)
+ {
+ /* We're the first thread to initialize the thread pool */
+
+ InitializeListHead(&ThreadPoolIOWorkerThreadsList);
+
+ PersistentIoThread = NULL;
+
+ ThreadPoolWorkerThreads = 0;
+ ThreadPoolWorkerThreadsRequests = 0;
+ ThreadPoolWorkerThreadsLongRequests = 0;
+ ThreadPoolIOWorkerThreads = 0;
+ ThreadPoolIOWorkerThreadsRequests = 0;
+ ThreadPoolIOWorkerThreadsLongRequests = 0;
+
+ /* Initialize the lock */
+ Status = RtlInitializeCriticalSection(&ThreadPoolLock);
+ if (!NT_SUCCESS(Status))
+ goto Finish;
+
+ /* Create the complection port */
+ Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
+ IO_COMPLETION_ALL_ACCESS,
+ NULL,
+ 0);
+ if (!NT_SUCCESS(Status))
+ {
+ RtlDeleteCriticalSection(&ThreadPoolLock);
+ goto Finish;
+ }
+
+Finish:
+ /* Initialization done */
+ InterlockedExchange(&ThreadPoolInitialized,
+ 1);
+ break;
+ }
+ else if (InitStatus == 2)
+ {
+ LARGE_INTEGER Timeout;
+
+ /* Another thread is currently initializing the thread pool!
+ Poll after a short period of time to see if the initialization
+ was completed */
+
+ Timeout.QuadPart = -10000000LL; /* Wait for a second */
+ NtDelayExecution(FALSE,
+ &Timeout);
+ }
+ } while (InitStatus != 1);
+
+ return Status;
+}
+
+static NTSTATUS
+RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
+{
+ NTSTATUS Status;
+
+ Status = NtOpenThreadToken(NtCurrentThread(),
+ TOKEN_IMPERSONATE,
+ TRUE,
+ TokenHandle);
+ if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
+ {
+ *TokenHandle = NULL;
+ Status = STATUS_SUCCESS;
+ }
+
+ return Status;
+}
+
+static NTSTATUS
+RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
+{
+ NTSTATUS Status;
+ HANDLE ThreadHandle;
+ LARGE_INTEGER Timeout;
+ volatile LONG WorkerInitialized = 0;
+
+ Timeout.QuadPart = -10000LL; /* Wait for 100ms */
+
+ /* Start the thread */
+ Status = RtlCreateUserThread(NtCurrentProcess(),
+ NULL,
+ FALSE,
+ 0,
+ 0,
+ 0,
+ StartRoutine,
+ (PVOID)&WorkerInitialized,
+ &ThreadHandle,
+ NULL);
+
+ if (NT_SUCCESS(Status))
+ {
+ /* Poll until the thread got a chance to initialize */
+ while (WorkerInitialized == 0)
+ {
+ NtDelayExecution(FALSE,
+ &Timeout);
+ }
+
+ NtClose(ThreadHandle);
+ }
+
+ return Status;
+}
+
+static VOID
+NTAPI
+RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
+ IN OUT PVOID SystemArgument1,
+ IN OUT PVOID SystemArgument2)
+{
+ NTSTATUS Status;
+ BOOLEAN Impersonated = FALSE;
+ RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
+
+ RtlFreeHeap(RtlGetProcessHeap(),
+ 0,
+ SystemArgument2);
+
+ if (WorkItem.TokenHandle != NULL)
+ {
+ Status = NtSetInformationThread(NtCurrentThread(),
+ ThreadImpersonationToken,
+ &WorkItem.TokenHandle,
+ sizeof(HANDLE));
+
+ NtClose(WorkItem.TokenHandle);
+
+ if (NT_SUCCESS(Status))
+ {
+ Impersonated = TRUE;
+ }
+ }
+
+ _SEH_TRY
+ {
+ DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p
ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context,
WorkItem.TokenHandle);
+
+ /* Execute the function */
+ WorkItem.Function(WorkItem.Context);
+ }
+ _SEH_HANDLE
+ {
+ DPRINT1("Exception 0x%x while executing IO work item 0x%p\n",
_SEH_GetExceptionCode(), WorkItem.Function);
+ }
+ _SEH_END;
+
+ if (Impersonated)
+ {
+ WorkItem.TokenHandle = NULL;
+ Status = NtSetInformationThread(NtCurrentThread(),
+ ThreadImpersonationToken,
+ &WorkItem.TokenHandle,
+ sizeof(HANDLE));
+ if (!NT_SUCCESS(Status))
+ {
+ DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n",
Status);
+ }
+ }
+
+ /* update the requests counter */
+ InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
+
+ if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
+ {
+ InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
+ }
+}
+
+
+static NTSTATUS
+RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
+{
+ NTSTATUS Status = STATUS_SUCCESS;
+
+ InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
+
+ if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
+ {
+ InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
+ }
+
+ if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
+ {
+ Status = RtlpInitializeTimerThread();
+
+ if (NT_SUCCESS(Status))
+ {
+ /* Queue an APC in the timer thread */
+ Status = NtQueueApcThread(TimerThreadHandle,
+ RtlpExecuteWorkItem,
+ NULL,
+ NULL,
+ WorkItem);
+ }
+ }
+ else
+ {
+ /* Queue an IO completion message */
+ Status = NtSetIoCompletion(ThreadPoolCompletionPort,
+ RtlpExecuteWorkItem,
+ WorkItem,
+ STATUS_SUCCESS,
+ 0);
+ }
+
+ if (!NT_SUCCESS(Status))
+ {
+ InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
+
+ if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
+ {
+ InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
+ }
+ }
+
+ return Status;
+}
+
+static VOID
+NTAPI
+RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
+ IN OUT PVOID SystemArgument1,
+ IN OUT PVOID SystemArgument2)
+{
+ NTSTATUS Status;
+ BOOLEAN Impersonated = FALSE;
+ PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
+ RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
+
+ ASSERT(IoThread != NULL);
+
+ RtlFreeHeap(RtlGetProcessHeap(),
+ 0,
+ SystemArgument2);
+
+ if (WorkItem.TokenHandle != NULL)
+ {
+ Status = NtSetInformationThread(NtCurrentThread(),
+ ThreadImpersonationToken,
+ &WorkItem.TokenHandle,
+ sizeof(HANDLE));
+
+ NtClose(WorkItem.TokenHandle);
+
+ if (NT_SUCCESS(Status))
+ {
+ Impersonated = TRUE;
+ }
+ }
+
+ _SEH_TRY
+ {
+ DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p
ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context,
WorkItem.TokenHandle);
+
+ /* Execute the function */
+ WorkItem.Function(WorkItem.Context);
+ }
+ _SEH_HANDLE
+ {
+ DPRINT1("Exception 0x%x while executing IO work item 0x%p\n",
_SEH_GetExceptionCode(), WorkItem.Function);
+ }
+ _SEH_END;
+
+ if (Impersonated)
+ {
+ WorkItem.TokenHandle = NULL;
+ Status = NtSetInformationThread(NtCurrentThread(),
+ ThreadImpersonationToken,
+ &WorkItem.TokenHandle,
+ sizeof(HANDLE));
+ if (!NT_SUCCESS(Status))
+ {
+ DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n",
Status);
+ }
+ }
+
+ /* remove the long function flag */
+ if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
+ {
+ Status = RtlEnterCriticalSection(&ThreadPoolLock);
+ if (NT_SUCCESS(Status))
+ {
+ IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
+ RtlLeaveCriticalSection(&ThreadPoolLock);
+ }
+ }
+
+ /* update the requests counter */
+ InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
+
+ if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
+ {
+ InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
+ }
+}
+
+static NTSTATUS
+RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
+{
+ PLIST_ENTRY CurrentEntry;
+ PRTLP_IOWORKERTHREAD IoThread = NULL;
+ NTSTATUS Status = STATUS_SUCCESS;
+
+ if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
+ {
+ if (PersistentIoThread != NULL)
+ {
+ /* We already have a persistent IO worker thread */
+ IoThread = PersistentIoThread;
+ }
+ else
+ {
+ /* We're not aware of any persistent IO worker thread. Search for a
unused
+ worker thread that doesn't have a long function queued */
+ CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
+ while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
+ {
+ IoThread = CONTAINING_RECORD(CurrentEntry,
+ RTLP_IOWORKERTHREAD,
+ ListEntry);
+
+ if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
+ break;
+
+ CurrentEntry = CurrentEntry->Flink;
+ }
+
+ if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
+ {
+ /* Found a worker thread we can use. */
+ ASSERT(IoThread != NULL);
+
+ IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
+ PersistentIoThread = IoThread;
+ }
+ else
+ {
+ DPRINT1("Failed to find a worker thread for the persistent IO
thread!\n");
+ return STATUS_NO_MEMORY;
+ }
+ }
+ }
+ else
+ {
+ /* Find a worker thread that is not currently executing a long function */
+ CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
+ while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
+ {
+ IoThread = CONTAINING_RECORD(CurrentEntry,
+ RTLP_IOWORKERTHREAD,
+ ListEntry);
+
+ if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
+ {
+ /* if we're trying to queue a long function then make sure we're
not dealing
+ with the persistent thread */
+ if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) &&
!(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
+ {
+ /* found a candidate */
+ break;
+ }
+ }
+
+ CurrentEntry = CurrentEntry->Flink;
+ }
+
+ if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
+ {
+ /* Couldn't find an appropriate thread, see if we can use the persistent
thread (if it exists) for now */
+ if (ThreadPoolIOWorkerThreads == 0)
+ {
+ DPRINT1("Failed to find a worker thread for the work item
0x%p!\n");
+ ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
+ return STATUS_NO_MEMORY;
+ }
+ else
+ {
+ /* pick the first worker thread */
+ CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
+ IoThread = CONTAINING_RECORD(CurrentEntry,
+ RTLP_IOWORKERTHREAD,
+ ListEntry);
+
+ /* Since this might be the persistent worker thread, don't run as a
+ long function */
+ WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
+ }
+ }
+
+ /* Move the picked thread to the end of the list. Since we're always
searching
+ from the beginning, this improves distribution of work items */
+ RemoveEntryList(&IoThread->ListEntry);
+ InsertTailList(&ThreadPoolIOWorkerThreadsList,
+ &IoThread->ListEntry);
+ }
+
+ ASSERT(IoThread != NULL);
+
+ InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
+
+ if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
+ {
+ /* We're about to queue a long function, mark the thread */
+ IoThread->Flags |= WT_EXECUTELONGFUNCTION;
+
+ InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
+ }
+
+ /* It's time to queue the work item */
+ Status = NtQueueApcThread(IoThread->ThreadHandle,
+ RtlpExecuteIoWorkItem,
+ IoThread,
+ NULL,
+ WorkItem);
+ if (!NT_SUCCESS(Status))
+ {
+ DPRINT1("Failed to queue APC for work item 0x%p\n",
WorkItem->Function);
+ InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
+
+ if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
+ {
+ InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
+ }
+ }
+
+ return Status;
+}
+
+static BOOLEAN
+RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)
+{
+ NTSTATUS Status;
+ ULONG IoPending;
+ BOOLEAN CreatedHandle = FALSE;
+ BOOLEAN IsIoPending = TRUE;
+
+ if (ThreadHandle == NULL)
+ {
+ Status = NtDuplicateObject(NtCurrentProcess(),
+ NtCurrentThread(),
+ NtCurrentProcess(),
+ &ThreadHandle,
+ 0,
+ 0,
+ DUPLICATE_SAME_ACCESS);
+ if (!NT_SUCCESS(Status))
+ {
+ return IsIoPending;
+ }
+
+ CreatedHandle = TRUE;
+ }
+
+ Status = NtQueryInformationThread(ThreadHandle,
+ ThreadIsIoPending,
+ &IoPending,
+ sizeof(IoPending),
+ NULL);
+ if (NT_SUCCESS(Status) && IoPending == 0)
+ {
+ IsIoPending = FALSE;
+ }
+
+ if (CreatedHandle)
+ {
+ NtClose(ThreadHandle);
+ }
+
+ return IsIoPending;
+}
+
+static ULONG
+NTAPI
+RtlpIoWorkerThreadProc(IN PVOID Parameter)
+{
+ volatile RTLP_IOWORKERTHREAD ThreadInfo;
+ LARGE_INTEGER Timeout;
+ BOOLEAN Terminate;
+ NTSTATUS Status = STATUS_SUCCESS;
+
+ if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
+ {
+ /* Oops, too many worker threads... */
+ goto InitFailed;
+ }
+
+ /* Get a thread handle to ourselves */
+ Status = NtDuplicateObject(NtCurrentProcess(),
+ NtCurrentThread(),
+ NtCurrentProcess(),
+ (PHANDLE)&ThreadInfo.ThreadHandle,
+ 0,
+ 0,
+ DUPLICATE_SAME_ACCESS);
+ if (!NT_SUCCESS(Status))
+ {
+ DPRINT1("Failed to create handle to own thread! Status: 0x%x\n",
Status);
+
+InitFailed:
+ InterlockedDecrement(&ThreadPoolIOWorkerThreads);
+
+ /* Signal initialization completion */
+ InterlockedExchange((PLONG)Parameter,
+ 1);
+
+ RtlExitUserThread(Status);
+ return 0;
+ }
+
+ ThreadInfo.Flags = 0;
+
+ /* Insert the thread into the list */
+ InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
+ (PLIST_ENTRY)&ThreadInfo.ListEntry);
+
+ /* Signal initialization completion */
+ InterlockedExchange((PLONG)Parameter,
+ 1);
+
+ for (;;)
+ {
+ Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
+
+Wait:
+ do
+ {
+ /* Perform an alertable wait, the work items are going to be executed as APCs
*/
+ Status = NtDelayExecution(TRUE,
+ &Timeout);
+
+ /* Loop as long as we executed an APC */
+ } while (Status != STATUS_SUCCESS);
+
+ /* We timed out, let's see if we're allowed to terminate */
+ Terminate = FALSE;
+
+ Status = RtlEnterCriticalSection(&ThreadPoolLock);
+ if (NT_SUCCESS(Status))
+ {
+ if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
+ {
+ /* This thread is supposed to be persistent. Don't terminate! */
+ RtlLeaveCriticalSection(&ThreadPoolLock);
+
+ Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
+ goto Wait;
+ }
+
+ /* FIXME - figure out an effective method to determine if it's
appropriate to
+ lower the number of threads. For now let's always terminate if
there's
+ at least one thread and no queued items. */
+ Terminate = ((volatile LONG)ThreadPoolIOWorkerThreads - (volatile
LONG)ThreadPoolIOWorkerThreadsLongRequests >= WORKERTHREAD_CREATION_THRESHOLD)
&&
+ ((volatile LONG)ThreadPoolIOWorkerThreadsRequests == 0);
+
+ if (Terminate)
+ {
+ /* Prevent termination as long as IO is pending */
+ Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
+ }
+
+ if (Terminate)
+ {
+ /* Rundown the thread and unlink it from the list */
+ InterlockedDecrement(&ThreadPoolIOWorkerThreads);
+ RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
+ }
+
+ RtlLeaveCriticalSection(&ThreadPoolLock);
+
+ if (Terminate)
+ {
+ /* Break the infinite loop and terminate */
+ Status = STATUS_SUCCESS;
+ break;
+ }
+ }
+ else
+ {
+ DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n",
Status);
+ break;
+ }
+ }
+
+ NtClose(ThreadInfo.ThreadHandle);
+ RtlExitUserThread(Status);
+ return 0;
+}
+
+static ULONG
+NTAPI
+RtlpWorkerThreadProc(IN PVOID Parameter)
+{
+ LARGE_INTEGER Timeout;
+ BOOLEAN Terminate;
+ PVOID SystemArgument2;
+ IO_STATUS_BLOCK IoStatusBlock;
+ ULONG TimeoutCount = 0;
+ PKNORMAL_ROUTINE ApcRoutine;
+ NTSTATUS Status = STATUS_SUCCESS;
+
+ if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
+ {
+ /* Signal initialization completion */
+ InterlockedExchange((PLONG)Parameter,
+ 1);
+
+ /* Oops, too many worker threads... */
+ RtlExitUserThread(Status);
+ return 0;
+ }
+
+ /* Signal initialization completion */
+ InterlockedExchange((PLONG)Parameter,
+ 1);
+
+ for (;;)
+ {
+ Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
+
+ /* Dequeue a completion message */
+ Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
+ (PVOID*)&ApcRoutine,
+ &SystemArgument2,
+ &IoStatusBlock,
+ &Timeout);
+
+ if (Status == STATUS_SUCCESS)
+ {
+ TimeoutCount = 0;
+
+ _SEH_TRY
+ {
+ /* Call the APC routine */
+ ApcRoutine(NULL,
+ (PVOID)IoStatusBlock.Information,
+ SystemArgument2);
+ }
+ _SEH_HANDLE
+ {
+ }
+ _SEH_END;
+ }
+ else
+ {
+ Terminate = FALSE;
+
+ if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
+ continue;
+
+ /* FIXME - this should be optimized, check if there's requests, etc */
+
+ if (Status == STATUS_TIMEOUT)
+ {
+ /* FIXME - we might want to optimize this */
+ if (TimeoutCount++ > 2 &&
+ (volatile LONG)ThreadPoolWorkerThreads - (volatile
LONG)ThreadPoolWorkerThreadsLongRequests >= WORKERTHREAD_CREATION_THRESHOLD)
+ {
+ Terminate = TRUE;
+ }
+ }
+ else
+ Terminate = TRUE;
+
+ RtlLeaveCriticalSection(&ThreadPoolLock);
+
+ if (Terminate)
+ {
+ /* Prevent termination as long as IO is pending */
+ Terminate = !RtlpIsIoPending(NULL);
+ }
+
+ if (Terminate)
+ {
+ InterlockedDecrement(&ThreadPoolWorkerThreads);
+ Status = STATUS_SUCCESS;
+ break;
+ }
+ }
+ }
+
+ RtlExitUserThread(Status);
+ return 0;
+
+}
+
+/*
+ * @implemented
+ */
+NTSTATUS
+NTAPI
+RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
+ IN PVOID Context OPTIONAL,
+ IN ULONG Flags)
+{
+ LONG FreeWorkers;
+ NTSTATUS Status;
+ PRTLP_WORKITEM WorkItem;
+
+ DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
+
+ /* Initialize the thread pool if not already initialized */
+ if (!IsThreadPoolInitialized())
+ {
+ Status = RtlpInitializeThreadPool();
+
+ if (!NT_SUCCESS(Status))
+ return Status;
+ }
+
+ /* Allocate a work item */
+ WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
+ 0,
+ sizeof(RTLP_WORKITEM));
+ if (WorkItem == NULL)
+ return STATUS_NO_MEMORY;
+
+ WorkItem->Function = Function;
+ WorkItem->Context = Context;
+ WorkItem->Flags = Flags;
+
+ if (Flags & WT_TRANSFER_IMPERSONATION)
+ {
+ Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
+
+ if (!NT_SUCCESS(Status))
+ {
+ DPRINT1("Failed to get impersonation token! Status: 0x%x\n",
Status);
+ goto Cleanup;
+ }
+ }
+ else
+ WorkItem->TokenHandle = NULL;
+
+ Status = RtlEnterCriticalSection(&ThreadPoolLock);
+ if (NT_SUCCESS(Status))
+ {
+ if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD |
WT_EXECUTEINPERSISTENTIOTHREAD))
+ {
+ /* FIXME - We should optimize the algorithm used to determine whether to grow
the thread pool! */
+
+ FreeWorkers = ThreadPoolIOWorkerThreads -
ThreadPoolIOWorkerThreadsLongRequests;
+
+ if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION))
== WT_EXECUTELONGFUNCTION) &&
+ PersistentIoThread != NULL)
+ {
+ /* We shouldn't queue a long function into the persistent IO thread
*/
+ FreeWorkers--;
+ }
+
+ /* See if it's a good idea to grow the pool */
+ if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
+ (FreeWorkers <= 0 || ThreadPoolIOWorkerThreads -
ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
+ {
+ /* Grow the thread pool */
+ Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
+
+ if (!NT_SUCCESS(Status) && (volatile
LONG)ThreadPoolIOWorkerThreads != 0)
+ {
+ /* We failed to create the thread, but there's at least one there
so
+ we can at least queue the request */
+ Status = STATUS_SUCCESS;
+ }
+ }
+
+ if (NT_SUCCESS(Status))
+ {
+ /* Queue a IO worker thread */
+ Status = RtlpQueueIoWorkerThread(WorkItem);
+ }
+ }
+ else
+ {
+ /* FIXME - We should optimize the algorithm used to determine whether to grow
the thread pool! */
+
+ FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
+
+ /* See if it's a good idea to grow the pool */
+ if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
+ (FreeWorkers <= 0 || ThreadPoolWorkerThreads -
ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
+ {
+ /* Grow the thread pool */
+ Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
+
+ if (!NT_SUCCESS(Status) && (volatile LONG)ThreadPoolWorkerThreads
!= 0)
+ {
+ /* We failed to create the thread, but there's at least one there
so
+ we can at least queue the request */
+ Status = STATUS_SUCCESS;
+ }
+ }
+
+ if (NT_SUCCESS(Status))
+ {
+ /* Queue a normal worker thread */
+ Status = RtlpQueueWorkerThread(WorkItem);
+ }
+ }
+
+ RtlLeaveCriticalSection(&ThreadPoolLock);
+ }
+
+ if (!NT_SUCCESS(Status))
+ {
+ if (WorkItem->TokenHandle != NULL)
+ {
+ NtClose(WorkItem->TokenHandle);
+ }
+
+Cleanup:
+ RtlFreeHeap(RtlGetProcessHeap(),
+ 0,
+ WorkItem);
+ }
+
+ return Status;
+}
Propchange: trunk/reactos/lib/rtl/workitem.c
------------------------------------------------------------------------------
svn:eol-style = native