- Put the wait entry into the DriverContext of the irp instead to allocated it from pool. - Lock the data list on both ends of the pipe, if we disconnect the pipe. - Implemented a read and a write event on each end of the pipe. - Implemented a list for read requests to deliver the requests in the correct sequence. - Do not end a read request if the pipe was connected and if the buffer wasn't filled completely. Modified: trunk/reactos/drivers/fs/np/create.c Modified: trunk/reactos/drivers/fs/np/fsctrl.c Modified: trunk/reactos/drivers/fs/np/npfs.c Modified: trunk/reactos/drivers/fs/np/npfs.h Modified: trunk/reactos/drivers/fs/np/rw.c _____
Modified: trunk/reactos/drivers/fs/np/create.c --- trunk/reactos/drivers/fs/np/create.c 2005-03-28 18:37:39 UTC (rev 14371) +++ trunk/reactos/drivers/fs/np/create.c 2005-03-28 18:42:53 UTC (rev 14372) @@ -50,20 +50,21 @@
PLIST_ENTRY CurrentEntry; PNPFS_WAITER_ENTRY Waiter; KIRQL oldIrql; + PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink; while (CurrentEntry != &Pipe->WaiterListHead) { Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry); - if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE && - !Waiter->Irp->Cancel) + Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext); + if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE) { DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
IoAcquireCancelSpinLock(&oldIrql); - if (!Waiter->Irp->Cancel) + if (!Irp->Cancel) { - IoSetCancelRoutine(Waiter->Irp, NULL); + IoSetCancelRoutine(Irp, NULL); IoReleaseCancelSpinLock(oldIrql); return Waiter->Fcb; } @@ -83,6 +84,7 @@ { PLIST_ENTRY CurrentEntry; PNPFS_WAITER_ENTRY Waiter; + PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink; while (CurrentEntry != &Pipe->WaiterListHead) @@ -92,13 +94,12 @@ { DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
- Waiter->Irp->IoStatus.Status = STATUS_PIPE_CONNECTED; - Waiter->Irp->IoStatus.Information = 0; - IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT); - RemoveEntryList(&Waiter->Entry); - ExFreePool(Waiter); - return; + Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext); + Irp->IoStatus.Status = STATUS_PIPE_CONNECTED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + break; } CurrentEntry = CurrentEntry->Flink; } @@ -175,7 +176,10 @@ ClientFcb->PipeEnd = FILE_PIPE_CLIENT_END; ClientFcb->OtherSide = NULL; ClientFcb->PipeState = SpecialAccess ? 0 : FILE_PIPE_DISCONNECTED_STATE; + InitializeListHead(&ClientFcb->ReadRequestListHead);
+ DPRINT("Fcb: %x\n", ClientFcb); + /* Initialize data list. */ if (Pipe->OutboundQuota) { @@ -202,8 +206,10 @@ ClientFcb->MaxDataLength = Pipe->OutboundQuota; ExInitializeFastMutex(&ClientFcb->DataListLock); KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE); - KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE); + KeInitializeEvent(&ClientFcb->ReadEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&ClientFcb->WriteEvent, SynchronizationEvent, FALSE);
+ /* * Step 3. Search for listening server FCB. */ @@ -489,6 +495,7 @@ Fcb->ReadDataAvailable = 0; Fcb->WriteQuotaAvailable = Pipe->InboundQuota; Fcb->MaxDataLength = Pipe->InboundQuota; + InitializeListHead(&Fcb->ReadRequestListHead); ExInitializeFastMutex(&Fcb->DataListLock);
Pipe->CurrentInstances++; @@ -498,13 +505,11 @@ Fcb->PipeState = FILE_PIPE_LISTENING_STATE; Fcb->OtherSide = NULL;
- KeInitializeEvent(&Fcb->ConnectEvent, - SynchronizationEvent, - FALSE); + DPRINT("Fcb: %x\n", Fcb);
- KeInitializeEvent(&Fcb->Event, - SynchronizationEvent, - FALSE); + KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&Fcb->ReadEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&Fcb->WriteEvent, SynchronizationEvent, FALSE);
KeLockMutex(&Pipe->FcbListLock); InsertTailList(&Pipe->ServerFcbListHead, &Fcb->FcbListEntry); @@ -528,7 +533,7 @@ PNPFS_DEVICE_EXTENSION DeviceExt; PIO_STACK_LOCATION IoStack; PFILE_OBJECT FileObject; - PNPFS_FCB Fcb; + PNPFS_FCB Fcb, OtherSide; PNPFS_PIPE Pipe; BOOL Server;
@@ -566,19 +571,38 @@ { DPRINT("Client\n"); } - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) { - if (Fcb->OtherSide) + OtherSide = Fcb->OtherSide; + /* Lock the server first */ + if (Server) { - Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; - Fcb->OtherSide->OtherSide = NULL; - /* - * Signaling the write event. If is possible that an other - * thread waits for an empty buffer. - */ - KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + ExAcquireFastMutex(&Fcb->DataListLock); + ExAcquireFastMutex(&OtherSide->DataListLock); } + else + { + ExAcquireFastMutex(&OtherSide->DataListLock); + ExAcquireFastMutex(&Fcb->DataListLock); + } + OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + OtherSide->OtherSide = NULL; + /* + * Signaling the write event. If is possible that an other + * thread waits for an empty buffer. + */ + KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); + KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + if (Server) + { + ExReleaseFastMutex(&Fcb->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + else + { + ExReleaseFastMutex(&OtherSide->DataListLock); + ExReleaseFastMutex(&Fcb->DataListLock); + } } else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE) { @@ -586,6 +610,7 @@ PNPFS_WAITER_ENTRY WaitEntry = NULL; BOOLEAN Complete = FALSE; KIRQL oldIrql; + PIRP tmpIrp;
Entry = Fcb->Pipe->WaiterListHead.Flink; while (Entry != &Fcb->Pipe->WaiterListHead) @@ -594,28 +619,25 @@ if (WaitEntry->Fcb == Fcb) { RemoveEntryList(Entry); + tmpIrp = CONTAINING_RECORD(WaitEntry, IRP, Tail.Overlay.DriverContext); IoAcquireCancelSpinLock(&oldIrql); - if (!Irp->Cancel) + if (!tmpIrp->Cancel) { - IoSetCancelRoutine(WaitEntry->Irp, NULL); + IoSetCancelRoutine(tmpIrp, NULL); Complete = TRUE; } IoReleaseCancelSpinLock(oldIrql); + if (Complete) + { + tmpIrp->IoStatus.Status = STATUS_PIPE_BROKEN; + tmpIrp->IoStatus.Information = 0; + IoCompleteRequest(tmpIrp, IO_NO_INCREMENT); + } break; } Entry = Entry->Flink; }
- if (Entry != &Fcb->Pipe->WaiterListHead) - { - if (Complete) - { - WaitEntry->Irp->IoStatus.Status = STATUS_PIPE_BROKEN; - WaitEntry->Irp->IoStatus.Information = 0; - IoCompleteRequest(WaitEntry->Irp, IO_NO_INCREMENT); - } - ExFreePool(WaitEntry); - } } Fcb->PipeState = FILE_PIPE_CLOSING_STATE;
_____
Modified: trunk/reactos/drivers/fs/np/fsctrl.c --- trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-28 18:37:39 UTC (rev 14371) +++ trunk/reactos/drivers/fs/np/fsctrl.c 2005-03-28 18:42:53 UTC (rev 14372) @@ -26,18 +26,18 @@
DPRINT1("NpfsListeningCancelRoutine() called\n");
+ Waiter = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext; + IoReleaseCancelSpinLock(Irp->CancelIrql);
- Waiter = Irp->Tail.Overlay.DriverContext[0];
- KeLockMutex(&Waiter->Pipe->FcbListLock); + KeLockMutex(&Waiter->Fcb->Pipe->FcbListLock); RemoveEntryList(&Waiter->Entry); - KeUnlockMutex(&Waiter->Pipe->FcbListLock); + KeUnlockMutex(&Waiter->Fcb->Pipe->FcbListLock);
Irp->IoStatus.Status = STATUS_CANCELLED; Irp->IoStatus.Information = 0; IoCompleteRequest(Irp, IO_NO_INCREMENT); - ExFreePool(Waiter); }
@@ -48,18 +48,13 @@ PNPFS_WAITER_ENTRY Entry; KIRQL oldIrql;
- Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY)); - if (Entry == NULL) - return STATUS_INSUFFICIENT_RESOURCES; + Entry = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
- Entry->Irp = Irp; Entry->Fcb = Fcb; - Entry->Pipe = Fcb->Pipe;
KeLockMutex(&Fcb->Pipe->FcbListLock);
IoMarkIrpPending(Irp); - Irp->Tail.Overlay.DriverContext[0] = Entry; InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
IoAcquireCancelSpinLock(&oldIrql); @@ -78,7 +73,6 @@ Irp->IoStatus.Information = 0; IoCompleteRequest(Irp, IO_NO_INCREMENT); KeUnlockMutex(&Fcb->Pipe->FcbListLock); - ExFreePool(Entry);
return STATUS_CANCELLED; } @@ -173,38 +167,100 @@ static NTSTATUS NpfsDisconnectPipe(PNPFS_FCB Fcb) { - DPRINT("NpfsDisconnectPipe()\n"); + NTSTATUS Status; + PNPFS_FCB OtherSide; + PNPFS_PIPE Pipe; + BOOL Server;
- if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE) - return STATUS_SUCCESS; + DPRINT("NpfsDisconnectPipe()\n");
- if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - /* FIXME: Shouldn't this be FILE_PIPE_CLOSING_STATE? */ - Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + Pipe = Fcb->Pipe; + KeLockMutex(&Pipe->FcbListLock);
- /* FIXME: remove data queue(s) */ - - Fcb->OtherSide->OtherSide = NULL; + if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE) + { + DPRINT("Pipe is already disconnected\n"); + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) + { + Server = (Fcb->PipeEnd == FILE_PIPE_SERVER_END); + OtherSide = Fcb->OtherSide; Fcb->OtherSide = NULL; + /* Lock the server first */ + if (Server) + { + ExAcquireFastMutex(&Fcb->DataListLock); + ExAcquireFastMutex(&OtherSide->DataListLock); + } + else + { + ExAcquireFastMutex(&OtherSide->DataListLock); + ExAcquireFastMutex(&Fcb->DataListLock); + } + OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + OtherSide->OtherSide = NULL; + /* + * Signaling the write event. If is possible that an other + * thread waits for an empty buffer. + */ + KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); + KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + if (Server) + { + ExReleaseFastMutex(&Fcb->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + else + { + ExReleaseFastMutex(&OtherSide->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE) + { + PLIST_ENTRY Entry; + PNPFS_WAITER_ENTRY WaitEntry = NULL; + BOOLEAN Complete = FALSE; + PIRP Irp = NULL;
- DPRINT("Pipe disconnected\n"); - return STATUS_SUCCESS; - } + Entry = Fcb->Pipe->WaiterListHead.Flink; + while (Entry != &Fcb->Pipe->WaiterListHead) + { + WaitEntry = CONTAINING_RECORD(Entry, NPFS_WAITER_ENTRY, Entry); + if (WaitEntry->Fcb == Fcb) + { + RemoveEntryList(Entry); + Irp = CONTAINING_RECORD(Entry, IRP, Tail.Overlay.DriverContext); + Complete = (NULL == IoSetCancelRoutine(Irp, NULL)); + break; + } + Entry = Entry->Flink; + }
- if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE) - { + if (Irp) + { + if (Complete) + { + Irp->IoStatus.Status = STATUS_PIPE_BROKEN; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + } + } Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - Fcb->OtherSide = NULL; - - /* FIXME: remove data queue(s) */ - - DPRINT("Pipe disconnected\n"); - return STATUS_SUCCESS; - } - - return STATUS_UNSUCCESSFUL; + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE) + { + Status = STATUS_PIPE_CLOSING; + } + else + { + Status = STATUS_UNSUCCESSFUL; + } + KeUnlockMutex(&Pipe->FcbListLock); + return Status; }
_____
Modified: trunk/reactos/drivers/fs/np/npfs.c --- trunk/reactos/drivers/fs/np/npfs.c 2005-03-28 18:37:39 UTC (rev 14371) +++ trunk/reactos/drivers/fs/np/npfs.c 2005-03-28 18:42:53 UTC (rev 14372) @@ -27,6 +27,9 @@
NTSTATUS Status;
DPRINT("Named Pipe FSD 0.0.2\n"); + + ASSERT (sizeof(NPFS_CONTEXT) <= sizeof (((PIRP)NULL)->Tail.Overlay.DriverContext)); + ASSERT (sizeof(NPFS_WAITER_ENTRY) <= sizeof(((PIRP)NULL)->Tail.Overlay.DriverContext));
DriverObject->MajorFunction[IRP_MJ_CREATE] = NpfsCreate; DriverObject->MajorFunction[IRP_MJ_CREATE_NAMED_PIPE] = @@ -74,8 +77,7 @@ DeviceExtension = DeviceObject->DeviceExtension; InitializeListHead(&DeviceExtension->PipeListHead); InitializeListHead(&DeviceExtension->ThreadListHead); - KeInitializeMutex(&DeviceExtension->PipeListLock, - 0); + KeInitializeMutex(&DeviceExtension->PipeListLock, 0); DeviceExtension->EmptyWaiterCount = 0;
/* set the size quotas */ _____
Modified: trunk/reactos/drivers/fs/np/npfs.h --- trunk/reactos/drivers/fs/np/npfs.h 2005-03-28 18:37:39 UTC (rev 14371) +++ trunk/reactos/drivers/fs/np/npfs.h 2005-03-28 18:42:53 UTC (rev 14372) @@ -42,12 +42,15 @@
struct ETHREAD *Thread; PNPFS_PIPE Pipe; KEVENT ConnectEvent; - KEVENT Event; + KEVENT ReadEvent; + KEVENT WriteEvent; ULONG PipeEnd; ULONG PipeState; ULONG ReadDataAvailable; ULONG WriteQuotaAvailable;
+ LIST_ENTRY ReadRequestListHead; + PVOID Data; PVOID ReadPtr; PVOID WritePtr; @@ -58,11 +61,8 @@
typedef struct _NPFS_CONTEXT { - PDEVICE_OBJECT DeviceObject; - PIRP Irp; - PNPFS_FCB Fcb; - UCHAR MajorFunction; - BOOLEAN AllocatedFromPool; + LIST_ENTRY ListEntry; + PKEVENT WaitEvent; } NPFS_CONTEXT, *PNPFS_CONTEXT;
typedef struct _NPFS_THREAD_CONTEXT @@ -73,14 +73,12 @@ LIST_ENTRY ListEntry; PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS]; KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS]; - PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS]; + PIRP WaitIrpArray[MAXIMUM_WAIT_OBJECTS]; } NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
typedef struct _NPFS_WAITER_ENTRY { LIST_ENTRY Entry; - PIRP Irp; - PNPFS_PIPE Pipe; PNPFS_FCB Fcb; } NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
_____
Modified: trunk/reactos/drivers/fs/np/rw.c --- trunk/reactos/drivers/fs/np/rw.c 2005-03-28 18:37:39 UTC (rev 14371) +++ trunk/reactos/drivers/fs/np/rw.c 2005-03-28 18:42:53 UTC (rev 14372) @@ -46,61 +46,90 @@
} #endif
-static NTSTATUS -NpfsReadFromPipe(PNPFS_CONTEXT Context); - static VOID STDCALL -NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject, - IN PIRP Irp) +NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { PNPFS_CONTEXT Context; PNPFS_DEVICE_EXTENSION DeviceExt; + PIO_STACK_LOCATION IoStack; + PNPFS_FCB Fcb; + BOOLEAN Complete = FALSE;
- DPRINT1("NpfsWaitingCancelRoutine() called\n"); + DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n", DeviceObject, Irp);
IoReleaseCancelSpinLock(Irp->CancelIrql);
- Context = Irp->Tail.Overlay.DriverContext[0]; - DeviceExt = Context->DeviceObject->DeviceExtension; + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension; + IoStack = IoGetCurrentIrpStackLocation(Irp); + Fcb = IoStack->FileObject->FsContext;
KeLockMutex(&DeviceExt->PipeListLock); - KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE); + ExAcquireFastMutex(&Fcb->DataListLock); + switch(IoStack->MajorFunction) + { + case IRP_MJ_READ: + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* we are not the first in the list, remove an complete us */ + RemoveEntryList(&Context->ListEntry); + Complete = TRUE; + } + else + { + KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE); + } + break; + default: + KEBUGCHECK(0); + } + ExReleaseFastMutex(&Fcb->DataListLock); KeUnlockMutex(&DeviceExt->PipeListLock); + if (Complete) + { + Irp->IoStatus.Status = STATUS_CANCELLED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + } }
static VOID STDCALL -NpfsWaiterThread(PVOID Context) +NpfsWaiterThread(PVOID InitContext) { - PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context; - ULONG CurrentCount, Count = 0; - PNPFS_CONTEXT WaitContext = NULL; + PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext; + ULONG CurrentCount; + ULONG Count = 0; + PIRP Irp = NULL; + PIRP NextIrp; NTSTATUS Status; BOOLEAN Terminate = FALSE; BOOLEAN Cancel = FALSE; - KIRQL oldIrql; + PIO_STACK_LOCATION IoStack = NULL; + PNPFS_CONTEXT Context; + PNPFS_CONTEXT NextContext; + PNPFS_FCB Fcb;
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
while (1) { CurrentCount = ThreadContext->Count; - KeResetEvent(&ThreadContext->Event); KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock); - if (WaitContext) + if (Irp) { if (Cancel) { - WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED; - WaitContext->Irp->IoStatus.Information = 0; - IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT); - ExFreePool(WaitContext); + Irp->IoStatus.Status = STATUS_CANCELLED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); } else { - switch (WaitContext->MajorFunction) + switch (IoStack->MajorFunction) { case IRP_MJ_READ: - NpfsReadFromPipe(WaitContext); + NpfsRead(IoStack->DeviceObject, Irp); break; default: KEBUGCHECK(0); @@ -119,30 +148,56 @@ FALSE, NULL, ThreadContext->WaitBlockArray); - KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); if (!NT_SUCCESS(Status)) { KEBUGCHECK(0); } + KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); Count = Status - STATUS_SUCCESS; - ASSERT (Count <= CurrentCount); + ASSERT (Count < CurrentCount); if (Count > 0) - { - WaitContext = ThreadContext->WaitContextArray[Count]; - ThreadContext->Count--; - ThreadContext->DeviceExt->EmptyWaiterCount++; - ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count]; - ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count]; - IoAcquireCancelSpinLock(&oldIrql); - Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL); - IoReleaseCancelSpinLock(oldIrql); - } - else - { + { + Irp = ThreadContext->WaitIrpArray[Count]; + ThreadContext->Count--; + ThreadContext->DeviceExt->EmptyWaiterCount++; + ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count]; + ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count]; + + Cancel = (NULL == IoSetCancelRoutine(Irp, NULL)); + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + IoStack = IoGetCurrentIrpStackLocation(Irp); + + if (Cancel) + { + Fcb = IoStack->FileObject->FsContext; + ExAcquireFastMutex(&Fcb->DataListLock); + RemoveEntryList(&Context->ListEntry); + switch (IoStack->MajorFunction) + { + case IRP_MJ_READ: + if (!IsListEmpty(&Fcb->ReadRequestListHead)) + { + /* put the next request on the wait list */ + NextContext = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry); + ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent; + NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext); + ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp; + ThreadContext->Count++; + ThreadContext->DeviceExt->EmptyWaiterCount--; + } + break; + default: + KEBUGCHECK(0); + } + ExReleaseFastMutex(&Fcb->DataListLock); + } + } + else + { /* someone has add a new wait request */ - WaitContext = NULL; - } - if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS) + Irp = NULL; + } + if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS) { /* it exist an other thread with empty wait slots, we can remove our thread from the list */ RemoveEntryList(&ThreadContext->ListEntry); @@ -155,14 +210,20 @@ }
static NTSTATUS -NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb) +NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { PLIST_ENTRY ListEntry; - PNPFS_THREAD_CONTEXT ThreadContext; + PNPFS_THREAD_CONTEXT ThreadContext = NULL; NTSTATUS Status; HANDLE hThread; KIRQL oldIrql;
+ PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;; + + DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp); + KeLockMutex(&DeviceExt->PipeListLock);
ListEntry = DeviceExt->ThreadListHead.Flink; @@ -184,12 +245,12 @@ return STATUS_NO_MEMORY; } ThreadContext->DeviceExt = DeviceExt; - KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE); + KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE); ThreadContext->Count = 1; ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
- DPRINT("Creating a new system thread for waiting read requests\n"); + DPRINT("Creating a new system thread for waiting read/write requests\n");
Status = PsCreateSystemThread(&hThread, THREAD_ALL_ACCESS, @@ -207,21 +268,20 @@ InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry); DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1; } - IoMarkIrpPending(Context->Irp); - Context->Irp->Tail.Overlay.DriverContext[0] = Context; + IoMarkIrpPending(Irp);
IoAcquireCancelSpinLock(&oldIrql); - if (Context->Irp->Cancel) + if (Irp->Cancel) { IoReleaseCancelSpinLock(oldIrql); Status = STATUS_CANCELLED; } else { - IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine); + IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine); IoReleaseCancelSpinLock(oldIrql); - ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event; - ThreadContext->WaitContextArray[ThreadContext->Count] = Context; + ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent; + ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp; ThreadContext->Count++; DeviceExt->EmptyWaiterCount--; KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE); @@ -231,130 +291,182 @@ return Status; }
-static NTSTATUS -NpfsReadFromPipe(PNPFS_CONTEXT Context) +NTSTATUS STDCALL +NpfsRead(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { - PIO_STACK_LOCATION IoStack; PFILE_OBJECT FileObject; NTSTATUS Status; - ULONG Information; + NTSTATUS OriginalStatus = STATUS_SUCCESS; PNPFS_FCB Fcb; - PNPFS_FCB WriterFcb; - PNPFS_PIPE Pipe; + PNPFS_CONTEXT Context; + KEVENT Event; ULONG Length; - PVOID Buffer; + ULONG Information; ULONG CopyLength; ULONG TempLength; + BOOLEAN IsOriginalRequest = TRUE; + PVOID Buffer;
- DPRINT("NpfsReadFromPipe(Context %p)\n", Context); + DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
- IoStack = IoGetCurrentIrpStackLocation(Context->Irp); - FileObject = IoStack->FileObject; + if (Irp->MdlAddress == NULL) + { + DPRINT("Irp->MdlAddress == NULL\n"); + Status = STATUS_UNSUCCESSFUL; + Irp->IoStatus.Information = 0; + goto done; + } + + FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject; Fcb = FileObject->FsContext; - Pipe = Fcb->Pipe; - WriterFcb = Fcb->OtherSide; + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
if (Fcb->Data == NULL) - { - DPRINT("Pipe is NOT readable!\n"); - Status = STATUS_UNSUCCESSFUL; - Information = 0; - goto done; - } + { + DPRINT1("Pipe is NOT readable!\n"); + Status = STATUS_UNSUCCESSFUL; + Irp->IoStatus.Information = 0; + goto done; + }
- Status = STATUS_SUCCESS; - Length = IoStack->Parameters.Read.Length; - Information = 0; + ExAcquireFastMutex(&Fcb->DataListLock);
- Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress); - ExAcquireFastMutex(&Fcb->DataListLock); - while (1) - { - if (Fcb->ReadDataAvailable == 0) + if (IoIsOperationSynchronous(Irp)) + { + InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry); + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + KeInitializeEvent(&Event, SynchronizationEvent, FALSE); + Context->WaitEvent = &Event; + ExReleaseFastMutex(&Fcb->DataListLock); + Status = KeWaitForSingleObject(&Event, + Executive, + KernelMode, + FALSE, + NULL); + if (!NT_SUCCESS(Status)) { - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); - } - ExReleaseFastMutex(&Fcb->DataListLock); - if (Information > 0) - { - Status = STATUS_SUCCESS; + KEBUGCHECK(0); + } + ExAcquireFastMutex(&Fcb->DataListLock); + } + Irp->IoStatus.Information = 0; + } + else + { + KIRQL oldIrql; + if (IsListEmpty(&Fcb->ReadRequestListHead) || + Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* this is a new request */ + Irp->IoStatus.Information = 0; + Context->WaitEvent = &Fcb->ReadEvent; + InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry); + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* there was already a request on the list */ + IoAcquireCancelSpinLock(&oldIrql); + if (Irp->Cancel) + { + IoReleaseCancelSpinLock(oldIrql); + RemoveEntryList(&Context->ListEntry); + ExReleaseFastMutex(&Fcb->DataListLock); + Status = STATUS_CANCELLED; goto done; - } + } + IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine); + IoReleaseCancelSpinLock(oldIrql); + ExReleaseFastMutex(&Fcb->DataListLock); + IoMarkIrpPending(Irp); + Status = STATUS_PENDING; + goto done; + } + } + }
- if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) - { + while (1) + { + Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress); + Information = Irp->IoStatus.Information; + Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length; + ASSERT (Information <= Length); + Buffer += Information; + Length -= Information; + Status = STATUS_SUCCESS; + + while (1) + { + if (Fcb->ReadDataAvailable == 0) + { + if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) + { + KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + } + if (Information > 0 && + (Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE || + Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)) + { + break; + } + if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) + { DPRINT("PipeState: %x\n", Fcb->PipeState); Status = STATUS_PIPE_BROKEN; - goto done; - } + break; + } + ExReleaseFastMutex(&Fcb->DataListLock); + if (IoIsOperationSynchronous(Irp)) + { + /* Wait for ReadEvent to become signaled */
- if (IoIsOperationSynchronous(Context->Irp)) - { - /* Wait for ReadEvent to become signaled */ - DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); - Status = KeWaitForSingleObject(&Fcb->Event, + DPRINT("Waiting for readable data (%wZ)\n", &Fcb->Pipe->PipeName); + Status = KeWaitForSingleObject(&Fcb->ReadEvent, UserRequest, KernelMode, FALSE, NULL); - DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); - } - else - { - PNPFS_CONTEXT NewContext; - - NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT)); - if (NewContext == NULL) - { - Status = STATUS_NO_MEMORY; - goto done; - } - memcpy(NewContext, Context, sizeof(NPFS_CONTEXT)); - NewContext->AllocatedFromPool = TRUE; - NewContext->Fcb = Fcb; - NewContext->MajorFunction = IRP_MJ_READ; - - Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb); + DPRINT("Finished waiting (%wZ)! Status: %x\n", &Fcb->Pipe->PipeName, Status); + ExAcquireFastMutex(&Fcb->DataListLock); + } + else + { + PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + + Context->WaitEvent = &Fcb->ReadEvent; + Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp); if (NT_SUCCESS(Status)) - { - Status = STATUS_PENDING; - } - else - { - ExFreePool(NewContext); - } - goto done; - } - - ExAcquireFastMutex(&Fcb->DataListLock); - } - - if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE) - { - DPRINT("Byte stream mode\n"); - /* Byte stream mode */ - while (Length > 0 && Fcb->ReadDataAvailable > 0) - { + { + Status = STATUS_PENDING; + } + ExAcquireFastMutex(&Fcb->DataListLock); + break; + } + } + if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE) + { + DPRINT("Byte stream mode\n"); + /* Byte stream mode */ + while (Length > 0 && Fcb->ReadDataAvailable > 0) + { CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length); if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength) - { - memcpy(Buffer, Fcb->ReadPtr, CopyLength); - Fcb->ReadPtr += CopyLength; - if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength) - { - Fcb->ReadPtr = Fcb->Data; - } - } - else - { - TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr; - memcpy(Buffer, Fcb->ReadPtr, TempLength); - memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength); - Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength; - } + { + memcpy(Buffer, Fcb->ReadPtr, CopyLength); + Fcb->ReadPtr += CopyLength; + if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength) + { + Fcb->ReadPtr = Fcb->Data; + } + } + else + { + TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr; + memcpy(Buffer, Fcb->ReadPtr, TempLength); + memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength); + Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength; + }
Buffer += CopyLength; Length -= CopyLength; [truncated at 1000 lines; 243 more skipped]