https://git.reactos.org/?p=reactos.git;a=commitdiff;h=405ed2b4ceeff7e011e35…
commit 405ed2b4ceeff7e011e35e5308abdfc582f29396
Author: Denis Malikov <filedem(a)gmail.com>
AuthorDate: Thu Apr 25 21:08:41 2024 -0700
Commit: Justin Miller <justinmiller100(a)gmail.com>
CommitDate: Sun Jan 26 23:30:59 2025 -0800
[KERNEL32_VISTA][SDK] Import Threadpool.c from wine-9.7
---
dll/ntdll/nt_0600/ntdll_vista.spec | 29 +
media/doc/WINESYNC.txt | 3 +-
sdk/include/ndk/rtlfuncs.h | 48 +
sdk/include/psdk/winbase.h | 1 +
sdk/include/xdk/winnt_old.h | 7 +
sdk/lib/rtl/CMakeLists.txt | 1 +
sdk/lib/rtl/threadpool.c | 3357 ++++++++++++++++++++++++++++++++++++
7 files changed, 3444 insertions(+), 2 deletions(-)
diff --git a/dll/ntdll/nt_0600/ntdll_vista.spec b/dll/ntdll/nt_0600/ntdll_vista.spec
index d36c6449d8e..23fd88e02c4 100644
--- a/dll/ntdll/nt_0600/ntdll_vista.spec
+++ b/dll/ntdll/nt_0600/ntdll_vista.spec
@@ -13,5 +13,34 @@
@ stdcall RtlRunOnceComplete(ptr long ptr)
@ stdcall RtlRunOnceExecuteOnce(ptr ptr ptr ptr)
+@ stdcall TpAllocCleanupGroup(ptr)
+@ stdcall TpAllocPool(ptr ptr)
+@ stdcall TpAllocTimer(ptr ptr ptr ptr)
+@ stdcall TpAllocWait(ptr ptr ptr ptr)
+@ stdcall TpAllocWork(ptr ptr ptr ptr)
+@ stdcall TpCallbackLeaveCriticalSectionOnCompletion(ptr ptr)
+@ stdcall TpCallbackMayRunLong(ptr)
+@ stdcall TpCallbackReleaseMutexOnCompletion(ptr ptr)
+@ stdcall TpCallbackReleaseSemaphoreOnCompletion(ptr ptr long)
+@ stdcall TpCallbackSetEventOnCompletion(ptr ptr)
+@ stdcall TpCallbackUnloadDllOnCompletion(ptr ptr)
+@ stdcall TpDisassociateCallback(ptr)
+@ stdcall TpIsTimerSet(ptr)
+@ stdcall TpPostWork(ptr)
+@ stdcall TpReleaseCleanupGroup(ptr)
+@ stdcall TpReleaseCleanupGroupMembers(ptr long ptr)
+@ stdcall TpReleasePool(ptr)
+@ stdcall TpReleaseTimer(ptr)
+@ stdcall TpReleaseWait(ptr)
+@ stdcall TpReleaseWork(ptr)
+@ stdcall TpSetPoolMaxThreads(ptr long)
+@ stdcall TpSetPoolMinThreads(ptr long)
+@ stdcall TpSetTimer(ptr ptr long long)
+@ stdcall TpSetWait(ptr long ptr)
+@ stdcall TpSimpleTryPost(ptr ptr ptr)
+@ stdcall TpWaitForTimer(ptr long)
+@ stdcall TpWaitForWait(ptr long)
+@ stdcall TpWaitForWork(ptr long)
+
@ stdcall RtlConnectToSm(ptr ptr long ptr) SmConnectToSm
@ stdcall RtlSendMsgToSm(ptr ptr) SmSendMsgToSm
diff --git a/media/doc/WINESYNC.txt b/media/doc/WINESYNC.txt
index bf5b76bfb55..61e11c67f15 100644
--- a/media/doc/WINESYNC.txt
+++ b/media/doc/WINESYNC.txt
@@ -268,8 +268,7 @@ check Wine current sources first as it may already be fixed.
sdk/lib/3rdparty/strmbase # Synced to WineStaging-3.3
sdk/lib/rtl/actctx.c # Synced to wine-5.18
-sdk/lib/rtl/timerqueue.c # Synced to wine-5.18
-sdk/lib/rtl/wait.c # Synced to wine-5.18
+sdk/lib/rtl/threadpool.c # Synced with wine-9.7
advapi32 -
dll/win32/advapi32/wine/cred.c # Synced to WineStaging-3.3
diff --git a/sdk/include/ndk/rtlfuncs.h b/sdk/include/ndk/rtlfuncs.h
index 3a00f490935..7b128cd440b 100644
--- a/sdk/include/ndk/rtlfuncs.h
+++ b/sdk/include/ndk/rtlfuncs.h
@@ -4745,6 +4745,54 @@ RtlSleepConditionVariableSRW(
_In_ ULONG Flags);
#endif
+//
+// Synchronization functions
+//
+VOID
+NTAPI
+RtlInitializeConditionVariable(OUT PRTL_CONDITION_VARIABLE ConditionVariable);
+
+VOID
+NTAPI
+RtlWakeConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
+
+VOID
+NTAPI
+RtlWakeAllConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
+
+NTSTATUS
+NTAPI
+RtlSleepConditionVariableCS(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
+ IN OUT PRTL_CRITICAL_SECTION CriticalSection,
+ IN const PLARGE_INTEGER TimeOut OPTIONAL);
+
+NTSTATUS
+NTAPI
+RtlSleepConditionVariableSRW(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
+ IN OUT PRTL_SRWLOCK SRWLock,
+ IN PLARGE_INTEGER TimeOut OPTIONAL,
+ IN ULONG Flags);
+
+VOID
+NTAPI
+RtlInitializeSRWLock(OUT PRTL_SRWLOCK SRWLock);
+
+VOID
+NTAPI
+RtlAcquireSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
+
+VOID
+NTAPI
+RtlReleaseSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
+
+VOID
+NTAPI
+RtlAcquireSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
+
+VOID
+NTAPI
+RtlReleaseSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
+
//
// Secure Memory Functions
//
diff --git a/sdk/include/psdk/winbase.h b/sdk/include/psdk/winbase.h
index f5eed964654..4ff919b83f1 100644
--- a/sdk/include/psdk/winbase.h
+++ b/sdk/include/psdk/winbase.h
@@ -4123,6 +4123,7 @@ InitOnceExecuteOnce(
_Inout_opt_ PVOID Parameter,
_Outptr_opt_result_maybenull_ LPVOID *Context);
+typedef VOID (NTAPI
*PTP_WIN32_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PVOID,ULONG,ULONG_PTR,PTP_IO);
#if defined(_SLIST_HEADER_) && !defined(_NTOS_) && !defined(_NTOSP_)
diff --git a/sdk/include/xdk/winnt_old.h b/sdk/include/xdk/winnt_old.h
index 157c3724e80..ee28a933347 100644
--- a/sdk/include/xdk/winnt_old.h
+++ b/sdk/include/xdk/winnt_old.h
@@ -4480,7 +4480,11 @@ DbgRaiseAssertionFailure(VOID)
typedef struct _TP_POOL TP_POOL, *PTP_POOL;
typedef struct _TP_WORK TP_WORK, *PTP_WORK;
typedef struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE, *PTP_CALLBACK_INSTANCE;
+typedef struct _TP_TIMER TP_TIMER, *PTP_TIMER;
+typedef struct _TP_WAIT TP_WAIT, *PTP_WAIT;
+typedef struct _TP_IO TP_IO, *PTP_IO;
+typedef DWORD TP_WAIT_RESULT;
typedef DWORD TP_VERSION, *PTP_VERSION;
typedef enum _TP_CALLBACK_PRIORITY {
@@ -4509,6 +4513,9 @@ typedef VOID
_Inout_opt_ PVOID ObjectContext,
_Inout_opt_ PVOID CleanupContext);
+typedef VOID (NTAPI *PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PTP_TIMER);
+typedef VOID (NTAPI
*PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PTP_WAIT,TP_WAIT_RESULT);
+
#if (_WIN32_WINNT >= _WIN32_WINNT_WIN7)
typedef struct _TP_CALLBACK_ENVIRON_V3 {
TP_VERSION Version;
diff --git a/sdk/lib/rtl/CMakeLists.txt b/sdk/lib/rtl/CMakeLists.txt
index efe2c90a40b..78077605a91 100644
--- a/sdk/lib/rtl/CMakeLists.txt
+++ b/sdk/lib/rtl/CMakeLists.txt
@@ -127,6 +127,7 @@ list(APPEND SOURCE_VISTA
condvar.c
runonce.c
srw.c
+ threadpool.c
utf8.c)
add_library(rtl_vista ${SOURCE_VISTA})
diff --git a/sdk/lib/rtl/threadpool.c b/sdk/lib/rtl/threadpool.c
new file mode 100644
index 00000000000..c6e40fc78f4
--- /dev/null
+++ b/sdk/lib/rtl/threadpool.c
@@ -0,0 +1,3357 @@
+/*
+ * Thread pooling
+ *
+ * Copyright (c) 2006 Robert Shearman
+ * Copyright (c) 2014-2016 Sebastian Lackner
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+ */
+
+#include <assert.h>
+#include <stdarg.h>
+#include <limits.h>
+
+#include "ntstatus.h"
+#define WIN32_NO_STATUS
+#include "winternl.h"
+
+#include "wine/debug.h"
+#include "wine/list.h"
+
+#include "ntdll_misc.h"
+
+WINE_DEFAULT_DEBUG_CHANNEL(threadpool);
+
+/*
+ * Old thread pooling API
+ */
+
+struct rtl_work_item
+{
+ PRTL_WORK_ITEM_ROUTINE function;
+ PVOID context;
+};
+
+#define EXPIRE_NEVER (~(ULONGLONG)0)
+#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
+
+static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug;
+
+static struct
+{
+ HANDLE compl_port;
+ RTL_CRITICAL_SECTION threadpool_compl_cs;
+}
+old_threadpool =
+{
+ NULL, /* compl_port */
+ { &critsect_compl_debug, -1, 0, 0, 0, 0 }, /* threadpool_compl_cs */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG critsect_compl_debug =
+{
+ 0, 0, &old_threadpool.threadpool_compl_cs,
+ { &critsect_compl_debug.ProcessLocksList,
&critsect_compl_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": threadpool_compl_cs") }
+};
+
+struct timer_queue;
+struct queue_timer
+{
+ struct timer_queue *q;
+ struct list entry;
+ ULONG runcount; /* number of callbacks pending execution */
+ RTL_WAITORTIMERCALLBACKFUNC callback;
+ PVOID param;
+ DWORD period;
+ ULONG flags;
+ ULONGLONG expire;
+ BOOL destroy; /* timer should be deleted; once set, never unset */
+ HANDLE event; /* removal event */
+};
+
+struct timer_queue
+{
+ DWORD magic;
+ RTL_CRITICAL_SECTION cs;
+ struct list timers; /* sorted by expiration time */
+ BOOL quit; /* queue should be deleted; once set, never unset */
+ HANDLE event;
+ HANDLE thread;
+};
+
+/*
+ * Object-oriented thread pooling API
+ */
+
+#define THREADPOOL_WORKER_TIMEOUT 5000
+#define MAXIMUM_WAITQUEUE_OBJECTS (MAXIMUM_WAIT_OBJECTS - 1)
+
+/* internal threadpool representation */
+struct threadpool
+{
+ LONG refcount;
+ LONG objcount;
+ BOOL shutdown;
+ CRITICAL_SECTION cs;
+ /* Pools of work items, locked via .cs, order matches TP_CALLBACK_PRIORITY - high,
normal, low. */
+ struct list pools[3];
+ RTL_CONDITION_VARIABLE update_event;
+ /* information about worker threads, locked via .cs */
+ int max_workers;
+ int min_workers;
+ int num_workers;
+ int num_busy_workers;
+ HANDLE compl_port;
+ TP_POOL_STACK_INFORMATION stack_info;
+};
+
+enum threadpool_objtype
+{
+ TP_OBJECT_TYPE_SIMPLE,
+ TP_OBJECT_TYPE_WORK,
+ TP_OBJECT_TYPE_TIMER,
+ TP_OBJECT_TYPE_WAIT,
+ TP_OBJECT_TYPE_IO,
+};
+
+struct io_completion
+{
+ IO_STATUS_BLOCK iosb;
+ ULONG_PTR cvalue;
+};
+
+/* internal threadpool object representation */
+struct threadpool_object
+{
+ void *win32_callback; /* leave space for kernelbase to store win32
callback */
+ LONG refcount;
+ BOOL shutdown;
+ /* read-only information */
+ enum threadpool_objtype type;
+ struct threadpool *pool;
+ struct threadpool_group *group;
+ PVOID userdata;
+ PTP_CLEANUP_GROUP_CANCEL_CALLBACK group_cancel_callback;
+ PTP_SIMPLE_CALLBACK finalization_callback;
+ BOOL may_run_long;
+ HMODULE race_dll;
+ TP_CALLBACK_PRIORITY priority;
+ /* information about the group, locked via .group->cs */
+ struct list group_entry;
+ BOOL is_group_member;
+ /* information about the pool, locked via .pool->cs */
+ struct list pool_entry;
+ RTL_CONDITION_VARIABLE finished_event;
+ RTL_CONDITION_VARIABLE group_finished_event;
+ HANDLE completed_event;
+ LONG num_pending_callbacks;
+ LONG num_running_callbacks;
+ LONG num_associated_callbacks;
+ /* arguments for callback */
+ union
+ {
+ struct
+ {
+ PTP_SIMPLE_CALLBACK callback;
+ } simple;
+ struct
+ {
+ PTP_WORK_CALLBACK callback;
+ } work;
+ struct
+ {
+ PTP_TIMER_CALLBACK callback;
+ /* information about the timer, locked via timerqueue.cs */
+ BOOL timer_initialized;
+ BOOL timer_pending;
+ struct list timer_entry;
+ BOOL timer_set;
+ ULONGLONG timeout;
+ LONG period;
+ LONG window_length;
+ } timer;
+ struct
+ {
+ PTP_WAIT_CALLBACK callback;
+ LONG signaled;
+ /* information about the wait object, locked via waitqueue.cs */
+ struct waitqueue_bucket *bucket;
+ BOOL wait_pending;
+ struct list wait_entry;
+ ULONGLONG timeout;
+ HANDLE handle;
+ DWORD flags;
+ RTL_WAITORTIMERCALLBACKFUNC rtl_callback;
+ } wait;
+ struct
+ {
+ PTP_IO_CALLBACK callback;
+ /* locked via .pool->cs */
+ unsigned int pending_count, skipped_count, completion_count,
completion_max;
+ BOOL shutting_down;
+ struct io_completion *completions;
+ } io;
+ } u;
+};
+
+/* internal threadpool instance representation */
+struct threadpool_instance
+{
+ struct threadpool_object *object;
+ DWORD threadid;
+ BOOL associated;
+ BOOL may_run_long;
+ struct
+ {
+ CRITICAL_SECTION *critical_section;
+ HANDLE mutex;
+ HANDLE semaphore;
+ LONG semaphore_count;
+ HANDLE event;
+ HMODULE library;
+ } cleanup;
+};
+
+/* internal threadpool group representation */
+struct threadpool_group
+{
+ LONG refcount;
+ BOOL shutdown;
+ CRITICAL_SECTION cs;
+ /* list of group members, locked via .cs */
+ struct list members;
+};
+
+/* global timerqueue object */
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+ LONG objcount;
+ BOOL thread_running;
+ struct list pending_timers;
+ RTL_CONDITION_VARIABLE update_event;
+}
+timerqueue =
+{
+ { &timerqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
+ 0, /* objcount */
+ FALSE, /* thread_running */
+ LIST_INIT( timerqueue.pending_timers ), /* pending_timers */
+ RTL_CONDITION_VARIABLE_INIT /* update_event */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG timerqueue_debug =
+{
+ 0, 0, &timerqueue.cs,
+ { &timerqueue_debug.ProcessLocksList, &timerqueue_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": timerqueue.cs") }
+};
+
+/* global waitqueue object */
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+ LONG num_buckets;
+ struct list buckets;
+}
+waitqueue =
+{
+ { &waitqueue_debug, -1, 0, 0, 0, 0 }, /* cs */
+ 0, /* num_buckets */
+ LIST_INIT( waitqueue.buckets ) /* buckets */
+};
+
+static RTL_CRITICAL_SECTION_DEBUG waitqueue_debug =
+{
+ 0, 0, &waitqueue.cs,
+ { &waitqueue_debug.ProcessLocksList, &waitqueue_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": waitqueue.cs") }
+};
+
+struct waitqueue_bucket
+{
+ struct list bucket_entry;
+ LONG objcount;
+ struct list reserved;
+ struct list waiting;
+ HANDLE update_event;
+ BOOL alertable;
+};
+
+/* global I/O completion queue object */
+static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug;
+
+static struct
+{
+ CRITICAL_SECTION cs;
+ LONG objcount;
+ BOOL thread_running;
+ HANDLE port;
+ RTL_CONDITION_VARIABLE update_event;
+}
+ioqueue =
+{
+ .cs = { &ioqueue_debug, -1, 0, 0, 0, 0 },
+};
+
+static RTL_CRITICAL_SECTION_DEBUG ioqueue_debug =
+{
+ 0, 0, &ioqueue.cs,
+ { &ioqueue_debug.ProcessLocksList, &ioqueue_debug.ProcessLocksList },
+ 0, 0, { (DWORD_PTR)(__FILE__ ": ioqueue.cs") }
+};
+
+static inline struct threadpool *impl_from_TP_POOL( TP_POOL *pool )
+{
+ return (struct threadpool *)pool;
+}
+
+static inline struct threadpool_object *impl_from_TP_WORK( TP_WORK *work )
+{
+ struct threadpool_object *object = (struct threadpool_object *)work;
+ assert( object->type == TP_OBJECT_TYPE_WORK );
+ return object;
+}
+
+static inline struct threadpool_object *impl_from_TP_TIMER( TP_TIMER *timer )
+{
+ struct threadpool_object *object = (struct threadpool_object *)timer;
+ assert( object->type == TP_OBJECT_TYPE_TIMER );
+ return object;
+}
+
+static inline struct threadpool_object *impl_from_TP_WAIT( TP_WAIT *wait )
+{
+ struct threadpool_object *object = (struct threadpool_object *)wait;
+ assert( object->type == TP_OBJECT_TYPE_WAIT );
+ return object;
+}
+
+static inline struct threadpool_object *impl_from_TP_IO( TP_IO *io )
+{
+ struct threadpool_object *object = (struct threadpool_object *)io;
+ assert( object->type == TP_OBJECT_TYPE_IO );
+ return object;
+}
+
+static inline struct threadpool_group *impl_from_TP_CLEANUP_GROUP( TP_CLEANUP_GROUP
*group )
+{
+ return (struct threadpool_group *)group;
+}
+
+static inline struct threadpool_instance *impl_from_TP_CALLBACK_INSTANCE(
TP_CALLBACK_INSTANCE *instance )
+{
+ return (struct threadpool_instance *)instance;
+}
+
+static void CALLBACK threadpool_worker_proc( void *param );
+static void tp_object_submit( struct threadpool_object *object, BOOL signaled );
+static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread );
+static void tp_object_prepare_shutdown( struct threadpool_object *object );
+static BOOL tp_object_release( struct threadpool_object *object );
+static struct threadpool *default_threadpool = NULL;
+
+static BOOL array_reserve(void **elements, unsigned int *capacity, unsigned int count,
unsigned int size)
+{
+ unsigned int new_capacity, max_capacity;
+ void *new_elements;
+
+ if (count <= *capacity)
+ return TRUE;
+
+ max_capacity = ~(SIZE_T)0 / size;
+ if (count > max_capacity)
+ return FALSE;
+
+ new_capacity = max(4, *capacity);
+ while (new_capacity < count && new_capacity <= max_capacity / 2)
+ new_capacity *= 2;
+ if (new_capacity < count)
+ new_capacity = max_capacity;
+
+ if (!(new_elements = RtlReAllocateHeap( GetProcessHeap(), 0, *elements, new_capacity
* size )))
+ return FALSE;
+
+ *elements = new_elements;
+ *capacity = new_capacity;
+
+ return TRUE;
+}
+
+static void set_thread_name(const WCHAR *name)
+{
+ THREAD_NAME_INFORMATION info;
+
+ RtlInitUnicodeString(&info.ThreadName, name);
+ NtSetInformationThread(GetCurrentThread(), ThreadNameInformation, &info,
sizeof(info));
+}
+
+static void CALLBACK process_rtl_work_item( TP_CALLBACK_INSTANCE *instance, void
*userdata )
+{
+ struct rtl_work_item *item = userdata;
+
+ TRACE("executing %p(%p)\n", item->function, item->context);
+ item->function( item->context );
+
+ RtlFreeHeap( GetProcessHeap(), 0, item );
+}
+
+/***********************************************************************
+ * RtlQueueWorkItem (NTDLL.@)
+ *
+ * Queues a work item into a thread in the thread pool.
+ *
+ * PARAMS
+ * function [I] Work function to execute.
+ * context [I] Context to pass to the work function when it is executed.
+ * flags [I] Flags. See notes.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ *
+ * NOTES
+ * Flags can be one or more of the following:
+ *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
+ *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
+ *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
+ *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
+ *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
+ */
+NTSTATUS WINAPI RtlQueueWorkItem( PRTL_WORK_ITEM_ROUTINE function, PVOID context, ULONG
flags )
+{
+ TP_CALLBACK_ENVIRON environment;
+ struct rtl_work_item *item;
+ NTSTATUS status;
+
+ TRACE( "%p %p %lu\n", function, context, flags );
+
+ item = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*item) );
+ if (!item)
+ return STATUS_NO_MEMORY;
+
+ memset( &environment, 0, sizeof(environment) );
+ environment.Version = 1;
+ environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
+ environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
+
+ item->function = function;
+ item->context = context;
+
+ status = TpSimpleTryPost( process_rtl_work_item, item, &environment );
+ if (status) RtlFreeHeap( GetProcessHeap(), 0, item );
+ return status;
+}
+
+/***********************************************************************
+ * iocp_poller - get completion events and run callbacks
+ */
+static DWORD CALLBACK iocp_poller(LPVOID Arg)
+{
+ HANDLE cport = Arg;
+
+ while( TRUE )
+ {
+ PRTL_OVERLAPPED_COMPLETION_ROUTINE callback;
+ LPVOID overlapped;
+ IO_STATUS_BLOCK iosb;
+ NTSTATUS res = NtRemoveIoCompletion( cport, (PULONG_PTR)&callback,
(PULONG_PTR)&overlapped, &iosb, NULL );
+ if (res)
+ {
+ ERR("NtRemoveIoCompletion failed: 0x%lx\n", res);
+ }
+ else
+ {
+ DWORD transferred = 0;
+ DWORD err = 0;
+
+ if (iosb.Status == STATUS_SUCCESS)
+ transferred = iosb.Information;
+ else
+ err = RtlNtStatusToDosError(iosb.Status);
+
+ callback( err, transferred, overlapped );
+ }
+ }
+ return 0;
+}
+
+/***********************************************************************
+ * RtlSetIoCompletionCallback (NTDLL.@)
+ *
+ * Binds a handle to a thread pool's completion port, and possibly
+ * starts a non-I/O thread to monitor this port and call functions back.
+ *
+ * PARAMS
+ * FileHandle [I] Handle to bind to a completion port.
+ * Function [I] Callback function to call on I/O completions.
+ * Flags [I] Not used.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ *
+ */
+NTSTATUS WINAPI RtlSetIoCompletionCallback(HANDLE FileHandle,
PRTL_OVERLAPPED_COMPLETION_ROUTINE Function, ULONG Flags)
+{
+ IO_STATUS_BLOCK iosb;
+ FILE_COMPLETION_INFORMATION info;
+
+ if (Flags) FIXME("Unknown value Flags=0x%lx\n", Flags);
+
+ if (!old_threadpool.compl_port)
+ {
+ NTSTATUS res = STATUS_SUCCESS;
+
+ RtlEnterCriticalSection(&old_threadpool.threadpool_compl_cs);
+ if (!old_threadpool.compl_port)
+ {
+ HANDLE cport;
+
+ res = NtCreateIoCompletion( &cport, IO_COMPLETION_ALL_ACCESS, NULL, 0 );
+ if (!res)
+ {
+ /* FIXME native can start additional threads in case of e.g. hung
callback function. */
+ res = RtlQueueWorkItem( iocp_poller, cport, WT_EXECUTEDEFAULT );
+ if (!res)
+ old_threadpool.compl_port = cport;
+ else
+ NtClose( cport );
+ }
+ }
+ RtlLeaveCriticalSection(&old_threadpool.threadpool_compl_cs);
+ if (res) return res;
+ }
+
+ info.CompletionPort = old_threadpool.compl_port;
+ info.CompletionKey = (ULONG_PTR)Function;
+
+ return NtSetInformationFile( FileHandle, &iosb, &info, sizeof(info),
FileCompletionInformation );
+}
+
+static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
+{
+ if (timeout == INFINITE) return NULL;
+ pTime->QuadPart = (ULONGLONG)timeout * -10000;
+ return pTime;
+}
+
+
+/************************** Timer Queue Impl **************************/
+
+static void queue_remove_timer(struct queue_timer *t)
+{
+ /* We MUST hold the queue cs while calling this function. This ensures
+ that we cannot queue another callback for this timer. The runcount
+ being zero makes sure we don't have any already queued. */
+ struct timer_queue *q = t->q;
+
+ assert(t->runcount == 0);
+ assert(t->destroy);
+
+ list_remove(&t->entry);
+ if (t->event)
+ NtSetEvent(t->event, NULL);
+ RtlFreeHeap(GetProcessHeap(), 0, t);
+
+ if (q->quit && list_empty(&q->timers))
+ NtSetEvent(q->event, NULL);
+}
+
+static void timer_cleanup_callback(struct queue_timer *t)
+{
+ struct timer_queue *q = t->q;
+ RtlEnterCriticalSection(&q->cs);
+
+ assert(0 < t->runcount);
+ --t->runcount;
+
+ if (t->destroy && t->runcount == 0)
+ queue_remove_timer(t);
+
+ RtlLeaveCriticalSection(&q->cs);
+}
+
+static DWORD WINAPI timer_callback_wrapper(LPVOID p)
+{
+ struct queue_timer *t = p;
+ t->callback(t->param, TRUE);
+ timer_cleanup_callback(t);
+ return 0;
+}
+
+static inline ULONGLONG queue_current_time(void)
+{
+ LARGE_INTEGER now, freq;
+ NtQueryPerformanceCounter(&now, &freq);
+ return now.QuadPart * 1000 / freq.QuadPart;
+}
+
+static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
+ BOOL set_event)
+{
+ /* We MUST hold the queue cs while calling this function. */
+ struct timer_queue *q = t->q;
+ struct list *ptr = &q->timers;
+
+ assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
+
+ if (time != EXPIRE_NEVER)
+ LIST_FOR_EACH(ptr, &q->timers)
+ {
+ struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
+ if (time < cur->expire)
+ break;
+ }
+ list_add_before(ptr, &t->entry);
+
+ t->expire = time;
+
+ /* If we insert at the head of the list, we need to expire sooner
+ than expected. */
+ if (set_event && &t->entry == list_head(&q->timers))
+ NtSetEvent(q->event, NULL);
+}
+
+static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
+ BOOL set_event)
+{
+ /* We MUST hold the queue cs while calling this function. */
+ list_remove(&t->entry);
+ queue_add_timer(t, time, set_event);
+}
+
+static void queue_timer_expire(struct timer_queue *q)
+{
+ struct queue_timer *t = NULL;
+
+ RtlEnterCriticalSection(&q->cs);
+ if (list_head(&q->timers))
+ {
+ ULONGLONG now, next;
+ t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
+ if (!t->destroy && t->expire <= ((now = queue_current_time())))
+ {
+ ++t->runcount;
+ if (t->period)
+ {
+ next = t->expire + t->period;
+ /* avoid trigger cascade if overloaded / hibernated */
+ if (next < now)
+ next = now + t->period;
+ }
+ else
+ next = EXPIRE_NEVER;
+ queue_move_timer(t, next, FALSE);
+ }
+ else
+ t = NULL;
+ }
+ RtlLeaveCriticalSection(&q->cs);
+
+ if (t)
+ {
+ if (t->flags & WT_EXECUTEINTIMERTHREAD)
+ timer_callback_wrapper(t);
+ else
+ {
+ ULONG flags
+ = (t->flags
+ & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
+ | WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
+ NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
+ if (status != STATUS_SUCCESS)
+ timer_cleanup_callback(t);
+ }
+ }
+}
+
+static ULONG queue_get_timeout(struct timer_queue *q)
+{
+ struct queue_timer *t;
+ ULONG timeout = INFINITE;
+
+ RtlEnterCriticalSection(&q->cs);
+ if (list_head(&q->timers))
+ {
+ t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
+ assert(!t->destroy || t->expire == EXPIRE_NEVER);
+
+ if (t->expire != EXPIRE_NEVER)
+ {
+ ULONGLONG time = queue_current_time();
+ timeout = t->expire < time ? 0 : t->expire - time;
+ }
+ }
+ RtlLeaveCriticalSection(&q->cs);
+
+ return timeout;
+}
+
+static void WINAPI timer_queue_thread_proc(LPVOID p)
+{
+ struct timer_queue *q = p;
+ ULONG timeout_ms;
+
+ set_thread_name(L"wine_threadpool_timer_queue");
+ timeout_ms = INFINITE;
+ for (;;)
+ {
+ LARGE_INTEGER timeout;
+ NTSTATUS status;
+ BOOL done = FALSE;
+
+ status = NtWaitForSingleObject(
+ q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
+
+ if (status == STATUS_WAIT_0)
+ {
+ /* There are two possible ways to trigger the event. Either
+ we are quitting and the last timer got removed, or a new
+ timer got put at the head of the list so we need to adjust
+ our timeout. */
+ RtlEnterCriticalSection(&q->cs);
+ if (q->quit && list_empty(&q->timers))
+ done = TRUE;
+ RtlLeaveCriticalSection(&q->cs);
+ }
+ else if (status == STATUS_TIMEOUT)
+ queue_timer_expire(q);
+
+ if (done)
+ break;
+
+ timeout_ms = queue_get_timeout(q);
+ }
+
+ NtClose(q->event);
+ RtlDeleteCriticalSection(&q->cs);
+ q->magic = 0;
+ RtlFreeHeap(GetProcessHeap(), 0, q);
+ RtlExitUserThread( 0 );
+}
+
+static void queue_destroy_timer(struct queue_timer *t)
+{
+ /* We MUST hold the queue cs while calling this function. */
+ t->destroy = TRUE;
+ if (t->runcount == 0)
+ /* Ensure a timer is promptly removed. If callbacks are pending,
+ it will be removed after the last one finishes by the callback
+ cleanup wrapper. */
+ queue_remove_timer(t);
+ else
+ /* Make sure no destroyed timer masks an active timer at the head
+ of the sorted list. */
+ queue_move_timer(t, EXPIRE_NEVER, FALSE);
+}
+
+/***********************************************************************
+ * RtlCreateTimerQueue (NTDLL.@)
+ *
+ * Creates a timer queue object and returns a handle to it.
+ *
+ * PARAMS
+ * NewTimerQueue [O] The newly created queue.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
+{
+ NTSTATUS status;
+ struct timer_queue *q = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *q);
+ if (!q)
+ return STATUS_NO_MEMORY;
+
+ RtlInitializeCriticalSection(&q->cs);
+ list_init(&q->timers);
+ q->quit = FALSE;
+ q->magic = TIMER_QUEUE_MAGIC;
+ status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL,
SynchronizationEvent, FALSE);
+ if (status != STATUS_SUCCESS)
+ {
+ RtlFreeHeap(GetProcessHeap(), 0, q);
+ return status;
+ }
+ status = RtlCreateUserThread(GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
+ timer_queue_thread_proc, q, &q->thread, NULL);
+ if (status != STATUS_SUCCESS)
+ {
+ NtClose(q->event);
+ RtlFreeHeap(GetProcessHeap(), 0, q);
+ return status;
+ }
+
+ *NewTimerQueue = q;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * RtlDeleteTimerQueueEx (NTDLL.@)
+ *
+ * Deletes a timer queue object.
+ *
+ * PARAMS
+ * TimerQueue [I] The timer queue to destroy.
+ * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
+ * wait until all timers are finished firing before
+ * returning. Otherwise, return immediately and set the
+ * event when all timers are done.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
+{
+ struct timer_queue *q = TimerQueue;
+ struct queue_timer *t, *temp;
+ HANDLE thread;
+ NTSTATUS status;
+
+ if (!q || q->magic != TIMER_QUEUE_MAGIC)
+ return STATUS_INVALID_HANDLE;
+
+ thread = q->thread;
+
+ RtlEnterCriticalSection(&q->cs);
+ q->quit = TRUE;
+ if (list_head(&q->timers))
+ /* When the last timer is removed, it will signal the timer thread to
+ exit... */
+ LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
+ queue_destroy_timer(t);
+ else
+ /* However if we have none, we must do it ourselves. */
+ NtSetEvent(q->event, NULL);
+ RtlLeaveCriticalSection(&q->cs);
+
+ if (CompletionEvent == INVALID_HANDLE_VALUE)
+ {
+ NtWaitForSingleObject(thread, FALSE, NULL);
+ status = STATUS_SUCCESS;
+ }
+ else
+ {
+ if (CompletionEvent)
+ {
+ FIXME("asynchronous return on completion event unimplemented\n");
+ NtWaitForSingleObject(thread, FALSE, NULL);
+ NtSetEvent(CompletionEvent, NULL);
+ }
+ status = STATUS_PENDING;
+ }
+
+ NtClose(thread);
+ return status;
+}
+
+static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
+{
+ static struct timer_queue *default_timer_queue;
+
+ if (TimerQueue)
+ return TimerQueue;
+ else
+ {
+ if (!default_timer_queue)
+ {
+ HANDLE q;
+ NTSTATUS status = RtlCreateTimerQueue(&q);
+ if (status == STATUS_SUCCESS)
+ {
+ PVOID p = InterlockedCompareExchangePointer( (void **)
&default_timer_queue, q, NULL );
+ if (p)
+ /* Got beat to the punch. */
+ RtlDeleteTimerQueueEx(q, NULL);
+ }
+ }
+ return default_timer_queue;
+ }
+}
+
+/***********************************************************************
+ * RtlCreateTimer (NTDLL.@)
+ *
+ * Creates a new timer associated with the given queue.
+ *
+ * PARAMS
+ * TimerQueue [I] The queue to hold the timer.
+ * NewTimer [O] The newly created timer.
+ * Callback [I] The callback to fire.
+ * Parameter [I] The argument for the callback.
+ * DueTime [I] The delay, in milliseconds, before first firing the
+ * timer.
+ * Period [I] The period, in milliseconds, at which to fire the timer
+ * after the first callback. If zero, the timer will only
+ * fire once. It still needs to be deleted with
+ * RtlDeleteTimer.
+ * Flags [I] Flags controlling the execution of the callback. In
+ * addition to the WT_* thread pool flags (see
+ * RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
+ * WT_EXECUTEONLYONCE are supported.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, HANDLE *NewTimer,
+ RTL_WAITORTIMERCALLBACKFUNC Callback,
+ PVOID Parameter, DWORD DueTime, DWORD Period,
+ ULONG Flags)
+{
+ NTSTATUS status;
+ struct queue_timer *t;
+ struct timer_queue *q = get_timer_queue(TimerQueue);
+
+ if (!q) return STATUS_NO_MEMORY;
+ if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
+
+ t = RtlAllocateHeap(GetProcessHeap(), 0, sizeof *t);
+ if (!t)
+ return STATUS_NO_MEMORY;
+
+ t->q = q;
+ t->runcount = 0;
+ t->callback = Callback;
+ t->param = Parameter;
+ t->period = Period;
+ t->flags = Flags;
+ t->destroy = FALSE;
+ t->event = NULL;
+
+ status = STATUS_SUCCESS;
+ RtlEnterCriticalSection(&q->cs);
+ if (q->quit)
+ status = STATUS_INVALID_HANDLE;
+ else
+ queue_add_timer(t, queue_current_time() + DueTime, TRUE);
+ RtlLeaveCriticalSection(&q->cs);
+
+ if (status == STATUS_SUCCESS)
+ *NewTimer = t;
+ else
+ RtlFreeHeap(GetProcessHeap(), 0, t);
+
+ return status;
+}
+
+/***********************************************************************
+ * RtlUpdateTimer (NTDLL.@)
+ *
+ * Changes the time at which a timer expires.
+ *
+ * PARAMS
+ * TimerQueue [I] The queue that holds the timer.
+ * Timer [I] The timer to update.
+ * DueTime [I] The delay, in milliseconds, before next firing the timer.
+ * Period [I] The period, in milliseconds, at which to fire the timer
+ * after the first callback. If zero, the timer will not
+ * refire once. It still needs to be deleted with
+ * RtlDeleteTimer.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
+ DWORD DueTime, DWORD Period)
+{
+ struct queue_timer *t = Timer;
+ struct timer_queue *q = t->q;
+
+ RtlEnterCriticalSection(&q->cs);
+ /* Can't change a timer if it was once-only or destroyed. */
+ if (t->expire != EXPIRE_NEVER)
+ {
+ t->period = Period;
+ queue_move_timer(t, queue_current_time() + DueTime, TRUE);
+ }
+ RtlLeaveCriticalSection(&q->cs);
+
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * RtlDeleteTimer (NTDLL.@)
+ *
+ * Cancels a timer-queue timer.
+ *
+ * PARAMS
+ * TimerQueue [I] The queue that holds the timer.
+ * Timer [I] The timer to update.
+ * CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
+ * wait until the timer is finished firing all pending
+ * callbacks before returning. Otherwise, return
+ * immediately and set the timer is done.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
+ or if the completion event is NULL.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
+ HANDLE CompletionEvent)
+{
+ struct queue_timer *t = Timer;
+ struct timer_queue *q;
+ NTSTATUS status = STATUS_PENDING;
+ HANDLE event = NULL;
+
+ if (!Timer)
+ return STATUS_INVALID_PARAMETER_1;
+ q = t->q;
+ if (CompletionEvent == INVALID_HANDLE_VALUE)
+ {
+ status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent,
FALSE);
+ if (status == STATUS_SUCCESS)
+ status = STATUS_PENDING;
+ }
+ else if (CompletionEvent)
+ event = CompletionEvent;
+
+ RtlEnterCriticalSection(&q->cs);
+ t->event = event;
+ if (t->runcount == 0 && event)
+ status = STATUS_SUCCESS;
+ queue_destroy_timer(t);
+ RtlLeaveCriticalSection(&q->cs);
+
+ if (CompletionEvent == INVALID_HANDLE_VALUE && event)
+ {
+ if (status == STATUS_PENDING)
+ {
+ NtWaitForSingleObject(event, FALSE, NULL);
+ status = STATUS_SUCCESS;
+ }
+ NtClose(event);
+ }
+
+ return status;
+}
+
+/***********************************************************************
+ * timerqueue_thread_proc (internal)
+ */
+static void CALLBACK timerqueue_thread_proc( void *param )
+{
+ ULONGLONG timeout_lower, timeout_upper, new_timeout;
+ struct threadpool_object *other_timer;
+ LARGE_INTEGER now, timeout;
+ struct list *ptr;
+
+ TRACE( "starting timer queue thread\n" );
+ set_thread_name(L"wine_threadpool_timerqueue");
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+ for (;;)
+ {
+ NtQuerySystemTime( &now );
+
+ /* Check for expired timers. */
+ while ((ptr = list_head( &timerqueue.pending_timers )))
+ {
+ struct threadpool_object *timer = LIST_ENTRY( ptr, struct threadpool_object,
u.timer.timer_entry );
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+ assert( timer->u.timer.timer_pending );
+ if (timer->u.timer.timeout > now.QuadPart)
+ break;
+
+ /* Queue a new callback in one of the worker threads. */
+ list_remove( &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = FALSE;
+ tp_object_submit( timer, FALSE );
+
+ /* Insert the timer back into the queue, except it's marked for shutdown.
*/
+ if (timer->u.timer.period && !timer->shutdown)
+ {
+ timer->u.timer.timeout += (ULONGLONG)timer->u.timer.period *
10000;
+ if (timer->u.timer.timeout <= now.QuadPart)
+ timer->u.timer.timeout = now.QuadPart + 1;
+
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (timer->u.timer.timeout < other_timer->u.timer.timeout)
+ break;
+ }
+ list_add_before( &other_timer->u.timer.timer_entry,
&timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = TRUE;
+ }
+ }
+
+ timeout_lower = timeout_upper = MAXLONGLONG;
+
+ /* Determine next timeout and use the window length to optimize wakeup times. */
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (other_timer->u.timer.timeout >= timeout_upper)
+ break;
+
+ timeout_lower = other_timer->u.timer.timeout;
+ new_timeout = timeout_lower +
(ULONGLONG)other_timer->u.timer.window_length * 10000;
+ if (new_timeout < timeout_upper)
+ timeout_upper = new_timeout;
+ }
+
+ /* Wait for timer update events or until the next timer expires. */
+ if (timerqueue.objcount)
+ {
+ timeout.QuadPart = timeout_lower;
+ RtlSleepConditionVariableCS( &timerqueue.update_event,
&timerqueue.cs, &timeout );
+ continue;
+ }
+
+ /* All timers have been destroyed, if no new timers are created
+ * within some amount of time, then we can shutdown this thread. */
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ if (RtlSleepConditionVariableCS( &timerqueue.update_event,
&timerqueue.cs,
+ &timeout ) == STATUS_TIMEOUT && !timerqueue.objcount)
+ {
+ break;
+ }
+ }
+
+ timerqueue.thread_running = FALSE;
+ RtlLeaveCriticalSection( &timerqueue.cs );
+
+ TRACE( "terminating timer queue thread\n" );
+ RtlExitUserThread( 0 );
+}
+
+/***********************************************************************
+ * tp_new_worker_thread (internal)
+ *
+ * Create and account a new worker thread for the desired pool.
+ */
+static NTSTATUS tp_new_worker_thread( struct threadpool *pool )
+{
+ HANDLE thread;
+ NTSTATUS status;
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0,
+ pool->stack_info.StackReserve,
pool->stack_info.StackCommit,
+ threadpool_worker_proc, pool, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ InterlockedIncrement( &pool->refcount );
+ pool->num_workers++;
+ NtClose( thread );
+ }
+ return status;
+}
+
+/***********************************************************************
+ * tp_timerqueue_lock (internal)
+ *
+ * Acquires a lock on the global timerqueue. When the lock is acquired
+ * successfully, it is guaranteed that the timer thread is running.
+ */
+static NTSTATUS tp_timerqueue_lock( struct threadpool_object *timer )
+{
+ NTSTATUS status = STATUS_SUCCESS;
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ timer->u.timer.timer_initialized = FALSE;
+ timer->u.timer.timer_pending = FALSE;
+ timer->u.timer.timer_set = FALSE;
+ timer->u.timer.timeout = 0;
+ timer->u.timer.period = 0;
+ timer->u.timer.window_length = 0;
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ /* Make sure that the timerqueue thread is running. */
+ if (!timerqueue.thread_running)
+ {
+ HANDLE thread;
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
+ timerqueue_thread_proc, NULL, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ timerqueue.thread_running = TRUE;
+ NtClose( thread );
+ }
+ }
+
+ if (status == STATUS_SUCCESS)
+ {
+ timer->u.timer.timer_initialized = TRUE;
+ timerqueue.objcount++;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+ return status;
+}
+
+/***********************************************************************
+ * tp_timerqueue_unlock (internal)
+ *
+ * Releases a lock on the global timerqueue.
+ */
+static void tp_timerqueue_unlock( struct threadpool_object *timer )
+{
+ assert( timer->type == TP_OBJECT_TYPE_TIMER );
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+ if (timer->u.timer.timer_initialized)
+ {
+ /* If timer was pending, remove it. */
+ if (timer->u.timer.timer_pending)
+ {
+ list_remove( &timer->u.timer.timer_entry );
+ timer->u.timer.timer_pending = FALSE;
+ }
+
+ /* If the last timer object was destroyed, then wake up the thread. */
+ if (!--timerqueue.objcount)
+ {
+ assert( list_empty( &timerqueue.pending_timers ) );
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+ }
+
+ timer->u.timer.timer_initialized = FALSE;
+ }
+ RtlLeaveCriticalSection( &timerqueue.cs );
+}
+
+/***********************************************************************
+ * waitqueue_thread_proc (internal)
+ */
+static void CALLBACK waitqueue_thread_proc( void *param )
+{
+ struct threadpool_object *objects[MAXIMUM_WAITQUEUE_OBJECTS];
+ HANDLE handles[MAXIMUM_WAITQUEUE_OBJECTS + 1];
+ struct waitqueue_bucket *bucket = param;
+ struct threadpool_object *wait, *next;
+ LARGE_INTEGER now, timeout;
+ DWORD num_handles;
+ NTSTATUS status;
+
+ TRACE( "starting wait queue thread\n" );
+ set_thread_name(L"wine_threadpool_waitqueue");
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ for (;;)
+ {
+ NtQuerySystemTime( &now );
+ timeout.QuadPart = MAXLONGLONG;
+ num_handles = 0;
+
+ LIST_FOR_EACH_ENTRY_SAFE( wait, next, &bucket->waiting, struct
threadpool_object,
+ u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ if (wait->u.wait.timeout <= now.QuadPart)
+ {
+ /* Wait object timed out. */
+ if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
+ {
+ list_remove( &wait->u.wait.wait_entry );
+ list_add_tail( &bucket->reserved,
&wait->u.wait.wait_entry );
+ }
+ if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD |
WT_EXECUTEINIOTHREAD)))
+ {
+ InterlockedIncrement( &wait->refcount );
+ wait->num_pending_callbacks++;
+ RtlEnterCriticalSection( &wait->pool->cs );
+ tp_object_execute( wait, TRUE );
+ RtlLeaveCriticalSection( &wait->pool->cs );
+ tp_object_release( wait );
+ }
+ else tp_object_submit( wait, FALSE );
+ }
+ else
+ {
+ if (wait->u.wait.timeout < timeout.QuadPart)
+ timeout.QuadPart = wait->u.wait.timeout;
+
+ assert( num_handles < MAXIMUM_WAITQUEUE_OBJECTS );
+ InterlockedIncrement( &wait->refcount );
+ objects[num_handles] = wait;
+ handles[num_handles] = wait->u.wait.handle;
+ num_handles++;
+ }
+ }
+
+ if (!bucket->objcount)
+ {
+ /* All wait objects have been destroyed, if no new wait objects are created
+ * within some amount of time, then we can shutdown this thread. */
+ assert( num_handles == 0 );
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ status = NtWaitForMultipleObjects( 1, &bucket->update_event, TRUE,
bucket->alertable, &timeout );
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ if (status == STATUS_TIMEOUT && !bucket->objcount)
+ break;
+ }
+ else
+ {
+ handles[num_handles] = bucket->update_event;
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ status = NtWaitForMultipleObjects( num_handles + 1, handles, TRUE,
bucket->alertable, &timeout );
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ if (status >= STATUS_WAIT_0 && status < STATUS_WAIT_0 +
num_handles)
+ {
+ wait = objects[status - STATUS_WAIT_0];
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ if (wait->u.wait.bucket)
+ {
+ /* Wait object signaled. */
+ assert( wait->u.wait.bucket == bucket );
+ if ((wait->u.wait.flags & WT_EXECUTEONLYONCE))
+ {
+ list_remove( &wait->u.wait.wait_entry );
+ list_add_tail( &bucket->reserved,
&wait->u.wait.wait_entry );
+ }
+ if ((wait->u.wait.flags & (WT_EXECUTEINWAITTHREAD |
WT_EXECUTEINIOTHREAD)))
+ {
+ wait->u.wait.signaled++;
+ wait->num_pending_callbacks++;
+ RtlEnterCriticalSection( &wait->pool->cs );
+ tp_object_execute( wait, TRUE );
+ RtlLeaveCriticalSection( &wait->pool->cs );
+ }
+ else tp_object_submit( wait, TRUE );
+ }
+ else
+ WARN("wait object %p triggered while object was
destroyed\n", wait);
+ }
+
+ /* Release temporary references to wait objects. */
+ while (num_handles)
+ {
+ wait = objects[--num_handles];
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ tp_object_release( wait );
+ }
+ }
+
+ /* Try to merge bucket with other threads. */
+ if (waitqueue.num_buckets > 1 && bucket->objcount &&
+ bucket->objcount <= MAXIMUM_WAITQUEUE_OBJECTS * 1 / 3)
+ {
+ struct waitqueue_bucket *other_bucket;
+ LIST_FOR_EACH_ENTRY( other_bucket, &waitqueue.buckets, struct
waitqueue_bucket, bucket_entry )
+ {
+ if (other_bucket != bucket && other_bucket->objcount
&& other_bucket->alertable == bucket->alertable &&
+ other_bucket->objcount + bucket->objcount <=
MAXIMUM_WAITQUEUE_OBJECTS * 2 / 3)
+ {
+ other_bucket->objcount += bucket->objcount;
+ bucket->objcount = 0;
+
+ /* Update reserved list. */
+ LIST_FOR_EACH_ENTRY( wait, &bucket->reserved, struct
threadpool_object, u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ wait->u.wait.bucket = other_bucket;
+ }
+ list_move_tail( &other_bucket->reserved,
&bucket->reserved );
+
+ /* Update waiting list. */
+ LIST_FOR_EACH_ENTRY( wait, &bucket->waiting, struct
threadpool_object, u.wait.wait_entry )
+ {
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+ wait->u.wait.bucket = other_bucket;
+ }
+ list_move_tail( &other_bucket->waiting,
&bucket->waiting );
+
+ /* Move bucket to the end, to keep the probability of
+ * newly added wait objects as small as possible. */
+ list_remove( &bucket->bucket_entry );
+ list_add_tail( &waitqueue.buckets, &bucket->bucket_entry
);
+
+ NtSetEvent( other_bucket->update_event, NULL );
+ break;
+ }
+ }
+ }
+ }
+
+ /* Remove this bucket from the list. */
+ list_remove( &bucket->bucket_entry );
+ if (!--waitqueue.num_buckets)
+ assert( list_empty( &waitqueue.buckets ) );
+
+ RtlLeaveCriticalSection( &waitqueue.cs );
+
+ TRACE( "terminating wait queue thread\n" );
+
+ assert( bucket->objcount == 0 );
+ assert( list_empty( &bucket->reserved ) );
+ assert( list_empty( &bucket->waiting ) );
+ NtClose( bucket->update_event );
+
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+ RtlExitUserThread( 0 );
+}
+
+/***********************************************************************
+ * tp_waitqueue_lock (internal)
+ */
+static NTSTATUS tp_waitqueue_lock( struct threadpool_object *wait )
+{
+ struct waitqueue_bucket *bucket;
+ NTSTATUS status;
+ HANDLE thread;
+ BOOL alertable = (wait->u.wait.flags & WT_EXECUTEINIOTHREAD) != 0;
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+
+ wait->u.wait.signaled = 0;
+ wait->u.wait.bucket = NULL;
+ wait->u.wait.wait_pending = FALSE;
+ wait->u.wait.timeout = 0;
+ wait->u.wait.handle = INVALID_HANDLE_VALUE;
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ /* Try to assign to existing bucket if possible. */
+ LIST_FOR_EACH_ENTRY( bucket, &waitqueue.buckets, struct waitqueue_bucket,
bucket_entry )
+ {
+ if (bucket->objcount < MAXIMUM_WAITQUEUE_OBJECTS &&
bucket->alertable == alertable)
+ {
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+ wait->u.wait.bucket = bucket;
+ bucket->objcount++;
+
+ status = STATUS_SUCCESS;
+ goto out;
+ }
+ }
+
+ /* Create a new bucket and corresponding worker thread. */
+ bucket = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*bucket) );
+ if (!bucket)
+ {
+ status = STATUS_NO_MEMORY;
+ goto out;
+ }
+
+ bucket->objcount = 0;
+ bucket->alertable = alertable;
+ list_init( &bucket->reserved );
+ list_init( &bucket->waiting );
+
+ status = NtCreateEvent( &bucket->update_event, EVENT_ALL_ACCESS,
+ NULL, SynchronizationEvent, FALSE );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+ goto out;
+ }
+
+ status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE, 0, 0, 0,
+ waitqueue_thread_proc, bucket, &thread, NULL );
+ if (status == STATUS_SUCCESS)
+ {
+ list_add_tail( &waitqueue.buckets, &bucket->bucket_entry );
+ waitqueue.num_buckets++;
+
+ list_add_tail( &bucket->reserved, &wait->u.wait.wait_entry );
+ wait->u.wait.bucket = bucket;
+ bucket->objcount++;
+
+ NtClose( thread );
+ }
+ else
+ {
+ NtClose( bucket->update_event );
+ RtlFreeHeap( GetProcessHeap(), 0, bucket );
+ }
+
+out:
+ RtlLeaveCriticalSection( &waitqueue.cs );
+ return status;
+}
+
+/***********************************************************************
+ * tp_waitqueue_unlock (internal)
+ */
+static void tp_waitqueue_unlock( struct threadpool_object *wait )
+{
+ assert( wait->type == TP_OBJECT_TYPE_WAIT );
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+ if (wait->u.wait.bucket)
+ {
+ struct waitqueue_bucket *bucket = wait->u.wait.bucket;
+ assert( bucket->objcount > 0 );
+
+ list_remove( &wait->u.wait.wait_entry );
+ wait->u.wait.bucket = NULL;
+ bucket->objcount--;
+
+ NtSetEvent( bucket->update_event, NULL );
+ }
+ RtlLeaveCriticalSection( &waitqueue.cs );
+}
+
+static void CALLBACK ioqueue_thread_proc( void *param )
+{
+ struct io_completion *completion;
+ struct threadpool_object *io;
+ IO_STATUS_BLOCK iosb;
+ ULONG_PTR key, value;
+ BOOL destroy, skip;
+ NTSTATUS status;
+
+ TRACE( "starting I/O completion thread\n" );
+ set_thread_name(L"wine_threadpool_ioqueue");
+
+ RtlEnterCriticalSection( &ioqueue.cs );
+
+ for (;;)
+ {
+ RtlLeaveCriticalSection( &ioqueue.cs );
+ if ((status = NtRemoveIoCompletion( ioqueue.port, &key, &value,
&iosb, NULL )))
+ ERR("NtRemoveIoCompletion failed, status %#lx.\n", status);
+ RtlEnterCriticalSection( &ioqueue.cs );
+
+ destroy = skip = FALSE;
+ io = (struct threadpool_object *)key;
+
+ TRACE( "io %p, iosb.Status %#lx.\n", io, iosb.Status );
+
+ if (io && (io->shutdown || io->u.io.shutting_down))
+ {
+ RtlEnterCriticalSection( &io->pool->cs );
+ if (!io->u.io.pending_count)
+ {
+ if (io->u.io.skipped_count)
+ --io->u.io.skipped_count;
+
+ if (io->u.io.skipped_count)
+ skip = TRUE;
+ else
+ destroy = TRUE;
+ }
+ RtlLeaveCriticalSection( &io->pool->cs );
+ if (skip) continue;
+ }
+
+ if (destroy)
+ {
+ --ioqueue.objcount;
+ TRACE( "Releasing io %p.\n", io );
+ io->shutdown = TRUE;
+ tp_object_release( io );
+ }
+ else if (io)
+ {
+ RtlEnterCriticalSection( &io->pool->cs );
+
+ TRACE( "pending_count %u.\n", io->u.io.pending_count );
+
+ if (io->u.io.pending_count)
+ {
+ --io->u.io.pending_count;
+ if (!array_reserve((void **)&io->u.io.completions,
&io->u.io.completion_max,
+ io->u.io.completion_count + 1,
sizeof(*io->u.io.completions)))
+ {
+ ERR( "Failed to allocate memory.\n" );
+ RtlLeaveCriticalSection( &io->pool->cs );
+ continue;
+ }
+
+ completion =
&io->u.io.completions[io->u.io.completion_count++];
+ completion->iosb = iosb;
+ completion->cvalue = value;
+
+ tp_object_submit( io, FALSE );
+ }
+ RtlLeaveCriticalSection( &io->pool->cs );
+ }
+
+ if (!ioqueue.objcount)
+ {
+ /* All I/O objects have been destroyed; if no new objects are
+ * created within some amount of time, then we can shutdown this
+ * thread. */
+ LARGE_INTEGER timeout = {.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT *
-10000};
+ if (RtlSleepConditionVariableCS( &ioqueue.update_event, &ioqueue.cs,
+ &timeout) == STATUS_TIMEOUT && !ioqueue.objcount)
+ break;
+ }
+ }
+
+ ioqueue.thread_running = FALSE;
+ RtlLeaveCriticalSection( &ioqueue.cs );
+
+ TRACE( "terminating I/O completion thread\n" );
+
+ RtlExitUserThread( 0 );
+}
+
+static NTSTATUS tp_ioqueue_lock( struct threadpool_object *io, HANDLE file )
+{
+ NTSTATUS status = STATUS_SUCCESS;
+
+ assert( io->type == TP_OBJECT_TYPE_IO );
+
+ RtlEnterCriticalSection( &ioqueue.cs );
+
+ if (!ioqueue.port && (status = NtCreateIoCompletion( &ioqueue.port,
+ IO_COMPLETION_ALL_ACCESS, NULL, 0 )))
+ {
+ RtlLeaveCriticalSection( &ioqueue.cs );
+ return status;
+ }
+
+ if (!ioqueue.thread_running)
+ {
+ HANDLE thread;
+
+ if (!(status = RtlCreateUserThread( GetCurrentProcess(), NULL, FALSE,
+ 0, 0, 0, ioqueue_thread_proc, NULL,
&thread, NULL )))
+ {
+ ioqueue.thread_running = TRUE;
+ NtClose( thread );
+ }
+ }
+
+ if (status == STATUS_SUCCESS)
+ {
+ FILE_COMPLETION_INFORMATION info;
+ IO_STATUS_BLOCK iosb;
+
+ info.CompletionPort = ioqueue.port;
+ info.CompletionKey = (ULONG_PTR)io;
+
+ status = NtSetInformationFile( file, &iosb, &info, sizeof(info),
FileCompletionInformation );
+ }
+
+ if (status == STATUS_SUCCESS)
+ {
+ if (!ioqueue.objcount++)
+ RtlWakeConditionVariable( &ioqueue.update_event );
+ }
+
+ RtlLeaveCriticalSection( &ioqueue.cs );
+ return status;
+}
+
+/***********************************************************************
+ * tp_threadpool_alloc (internal)
+ *
+ * Allocates a new threadpool object.
+ */
+static NTSTATUS tp_threadpool_alloc( struct threadpool **out )
+{
+ IMAGE_NT_HEADERS *nt = RtlImageNtHeader( NtCurrentTeb()->Peb->ImageBaseAddress
);
+ struct threadpool *pool;
+ unsigned int i;
+
+ pool = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*pool) );
+ if (!pool)
+ return STATUS_NO_MEMORY;
+
+ pool->refcount = 1;
+ pool->objcount = 0;
+ pool->shutdown = FALSE;
+
+ RtlInitializeCriticalSectionEx( &pool->cs, 0,
RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
+ pool->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ":
threadpool.cs");
+
+ for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+ list_init( &pool->pools[i] );
+ RtlInitializeConditionVariable( &pool->update_event );
+
+ pool->max_workers = 500;
+ pool->min_workers = 0;
+ pool->num_workers = 0;
+ pool->num_busy_workers = 0;
+ pool->stack_info.StackReserve = nt->OptionalHeader.SizeOfStackReserve;
+ pool->stack_info.StackCommit = nt->OptionalHeader.SizeOfStackCommit;
+
+ TRACE( "allocated threadpool %p\n", pool );
+
+ *out = pool;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * tp_threadpool_shutdown (internal)
+ *
+ * Prepares the shutdown of a threadpool object and notifies all worker
+ * threads to terminate (after all remaining work items have been
+ * processed).
+ */
+static void tp_threadpool_shutdown( struct threadpool *pool )
+{
+ assert( pool != default_threadpool );
+
+ pool->shutdown = TRUE;
+ RtlWakeAllConditionVariable( &pool->update_event );
+}
+
+/***********************************************************************
+ * tp_threadpool_release (internal)
+ *
+ * Releases a reference to a threadpool object.
+ */
+static BOOL tp_threadpool_release( struct threadpool *pool )
+{
+ unsigned int i;
+
+ if (InterlockedDecrement( &pool->refcount ))
+ return FALSE;
+
+ TRACE( "destroying threadpool %p\n", pool );
+
+ assert( pool->shutdown );
+ assert( !pool->objcount );
+ for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+ assert( list_empty( &pool->pools[i] ) );
+
+ pool->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &pool->cs );
+
+ RtlFreeHeap( GetProcessHeap(), 0, pool );
+ return TRUE;
+}
+
+/***********************************************************************
+ * tp_threadpool_lock (internal)
+ *
+ * Acquires a lock on a threadpool, specified with an TP_CALLBACK_ENVIRON
+ * block. When the lock is acquired successfully, it is guaranteed that
+ * there is at least one worker thread to process tasks.
+ */
+static NTSTATUS tp_threadpool_lock( struct threadpool **out, TP_CALLBACK_ENVIRON
*environment )
+{
+ struct threadpool *pool = NULL;
+ NTSTATUS status = STATUS_SUCCESS;
+
+ if (environment)
+ {
+ /* Validate environment parameters. */
+ if (environment->Version == 3)
+ {
+ TP_CALLBACK_ENVIRON_V3 *environment3 = (TP_CALLBACK_ENVIRON_V3
*)environment;
+
+ switch (environment3->CallbackPriority)
+ {
+ case TP_CALLBACK_PRIORITY_HIGH:
+ case TP_CALLBACK_PRIORITY_NORMAL:
+ case TP_CALLBACK_PRIORITY_LOW:
+ break;
+ default:
+ return STATUS_INVALID_PARAMETER;
+ }
+ }
+
+ pool = (struct threadpool *)environment->Pool;
+ }
+
+ if (!pool)
+ {
+ if (!default_threadpool)
+ {
+ status = tp_threadpool_alloc( &pool );
+ if (status != STATUS_SUCCESS)
+ return status;
+
+ if (InterlockedCompareExchangePointer( (void *)&default_threadpool, pool,
NULL ) != NULL)
+ {
+ tp_threadpool_shutdown( pool );
+ tp_threadpool_release( pool );
+ }
+ }
+
+ pool = default_threadpool;
+ }
+
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Make sure that the threadpool has at least one thread. */
+ if (!pool->num_workers)
+ status = tp_new_worker_thread( pool );
+
+ /* Keep a reference, and increment objcount to ensure that the
+ * last thread doesn't terminate. */
+ if (status == STATUS_SUCCESS)
+ {
+ InterlockedIncrement( &pool->refcount );
+ pool->objcount++;
+ }
+
+ RtlLeaveCriticalSection( &pool->cs );
+
+ if (status != STATUS_SUCCESS)
+ return status;
+
+ *out = pool;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * tp_threadpool_unlock (internal)
+ *
+ * Releases a lock on a threadpool.
+ */
+static void tp_threadpool_unlock( struct threadpool *pool )
+{
+ RtlEnterCriticalSection( &pool->cs );
+ pool->objcount--;
+ RtlLeaveCriticalSection( &pool->cs );
+ tp_threadpool_release( pool );
+}
+
+/***********************************************************************
+ * tp_group_alloc (internal)
+ *
+ * Allocates a new threadpool group object.
+ */
+static NTSTATUS tp_group_alloc( struct threadpool_group **out )
+{
+ struct threadpool_group *group;
+
+ group = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*group) );
+ if (!group)
+ return STATUS_NO_MEMORY;
+
+ group->refcount = 1;
+ group->shutdown = FALSE;
+
+ RtlInitializeCriticalSectionEx( &group->cs, 0,
RTL_CRITICAL_SECTION_FLAG_FORCE_DEBUG_INFO );
+ group->cs.DebugInfo->Spare[0] = (DWORD_PTR)(__FILE__ ":
threadpool_group.cs");
+
+ list_init( &group->members );
+
+ TRACE( "allocated group %p\n", group );
+
+ *out = group;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * tp_group_shutdown (internal)
+ *
+ * Marks the group object for shutdown.
+ */
+static void tp_group_shutdown( struct threadpool_group *group )
+{
+ group->shutdown = TRUE;
+}
+
+/***********************************************************************
+ * tp_group_release (internal)
+ *
+ * Releases a reference to a group object.
+ */
+static BOOL tp_group_release( struct threadpool_group *group )
+{
+ if (InterlockedDecrement( &group->refcount ))
+ return FALSE;
+
+ TRACE( "destroying group %p\n", group );
+
+ assert( group->shutdown );
+ assert( list_empty( &group->members ) );
+
+ group->cs.DebugInfo->Spare[0] = 0;
+ RtlDeleteCriticalSection( &group->cs );
+
+ RtlFreeHeap( GetProcessHeap(), 0, group );
+ return TRUE;
+}
+
+/***********************************************************************
+ * tp_object_initialize (internal)
+ *
+ * Initializes members of a threadpool object.
+ */
+static void tp_object_initialize( struct threadpool_object *object, struct threadpool
*pool,
+ PVOID userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ BOOL is_simple_callback = (object->type == TP_OBJECT_TYPE_SIMPLE);
+
+ object->refcount = 1;
+ object->shutdown = FALSE;
+
+ object->pool = pool;
+ object->group = NULL;
+ object->userdata = userdata;
+ object->group_cancel_callback = NULL;
+ object->finalization_callback = NULL;
+ object->may_run_long = 0;
+ object->race_dll = NULL;
+ object->priority = TP_CALLBACK_PRIORITY_NORMAL;
+
+ memset( &object->group_entry, 0, sizeof(object->group_entry) );
+ object->is_group_member = FALSE;
+
+ memset( &object->pool_entry, 0, sizeof(object->pool_entry) );
+ RtlInitializeConditionVariable( &object->finished_event );
+ RtlInitializeConditionVariable( &object->group_finished_event );
+ object->completed_event = NULL;
+ object->num_pending_callbacks = 0;
+ object->num_running_callbacks = 0;
+ object->num_associated_callbacks = 0;
+
+ if (environment)
+ {
+ if (environment->Version != 1 && environment->Version != 3)
+ FIXME( "unsupported environment version %lu\n",
environment->Version );
+
+ object->group = impl_from_TP_CLEANUP_GROUP( environment->CleanupGroup );
+ object->group_cancel_callback = environment->CleanupGroupCancelCallback;
+ object->finalization_callback = environment->FinalizationCallback;
+ object->may_run_long = environment->u.s.LongFunction != 0;
+ object->race_dll = environment->RaceDll;
+ if (environment->Version == 3)
+ {
+ TP_CALLBACK_ENVIRON_V3 *environment_v3 = (TP_CALLBACK_ENVIRON_V3
*)environment;
+
+ object->priority = environment_v3->CallbackPriority;
+ assert( object->priority < ARRAY_SIZE(pool->pools) );
+ }
+
+ if (environment->ActivationContext)
+ FIXME( "activation context not supported yet\n" );
+
+ if (environment->u.s.Persistent)
+ FIXME( "persistent threads not supported yet\n" );
+ }
+
+ if (object->race_dll)
+ LdrAddRefDll( 0, object->race_dll );
+
+ TRACE( "allocated object %p of type %u\n", object, object->type );
+
+ /* For simple callbacks we have to run tp_object_submit before adding this object
+ * to the cleanup group. As soon as the cleanup group members are released
->shutdown
+ * will be set, and tp_object_submit would fail with an assertion. */
+
+ if (is_simple_callback)
+ tp_object_submit( object, FALSE );
+
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+ InterlockedIncrement( &group->refcount );
+
+ RtlEnterCriticalSection( &group->cs );
+ list_add_tail( &group->members, &object->group_entry );
+ object->is_group_member = TRUE;
+ RtlLeaveCriticalSection( &group->cs );
+ }
+
+ if (is_simple_callback)
+ tp_object_release( object );
+}
+
+static void tp_object_prio_queue( struct threadpool_object *object )
+{
+ ++object->pool->num_busy_workers;
+ list_add_tail( &object->pool->pools[object->priority],
&object->pool_entry );
+}
+
+/***********************************************************************
+ * tp_object_submit (internal)
+ *
+ * Submits a threadpool object to the associated threadpool. This
+ * function has to be VOID because TpPostWork can never fail on Windows.
+ */
+static void tp_object_submit( struct threadpool_object *object, BOOL signaled )
+{
+ struct threadpool *pool = object->pool;
+ NTSTATUS status = STATUS_UNSUCCESSFUL;
+
+ assert( !object->shutdown );
+ assert( !pool->shutdown );
+
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Start new worker threads if required. */
+ if (pool->num_busy_workers >= pool->num_workers &&
+ pool->num_workers < pool->max_workers)
+ status = tp_new_worker_thread( pool );
+
+ /* Queue work item and increment refcount. */
+ InterlockedIncrement( &object->refcount );
+ if (!object->num_pending_callbacks++)
+ tp_object_prio_queue( object );
+
+ /* Count how often the object was signaled. */
+ if (object->type == TP_OBJECT_TYPE_WAIT && signaled)
+ object->u.wait.signaled++;
+
+ /* No new thread started - wake up one existing thread. */
+ if (status != STATUS_SUCCESS)
+ {
+ assert( pool->num_workers > 0 );
+ RtlWakeConditionVariable( &pool->update_event );
+ }
+
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
+/***********************************************************************
+ * tp_object_cancel (internal)
+ *
+ * Cancels all currently pending callbacks for a specific object.
+ */
+static void tp_object_cancel( struct threadpool_object *object )
+{
+ struct threadpool *pool = object->pool;
+ LONG pending_callbacks = 0;
+
+ RtlEnterCriticalSection( &pool->cs );
+ if (object->num_pending_callbacks)
+ {
+ pending_callbacks = object->num_pending_callbacks;
+ object->num_pending_callbacks = 0;
+ list_remove( &object->pool_entry );
+
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ object->u.wait.signaled = 0;
+ }
+ if (object->type == TP_OBJECT_TYPE_IO)
+ {
+ object->u.io.skipped_count += object->u.io.pending_count;
+ object->u.io.pending_count = 0;
+ }
+ RtlLeaveCriticalSection( &pool->cs );
+
+ while (pending_callbacks--)
+ tp_object_release( object );
+}
+
+static BOOL object_is_finished( struct threadpool_object *object, BOOL group )
+{
+ if (object->num_pending_callbacks)
+ return FALSE;
+ if (object->type == TP_OBJECT_TYPE_IO && object->u.io.pending_count)
+ return FALSE;
+
+ if (group)
+ return !object->num_running_callbacks;
+ else
+ return !object->num_associated_callbacks;
+}
+
+/***********************************************************************
+ * tp_object_wait (internal)
+ *
+ * Waits until all pending and running callbacks of a specific object
+ * have been processed.
+ */
+static void tp_object_wait( struct threadpool_object *object, BOOL group_wait )
+{
+ struct threadpool *pool = object->pool;
+
+ RtlEnterCriticalSection( &pool->cs );
+ while (!object_is_finished( object, group_wait ))
+ {
+ if (group_wait)
+ RtlSleepConditionVariableCS( &object->group_finished_event,
&pool->cs, NULL );
+ else
+ RtlSleepConditionVariableCS( &object->finished_event,
&pool->cs, NULL );
+ }
+ RtlLeaveCriticalSection( &pool->cs );
+}
+
+static void tp_ioqueue_unlock( struct threadpool_object *io )
+{
+ assert( io->type == TP_OBJECT_TYPE_IO );
+
+ RtlEnterCriticalSection( &ioqueue.cs );
+
+ assert(ioqueue.objcount);
+
+ if (!io->shutdown && !--ioqueue.objcount)
+ NtSetIoCompletion( ioqueue.port, 0, 0, STATUS_SUCCESS, 0 );
+
+ RtlLeaveCriticalSection( &ioqueue.cs );
+}
+
+/***********************************************************************
+ * tp_object_prepare_shutdown (internal)
+ *
+ * Prepares a threadpool object for shutdown.
+ */
+static void tp_object_prepare_shutdown( struct threadpool_object *object )
+{
+ if (object->type == TP_OBJECT_TYPE_TIMER)
+ tp_timerqueue_unlock( object );
+ else if (object->type == TP_OBJECT_TYPE_WAIT)
+ tp_waitqueue_unlock( object );
+ else if (object->type == TP_OBJECT_TYPE_IO)
+ tp_ioqueue_unlock( object );
+}
+
+/***********************************************************************
+ * tp_object_release (internal)
+ *
+ * Releases a reference to a threadpool object.
+ */
+static BOOL tp_object_release( struct threadpool_object *object )
+{
+ if (InterlockedDecrement( &object->refcount ))
+ return FALSE;
+
+ TRACE( "destroying object %p of type %u\n", object, object->type );
+
+ assert( object->shutdown );
+ assert( !object->num_pending_callbacks );
+ assert( !object->num_running_callbacks );
+ assert( !object->num_associated_callbacks );
+
+ /* release reference to the group */
+ if (object->group)
+ {
+ struct threadpool_group *group = object->group;
+
+ RtlEnterCriticalSection( &group->cs );
+ if (object->is_group_member)
+ {
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ }
+ RtlLeaveCriticalSection( &group->cs );
+
+ tp_group_release( group );
+ }
+
+ tp_threadpool_unlock( object->pool );
+
+ if (object->race_dll)
+ LdrUnloadDll( object->race_dll );
+
+ if (object->completed_event && object->completed_event !=
INVALID_HANDLE_VALUE)
+ NtSetEvent( object->completed_event, NULL );
+
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return TRUE;
+}
+
+static struct list *threadpool_get_next_item( const struct threadpool *pool )
+{
+ struct list *ptr;
+ unsigned int i;
+
+ for (i = 0; i < ARRAY_SIZE(pool->pools); ++i)
+ {
+ if ((ptr = list_head( &pool->pools[i] )))
+ break;
+ }
+
+ return ptr;
+}
+
+/***********************************************************************
+ * tp_object_execute (internal)
+ *
+ * Executes a threadpool object callback, object->pool->cs has to be
+ * held.
+ */
+static void tp_object_execute( struct threadpool_object *object, BOOL wait_thread )
+{
+ TP_CALLBACK_INSTANCE *callback_instance;
+ struct threadpool_instance instance;
+ struct io_completion completion;
+ struct threadpool *pool = object->pool;
+ TP_WAIT_RESULT wait_result = 0;
+ NTSTATUS status;
+
+ object->num_pending_callbacks--;
+
+ /* For wait objects check if they were signaled or have timed out. */
+ if (object->type == TP_OBJECT_TYPE_WAIT)
+ {
+ wait_result = object->u.wait.signaled ? WAIT_OBJECT_0 : WAIT_TIMEOUT;
+ if (wait_result == WAIT_OBJECT_0) object->u.wait.signaled--;
+ }
+ else if (object->type == TP_OBJECT_TYPE_IO)
+ {
+ assert( object->u.io.completion_count );
+ completion = object->u.io.completions[--object->u.io.completion_count];
+ }
+
+ /* Leave critical section and do the actual callback. */
+ object->num_associated_callbacks++;
+ object->num_running_callbacks++;
+ RtlLeaveCriticalSection( &pool->cs );
+ if (wait_thread) RtlLeaveCriticalSection( &waitqueue.cs );
+
+ /* Initialize threadpool instance struct. */
+ callback_instance = (TP_CALLBACK_INSTANCE *)&instance;
+ instance.object = object;
+ instance.threadid = GetCurrentThreadId();
+ instance.associated = TRUE;
+ instance.may_run_long = object->may_run_long;
+ instance.cleanup.critical_section = NULL;
+ instance.cleanup.mutex = NULL;
+ instance.cleanup.semaphore = NULL;
+ instance.cleanup.semaphore_count = 0;
+ instance.cleanup.event = NULL;
+ instance.cleanup.library = NULL;
+
+ switch (object->type)
+ {
+ case TP_OBJECT_TYPE_SIMPLE:
+ {
+ TRACE( "executing simple callback %p(%p, %p)\n",
+ object->u.simple.callback, callback_instance, object->userdata
);
+ object->u.simple.callback( callback_instance, object->userdata );
+ TRACE( "callback %p returned\n", object->u.simple.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_WORK:
+ {
+ TRACE( "executing work callback %p(%p, %p, %p)\n",
+ object->u.work.callback, callback_instance, object->userdata,
object );
+ object->u.work.callback( callback_instance, object->userdata, (TP_WORK
*)object );
+ TRACE( "callback %p returned\n", object->u.work.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_TIMER:
+ {
+ TRACE( "executing timer callback %p(%p, %p, %p)\n",
+ object->u.timer.callback, callback_instance, object->userdata,
object );
+ object->u.timer.callback( callback_instance, object->userdata,
(TP_TIMER *)object );
+ TRACE( "callback %p returned\n", object->u.timer.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_WAIT:
+ {
+ TRACE( "executing wait callback %p(%p, %p, %p, %lu)\n",
+ object->u.wait.callback, callback_instance, object->userdata,
object, wait_result );
+ object->u.wait.callback( callback_instance, object->userdata, (TP_WAIT
*)object, wait_result );
+ TRACE( "callback %p returned\n", object->u.wait.callback );
+ break;
+ }
+
+ case TP_OBJECT_TYPE_IO:
+ {
+ TRACE( "executing I/O callback %p(%p, %p, %#Ix, %p, %p)\n",
+ object->u.io.callback, callback_instance, object->userdata,
+ completion.cvalue, &completion.iosb, (TP_IO *)object );
+ object->u.io.callback( callback_instance, object->userdata,
+ (void *)completion.cvalue, &completion.iosb, (TP_IO *)object );
+ TRACE( "callback %p returned\n", object->u.io.callback );
+ break;
+ }
+
+ default:
+ assert(0);
+ break;
+ }
+
+ /* Execute finalization callback. */
+ if (object->finalization_callback)
+ {
+ TRACE( "executing finalization callback %p(%p, %p)\n",
+ object->finalization_callback, callback_instance, object->userdata
);
+ object->finalization_callback( callback_instance, object->userdata );
+ TRACE( "callback %p returned\n", object->finalization_callback );
+ }
+
+ /* Execute cleanup tasks. */
+ if (instance.cleanup.critical_section)
+ {
+ RtlLeaveCriticalSection( instance.cleanup.critical_section );
+ }
+ if (instance.cleanup.mutex)
+ {
+ status = NtReleaseMutant( instance.cleanup.mutex, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.semaphore)
+ {
+ status = NtReleaseSemaphore( instance.cleanup.semaphore,
instance.cleanup.semaphore_count, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.event)
+ {
+ status = NtSetEvent( instance.cleanup.event, NULL );
+ if (status != STATUS_SUCCESS) goto skip_cleanup;
+ }
+ if (instance.cleanup.library)
+ {
+ LdrUnloadDll( instance.cleanup.library );
+ }
+
+skip_cleanup:
+ if (wait_thread) RtlEnterCriticalSection( &waitqueue.cs );
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Simple callbacks are automatically shutdown after execution. */
+ if (object->type == TP_OBJECT_TYPE_SIMPLE)
+ {
+ tp_object_prepare_shutdown( object );
+ object->shutdown = TRUE;
+ }
+
+ object->num_running_callbacks--;
+ if (object_is_finished( object, TRUE ))
+ RtlWakeAllConditionVariable( &object->group_finished_event );
+
+ if (instance.associated)
+ {
+ object->num_associated_callbacks--;
+ if (object_is_finished( object, FALSE ))
+ RtlWakeAllConditionVariable( &object->finished_event );
+ }
+}
+
+/***********************************************************************
+ * threadpool_worker_proc (internal)
+ */
+static void CALLBACK threadpool_worker_proc( void *param )
+{
+ struct threadpool *pool = param;
+ LARGE_INTEGER timeout;
+ struct list *ptr;
+
+ TRACE( "starting worker thread for pool %p\n", pool );
+ set_thread_name(L"wine_threadpool_worker");
+
+ RtlEnterCriticalSection( &pool->cs );
+ for (;;)
+ {
+ while ((ptr = threadpool_get_next_item( pool )))
+ {
+ struct threadpool_object *object = LIST_ENTRY( ptr, struct threadpool_object,
pool_entry );
+ assert( object->num_pending_callbacks > 0 );
+
+ /* If further pending callbacks are queued, move the work item to
+ * the end of the pool list. Otherwise remove it from the pool. */
+ list_remove( &object->pool_entry );
+ if (object->num_pending_callbacks > 1)
+ tp_object_prio_queue( object );
+
+ tp_object_execute( object, FALSE );
+
+ assert(pool->num_busy_workers);
+ pool->num_busy_workers--;
+
+ tp_object_release( object );
+ }
+
+ /* Shutdown worker thread if requested. */
+ if (pool->shutdown)
+ break;
+
+ /* Wait for new tasks or until the timeout expires. A thread only terminates
+ * when no new tasks are available, and the number of threads can be
+ * decreased without violating the min_workers limit. An exception is when
+ * min_workers == 0, then objcount is used to detect if the last thread
+ * can be terminated. */
+ timeout.QuadPart = (ULONGLONG)THREADPOOL_WORKER_TIMEOUT * -10000;
+ if (RtlSleepConditionVariableCS( &pool->update_event, &pool->cs,
&timeout ) == STATUS_TIMEOUT &&
+ !threadpool_get_next_item( pool ) && (pool->num_workers > max(
pool->min_workers, 1 ) ||
+ (!pool->min_workers && !pool->objcount)))
+ {
+ break;
+ }
+ }
+ pool->num_workers--;
+ RtlLeaveCriticalSection( &pool->cs );
+
+ TRACE( "terminating worker thread for pool %p\n", pool );
+ tp_threadpool_release( pool );
+ RtlExitUserThread( 0 );
+}
+
+/***********************************************************************
+ * TpAllocCleanupGroup (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocCleanupGroup( TP_CLEANUP_GROUP **out )
+{
+ TRACE( "%p\n", out );
+
+ return tp_group_alloc( (struct threadpool_group **)out );
+}
+
+/***********************************************************************
+ * TpAllocIoCompletion (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocIoCompletion( TP_IO **out, HANDLE file, PTP_IO_CALLBACK callback,
+ void *userdata, TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ TRACE( "%p %p %p %p %p\n", out, file, callback, userdata, environment );
+
+ if (!(object = RtlAllocateHeap( GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(*object)
)))
+ return STATUS_NO_MEMORY;
+
+ if ((status = tp_threadpool_lock( &pool, environment )))
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_IO;
+ object->u.io.callback = callback;
+ if (!(object->u.io.completions = RtlAllocateHeap( GetProcessHeap(), 0, 8 *
sizeof(*object->u.io.completions) )))
+ {
+ tp_threadpool_unlock( pool );
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ if ((status = tp_ioqueue_lock( object, file )))
+ {
+ tp_threadpool_unlock( pool );
+ RtlFreeHeap( GetProcessHeap(), 0, object->u.io.completions );
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = (TP_IO *)object;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * TpAllocPool (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocPool( TP_POOL **out, PVOID reserved )
+{
+ TRACE( "%p %p\n", out, reserved );
+
+ if (reserved)
+ FIXME( "reserved argument is nonzero (%p)\n", reserved );
+
+ return tp_threadpool_alloc( (struct threadpool **)out );
+}
+
+/***********************************************************************
+ * TpAllocTimer (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocTimer( TP_TIMER **out, PTP_TIMER_CALLBACK callback, PVOID
userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ status = tp_threadpool_lock( &pool, environment );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_TIMER;
+ object->u.timer.callback = callback;
+
+ status = tp_timerqueue_lock( object );
+ if (status)
+ {
+ tp_threadpool_unlock( pool );
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = (TP_TIMER *)object;
+ return STATUS_SUCCESS;
+}
+
+static NTSTATUS tp_alloc_wait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID
userdata,
+ TP_CALLBACK_ENVIRON *environment, DWORD flags )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ status = tp_threadpool_lock( &pool, environment );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_WAIT;
+ object->u.wait.callback = callback;
+ object->u.wait.flags = flags;
+
+ status = tp_waitqueue_lock( object );
+ if (status)
+ {
+ tp_threadpool_unlock( pool );
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = (TP_WAIT *)object;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * TpAllocWait (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocWait( TP_WAIT **out, PTP_WAIT_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
+ return tp_alloc_wait( out, callback, userdata, environment, WT_EXECUTEONLYONCE );
+}
+
+/***********************************************************************
+ * TpAllocWork (NTDLL.@)
+ */
+NTSTATUS WINAPI TpAllocWork( TP_WORK **out, PTP_WORK_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ TRACE( "%p %p %p %p\n", out, callback, userdata, environment );
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ status = tp_threadpool_lock( &pool, environment );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_WORK;
+ object->u.work.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment );
+
+ *out = (TP_WORK *)object;
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * TpCancelAsyncIoOperation (NTDLL.@)
+ */
+void WINAPI TpCancelAsyncIoOperation( TP_IO *io )
+{
+ struct threadpool_object *this = impl_from_TP_IO( io );
+
+ TRACE( "%p\n", io );
+
+ RtlEnterCriticalSection( &this->pool->cs );
+
+ TRACE("pending_count %u.\n", this->u.io.pending_count);
+
+ this->u.io.pending_count--;
+ if (object_is_finished( this, TRUE ))
+ RtlWakeAllConditionVariable( &this->group_finished_event );
+ if (object_is_finished( this, FALSE ))
+ RtlWakeAllConditionVariable( &this->finished_event );
+
+ RtlLeaveCriticalSection( &this->pool->cs );
+}
+
+/***********************************************************************
+ * TpCallbackLeaveCriticalSectionOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackLeaveCriticalSectionOnCompletion( TP_CALLBACK_INSTANCE *instance,
CRITICAL_SECTION *crit )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+
+ TRACE( "%p %p\n", instance, crit );
+
+ if (!this->cleanup.critical_section)
+ this->cleanup.critical_section = crit;
+}
+
+/***********************************************************************
+ * TpCallbackMayRunLong (NTDLL.@)
+ */
+NTSTATUS WINAPI TpCallbackMayRunLong( TP_CALLBACK_INSTANCE *instance )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ struct threadpool_object *object = this->object;
+ struct threadpool *pool;
+ NTSTATUS status = STATUS_SUCCESS;
+
+ TRACE( "%p\n", instance );
+
+ if (this->threadid != GetCurrentThreadId())
+ {
+ ERR("called from wrong thread, ignoring\n");
+ return STATUS_UNSUCCESSFUL; /* FIXME */
+ }
+
+ if (this->may_run_long)
+ return STATUS_SUCCESS;
+
+ pool = object->pool;
+ RtlEnterCriticalSection( &pool->cs );
+
+ /* Start new worker threads if required. */
+ if (pool->num_busy_workers >= pool->num_workers)
+ {
+ if (pool->num_workers < pool->max_workers)
+ {
+ status = tp_new_worker_thread( pool );
+ }
+ else
+ {
+ status = STATUS_TOO_MANY_THREADS;
+ }
+ }
+
+ RtlLeaveCriticalSection( &pool->cs );
+ this->may_run_long = TRUE;
+ return status;
+}
+
+/***********************************************************************
+ * TpCallbackReleaseMutexOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackReleaseMutexOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE
mutex )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+
+ TRACE( "%p %p\n", instance, mutex );
+
+ if (!this->cleanup.mutex)
+ this->cleanup.mutex = mutex;
+}
+
+/***********************************************************************
+ * TpCallbackReleaseSemaphoreOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackReleaseSemaphoreOnCompletion( TP_CALLBACK_INSTANCE *instance,
HANDLE semaphore, DWORD count )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+
+ TRACE( "%p %p %lu\n", instance, semaphore, count );
+
+ if (!this->cleanup.semaphore)
+ {
+ this->cleanup.semaphore = semaphore;
+ this->cleanup.semaphore_count = count;
+ }
+}
+
+/***********************************************************************
+ * TpCallbackSetEventOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackSetEventOnCompletion( TP_CALLBACK_INSTANCE *instance, HANDLE event
)
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+
+ TRACE( "%p %p\n", instance, event );
+
+ if (!this->cleanup.event)
+ this->cleanup.event = event;
+}
+
+/***********************************************************************
+ * TpCallbackUnloadDllOnCompletion (NTDLL.@)
+ */
+VOID WINAPI TpCallbackUnloadDllOnCompletion( TP_CALLBACK_INSTANCE *instance, HMODULE
module )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+
+ TRACE( "%p %p\n", instance, module );
+
+ if (!this->cleanup.library)
+ this->cleanup.library = module;
+}
+
+/***********************************************************************
+ * TpDisassociateCallback (NTDLL.@)
+ */
+VOID WINAPI TpDisassociateCallback( TP_CALLBACK_INSTANCE *instance )
+{
+ struct threadpool_instance *this = impl_from_TP_CALLBACK_INSTANCE( instance );
+ struct threadpool_object *object = this->object;
+ struct threadpool *pool;
+
+ TRACE( "%p\n", instance );
+
+ if (this->threadid != GetCurrentThreadId())
+ {
+ ERR("called from wrong thread, ignoring\n");
+ return;
+ }
+
+ if (!this->associated)
+ return;
+
+ pool = object->pool;
+ RtlEnterCriticalSection( &pool->cs );
+
+ object->num_associated_callbacks--;
+ if (object_is_finished( object, FALSE ))
+ RtlWakeAllConditionVariable( &object->finished_event );
+
+ RtlLeaveCriticalSection( &pool->cs );
+ this->associated = FALSE;
+}
+
+/***********************************************************************
+ * TpIsTimerSet (NTDLL.@)
+ */
+BOOL WINAPI TpIsTimerSet( TP_TIMER *timer )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+ TRACE( "%p\n", timer );
+
+ return this->u.timer.timer_set;
+}
+
+/***********************************************************************
+ * TpPostWork (NTDLL.@)
+ */
+VOID WINAPI TpPostWork( TP_WORK *work )
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+
+ TRACE( "%p\n", work );
+
+ tp_object_submit( this, FALSE );
+}
+
+/***********************************************************************
+ * TpReleaseCleanupGroup (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroup( TP_CLEANUP_GROUP *group )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+
+ TRACE( "%p\n", group );
+
+ tp_group_shutdown( this );
+ tp_group_release( this );
+}
+
+/***********************************************************************
+ * TpReleaseCleanupGroupMembers (NTDLL.@)
+ */
+VOID WINAPI TpReleaseCleanupGroupMembers( TP_CLEANUP_GROUP *group, BOOL cancel_pending,
PVOID userdata )
+{
+ struct threadpool_group *this = impl_from_TP_CLEANUP_GROUP( group );
+ struct threadpool_object *object, *next;
+ struct list members;
+
+ TRACE( "%p %u %p\n", group, cancel_pending, userdata );
+
+ RtlEnterCriticalSection( &this->cs );
+
+ /* Unset group, increase references, and mark objects for shutdown */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &this->members, struct
threadpool_object, group_entry )
+ {
+ assert( object->group == this );
+ assert( object->is_group_member );
+
+ if (InterlockedIncrement( &object->refcount ) == 1)
+ {
+ /* Object is basically already destroyed, but group reference
+ * was not deleted yet. We can safely ignore this object. */
+ InterlockedDecrement( &object->refcount );
+ list_remove( &object->group_entry );
+ object->is_group_member = FALSE;
+ continue;
+ }
+
+ object->is_group_member = FALSE;
+ tp_object_prepare_shutdown( object );
+ }
+
+ /* Move members to a new temporary list */
+ list_init( &members );
+ list_move_tail( &members, &this->members );
+
+ RtlLeaveCriticalSection( &this->cs );
+
+ /* Cancel pending callbacks if requested */
+ if (cancel_pending)
+ {
+ LIST_FOR_EACH_ENTRY( object, &members, struct threadpool_object, group_entry
)
+ {
+ tp_object_cancel( object );
+ }
+ }
+
+ /* Wait for remaining callbacks to finish */
+ LIST_FOR_EACH_ENTRY_SAFE( object, next, &members, struct threadpool_object,
group_entry )
+ {
+ tp_object_wait( object, TRUE );
+
+ if (!object->shutdown)
+ {
+ /* Execute group cancellation callback if defined, and if this was actually a
group cancel. */
+ if (cancel_pending && object->group_cancel_callback)
+ {
+ TRACE( "executing group cancel callback %p(%p, %p)\n",
+ object->group_cancel_callback, object->userdata, userdata
);
+ object->group_cancel_callback( object->userdata, userdata );
+ TRACE( "callback %p returned\n",
object->group_cancel_callback );
+ }
+
+ if (object->type != TP_OBJECT_TYPE_SIMPLE)
+ tp_object_release( object );
+ }
+
+ object->shutdown = TRUE;
+ tp_object_release( object );
+ }
+}
+
+/***********************************************************************
+ * TpReleaseIoCompletion (NTDLL.@)
+ */
+void WINAPI TpReleaseIoCompletion( TP_IO *io )
+{
+ struct threadpool_object *this = impl_from_TP_IO( io );
+ BOOL can_destroy;
+
+ TRACE( "%p\n", io );
+
+ RtlEnterCriticalSection( &this->pool->cs );
+ this->u.io.shutting_down = TRUE;
+ can_destroy = !this->u.io.pending_count && !this->u.io.skipped_count;
+ RtlLeaveCriticalSection( &this->pool->cs );
+
+ if (can_destroy)
+ {
+ tp_object_prepare_shutdown( this );
+ this->shutdown = TRUE;
+ tp_object_release( this );
+ }
+}
+
+/***********************************************************************
+ * TpReleasePool (NTDLL.@)
+ */
+VOID WINAPI TpReleasePool( TP_POOL *pool )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+
+ TRACE( "%p\n", pool );
+
+ tp_threadpool_shutdown( this );
+ tp_threadpool_release( this );
+}
+
+/***********************************************************************
+ * TpReleaseTimer (NTDLL.@)
+ */
+VOID WINAPI TpReleaseTimer( TP_TIMER *timer )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+ TRACE( "%p\n", timer );
+
+ tp_object_prepare_shutdown( this );
+ this->shutdown = TRUE;
+ tp_object_release( this );
+}
+
+/***********************************************************************
+ * TpReleaseWait (NTDLL.@)
+ */
+VOID WINAPI TpReleaseWait( TP_WAIT *wait )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+
+ TRACE( "%p\n", wait );
+
+ tp_object_prepare_shutdown( this );
+ this->shutdown = TRUE;
+ tp_object_release( this );
+}
+
+/***********************************************************************
+ * TpReleaseWork (NTDLL.@)
+ */
+VOID WINAPI TpReleaseWork( TP_WORK *work )
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+
+ TRACE( "%p\n", work );
+
+ tp_object_prepare_shutdown( this );
+ this->shutdown = TRUE;
+ tp_object_release( this );
+}
+
+/***********************************************************************
+ * TpSetPoolMaxThreads (NTDLL.@)
+ */
+VOID WINAPI TpSetPoolMaxThreads( TP_POOL *pool, DWORD maximum )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+
+ TRACE( "%p %lu\n", pool, maximum );
+
+ RtlEnterCriticalSection( &this->cs );
+ this->max_workers = max( maximum, 1 );
+ this->min_workers = min( this->min_workers, this->max_workers );
+ RtlLeaveCriticalSection( &this->cs );
+}
+
+/***********************************************************************
+ * TpSetPoolMinThreads (NTDLL.@)
+ */
+BOOL WINAPI TpSetPoolMinThreads( TP_POOL *pool, DWORD minimum )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+ NTSTATUS status = STATUS_SUCCESS;
+
+ TRACE( "%p %lu\n", pool, minimum );
+
+ RtlEnterCriticalSection( &this->cs );
+
+ while (this->num_workers < minimum)
+ {
+ status = tp_new_worker_thread( this );
+ if (status != STATUS_SUCCESS)
+ break;
+ }
+
+ if (status == STATUS_SUCCESS)
+ {
+ this->min_workers = minimum;
+ this->max_workers = max( this->min_workers, this->max_workers );
+ }
+
+ RtlLeaveCriticalSection( &this->cs );
+ return !status;
+}
+
+/***********************************************************************
+ * TpSetTimer (NTDLL.@)
+ */
+VOID WINAPI TpSetTimer( TP_TIMER *timer, LARGE_INTEGER *timeout, LONG period, LONG
window_length )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+ struct threadpool_object *other_timer;
+ BOOL submit_timer = FALSE;
+ ULONGLONG timestamp;
+
+ TRACE( "%p %p %lu %lu\n", timer, timeout, period, window_length );
+
+ RtlEnterCriticalSection( &timerqueue.cs );
+
+ assert( this->u.timer.timer_initialized );
+ this->u.timer.timer_set = timeout != NULL;
+
+ /* Convert relative timeout to absolute timestamp and handle a timeout
+ * of zero, which means that the timer is submitted immediately. */
+ if (timeout)
+ {
+ timestamp = timeout->QuadPart;
+ if ((LONGLONG)timestamp < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ timestamp = now.QuadPart - timestamp;
+ }
+ else if (!timestamp)
+ {
+ if (!period)
+ timeout = NULL;
+ else
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ timestamp = now.QuadPart + (ULONGLONG)period * 10000;
+ }
+ submit_timer = TRUE;
+ }
+ }
+
+ /* First remove existing timeout. */
+ if (this->u.timer.timer_pending)
+ {
+ list_remove( &this->u.timer.timer_entry );
+ this->u.timer.timer_pending = FALSE;
+ }
+
+ /* If the timer was enabled, then add it back to the queue. */
+ if (timeout)
+ {
+ this->u.timer.timeout = timestamp;
+ this->u.timer.period = period;
+ this->u.timer.window_length = window_length;
+
+ LIST_FOR_EACH_ENTRY( other_timer, &timerqueue.pending_timers,
+ struct threadpool_object, u.timer.timer_entry )
+ {
+ assert( other_timer->type == TP_OBJECT_TYPE_TIMER );
+ if (this->u.timer.timeout < other_timer->u.timer.timeout)
+ break;
+ }
+ list_add_before( &other_timer->u.timer.timer_entry,
&this->u.timer.timer_entry );
+
+ /* Wake up the timer thread when the timeout has to be updated. */
+ if (list_head( &timerqueue.pending_timers ) ==
&this->u.timer.timer_entry )
+ RtlWakeAllConditionVariable( &timerqueue.update_event );
+
+ this->u.timer.timer_pending = TRUE;
+ }
+
+ RtlLeaveCriticalSection( &timerqueue.cs );
+
+ if (submit_timer)
+ tp_object_submit( this, FALSE );
+}
+
+/***********************************************************************
+ * TpSetWait (NTDLL.@)
+ */
+VOID WINAPI TpSetWait( TP_WAIT *wait, HANDLE handle, LARGE_INTEGER *timeout )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+ ULONGLONG timestamp = MAXLONGLONG;
+
+ TRACE( "%p %p %p\n", wait, handle, timeout );
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+
+ assert( this->u.wait.bucket );
+ this->u.wait.handle = handle;
+
+ if (handle || this->u.wait.wait_pending)
+ {
+ struct waitqueue_bucket *bucket = this->u.wait.bucket;
+ list_remove( &this->u.wait.wait_entry );
+
+ /* Convert relative timeout to absolute timestamp. */
+ if (handle && timeout)
+ {
+ timestamp = timeout->QuadPart;
+ if ((LONGLONG)timestamp < 0)
+ {
+ LARGE_INTEGER now;
+ NtQuerySystemTime( &now );
+ timestamp = now.QuadPart - timestamp;
+ }
+ }
+
+ /* Add wait object back into one of the queues. */
+ if (handle)
+ {
+ list_add_tail( &bucket->waiting, &this->u.wait.wait_entry );
+ this->u.wait.wait_pending = TRUE;
+ this->u.wait.timeout = timestamp;
+ }
+ else
+ {
+ list_add_tail( &bucket->reserved, &this->u.wait.wait_entry );
+ this->u.wait.wait_pending = FALSE;
+ }
+
+ /* Wake up the wait queue thread. */
+ NtSetEvent( bucket->update_event, NULL );
+ }
+
+ RtlLeaveCriticalSection( &waitqueue.cs );
+}
+
+/***********************************************************************
+ * TpSimpleTryPost (NTDLL.@)
+ */
+NTSTATUS WINAPI TpSimpleTryPost( PTP_SIMPLE_CALLBACK callback, PVOID userdata,
+ TP_CALLBACK_ENVIRON *environment )
+{
+ struct threadpool_object *object;
+ struct threadpool *pool;
+ NTSTATUS status;
+
+ TRACE( "%p %p %p\n", callback, userdata, environment );
+
+ object = RtlAllocateHeap( GetProcessHeap(), 0, sizeof(*object) );
+ if (!object)
+ return STATUS_NO_MEMORY;
+
+ status = tp_threadpool_lock( &pool, environment );
+ if (status)
+ {
+ RtlFreeHeap( GetProcessHeap(), 0, object );
+ return status;
+ }
+
+ object->type = TP_OBJECT_TYPE_SIMPLE;
+ object->u.simple.callback = callback;
+ tp_object_initialize( object, pool, userdata, environment );
+
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * TpStartAsyncIoOperation (NTDLL.@)
+ */
+void WINAPI TpStartAsyncIoOperation( TP_IO *io )
+{
+ struct threadpool_object *this = impl_from_TP_IO( io );
+
+ TRACE( "%p\n", io );
+
+ RtlEnterCriticalSection( &this->pool->cs );
+
+ this->u.io.pending_count++;
+
+ RtlLeaveCriticalSection( &this->pool->cs );
+}
+
+/***********************************************************************
+ * TpWaitForIoCompletion (NTDLL.@)
+ */
+void WINAPI TpWaitForIoCompletion( TP_IO *io, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_IO( io );
+
+ TRACE( "%p %d\n", io, cancel_pending );
+
+ if (cancel_pending)
+ tp_object_cancel( this );
+ tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
+ * TpWaitForTimer (NTDLL.@)
+ */
+VOID WINAPI TpWaitForTimer( TP_TIMER *timer, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_TIMER( timer );
+
+ TRACE( "%p %d\n", timer, cancel_pending );
+
+ if (cancel_pending)
+ tp_object_cancel( this );
+ tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
+ * TpWaitForWait (NTDLL.@)
+ */
+VOID WINAPI TpWaitForWait( TP_WAIT *wait, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_WAIT( wait );
+
+ TRACE( "%p %d\n", wait, cancel_pending );
+
+ if (cancel_pending)
+ tp_object_cancel( this );
+ tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
+ * TpWaitForWork (NTDLL.@)
+ */
+VOID WINAPI TpWaitForWork( TP_WORK *work, BOOL cancel_pending )
+{
+ struct threadpool_object *this = impl_from_TP_WORK( work );
+
+ TRACE( "%p %u\n", work, cancel_pending );
+
+ if (cancel_pending)
+ tp_object_cancel( this );
+ tp_object_wait( this, FALSE );
+}
+
+/***********************************************************************
+ * TpSetPoolStackInformation (NTDLL.@)
+ */
+NTSTATUS WINAPI TpSetPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION
*stack_info )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+
+ TRACE( "%p %p\n", pool, stack_info );
+
+ if (!stack_info)
+ return STATUS_INVALID_PARAMETER;
+
+ RtlEnterCriticalSection( &this->cs );
+ this->stack_info = *stack_info;
+ RtlLeaveCriticalSection( &this->cs );
+
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * TpQueryPoolStackInformation (NTDLL.@)
+ */
+NTSTATUS WINAPI TpQueryPoolStackInformation( TP_POOL *pool, TP_POOL_STACK_INFORMATION
*stack_info )
+{
+ struct threadpool *this = impl_from_TP_POOL( pool );
+
+ TRACE( "%p %p\n", pool, stack_info );
+
+ if (!stack_info)
+ return STATUS_INVALID_PARAMETER;
+
+ RtlEnterCriticalSection( &this->cs );
+ *stack_info = this->stack_info;
+ RtlLeaveCriticalSection( &this->cs );
+
+ return STATUS_SUCCESS;
+}
+
+static void CALLBACK rtl_wait_callback( TP_CALLBACK_INSTANCE *instance, void *userdata,
TP_WAIT *wait, TP_WAIT_RESULT result )
+{
+ struct threadpool_object *object = impl_from_TP_WAIT(wait);
+ object->u.wait.rtl_callback( userdata, result != STATUS_WAIT_0 );
+}
+
+/***********************************************************************
+ * RtlRegisterWait (NTDLL.@)
+ *
+ * Registers a wait for a handle to become signaled.
+ *
+ * PARAMS
+ * NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
+ * Object [I] Object to wait to become signaled.
+ * Callback [I] Callback function to execute when the wait times out or the handle is
signaled.
+ * Context [I] Context to pass to the callback function when it is executed.
+ * Milliseconds [I] Number of milliseconds to wait before timing out.
+ * Flags [I] Flags. See notes.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ *
+ * NOTES
+ * Flags can be one or more of the following:
+ *|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
+ *|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
+ *|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
+ *|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
+ *|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
+ */
+NTSTATUS WINAPI RtlRegisterWait( HANDLE *out, HANDLE handle, RTL_WAITORTIMERCALLBACKFUNC
callback,
+ void *context, ULONG milliseconds, ULONG flags )
+{
+ struct threadpool_object *object;
+ TP_CALLBACK_ENVIRON environment;
+ LARGE_INTEGER timeout;
+ NTSTATUS status;
+ TP_WAIT *wait;
+
+ TRACE( "out %p, handle %p, callback %p, context %p, milliseconds %lu, flags
%lx\n",
+ out, handle, callback, context, milliseconds, flags );
+
+ memset( &environment, 0, sizeof(environment) );
+ environment.Version = 1;
+ environment.u.s.LongFunction = (flags & WT_EXECUTELONGFUNCTION) != 0;
+ environment.u.s.Persistent = (flags & WT_EXECUTEINPERSISTENTTHREAD) != 0;
+
+ flags &= (WT_EXECUTEONLYONCE | WT_EXECUTEINWAITTHREAD | WT_EXECUTEINIOTHREAD);
+ if ((status = tp_alloc_wait( &wait, rtl_wait_callback, context, &environment,
flags )))
+ return status;
+
+ object = impl_from_TP_WAIT(wait);
+ object->u.wait.rtl_callback = callback;
+
+ RtlEnterCriticalSection( &waitqueue.cs );
+ TpSetWait( (TP_WAIT *)object, handle, get_nt_timeout( &timeout, milliseconds )
);
+
+ *out = object;
+ RtlLeaveCriticalSection( &waitqueue.cs );
+
+ return STATUS_SUCCESS;
+}
+
+/***********************************************************************
+ * RtlDeregisterWaitEx (NTDLL.@)
+ *
+ * Cancels a wait operation and frees the resources associated with calling
+ * RtlRegisterWait().
+ *
+ * PARAMS
+ * WaitObject [I] Handle to the wait object to free.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlDeregisterWaitEx( HANDLE handle, HANDLE event )
+{
+ struct threadpool_object *object = handle;
+ NTSTATUS status;
+
+ TRACE( "handle %p, event %p\n", handle, event );
+
+ if (!object) return STATUS_INVALID_HANDLE;
+
+ TpSetWait( (TP_WAIT *)object, NULL, NULL );
+
+ if (event == INVALID_HANDLE_VALUE) TpWaitForWait( (TP_WAIT *)object, TRUE );
+ else
+ {
+ assert( object->completed_event == NULL );
+ object->completed_event = event;
+ }
+
+ RtlEnterCriticalSection( &object->pool->cs );
+ if (object->num_pending_callbacks + object->num_running_callbacks
+ + object->num_associated_callbacks) status = STATUS_PENDING;
+ else status = STATUS_SUCCESS;
+ RtlLeaveCriticalSection( &object->pool->cs );
+
+ TpReleaseWait( (TP_WAIT *)object );
+ return status;
+}
+
+/***********************************************************************
+ * RtlDeregisterWait (NTDLL.@)
+ *
+ * Cancels a wait operation and frees the resources associated with calling
+ * RtlRegisterWait().
+ *
+ * PARAMS
+ * WaitObject [I] Handle to the wait object to free.
+ *
+ * RETURNS
+ * Success: STATUS_SUCCESS.
+ * Failure: Any NTSTATUS code.
+ */
+NTSTATUS WINAPI RtlDeregisterWait(HANDLE WaitHandle)
+{
+ return RtlDeregisterWaitEx(WaitHandle, NULL);
+}