- Guarded the calls to IoSetCancelRoutine with
IoAcquireCancelSpinLock/IoReleaseCancelSpinLock.
- Used a fastmutex as lock for the data queue.
- Used paged pool for the data buffers.
- Allowed the server to read (and to wait) on a listening pipe.
- Implemented the non blocking read operations.
Modified: trunk/reactos/drivers/fs/np/create.c
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
Modified: trunk/reactos/drivers/fs/np/npfs.c
Modified: trunk/reactos/drivers/fs/np/npfs.h
Modified: trunk/reactos/drivers/fs/np/rw.c
_____
Modified: trunk/reactos/drivers/fs/np/create.c
--- trunk/reactos/drivers/fs/np/create.c 2005-03-23 21:51:40 UTC
(rev 14295)
+++ trunk/reactos/drivers/fs/np/create.c 2005-03-23 22:11:20 UTC
(rev 14296)
@@ -49,6 +49,7 @@
{
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
+ KIRQL oldIrql;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
@@ -58,11 +59,15 @@
!Waiter->Irp->Cancel)
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
-
- if (IoSetCancelRoutine(Waiter->Irp, NULL) != NULL)
- {
+
+ IoAcquireCancelSpinLock(&oldIrql);
+ if (!Waiter->Irp->Cancel)
+ {
+ IoSetCancelRoutine(Waiter->Irp, NULL);
+ IoReleaseCancelSpinLock(oldIrql);
return Waiter->Fcb;
}
+ IoReleaseCancelSpinLock(oldIrql);
}
CurrentEntry = CurrentEntry->Flink;
@@ -174,7 +179,7 @@
/* Initialize data list. */
if (Pipe->OutboundQuota)
{
- ClientFcb->Data = ExAllocatePool(NonPagedPool,
Pipe->OutboundQuota);
+ ClientFcb->Data = ExAllocatePool(PagedPool, Pipe->OutboundQuota);
if (ClientFcb->Data == NULL)
{
DPRINT("No memory!\n");
@@ -195,7 +200,7 @@
ClientFcb->ReadDataAvailable = 0;
ClientFcb->WriteQuotaAvailable = Pipe->OutboundQuota;
ClientFcb->MaxDataLength = Pipe->OutboundQuota;
- KeInitializeSpinLock(&ClientFcb->DataListLock);
+ ExInitializeFastMutex(&ClientFcb->DataListLock);
KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent,
FALSE);
KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
@@ -455,13 +460,17 @@
if (Pipe->InboundQuota)
{
- Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota);
+ Fcb->Data = ExAllocatePool(PagedPool, Pipe->InboundQuota);
if (Fcb->Data == NULL)
{
ExFreePool(Fcb);
if (NewPipe)
{
+ /*
+ * FIXME:
+ * Lock the pipelist and remove the pipe from the list.
+ */
RtlFreeUnicodeString(&Pipe->PipeName);
ExFreePool(Pipe);
}
@@ -481,7 +490,7 @@
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
Fcb->MaxDataLength = Pipe->InboundQuota;
- KeInitializeSpinLock(&Fcb->DataListLock);
+ ExInitializeFastMutex(&Fcb->DataListLock);
Pipe->CurrentInstances++;
_____
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
--- trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-23 21:51:40 UTC
(rev 14295)
+++ trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-23 22:11:20 UTC
(rev 14296)
@@ -46,6 +46,7 @@
PNPFS_FCB Fcb)
{
PNPFS_WAITER_ENTRY Entry;
+ KIRQL oldIrql;
Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
if (Entry == NULL)
@@ -61,13 +62,15 @@
Irp->Tail.Overlay.DriverContext[0] = Entry;
InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
- IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
-
+ IoAcquireCancelSpinLock(&oldIrql);
if (!Irp->Cancel)
{
+ IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
+ IoReleaseCancelSpinLock(oldIrql);
KeUnlockMutex(&Fcb->Pipe->FcbListLock);
return STATUS_PENDING;
}
+ IoReleaseCancelSpinLock(oldIrql);
RemoveEntryList(&Entry->Entry);
_____
Modified: trunk/reactos/drivers/fs/np/npfs.c
--- trunk/reactos/drivers/fs/np/npfs.c 2005-03-23 21:51:40 UTC (rev
14295)
+++ trunk/reactos/drivers/fs/np/npfs.c 2005-03-23 22:11:20 UTC (rev
14296)
@@ -73,8 +73,10 @@
/* initialize the device extension */
DeviceExtension = DeviceObject->DeviceExtension;
InitializeListHead(&DeviceExtension->PipeListHead);
+ InitializeListHead(&DeviceExtension->ThreadListHead);
KeInitializeMutex(&DeviceExtension->PipeListLock,
0);
+ DeviceExtension->EmptyWaiterCount = 0;
/* set the size quotas */
DeviceExtension->MinQuota = PAGE_SIZE;
_____
Modified: trunk/reactos/drivers/fs/np/npfs.h
--- trunk/reactos/drivers/fs/np/npfs.h 2005-03-23 21:51:40 UTC (rev
14295)
+++ trunk/reactos/drivers/fs/np/npfs.h 2005-03-23 22:11:20 UTC (rev
14296)
@@ -6,7 +6,9 @@
typedef struct _NPFS_DEVICE_EXTENSION
{
LIST_ENTRY PipeListHead;
+ LIST_ENTRY ThreadListHead;
KMUTEX PipeListLock;
+ ULONG EmptyWaiterCount;
ULONG MinQuota;
ULONG DefaultQuota;
ULONG MaxQuota;
@@ -20,6 +22,7 @@
LIST_ENTRY ServerFcbListHead;
LIST_ENTRY ClientFcbListHead;
LIST_ENTRY WaiterListHead;
+ LIST_ENTRY EmptyBufferListHead;
ULONG PipeType;
ULONG ReadMode;
ULONG WriteMode;
@@ -50,9 +53,29 @@
PVOID WritePtr;
ULONG MaxDataLength;
- KSPIN_LOCK DataListLock; /* Data queue lock */
+ FAST_MUTEX DataListLock; /* Data queue lock */
} NPFS_FCB, *PNPFS_FCB;
+typedef struct _NPFS_CONTEXT
+{
+ PDEVICE_OBJECT DeviceObject;
+ PIRP Irp;
+ PNPFS_FCB Fcb;
+ UCHAR MajorFunction;
+ BOOLEAN AllocatedFromPool;
+} NPFS_CONTEXT, *PNPFS_CONTEXT;
+
+typedef struct _NPFS_THREAD_CONTEXT
+{
+ ULONG Count;
+ KEVENT Event;
+ PNPFS_DEVICE_EXTENSION DeviceExt;
+ LIST_ENTRY ListEntry;
+ PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
+ KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
+ PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
+} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
+
typedef struct _NPFS_WAITER_ENTRY
{
LIST_ENTRY Entry;
_____
Modified: trunk/reactos/drivers/fs/np/rw.c
--- trunk/reactos/drivers/fs/np/rw.c 2005-03-23 21:51:40 UTC (rev
14295)
+++ trunk/reactos/drivers/fs/np/rw.c 2005-03-23 22:11:20 UTC (rev
14296)
@@ -46,16 +46,197 @@
}
#endif
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context);
-NTSTATUS STDCALL
-NpfsRead(PDEVICE_OBJECT DeviceObject,
- PIRP Irp)
+static VOID STDCALL
+NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
+ PNPFS_CONTEXT Context;
+ PNPFS_DEVICE_EXTENSION DeviceExt;
+
+ DPRINT1("NpfsWaitingCancelRoutine() called\n");
+
+ IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+ Context = Irp->Tail.Overlay.DriverContext[0];
+ DeviceExt = Context->DeviceObject->DeviceExtension;
+
+ KeLockMutex(&DeviceExt->PipeListLock);
+ KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+}
+
+static VOID STDCALL
+NpfsWaiterThread(PVOID Context)
+{
+ PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
+ ULONG CurrentCount, Count = 0;
+ PNPFS_CONTEXT WaitContext = NULL;
+ NTSTATUS Status;
+ BOOLEAN Terminate = FALSE;
+ BOOLEAN Cancel = FALSE;
+ KIRQL oldIrql;
+
+ KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+
+ while (1)
+ {
+ CurrentCount = ThreadContext->Count;
+ KeResetEvent(&ThreadContext->Event);
+ KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ if (WaitContext)
+ {
+ if (Cancel)
+ {
+ WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
+ WaitContext->Irp->IoStatus.Information = 0;
+ IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
+ ExFreePool(WaitContext);
+ }
+ else
+ {
+ switch (WaitContext->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ NpfsReadFromPipe(WaitContext);
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ }
+ }
+ if (Terminate)
+ {
+ break;
+ }
+ Status = KeWaitForMultipleObjects(CurrentCount,
+ ThreadContext->WaitObjectArray,
+ WaitAny,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL,
+ ThreadContext->WaitBlockArray);
+ KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ if (!NT_SUCCESS(Status))
+ {
+ KEBUGCHECK(0);
+ }
+ Count = Status - STATUS_SUCCESS;
+ ASSERT (Count <= CurrentCount);
+ if (Count > 0)
+ {
+ WaitContext = ThreadContext->WaitContextArray[Count];
+ ThreadContext->Count--;
+ ThreadContext->DeviceExt->EmptyWaiterCount++;
+ ThreadContext->WaitObjectArray[Count] =
ThreadContext->WaitObjectArray[ThreadContext->Count];
+ ThreadContext->WaitContextArray[Count] =
ThreadContext->WaitContextArray[ThreadContext->Count];
+ IoAcquireCancelSpinLock(&oldIrql);
+ Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
+ IoReleaseCancelSpinLock(oldIrql);
+ }
+ else
+ {
+ /* someone has add a new wait request */
+ WaitContext = NULL;
+ }
+ if (ThreadContext->Count == 1 &&
ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+ {
+ /* it exist an other thread with empty wait slots, we can
remove our thread from the list */
+ RemoveEntryList(&ThreadContext->ListEntry);
+ ThreadContext->DeviceExt->EmptyWaiterCount -=
MAXIMUM_WAIT_OBJECTS - 1;
+ Terminate = TRUE;
+ }
+ }
+ KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+ ExFreePool(ThreadContext);
+}
+
+static NTSTATUS
+NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT
Context, PNPFS_FCB Fcb)
+{
+ PLIST_ENTRY ListEntry;
+ PNPFS_THREAD_CONTEXT ThreadContext;
+ NTSTATUS Status;
+ HANDLE hThread;
+ KIRQL oldIrql;
+
+ KeLockMutex(&DeviceExt->PipeListLock);
+
+ ListEntry = DeviceExt->ThreadListHead.Flink;
+ while (ListEntry != &DeviceExt->ThreadListHead)
+ {
+ ThreadContext = CONTAINING_RECORD(ListEntry,
NPFS_THREAD_CONTEXT, ListEntry);
+ if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
+ {
+ break;
+ }
+ ListEntry = ListEntry->Flink;
+ }
+ if (ListEntry == &DeviceExt->ThreadListHead)
+ {
+ ThreadContext = ExAllocatePool(NonPagedPool,
sizeof(NPFS_THREAD_CONTEXT));
+ if (ThreadContext == NULL)
+ {
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return STATUS_NO_MEMORY;
+ }
+ ThreadContext->DeviceExt = DeviceExt;
+ KeInitializeEvent(&ThreadContext->Event, NotificationEvent,
FALSE);
+ ThreadContext->Count = 1;
+ ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
+
+
+ DPRINT("Creating a new system thread for waiting read
requests\n");
+
+ Status = PsCreateSystemThread(&hThread,
+ THREAD_ALL_ACCESS,
+ NULL,
+ NULL,
+ NULL,
+ NpfsWaiterThread,
+ (PVOID)ThreadContext);
+ if (!NT_SUCCESS(Status))
+ {
+ ExFreePool(ThreadContext);
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return Status;
+ }
+ InsertHeadList(&DeviceExt->ThreadListHead,
&ThreadContext->ListEntry);
+ DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
+ }
+ IoMarkIrpPending(Context->Irp);
+ Context->Irp->Tail.Overlay.DriverContext[0] = Context;
+
+ IoAcquireCancelSpinLock(&oldIrql);
+ if (Context->Irp->Cancel)
+ {
+ IoReleaseCancelSpinLock(oldIrql);
+ Status = STATUS_CANCELLED;
+ }
+ else
+ {
+ IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
+ IoReleaseCancelSpinLock(oldIrql);
+ ThreadContext->WaitObjectArray[ThreadContext->Count] =
&Fcb->Event;
+ ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
+ ThreadContext->Count++;
+ DeviceExt->EmptyWaiterCount--;
+ KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
+ Status = STATUS_SUCCESS;
+ }
+ KeUnlockMutex(&DeviceExt->PipeListLock);
+ return Status;
+}
+
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context)
+{
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
NTSTATUS Status;
- PNPFS_DEVICE_EXTENSION DeviceExt;
- KIRQL OldIrql;
ULONG Information;
PNPFS_FCB Fcb;
PNPFS_FCB WriterFcb;
@@ -65,23 +246,14 @@
ULONG CopyLength;
ULONG TempLength;
- DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
+ DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
- DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
- IoStack = IoGetCurrentIrpStackLocation(Irp);
+ IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
FileObject = IoStack->FileObject;
Fcb = FileObject->FsContext;
Pipe = Fcb->Pipe;
WriterFcb = Fcb->OtherSide;
- if (Irp->MdlAddress == NULL)
- {
- DPRINT("Irp->MdlAddress == NULL\n");
- Status = STATUS_UNSUCCESSFUL;
- Information = 0;
- goto done;
- }
-
if (Fcb->Data == NULL)
{
DPRINT("Pipe is NOT readable!\n");
@@ -94,41 +266,71 @@
Length = IoStack->Parameters.Read.Length;
Information = 0;
- Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
- KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+ Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
+ ExAcquireFastMutex(&Fcb->DataListLock);
while (1)
{
- /* FIXME: check if in blocking mode */
if (Fcb->ReadDataAvailable == 0)
{
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
}
- KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+ ExReleaseFastMutex(&Fcb->DataListLock);
if (Information > 0)
{
Status = STATUS_SUCCESS;
goto done;
}
- if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE &&
+ !(Fcb->PipeState == FILE_PIPE_LISTENING_STATE &&
Fcb->PipeEnd == FILE_PIPE_SERVER_END))
{
DPRINT("PipeState: %x\n", Fcb->PipeState);
Status = STATUS_PIPE_BROKEN;
goto done;
}
- /* Wait for ReadEvent to become signaled */
- DPRINT("Waiting for readable data (%S)\n",
Pipe->PipeName.Buffer);
- Status = KeWaitForSingleObject(&Fcb->Event,
- UserRequest,
- KernelMode,
- FALSE,
- NULL);
- DPRINT("Finished waiting (%S)! Status: %x\n",
Pipe->PipeName.Buffer, Status);
+ if (IoIsOperationSynchronous(Context->Irp))
+ {
+ /* Wait for ReadEvent to become signaled */
+ DPRINT("Waiting for readable data (%S)\n",
Pipe->PipeName.Buffer);
+ Status = KeWaitForSingleObject(&Fcb->Event,
+ UserRequest,
+ KernelMode,
+ FALSE,
+ NULL);
+ DPRINT("Finished waiting (%S)! Status: %x\n",
Pipe->PipeName.Buffer, Status);
+ }
+ else
+ {
+ PNPFS_CONTEXT NewContext;
- KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+ NewContext = ExAllocatePool(NonPagedPool,
sizeof(NPFS_CONTEXT));
+ if (NewContext == NULL)
+ {
+ Status = STATUS_NO_MEMORY;
+ goto done;
+ }
+ memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
+ NewContext->AllocatedFromPool = TRUE;
+ NewContext->Fcb = Fcb;
+ NewContext->MajorFunction = IRP_MJ_READ;
+
+ Status =
NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext,
Fcb);
+
+ if (NT_SUCCESS(Status))
+ {
+ Status = STATUS_PENDING;
+ }
+ else
+ {
+ ExFreePool(NewContext);
+ }
+ goto done;
+ }
+
+ ExAcquireFastMutex(&Fcb->DataListLock);
}
if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -217,20 +419,48 @@
}
}
- KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+ ExReleaseFastMutex(&Fcb->DataListLock);
done:
- Irp->IoStatus.Status = Status;
- Irp->IoStatus.Information = Information;
+ Context->Irp->IoStatus.Status = Status;
+ Context->Irp->IoStatus.Information = Information;
- IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ if (Status != STATUS_PENDING)
+ {
+ IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
+ }
+ if (Context->AllocatedFromPool)
+ {
+ ExFreePool(Context);
+ }
DPRINT("NpfsRead done (Status %lx)\n", Status);
return Status;
}
+NTSTATUS STDCALL
+NpfsRead(PDEVICE_OBJECT DeviceObject,
+ PIRP Irp)
+{
+ NPFS_CONTEXT Context;
+ Context.AllocatedFromPool = FALSE;
+ Context.DeviceObject = DeviceObject;
+ Context.Irp = Irp;
+
+ if (Irp->MdlAddress == NULL)
+ {
+ DPRINT("Irp->MdlAddress == NULL\n");
+ Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ return STATUS_UNSUCCESSFUL;
+ }
+
+ return NpfsReadFromPipe(&Context);
+}
+
NTSTATUS STDCALL
NpfsWrite(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
@@ -244,7 +474,6 @@
NTSTATUS Status = STATUS_SUCCESS;
ULONG Length;
ULONG Offset;
- KIRQL OldIrql;
ULONG Information;
ULONG CopyLength;
ULONG TempLength;
@@ -296,7 +525,7 @@
Status = STATUS_SUCCESS;
Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
- KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+ ExAcquireFastMutex(&ReaderFcb->DataListLock);
#ifndef NDEBUG
DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
HexDump(Buffer, Length);
@@ -307,7 +536,7 @@
if (ReaderFcb->WriteQuotaAvailable == 0)
{
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
- KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
Status = STATUS_PIPE_BROKEN;
@@ -332,7 +561,7 @@
Status = STATUS_PIPE_BROKEN;
goto done;
}
- KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+ ExAcquireFastMutex(&ReaderFcb->DataListLock);
}
if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -395,7 +624,7 @@
}
}
- KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+ ExReleaseFastMutex(&ReaderFcb->DataListLock);
done:
Irp->IoStatus.Status = Status;