- Put the wait entry into the DriverContext of the irp instead to
allocated it from pool.
- Lock the data list on both ends of the pipe, if we disconnect the
pipe.
- Implemented a read and a write event on each end of the pipe.
- Implemented a list for read requests to deliver the requests in the
correct sequence.
- Do not end a read request if the pipe was connected and if the buffer
wasn't filled completely.
Modified: trunk/reactos/drivers/fs/np/create.c
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
Modified: trunk/reactos/drivers/fs/np/npfs.c
Modified: trunk/reactos/drivers/fs/np/npfs.h
Modified: trunk/reactos/drivers/fs/np/rw.c
_____
Modified: trunk/reactos/drivers/fs/np/create.c
--- trunk/reactos/drivers/fs/np/create.c 2005-03-28 18:37:39 UTC
(rev 14371)
+++ trunk/reactos/drivers/fs/np/create.c 2005-03-28 18:42:53 UTC
(rev 14372)
@@ -50,20 +50,21 @@
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
KIRQL oldIrql;
+ PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
{
Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY,
Entry);
- if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE &&
- !Waiter->Irp->Cancel)
+ Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext);
+ if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
IoAcquireCancelSpinLock(&oldIrql);
- if (!Waiter->Irp->Cancel)
+ if (!Irp->Cancel)
{
- IoSetCancelRoutine(Waiter->Irp, NULL);
+ IoSetCancelRoutine(Irp, NULL);
IoReleaseCancelSpinLock(oldIrql);
return Waiter->Fcb;
}
@@ -83,6 +84,7 @@
{
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
+ PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
@@ -92,13 +94,12 @@
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
- Waiter->Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
- Waiter->Irp->IoStatus.Information = 0;
- IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
-
RemoveEntryList(&Waiter->Entry);
- ExFreePool(Waiter);
- return;
+ Irp = CONTAINING_RECORD(Waiter, IRP,
Tail.Overlay.DriverContext);
+ Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ break;
}
CurrentEntry = CurrentEntry->Flink;
}
@@ -175,7 +176,10 @@
ClientFcb->PipeEnd = FILE_PIPE_CLIENT_END;
ClientFcb->OtherSide = NULL;
ClientFcb->PipeState = SpecialAccess ? 0 :
FILE_PIPE_DISCONNECTED_STATE;
+ InitializeListHead(&ClientFcb->ReadRequestListHead);
+ DPRINT("Fcb: %x\n", ClientFcb);
+
/* Initialize data list. */
if (Pipe->OutboundQuota)
{
@@ -202,8 +206,10 @@
ClientFcb->MaxDataLength = Pipe->OutboundQuota;
ExInitializeFastMutex(&ClientFcb->DataListLock);
KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent,
FALSE);
- KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
+ KeInitializeEvent(&ClientFcb->ReadEvent, SynchronizationEvent,
FALSE);
+ KeInitializeEvent(&ClientFcb->WriteEvent, SynchronizationEvent,
FALSE);
+
/*
* Step 3. Search for listening server FCB.
*/
@@ -489,6 +495,7 @@
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
Fcb->MaxDataLength = Pipe->InboundQuota;
+ InitializeListHead(&Fcb->ReadRequestListHead);
ExInitializeFastMutex(&Fcb->DataListLock);
Pipe->CurrentInstances++;
@@ -498,13 +505,11 @@
Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
Fcb->OtherSide = NULL;
- KeInitializeEvent(&Fcb->ConnectEvent,
- SynchronizationEvent,
- FALSE);
+ DPRINT("Fcb: %x\n", Fcb);
- KeInitializeEvent(&Fcb->Event,
- SynchronizationEvent,
- FALSE);
+ KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE);
+ KeInitializeEvent(&Fcb->ReadEvent, SynchronizationEvent, FALSE);
+ KeInitializeEvent(&Fcb->WriteEvent, SynchronizationEvent, FALSE);
KeLockMutex(&Pipe->FcbListLock);
InsertTailList(&Pipe->ServerFcbListHead, &Fcb->FcbListEntry);
@@ -528,7 +533,7 @@
PNPFS_DEVICE_EXTENSION DeviceExt;
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
- PNPFS_FCB Fcb;
+ PNPFS_FCB Fcb, OtherSide;
PNPFS_PIPE Pipe;
BOOL Server;
@@ -566,19 +571,38 @@
{
DPRINT("Client\n");
}
-
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
- if (Fcb->OtherSide)
+ OtherSide = Fcb->OtherSide;
+ /* Lock the server first */
+ if (Server)
{
- Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
- Fcb->OtherSide->OtherSide = NULL;
- /*
- * Signaling the write event. If is possible that an other
- * thread waits for an empty buffer.
- */
- KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ ExAcquireFastMutex(&OtherSide->DataListLock);
}
+ else
+ {
+ ExAcquireFastMutex(&OtherSide->DataListLock);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+ OtherSide->OtherSide = NULL;
+ /*
+ * Signaling the write event. If is possible that an other
+ * thread waits for an empty buffer.
+ */
+ KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
+ KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+ if (Server)
+ {
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ ExReleaseFastMutex(&OtherSide->DataListLock);
+ }
+ else
+ {
+ ExReleaseFastMutex(&OtherSide->DataListLock);
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ }
}
else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
{
@@ -586,6 +610,7 @@
PNPFS_WAITER_ENTRY WaitEntry = NULL;
BOOLEAN Complete = FALSE;
KIRQL oldIrql;
+ PIRP tmpIrp;
Entry = Fcb->Pipe->WaiterListHead.Flink;
while (Entry != &Fcb->Pipe->WaiterListHead)
@@ -594,28 +619,25 @@
if (WaitEntry->Fcb == Fcb)
{
RemoveEntryList(Entry);
+ tmpIrp = CONTAINING_RECORD(WaitEntry, IRP,
Tail.Overlay.DriverContext);
IoAcquireCancelSpinLock(&oldIrql);
- if (!Irp->Cancel)
+ if (!tmpIrp->Cancel)
{
- IoSetCancelRoutine(WaitEntry->Irp, NULL);
+ IoSetCancelRoutine(tmpIrp, NULL);
Complete = TRUE;
}
IoReleaseCancelSpinLock(oldIrql);
+ if (Complete)
+ {
+ tmpIrp->IoStatus.Status = STATUS_PIPE_BROKEN;
+ tmpIrp->IoStatus.Information = 0;
+ IoCompleteRequest(tmpIrp, IO_NO_INCREMENT);
+ }
break;
}
Entry = Entry->Flink;
}
- if (Entry != &Fcb->Pipe->WaiterListHead)
- {
- if (Complete)
- {
- WaitEntry->Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
- WaitEntry->Irp->IoStatus.Information = 0;
- IoCompleteRequest(WaitEntry->Irp, IO_NO_INCREMENT);
- }
- ExFreePool(WaitEntry);
- }
}
Fcb->PipeState = FILE_PIPE_CLOSING_STATE;
_____
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
--- trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-28 18:37:39 UTC
(rev 14371)
+++ trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-28 18:42:53 UTC
(rev 14372)
@@ -26,18 +26,18 @@
DPRINT1("NpfsListeningCancelRoutine() called\n");
+ Waiter = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
+
IoReleaseCancelSpinLock(Irp->CancelIrql);
- Waiter = Irp->Tail.Overlay.DriverContext[0];
- KeLockMutex(&Waiter->Pipe->FcbListLock);
+ KeLockMutex(&Waiter->Fcb->Pipe->FcbListLock);
RemoveEntryList(&Waiter->Entry);
- KeUnlockMutex(&Waiter->Pipe->FcbListLock);
+ KeUnlockMutex(&Waiter->Fcb->Pipe->FcbListLock);
Irp->IoStatus.Status = STATUS_CANCELLED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
- ExFreePool(Waiter);
}
@@ -48,18 +48,13 @@
PNPFS_WAITER_ENTRY Entry;
KIRQL oldIrql;
- Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
- if (Entry == NULL)
- return STATUS_INSUFFICIENT_RESOURCES;
+ Entry = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
- Entry->Irp = Irp;
Entry->Fcb = Fcb;
- Entry->Pipe = Fcb->Pipe;
KeLockMutex(&Fcb->Pipe->FcbListLock);
IoMarkIrpPending(Irp);
- Irp->Tail.Overlay.DriverContext[0] = Entry;
InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
IoAcquireCancelSpinLock(&oldIrql);
@@ -78,7 +73,6 @@
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
KeUnlockMutex(&Fcb->Pipe->FcbListLock);
- ExFreePool(Entry);
return STATUS_CANCELLED;
}
@@ -173,38 +167,100 @@
static NTSTATUS
NpfsDisconnectPipe(PNPFS_FCB Fcb)
{
- DPRINT("NpfsDisconnectPipe()\n");
+ NTSTATUS Status;
+ PNPFS_FCB OtherSide;
+ PNPFS_PIPE Pipe;
+ BOOL Server;
- if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
- return STATUS_SUCCESS;
+ DPRINT("NpfsDisconnectPipe()\n");
- if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
- {
- Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
- /* FIXME: Shouldn't this be FILE_PIPE_CLOSING_STATE? */
- Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+ Pipe = Fcb->Pipe;
+ KeLockMutex(&Pipe->FcbListLock);
- /* FIXME: remove data queue(s) */
-
- Fcb->OtherSide->OtherSide = NULL;
+ if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
+ {
+ DPRINT("Pipe is already disconnected\n");
+ Status = STATUS_SUCCESS;
+ }
+ else if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+ {
+ Server = (Fcb->PipeEnd == FILE_PIPE_SERVER_END);
+ OtherSide = Fcb->OtherSide;
Fcb->OtherSide = NULL;
+ /* Lock the server first */
+ if (Server)
+ {
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ ExAcquireFastMutex(&OtherSide->DataListLock);
+ }
+ else
+ {
+ ExAcquireFastMutex(&OtherSide->DataListLock);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+ OtherSide->OtherSide = NULL;
+ /*
+ * Signaling the write event. If is possible that an other
+ * thread waits for an empty buffer.
+ */
+ KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
+ KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+ if (Server)
+ {
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ ExReleaseFastMutex(&OtherSide->DataListLock);
+ }
+ else
+ {
+ ExReleaseFastMutex(&OtherSide->DataListLock);
+ ExReleaseFastMutex(&OtherSide->DataListLock);
+ }
+ Status = STATUS_SUCCESS;
+ }
+ else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
+ {
+ PLIST_ENTRY Entry;
+ PNPFS_WAITER_ENTRY WaitEntry = NULL;
+ BOOLEAN Complete = FALSE;
+ PIRP Irp = NULL;
- DPRINT("Pipe disconnected\n");
- return STATUS_SUCCESS;
- }
+ Entry = Fcb->Pipe->WaiterListHead.Flink;
+ while (Entry != &Fcb->Pipe->WaiterListHead)
+ {
+ WaitEntry = CONTAINING_RECORD(Entry, NPFS_WAITER_ENTRY,
Entry);
+ if (WaitEntry->Fcb == Fcb)
+ {
+ RemoveEntryList(Entry);
+ Irp = CONTAINING_RECORD(Entry, IRP,
Tail.Overlay.DriverContext);
+ Complete = (NULL == IoSetCancelRoutine(Irp, NULL));
+ break;
+ }
+ Entry = Entry->Flink;
+ }
- if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
- {
+ if (Irp)
+ {
+ if (Complete)
+ {
+ Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ }
+ }
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
- Fcb->OtherSide = NULL;
-
- /* FIXME: remove data queue(s) */
-
- DPRINT("Pipe disconnected\n");
- return STATUS_SUCCESS;
- }
-
- return STATUS_UNSUCCESSFUL;
+ Status = STATUS_SUCCESS;
+ }
+ else if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
+ {
+ Status = STATUS_PIPE_CLOSING;
+ }
+ else
+ {
+ Status = STATUS_UNSUCCESSFUL;
+ }
+ KeUnlockMutex(&Pipe->FcbListLock);
+ return Status;
}
_____
Modified: trunk/reactos/drivers/fs/np/npfs.c
--- trunk/reactos/drivers/fs/np/npfs.c 2005-03-28 18:37:39 UTC (rev
14371)
+++ trunk/reactos/drivers/fs/np/npfs.c 2005-03-28 18:42:53 UTC (rev
14372)
@@ -27,6 +27,9 @@
NTSTATUS Status;
DPRINT("Named Pipe FSD 0.0.2\n");
+
+ ASSERT (sizeof(NPFS_CONTEXT) <= sizeof
(((PIRP)NULL)->Tail.Overlay.DriverContext));
+ ASSERT (sizeof(NPFS_WAITER_ENTRY) <=
sizeof(((PIRP)NULL)->Tail.Overlay.DriverContext));
DriverObject->MajorFunction[IRP_MJ_CREATE] = NpfsCreate;
DriverObject->MajorFunction[IRP_MJ_CREATE_NAMED_PIPE] =
@@ -74,8 +77,7 @@
DeviceExtension = DeviceObject->DeviceExtension;
InitializeListHead(&DeviceExtension->PipeListHead);
InitializeListHead(&DeviceExtension->ThreadListHead);
- KeInitializeMutex(&DeviceExtension->PipeListLock,
- 0);
+ KeInitializeMutex(&DeviceExtension->PipeListLock, 0);
DeviceExtension->EmptyWaiterCount = 0;
/* set the size quotas */
_____
Modified: trunk/reactos/drivers/fs/np/npfs.h
--- trunk/reactos/drivers/fs/np/npfs.h 2005-03-28 18:37:39 UTC (rev
14371)
+++ trunk/reactos/drivers/fs/np/npfs.h 2005-03-28 18:42:53 UTC (rev
14372)
@@ -42,12 +42,15 @@
struct ETHREAD *Thread;
PNPFS_PIPE Pipe;
KEVENT ConnectEvent;
- KEVENT Event;
+ KEVENT ReadEvent;
+ KEVENT WriteEvent;
ULONG PipeEnd;
ULONG PipeState;
ULONG ReadDataAvailable;
ULONG WriteQuotaAvailable;
+ LIST_ENTRY ReadRequestListHead;
+
PVOID Data;
PVOID ReadPtr;
PVOID WritePtr;
@@ -58,11 +61,8 @@
typedef struct _NPFS_CONTEXT
{
- PDEVICE_OBJECT DeviceObject;
- PIRP Irp;
- PNPFS_FCB Fcb;
- UCHAR MajorFunction;
- BOOLEAN AllocatedFromPool;
+ LIST_ENTRY ListEntry;
+ PKEVENT WaitEvent;
} NPFS_CONTEXT, *PNPFS_CONTEXT;
typedef struct _NPFS_THREAD_CONTEXT
@@ -73,14 +73,12 @@
LIST_ENTRY ListEntry;
PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
- PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
+ PIRP WaitIrpArray[MAXIMUM_WAIT_OBJECTS];
} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
typedef struct _NPFS_WAITER_ENTRY
{
LIST_ENTRY Entry;
- PIRP Irp;
- PNPFS_PIPE Pipe;
PNPFS_FCB Fcb;
} NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
_____
Modified: trunk/reactos/drivers/fs/np/rw.c
--- trunk/reactos/drivers/fs/np/rw.c 2005-03-28 18:37:39 UTC (rev
14371)
+++ trunk/reactos/drivers/fs/np/rw.c 2005-03-28 18:42:53 UTC (rev
14372)
@@ -46,61 +46,90 @@
}
#endif
-static NTSTATUS
-NpfsReadFromPipe(PNPFS_CONTEXT Context);
-
static VOID STDCALL
-NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
- IN PIRP Irp)
+NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
PNPFS_CONTEXT Context;
PNPFS_DEVICE_EXTENSION DeviceExt;
+ PIO_STACK_LOCATION IoStack;
+ PNPFS_FCB Fcb;
+ BOOLEAN Complete = FALSE;
- DPRINT1("NpfsWaitingCancelRoutine() called\n");
+ DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n",
DeviceObject, Irp);
IoReleaseCancelSpinLock(Irp->CancelIrql);
- Context = Irp->Tail.Overlay.DriverContext[0];
- DeviceExt = Context->DeviceObject->DeviceExtension;
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
+ IoStack = IoGetCurrentIrpStackLocation(Irp);
+ Fcb = IoStack->FileObject->FsContext;
KeLockMutex(&DeviceExt->PipeListLock);
- KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ switch(IoStack->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ /* we are not the first in the list, remove an complete us
*/
+ RemoveEntryList(&Context->ListEntry);
+ Complete = TRUE;
+ }
+ else
+ {
+ KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+ }
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
KeUnlockMutex(&DeviceExt->PipeListLock);
+ if (Complete)
+ {
+ Irp->IoStatus.Status = STATUS_CANCELLED;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
+ }
}
static VOID STDCALL
-NpfsWaiterThread(PVOID Context)
+NpfsWaiterThread(PVOID InitContext)
{
- PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
- ULONG CurrentCount, Count = 0;
- PNPFS_CONTEXT WaitContext = NULL;
+ PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT)
InitContext;
+ ULONG CurrentCount;
+ ULONG Count = 0;
+ PIRP Irp = NULL;
+ PIRP NextIrp;
NTSTATUS Status;
BOOLEAN Terminate = FALSE;
BOOLEAN Cancel = FALSE;
- KIRQL oldIrql;
+ PIO_STACK_LOCATION IoStack = NULL;
+ PNPFS_CONTEXT Context;
+ PNPFS_CONTEXT NextContext;
+ PNPFS_FCB Fcb;
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
while (1)
{
CurrentCount = ThreadContext->Count;
- KeResetEvent(&ThreadContext->Event);
KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
- if (WaitContext)
+ if (Irp)
{
if (Cancel)
{
- WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
- WaitContext->Irp->IoStatus.Information = 0;
- IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
- ExFreePool(WaitContext);
+ Irp->IoStatus.Status = STATUS_CANCELLED;
+ Irp->IoStatus.Information = 0;
+ IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
else
{
- switch (WaitContext->MajorFunction)
+ switch (IoStack->MajorFunction)
{
case IRP_MJ_READ:
- NpfsReadFromPipe(WaitContext);
+ NpfsRead(IoStack->DeviceObject, Irp);
break;
default:
KEBUGCHECK(0);
@@ -119,30 +148,56 @@
FALSE,
NULL,
ThreadContext->WaitBlockArray);
- KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
if (!NT_SUCCESS(Status))
{
KEBUGCHECK(0);
}
+ KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
Count = Status - STATUS_SUCCESS;
- ASSERT (Count <= CurrentCount);
+ ASSERT (Count < CurrentCount);
if (Count > 0)
- {
- WaitContext = ThreadContext->WaitContextArray[Count];
- ThreadContext->Count--;
- ThreadContext->DeviceExt->EmptyWaiterCount++;
- ThreadContext->WaitObjectArray[Count] =
ThreadContext->WaitObjectArray[ThreadContext->Count];
- ThreadContext->WaitContextArray[Count] =
ThreadContext->WaitContextArray[ThreadContext->Count];
- IoAcquireCancelSpinLock(&oldIrql);
- Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
- IoReleaseCancelSpinLock(oldIrql);
- }
- else
- {
+ {
+ Irp = ThreadContext->WaitIrpArray[Count];
+ ThreadContext->Count--;
+ ThreadContext->DeviceExt->EmptyWaiterCount++;
+ ThreadContext->WaitObjectArray[Count] =
ThreadContext->WaitObjectArray[ThreadContext->Count];
+ ThreadContext->WaitIrpArray[Count] =
ThreadContext->WaitIrpArray[ThreadContext->Count];
+
+ Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ IoStack = IoGetCurrentIrpStackLocation(Irp);
+
+ if (Cancel)
+ {
+ Fcb = IoStack->FileObject->FsContext;
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ RemoveEntryList(&Context->ListEntry);
+ switch (IoStack->MajorFunction)
+ {
+ case IRP_MJ_READ:
+ if (!IsListEmpty(&Fcb->ReadRequestListHead))
+ {
+ /* put the next request on the wait list */
+ NextContext =
CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT,
ListEntry);
+
ThreadContext->WaitObjectArray[ThreadContext->Count] =
NextContext->WaitEvent;
+ NextIrp = CONTAINING_RECORD(NextContext, IRP,
Tail.Overlay.DriverContext);
+ ThreadContext->WaitIrpArray[ThreadContext->Count]
= NextIrp;
+ ThreadContext->Count++;
+ ThreadContext->DeviceExt->EmptyWaiterCount--;
+ }
+ break;
+ default:
+ KEBUGCHECK(0);
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ }
+ }
+ else
+ {
/* someone has add a new wait request */
- WaitContext = NULL;
- }
- if (ThreadContext->Count == 1 &&
ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+ Irp = NULL;
+ }
+ if (ThreadContext->Count == 1 &&
ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
{
/* it exist an other thread with empty wait slots, we can
remove our thread from the list */
RemoveEntryList(&ThreadContext->ListEntry);
@@ -155,14 +210,20 @@
}
static NTSTATUS
-NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT
Context, PNPFS_FCB Fcb)
+NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
PLIST_ENTRY ListEntry;
- PNPFS_THREAD_CONTEXT ThreadContext;
+ PNPFS_THREAD_CONTEXT ThreadContext = NULL;
NTSTATUS Status;
HANDLE hThread;
KIRQL oldIrql;
+ PNPFS_CONTEXT Context =
(PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+ PNPFS_DEVICE_EXTENSION DeviceExt =
(PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;;
+
+ DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n",
DeviceObject, Irp);
+
KeLockMutex(&DeviceExt->PipeListLock);
ListEntry = DeviceExt->ThreadListHead.Flink;
@@ -184,12 +245,12 @@
return STATUS_NO_MEMORY;
}
ThreadContext->DeviceExt = DeviceExt;
- KeInitializeEvent(&ThreadContext->Event, NotificationEvent,
FALSE);
+ KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent,
FALSE);
ThreadContext->Count = 1;
ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
- DPRINT("Creating a new system thread for waiting read
requests\n");
+ DPRINT("Creating a new system thread for waiting read/write
requests\n");
Status = PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
@@ -207,21 +268,20 @@
InsertHeadList(&DeviceExt->ThreadListHead,
&ThreadContext->ListEntry);
DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
}
- IoMarkIrpPending(Context->Irp);
- Context->Irp->Tail.Overlay.DriverContext[0] = Context;
+ IoMarkIrpPending(Irp);
IoAcquireCancelSpinLock(&oldIrql);
- if (Context->Irp->Cancel)
+ if (Irp->Cancel)
{
IoReleaseCancelSpinLock(oldIrql);
Status = STATUS_CANCELLED;
}
else
{
- IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
+ IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
IoReleaseCancelSpinLock(oldIrql);
- ThreadContext->WaitObjectArray[ThreadContext->Count] =
&Fcb->Event;
- ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
+ ThreadContext->WaitObjectArray[ThreadContext->Count] =
Context->WaitEvent;
+ ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
ThreadContext->Count++;
DeviceExt->EmptyWaiterCount--;
KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
@@ -231,130 +291,182 @@
return Status;
}
-static NTSTATUS
-NpfsReadFromPipe(PNPFS_CONTEXT Context)
+NTSTATUS STDCALL
+NpfsRead(IN PDEVICE_OBJECT DeviceObject,
+ IN PIRP Irp)
{
- PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
NTSTATUS Status;
- ULONG Information;
+ NTSTATUS OriginalStatus = STATUS_SUCCESS;
PNPFS_FCB Fcb;
- PNPFS_FCB WriterFcb;
- PNPFS_PIPE Pipe;
+ PNPFS_CONTEXT Context;
+ KEVENT Event;
ULONG Length;
- PVOID Buffer;
+ ULONG Information;
ULONG CopyLength;
ULONG TempLength;
+ BOOLEAN IsOriginalRequest = TRUE;
+ PVOID Buffer;
- DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
+ DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
- IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
- FileObject = IoStack->FileObject;
+ if (Irp->MdlAddress == NULL)
+ {
+ DPRINT("Irp->MdlAddress == NULL\n");
+ Status = STATUS_UNSUCCESSFUL;
+ Irp->IoStatus.Information = 0;
+ goto done;
+ }
+
+ FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
Fcb = FileObject->FsContext;
- Pipe = Fcb->Pipe;
- WriterFcb = Fcb->OtherSide;
+ Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
if (Fcb->Data == NULL)
- {
- DPRINT("Pipe is NOT readable!\n");
- Status = STATUS_UNSUCCESSFUL;
- Information = 0;
- goto done;
- }
+ {
+ DPRINT1("Pipe is NOT readable!\n");
+ Status = STATUS_UNSUCCESSFUL;
+ Irp->IoStatus.Information = 0;
+ goto done;
+ }
- Status = STATUS_SUCCESS;
- Length = IoStack->Parameters.Read.Length;
- Information = 0;
+ ExAcquireFastMutex(&Fcb->DataListLock);
- Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
- ExAcquireFastMutex(&Fcb->DataListLock);
- while (1)
- {
- if (Fcb->ReadDataAvailable == 0)
+ if (IoIsOperationSynchronous(Irp))
+ {
+ InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
+ Context->WaitEvent = &Event;
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ Status = KeWaitForSingleObject(&Event,
+ Executive,
+ KernelMode,
+ FALSE,
+ NULL);
+ if (!NT_SUCCESS(Status))
{
- if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
- {
- KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
- }
- ExReleaseFastMutex(&Fcb->DataListLock);
- if (Information > 0)
- {
- Status = STATUS_SUCCESS;
+ KEBUGCHECK(0);
+ }
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ Irp->IoStatus.Information = 0;
+ }
+ else
+ {
+ KIRQL oldIrql;
+ if (IsListEmpty(&Fcb->ReadRequestListHead) ||
+ Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ /* this is a new request */
+ Irp->IoStatus.Information = 0;
+ Context->WaitEvent = &Fcb->ReadEvent;
+ InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+ if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+ {
+ /* there was already a request on the list */
+ IoAcquireCancelSpinLock(&oldIrql);
+ if (Irp->Cancel)
+ {
+ IoReleaseCancelSpinLock(oldIrql);
+ RemoveEntryList(&Context->ListEntry);
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ Status = STATUS_CANCELLED;
goto done;
- }
+ }
+ IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
+ IoReleaseCancelSpinLock(oldIrql);
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ IoMarkIrpPending(Irp);
+ Status = STATUS_PENDING;
+ goto done;
+ }
+ }
+ }
- if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
- {
+ while (1)
+ {
+ Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
+ Information = Irp->IoStatus.Information;
+ Length =
IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
+ ASSERT (Information <= Length);
+ Buffer += Information;
+ Length -= Information;
+ Status = STATUS_SUCCESS;
+
+ while (1)
+ {
+ if (Fcb->ReadDataAvailable == 0)
+ {
+ if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+ {
+ KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT,
FALSE);
+ }
+ if (Information > 0 &&
+ (Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
+ Fcb->PipeState != FILE_PIPE_CONNECTED_STATE))
+ {
+ break;
+ }
+ if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+ {
DPRINT("PipeState: %x\n", Fcb->PipeState);
Status = STATUS_PIPE_BROKEN;
- goto done;
- }
+ break;
+ }
+ ExReleaseFastMutex(&Fcb->DataListLock);
+ if (IoIsOperationSynchronous(Irp))
+ {
+ /* Wait for ReadEvent to become signaled */
- if (IoIsOperationSynchronous(Context->Irp))
- {
- /* Wait for ReadEvent to become signaled */
- DPRINT("Waiting for readable data (%S)\n",
Pipe->PipeName.Buffer);
- Status = KeWaitForSingleObject(&Fcb->Event,
+ DPRINT("Waiting for readable data (%wZ)\n",
&Fcb->Pipe->PipeName);
+ Status = KeWaitForSingleObject(&Fcb->ReadEvent,
UserRequest,
KernelMode,
FALSE,
NULL);
- DPRINT("Finished waiting (%S)! Status: %x\n",
Pipe->PipeName.Buffer, Status);
- }
- else
- {
- PNPFS_CONTEXT NewContext;
-
- NewContext = ExAllocatePool(NonPagedPool,
sizeof(NPFS_CONTEXT));
- if (NewContext == NULL)
- {
- Status = STATUS_NO_MEMORY;
- goto done;
- }
- memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
- NewContext->AllocatedFromPool = TRUE;
- NewContext->Fcb = Fcb;
- NewContext->MajorFunction = IRP_MJ_READ;
-
- Status =
NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext,
Fcb);
+ DPRINT("Finished waiting (%wZ)! Status: %x\n",
&Fcb->Pipe->PipeName, Status);
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ }
+ else
+ {
+ PNPFS_CONTEXT Context =
(PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+
+ Context->WaitEvent = &Fcb->ReadEvent;
+ Status = NpfsAddWaitingReadWriteRequest(DeviceObject,
Irp);
if (NT_SUCCESS(Status))
- {
- Status = STATUS_PENDING;
- }
- else
- {
- ExFreePool(NewContext);
- }
- goto done;
- }
-
- ExAcquireFastMutex(&Fcb->DataListLock);
- }
-
- if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
- {
- DPRINT("Byte stream mode\n");
- /* Byte stream mode */
- while (Length > 0 && Fcb->ReadDataAvailable > 0)
- {
+ {
+ Status = STATUS_PENDING;
+ }
+ ExAcquireFastMutex(&Fcb->DataListLock);
+ break;
+ }
+ }
+ if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+ {
+ DPRINT("Byte stream mode\n");
+ /* Byte stream mode */
+ while (Length > 0 && Fcb->ReadDataAvailable > 0)
+ {
CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
if (Fcb->ReadPtr + CopyLength <= Fcb->Data +
Fcb->MaxDataLength)
- {
- memcpy(Buffer, Fcb->ReadPtr, CopyLength);
- Fcb->ReadPtr += CopyLength;
- if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
- {
- Fcb->ReadPtr = Fcb->Data;
- }
- }
- else
- {
- TempLength = Fcb->Data + Fcb->MaxDataLength -
Fcb->ReadPtr;
- memcpy(Buffer, Fcb->ReadPtr, TempLength);
- memcpy(Buffer + TempLength, Fcb->Data, CopyLength -
TempLength);
- Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
- }
+ {
+ memcpy(Buffer, Fcb->ReadPtr, CopyLength);
+ Fcb->ReadPtr += CopyLength;
+ if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
+ {
+ Fcb->ReadPtr = Fcb->Data;
+ }
+ }
+ else
+ {
+ TempLength = Fcb->Data + Fcb->MaxDataLength -
Fcb->ReadPtr;
+ memcpy(Buffer, Fcb->ReadPtr, TempLength);
+ memcpy(Buffer + TempLength, Fcb->Data, CopyLength -
TempLength);
+ Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
+ }
Buffer += CopyLength;
Length -= CopyLength;
[truncated at 1000 lines; 243 more skipped]