- Put the wait entry into the DriverContext of the irp instead to allocated it from pool.  
- Lock the data list on both ends of the pipe, if we disconnect the pipe.    
- Implemented a read and a write event on each end of the pipe.  
- Implemented a list for read requests to deliver the requests in the correct sequence.  
- Do not end a read request if the pipe was connected and if the buffer wasn't filled completely.
Modified: trunk/reactos/drivers/fs/np/create.c
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
Modified: trunk/reactos/drivers/fs/np/npfs.c
Modified: trunk/reactos/drivers/fs/np/npfs.h
Modified: trunk/reactos/drivers/fs/np/rw.c

Modified: trunk/reactos/drivers/fs/np/create.c
--- trunk/reactos/drivers/fs/np/create.c	2005-03-28 18:37:39 UTC (rev 14371)
+++ trunk/reactos/drivers/fs/np/create.c	2005-03-28 18:42:53 UTC (rev 14372)
@@ -50,20 +50,21 @@
   PLIST_ENTRY CurrentEntry;
   PNPFS_WAITER_ENTRY Waiter;
   KIRQL oldIrql;
+  PIRP Irp;
 
   CurrentEntry = Pipe->WaiterListHead.Flink;
   while (CurrentEntry != &Pipe->WaiterListHead)
     {
       Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
-      if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE &&
-          !Waiter->Irp->Cancel)
+      Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext);
+      if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
 	{
 	  DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
   
 	  IoAcquireCancelSpinLock(&oldIrql);
-          if (!Waiter->Irp->Cancel)
+          if (!Irp->Cancel)
 	    {
-	      IoSetCancelRoutine(Waiter->Irp, NULL);
+	      IoSetCancelRoutine(Irp, NULL);
               IoReleaseCancelSpinLock(oldIrql);
               return Waiter->Fcb;
             }
@@ -83,6 +84,7 @@
 {
   PLIST_ENTRY CurrentEntry;
   PNPFS_WAITER_ENTRY Waiter;
+  PIRP Irp;
 
   CurrentEntry = Pipe->WaiterListHead.Flink;
   while (CurrentEntry != &Pipe->WaiterListHead)
@@ -92,13 +94,12 @@
 	{
 	  DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
 
-	  Waiter->Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
-	  Waiter->Irp->IoStatus.Information = 0;
-	  IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
-
 	  RemoveEntryList(&Waiter->Entry);
-	  ExFreePool(Waiter);
-	  return;
+	  Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext);
+	  Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
+	  Irp->IoStatus.Information = 0;
+	  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+	  break;
 	}
       CurrentEntry = CurrentEntry->Flink;
     }
@@ -175,7 +176,10 @@
   ClientFcb->PipeEnd = FILE_PIPE_CLIENT_END;
   ClientFcb->OtherSide = NULL;
   ClientFcb->PipeState = SpecialAccess ? 0 : FILE_PIPE_DISCONNECTED_STATE;
+  InitializeListHead(&ClientFcb->ReadRequestListHead);
 
+  DPRINT("Fcb: %x\n", ClientFcb);
+
   /* Initialize data list. */
   if (Pipe->OutboundQuota)
     {
@@ -202,8 +206,10 @@
   ClientFcb->MaxDataLength = Pipe->OutboundQuota;
   ExInitializeFastMutex(&ClientFcb->DataListLock);
   KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE);
-  KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
+  KeInitializeEvent(&ClientFcb->ReadEvent, SynchronizationEvent, FALSE);
+  KeInitializeEvent(&ClientFcb->WriteEvent, SynchronizationEvent, FALSE);
 
+
   /*
    * Step 3. Search for listening server FCB.
    */
@@ -489,6 +495,7 @@
    Fcb->ReadDataAvailable = 0;
    Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
    Fcb->MaxDataLength = Pipe->InboundQuota;
+   InitializeListHead(&Fcb->ReadRequestListHead);
    ExInitializeFastMutex(&Fcb->DataListLock);
 
    Pipe->CurrentInstances++;
@@ -498,13 +505,11 @@
    Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
    Fcb->OtherSide = NULL;
 
-   KeInitializeEvent(&Fcb->ConnectEvent,
-		     SynchronizationEvent,
-		     FALSE);
+   DPRINT("Fcb: %x\n", Fcb);
 
-   KeInitializeEvent(&Fcb->Event,
-		     SynchronizationEvent,
-		     FALSE);
+   KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE);
+   KeInitializeEvent(&Fcb->ReadEvent, SynchronizationEvent, FALSE);
+   KeInitializeEvent(&Fcb->WriteEvent, SynchronizationEvent, FALSE);
 
    KeLockMutex(&Pipe->FcbListLock);
    InsertTailList(&Pipe->ServerFcbListHead, &Fcb->FcbListEntry);
@@ -528,7 +533,7 @@
    PNPFS_DEVICE_EXTENSION DeviceExt;
    PIO_STACK_LOCATION IoStack;
    PFILE_OBJECT FileObject;
-   PNPFS_FCB Fcb;
+   PNPFS_FCB Fcb, OtherSide;
    PNPFS_PIPE Pipe;
    BOOL Server;
 
@@ -566,19 +571,38 @@
    {
       DPRINT("Client\n");
    }
-
    if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
    {
-      if (Fcb->OtherSide)
+      OtherSide = Fcb->OtherSide;
+      /* Lock the server first */
+      if (Server)
       {
-         Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
-         Fcb->OtherSide->OtherSide = NULL;
-         /*
-          * Signaling the write event. If is possible that an other
-          * thread waits for an empty buffer.
-          */
-         KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
+         ExAcquireFastMutex(&Fcb->DataListLock);
+	 ExAcquireFastMutex(&OtherSide->DataListLock);
       }
+      else
+      {
+	 ExAcquireFastMutex(&OtherSide->DataListLock);
+         ExAcquireFastMutex(&Fcb->DataListLock);
+      }
+      OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+      OtherSide->OtherSide = NULL;
+      /*
+       * Signaling the write event. If is possible that an other
+       * thread waits for an empty buffer.
+       */
+      KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
+      KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+      if (Server)
+      {
+         ExReleaseFastMutex(&Fcb->DataListLock);
+	 ExReleaseFastMutex(&OtherSide->DataListLock);
+      }
+      else
+      {
+	 ExReleaseFastMutex(&OtherSide->DataListLock);
+	 ExReleaseFastMutex(&Fcb->DataListLock);
+      }
    }
    else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
    {
@@ -586,6 +610,7 @@
       PNPFS_WAITER_ENTRY WaitEntry = NULL;
       BOOLEAN Complete = FALSE; 
       KIRQL oldIrql;
+      PIRP tmpIrp;
 
       Entry = Fcb->Pipe->WaiterListHead.Flink;
       while (Entry != &Fcb->Pipe->WaiterListHead)
@@ -594,28 +619,25 @@
 	 if (WaitEntry->Fcb == Fcb)
 	 {
             RemoveEntryList(Entry);
+	    tmpIrp = CONTAINING_RECORD(WaitEntry, IRP, Tail.Overlay.DriverContext);
 	    IoAcquireCancelSpinLock(&oldIrql);
-	    if (!Irp->Cancel)
+	    if (!tmpIrp->Cancel)
 	    {
-               IoSetCancelRoutine(WaitEntry->Irp, NULL);
+               IoSetCancelRoutine(tmpIrp, NULL);
 	       Complete = TRUE;
 	    }
 	    IoReleaseCancelSpinLock(oldIrql);
+            if (Complete)
+	    {
+	       tmpIrp->IoStatus.Status = STATUS_PIPE_BROKEN;
+               tmpIrp->IoStatus.Information = 0;
+               IoCompleteRequest(tmpIrp, IO_NO_INCREMENT);
+	    }
 	    break;
 	 }
 	 Entry = Entry->Flink;
       }
 
-      if (Entry != &Fcb->Pipe->WaiterListHead)
-      {
-         if (Complete)
-	 {
-	    WaitEntry->Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
-            WaitEntry->Irp->IoStatus.Information = 0;
-            IoCompleteRequest(WaitEntry->Irp, IO_NO_INCREMENT);
-	 }
-         ExFreePool(WaitEntry);
-      }
    }
    Fcb->PipeState = FILE_PIPE_CLOSING_STATE;
 

Modified: trunk/reactos/drivers/fs/np/fsctrl.c
--- trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-28 18:37:39 UTC (rev 14371)
+++ trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-28 18:42:53 UTC (rev 14372)
@@ -26,18 +26,18 @@
 
   DPRINT1("NpfsListeningCancelRoutine() called\n");
 
+  Waiter = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
+
   IoReleaseCancelSpinLock(Irp->CancelIrql);
 
-  Waiter = Irp->Tail.Overlay.DriverContext[0];
 
-  KeLockMutex(&Waiter->Pipe->FcbListLock);
+  KeLockMutex(&Waiter->Fcb->Pipe->FcbListLock);
   RemoveEntryList(&Waiter->Entry);
-  KeUnlockMutex(&Waiter->Pipe->FcbListLock);
+  KeUnlockMutex(&Waiter->Fcb->Pipe->FcbListLock);
 
   Irp->IoStatus.Status = STATUS_CANCELLED;
   Irp->IoStatus.Information = 0;
   IoCompleteRequest(Irp, IO_NO_INCREMENT);
-  ExFreePool(Waiter);
 }
 
 
@@ -48,18 +48,13 @@
   PNPFS_WAITER_ENTRY Entry;
   KIRQL oldIrql;
 
-  Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
-  if (Entry == NULL)
-    return STATUS_INSUFFICIENT_RESOURCES;
+  Entry = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
 
-  Entry->Irp = Irp;
   Entry->Fcb = Fcb;
-  Entry->Pipe = Fcb->Pipe;
 
   KeLockMutex(&Fcb->Pipe->FcbListLock);
 
   IoMarkIrpPending(Irp);
-  Irp->Tail.Overlay.DriverContext[0] = Entry;
   InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
 
   IoAcquireCancelSpinLock(&oldIrql);
@@ -78,7 +73,6 @@
   Irp->IoStatus.Information = 0;
   IoCompleteRequest(Irp, IO_NO_INCREMENT);
   KeUnlockMutex(&Fcb->Pipe->FcbListLock);
-  ExFreePool(Entry);
 
   return STATUS_CANCELLED;
 }
@@ -173,38 +167,100 @@
 static NTSTATUS
 NpfsDisconnectPipe(PNPFS_FCB Fcb)
 {
-  DPRINT("NpfsDisconnectPipe()\n");
+   NTSTATUS Status;
+   PNPFS_FCB OtherSide;
+   PNPFS_PIPE Pipe;
+   BOOL Server;
 
-  if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
-    return STATUS_SUCCESS;
+   DPRINT("NpfsDisconnectPipe()\n");
 
-  if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
-    {
-      Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
-      /* FIXME: Shouldn't this be FILE_PIPE_CLOSING_STATE? */
-      Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+   Pipe = Fcb->Pipe;
+   KeLockMutex(&Pipe->FcbListLock);
 
-      /* FIXME: remove data queue(s) */
-
-      Fcb->OtherSide->OtherSide = NULL;
+   if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
+   {
+      DPRINT("Pipe is already disconnected\n");
+      Status = STATUS_SUCCESS;
+   }
+   else if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+   {
+      Server = (Fcb->PipeEnd == FILE_PIPE_SERVER_END);
+      OtherSide = Fcb->OtherSide;
       Fcb->OtherSide = NULL;
+      /* Lock the server first */
+      if (Server)
+      {
+         ExAcquireFastMutex(&Fcb->DataListLock);
+	 ExAcquireFastMutex(&OtherSide->DataListLock);
+      }
+      else
+      {
+	 ExAcquireFastMutex(&OtherSide->DataListLock);
+         ExAcquireFastMutex(&Fcb->DataListLock);
+      }
+      OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
+      OtherSide->OtherSide = NULL;
+      /*
+       * Signaling the write event. If is possible that an other
+       * thread waits for an empty buffer.
+       */
+      KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
+      KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+      if (Server)
+      {
+         ExReleaseFastMutex(&Fcb->DataListLock);
+	 ExReleaseFastMutex(&OtherSide->DataListLock);
+      }
+      else
+      {
+	 ExReleaseFastMutex(&OtherSide->DataListLock);
+	 ExReleaseFastMutex(&OtherSide->DataListLock);
+      }
+      Status = STATUS_SUCCESS;
+   }
+   else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
+   {
+      PLIST_ENTRY Entry;
+      PNPFS_WAITER_ENTRY WaitEntry = NULL;
+      BOOLEAN Complete = FALSE; 
+      PIRP Irp = NULL;
 
-      DPRINT("Pipe disconnected\n");
-      return STATUS_SUCCESS;
-    }
+      Entry = Fcb->Pipe->WaiterListHead.Flink;
+      while (Entry != &Fcb->Pipe->WaiterListHead)
+      {
+         WaitEntry = CONTAINING_RECORD(Entry, NPFS_WAITER_ENTRY, Entry);
+	 if (WaitEntry->Fcb == Fcb)
+	 {
+            RemoveEntryList(Entry);
+	    Irp = CONTAINING_RECORD(Entry, IRP, Tail.Overlay.DriverContext);
+	    Complete = (NULL == IoSetCancelRoutine(Irp, NULL));
+            break;
+	 }
+	 Entry = Entry->Flink;
+      }
 
-  if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
-    {
+      if (Irp)
+      {
+         if (Complete)
+	 {
+	    Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
+            Irp->IoStatus.Information = 0;
+            IoCompleteRequest(Irp, IO_NO_INCREMENT);
+	 }
+      }
       Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
-      Fcb->OtherSide = NULL;
-
-      /* FIXME: remove data queue(s) */
-
-      DPRINT("Pipe disconnected\n");
-      return STATUS_SUCCESS;
-    }
-
-  return STATUS_UNSUCCESSFUL;
+      Status = STATUS_SUCCESS;
+   }
+   else if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
+   {
+      Status = STATUS_PIPE_CLOSING;
+   }
+   else
+   {
+      Status = STATUS_UNSUCCESSFUL;
+   }
+   KeUnlockMutex(&Pipe->FcbListLock);
+   return Status;
 }
 
 

Modified: trunk/reactos/drivers/fs/np/npfs.c
--- trunk/reactos/drivers/fs/np/npfs.c	2005-03-28 18:37:39 UTC (rev 14371)
+++ trunk/reactos/drivers/fs/np/npfs.c	2005-03-28 18:42:53 UTC (rev 14372)
@@ -27,6 +27,9 @@
    NTSTATUS Status;
    
    DPRINT("Named Pipe FSD 0.0.2\n");
+
+   ASSERT (sizeof(NPFS_CONTEXT) <= sizeof (((PIRP)NULL)->Tail.Overlay.DriverContext));
+   ASSERT (sizeof(NPFS_WAITER_ENTRY) <= sizeof(((PIRP)NULL)->Tail.Overlay.DriverContext));
    
    DriverObject->MajorFunction[IRP_MJ_CREATE] = NpfsCreate;
    DriverObject->MajorFunction[IRP_MJ_CREATE_NAMED_PIPE] =
@@ -74,8 +77,7 @@
    DeviceExtension = DeviceObject->DeviceExtension;
    InitializeListHead(&DeviceExtension->PipeListHead);
    InitializeListHead(&DeviceExtension->ThreadListHead);
-   KeInitializeMutex(&DeviceExtension->PipeListLock,
-		     0);
+   KeInitializeMutex(&DeviceExtension->PipeListLock, 0);
    DeviceExtension->EmptyWaiterCount = 0;
 
    /* set the size quotas */

Modified: trunk/reactos/drivers/fs/np/npfs.h
--- trunk/reactos/drivers/fs/np/npfs.h	2005-03-28 18:37:39 UTC (rev 14371)
+++ trunk/reactos/drivers/fs/np/npfs.h	2005-03-28 18:42:53 UTC (rev 14372)
@@ -42,12 +42,15 @@
   struct ETHREAD *Thread;
   PNPFS_PIPE Pipe;
   KEVENT ConnectEvent;
-  KEVENT Event;
+  KEVENT ReadEvent;
+  KEVENT WriteEvent;
   ULONG PipeEnd;
   ULONG PipeState;
   ULONG ReadDataAvailable;
   ULONG WriteQuotaAvailable;
 
+  LIST_ENTRY ReadRequestListHead;
+
   PVOID Data;
   PVOID ReadPtr;
   PVOID WritePtr;
@@ -58,11 +61,8 @@
 
 typedef struct _NPFS_CONTEXT
 {
-  PDEVICE_OBJECT DeviceObject;
-  PIRP Irp;
-  PNPFS_FCB Fcb;
-  UCHAR MajorFunction;
-  BOOLEAN AllocatedFromPool;
+  LIST_ENTRY ListEntry;
+  PKEVENT WaitEvent;
 } NPFS_CONTEXT, *PNPFS_CONTEXT;
 
 typedef struct _NPFS_THREAD_CONTEXT
@@ -73,14 +73,12 @@
   LIST_ENTRY ListEntry;
   PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
   KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
-  PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
+  PIRP WaitIrpArray[MAXIMUM_WAIT_OBJECTS];
 } NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
 
 typedef struct _NPFS_WAITER_ENTRY
 {
   LIST_ENTRY Entry;
-  PIRP Irp;
-  PNPFS_PIPE Pipe;
   PNPFS_FCB Fcb;
 } NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
 

Modified: trunk/reactos/drivers/fs/np/rw.c
--- trunk/reactos/drivers/fs/np/rw.c	2005-03-28 18:37:39 UTC (rev 14371)
+++ trunk/reactos/drivers/fs/np/rw.c	2005-03-28 18:42:53 UTC (rev 14372)
@@ -46,61 +46,90 @@
 }
 #endif
 
-static NTSTATUS
-NpfsReadFromPipe(PNPFS_CONTEXT Context);
-
 static VOID STDCALL 
-NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
-                         IN PIRP Irp)
+NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+                           IN PIRP Irp)
 {
    PNPFS_CONTEXT Context;
    PNPFS_DEVICE_EXTENSION DeviceExt;
+   PIO_STACK_LOCATION IoStack;
+   PNPFS_FCB Fcb;
+   BOOLEAN Complete = FALSE;
 
-   DPRINT1("NpfsWaitingCancelRoutine() called\n");
+   DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n", DeviceObject, Irp);
 
    IoReleaseCancelSpinLock(Irp->CancelIrql);
 
-   Context = Irp->Tail.Overlay.DriverContext[0];
-   DeviceExt = Context->DeviceObject->DeviceExtension;
+   Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+   DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
+   IoStack = IoGetCurrentIrpStackLocation(Irp);
+   Fcb = IoStack->FileObject->FsContext;
 
    KeLockMutex(&DeviceExt->PipeListLock);
-   KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
+   ExAcquireFastMutex(&Fcb->DataListLock);
+   switch(IoStack->MajorFunction)   
+   {
+      case IRP_MJ_READ:
+         if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+	 {
+	    /* we are not the first in the list, remove an complete us */
+	    RemoveEntryList(&Context->ListEntry);
+	    Complete = TRUE;
+	 }
+	 else
+	 {
+	    KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE);
+	 }
+	 break;
+      default:
+         KEBUGCHECK(0);
+   }
+   ExReleaseFastMutex(&Fcb->DataListLock);
    KeUnlockMutex(&DeviceExt->PipeListLock);
+   if (Complete)
+   {
+      Irp->IoStatus.Status = STATUS_CANCELLED;
+      Irp->IoStatus.Information = 0;
+      IoCompleteRequest(Irp, IO_NO_INCREMENT);
+   }
 }
 
 static VOID STDCALL
-NpfsWaiterThread(PVOID Context)
+NpfsWaiterThread(PVOID InitContext)
 {
-   PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
-   ULONG CurrentCount, Count = 0;
-   PNPFS_CONTEXT WaitContext = NULL;
+   PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
+   ULONG CurrentCount;
+   ULONG Count = 0;
+   PIRP Irp = NULL;
+   PIRP NextIrp;
    NTSTATUS Status;
    BOOLEAN Terminate = FALSE;
    BOOLEAN Cancel = FALSE;
-   KIRQL oldIrql;
+   PIO_STACK_LOCATION IoStack = NULL;
+   PNPFS_CONTEXT Context;
+   PNPFS_CONTEXT NextContext;
+   PNPFS_FCB Fcb;
 
    KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
 
    while (1)
      {
        CurrentCount = ThreadContext->Count;
-       KeResetEvent(&ThreadContext->Event);
        KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
-       if (WaitContext)
+       if (Irp)
          {
            if (Cancel)
              {
-	       WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
-               WaitContext->Irp->IoStatus.Information = 0;
-               IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
-	       ExFreePool(WaitContext);
+	       Irp->IoStatus.Status = STATUS_CANCELLED;
+               Irp->IoStatus.Information = 0;
+               IoCompleteRequest(Irp, IO_NO_INCREMENT);
 	     }
 	   else
 	     {
-	       switch (WaitContext->MajorFunction)
+               switch (IoStack->MajorFunction)
 	         {
 	           case IRP_MJ_READ:
-                     NpfsReadFromPipe(WaitContext);
+                     NpfsRead(IoStack->DeviceObject, Irp);
 		     break;
 		   default:
 		     KEBUGCHECK(0);
@@ -119,30 +148,56 @@
 					 FALSE,
 					 NULL,
 					 ThreadContext->WaitBlockArray);
-       KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
        if (!NT_SUCCESS(Status))
          {
            KEBUGCHECK(0);
          }
+       KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
        Count = Status - STATUS_SUCCESS;
-       ASSERT (Count <= CurrentCount);
+       ASSERT (Count < CurrentCount);
        if (Count > 0)
-         {
-	   WaitContext = ThreadContext->WaitContextArray[Count];
-	   ThreadContext->Count--;
-	   ThreadContext->DeviceExt->EmptyWaiterCount++;
-	   ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
-	   ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
-           IoAcquireCancelSpinLock(&oldIrql);
-	   Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
-           IoReleaseCancelSpinLock(oldIrql);
-        }
-      else
-        {
+       {
+	  Irp = ThreadContext->WaitIrpArray[Count];
+	  ThreadContext->Count--;
+	  ThreadContext->DeviceExt->EmptyWaiterCount++;
+	  ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
+	  ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
+
+          Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
+	  Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+	  IoStack = IoGetCurrentIrpStackLocation(Irp);
+
+	  if (Cancel)
+	  {
+	     Fcb = IoStack->FileObject->FsContext;
+ 	     ExAcquireFastMutex(&Fcb->DataListLock);
+	     RemoveEntryList(&Context->ListEntry);
+	     switch (IoStack->MajorFunction)
+	     {
+	        case IRP_MJ_READ:
+                   if (!IsListEmpty(&Fcb->ReadRequestListHead))
+		   {
+		      /* put the next request on the wait list */
+                      NextContext = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
+		      ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
+		      NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
+		      ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
+		      ThreadContext->Count++;
+                      ThreadContext->DeviceExt->EmptyWaiterCount--;		   
+		   }
+		   break;
+		default:
+		   KEBUGCHECK(0);
+	     }
+	     ExReleaseFastMutex(&Fcb->DataListLock);
+	  }	    
+       }
+       else
+       {  
 	  /* someone has add a new wait request */
-          WaitContext = NULL;
-        }
-      if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+          Irp = NULL;
+       }
+       if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
         {
           /* it exist an other thread with empty wait slots, we can remove our thread from the list */
           RemoveEntryList(&ThreadContext->ListEntry);
@@ -155,14 +210,20 @@
 }
 
 static NTSTATUS
-NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
+NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
+			       IN PIRP Irp)
 {
    PLIST_ENTRY ListEntry;
-   PNPFS_THREAD_CONTEXT ThreadContext;
+   PNPFS_THREAD_CONTEXT ThreadContext = NULL;
    NTSTATUS Status;
    HANDLE hThread;
    KIRQL oldIrql;
 
+   PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+   PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;;
+
+   DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
+
    KeLockMutex(&DeviceExt->PipeListLock);
 
    ListEntry = DeviceExt->ThreadListHead.Flink;
@@ -184,12 +245,12 @@
            return STATUS_NO_MEMORY;
          }
        ThreadContext->DeviceExt = DeviceExt;
-       KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
+       KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
        ThreadContext->Count = 1;
        ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
 
    
-       DPRINT("Creating a new system thread for waiting read requests\n");
+       DPRINT("Creating a new system thread for waiting read/write requests\n");
       
        Status = PsCreateSystemThread(&hThread,
 		                     THREAD_ALL_ACCESS,
@@ -207,21 +268,20 @@
        InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
        DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;       
      }
-   IoMarkIrpPending(Context->Irp);
-   Context->Irp->Tail.Overlay.DriverContext[0] = Context;
+   IoMarkIrpPending(Irp);
 
    IoAcquireCancelSpinLock(&oldIrql);
-   if (Context->Irp->Cancel)
+   if (Irp->Cancel)
      {
        IoReleaseCancelSpinLock(oldIrql);
        Status = STATUS_CANCELLED;
      }
    else
      {
-       IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
+       IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
        IoReleaseCancelSpinLock(oldIrql);
-       ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
-       ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
+       ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
+       ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
        ThreadContext->Count++;
        DeviceExt->EmptyWaiterCount--;
        KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
@@ -231,130 +291,182 @@
    return Status;
 }
 
-static NTSTATUS
-NpfsReadFromPipe(PNPFS_CONTEXT Context)
+NTSTATUS STDCALL
+NpfsRead(IN PDEVICE_OBJECT DeviceObject,
+	 IN PIRP Irp)
 {
-  PIO_STACK_LOCATION IoStack;
   PFILE_OBJECT FileObject;
   NTSTATUS Status;
-  ULONG Information;
+  NTSTATUS OriginalStatus = STATUS_SUCCESS;
   PNPFS_FCB Fcb;
-  PNPFS_FCB WriterFcb;
-  PNPFS_PIPE Pipe;
+  PNPFS_CONTEXT Context;
+  KEVENT Event;
   ULONG Length;
-  PVOID Buffer;
+  ULONG Information;
   ULONG CopyLength;
   ULONG TempLength;
+  BOOLEAN IsOriginalRequest = TRUE;
+  PVOID Buffer;
 
-  DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
+  DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
 
-  IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
-  FileObject = IoStack->FileObject;
+  if (Irp->MdlAddress == NULL)
+  {
+     DPRINT("Irp->MdlAddress == NULL\n");
+     Status = STATUS_UNSUCCESSFUL;
+     Irp->IoStatus.Information = 0;
+     goto done;
+  }
+
+  FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
   Fcb = FileObject->FsContext;
-  Pipe = Fcb->Pipe;
-  WriterFcb = Fcb->OtherSide;
+  Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
 
   if (Fcb->Data == NULL)
-    {
-      DPRINT("Pipe is NOT readable!\n");
-      Status = STATUS_UNSUCCESSFUL;
-      Information = 0;
-      goto done;
-    }
+  {
+     DPRINT1("Pipe is NOT readable!\n");
+     Status = STATUS_UNSUCCESSFUL;
+     Irp->IoStatus.Information = 0;
+     goto done;
+  }
 
-  Status = STATUS_SUCCESS;
-  Length = IoStack->Parameters.Read.Length;
-  Information = 0;
+  ExAcquireFastMutex(&Fcb->DataListLock);
 
-  Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
-  ExAcquireFastMutex(&Fcb->DataListLock);
-  while (1)
-    {
-      if (Fcb->ReadDataAvailable == 0)
+  if (IoIsOperationSynchronous(Irp))
+  {
+     InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+     if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+     {
+        KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
+	Context->WaitEvent = &Event;
+        ExReleaseFastMutex(&Fcb->DataListLock);
+        Status = KeWaitForSingleObject(&Event,
+	                               Executive,
+				       KernelMode,
+				       FALSE,
+				       NULL);
+	if (!NT_SUCCESS(Status))
 	{
-	  if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
-	    {
-	      KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
-	    }
-	  ExReleaseFastMutex(&Fcb->DataListLock);
-	  if (Information > 0)
-	    {
-	      Status = STATUS_SUCCESS;
+	   KEBUGCHECK(0);
+	}
+        ExAcquireFastMutex(&Fcb->DataListLock);
+     }
+     Irp->IoStatus.Information = 0;
+  }
+  else
+  {
+     KIRQL oldIrql;
+     if (IsListEmpty(&Fcb->ReadRequestListHead) ||
+	 Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+     {
+        /* this is a new request */
+        Irp->IoStatus.Information = 0;
+	Context->WaitEvent = &Fcb->ReadEvent;
+        InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
+	if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
+	{
+	   /* there was already a request on the list */
+           IoAcquireCancelSpinLock(&oldIrql);
+           if (Irp->Cancel)
+           {
+	      IoReleaseCancelSpinLock(oldIrql);
+	      RemoveEntryList(&Context->ListEntry);
+	      ExReleaseFastMutex(&Fcb->DataListLock);
+	      Status = STATUS_CANCELLED;
 	      goto done;
-	    }
+           }
+           IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
+           IoReleaseCancelSpinLock(oldIrql);
+	   ExReleaseFastMutex(&Fcb->DataListLock);
+           IoMarkIrpPending(Irp);
+	   Status = STATUS_PENDING;
+	   goto done;
+	}
+     }
+  }
 
-	  if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
-	    {
+  while (1)
+  {
+     Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
+     Information = Irp->IoStatus.Information;
+     Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
+     ASSERT (Information <= Length);
+     Buffer += Information;
+     Length -= Information;
+     Status = STATUS_SUCCESS;
+
+     while (1)
+     {
+        if (Fcb->ReadDataAvailable == 0)
+        {
+	   if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
+	   {
+	      KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
+	   }
+	   if (Information > 0 &&
+	       (Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
+	        Fcb->PipeState != FILE_PIPE_CONNECTED_STATE))
+	   {
+	      break;
+	   }
+      	   if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+	   {
 	      DPRINT("PipeState: %x\n", Fcb->PipeState);
 	      Status = STATUS_PIPE_BROKEN;
-	      goto done;
-	    }
+	      break;
+           }
+	   ExReleaseFastMutex(&Fcb->DataListLock);
+	   if (IoIsOperationSynchronous(Irp))
+	   {
+	      /* Wait for ReadEvent to become signaled */
 
-	  if (IoIsOperationSynchronous(Context->Irp))
-	    {
-	      /* Wait for ReadEvent to become signaled */
-	      DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
-	      Status = KeWaitForSingleObject(&Fcb->Event,
+	      DPRINT("Waiting for readable data (%wZ)\n", &Fcb->Pipe->PipeName);
+	      Status = KeWaitForSingleObject(&Fcb->ReadEvent,
 				             UserRequest,
 				             KernelMode,
 				             FALSE,
 				             NULL);
-	      DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
-	    }
-	  else
-	    {
-	      PNPFS_CONTEXT NewContext;
-
-	      NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
-	      if (NewContext == NULL)
-	        {
-		   Status = STATUS_NO_MEMORY;
-		   goto done;
-		}
-	      memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
-	      NewContext->AllocatedFromPool = TRUE;
-	      NewContext->Fcb = Fcb;
-	      NewContext->MajorFunction = IRP_MJ_READ;
-
-	      Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
+	      DPRINT("Finished waiting (%wZ)! Status: %x\n", &Fcb->Pipe->PipeName, Status);
+	      ExAcquireFastMutex(&Fcb->DataListLock);
+	   }
+	   else
+	   {
+              PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
+	    
+              Context->WaitEvent = &Fcb->ReadEvent;
+	      Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
 	         
 	      if (NT_SUCCESS(Status))
-	        {
-		  Status = STATUS_PENDING;
-		}
-	      else
-	        {
-		  ExFreePool(NewContext);
-		}
-	      goto done;
-	    }
-
-	  ExAcquireFastMutex(&Fcb->DataListLock);
-	}
-
-      if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
-	{
-	  DPRINT("Byte stream mode\n");
-	  /* Byte stream mode */
-	  while (Length > 0 && Fcb->ReadDataAvailable > 0)
-	    {
+	      {
+	         Status = STATUS_PENDING;
+	      }
+	      ExAcquireFastMutex(&Fcb->DataListLock);
+	      break;
+	   }
+        } 
+        if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+        {
+	   DPRINT("Byte stream mode\n");
+	   /* Byte stream mode */
+	   while (Length > 0 && Fcb->ReadDataAvailable > 0)
+	   {
 	      CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
 	      if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength)
-		{
-		  memcpy(Buffer, Fcb->ReadPtr, CopyLength);
-		  Fcb->ReadPtr += CopyLength;
-		  if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
-		    {
-		      Fcb->ReadPtr = Fcb->Data;
-		    }
-		}
-	      else
-		{
-		  TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
-		  memcpy(Buffer, Fcb->ReadPtr, TempLength);
-		  memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
-		  Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
-		}
+	      {
+	         memcpy(Buffer, Fcb->ReadPtr, CopyLength);
+	         Fcb->ReadPtr += CopyLength;
+	         if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
+	         {
+		    Fcb->ReadPtr = Fcb->Data;
+	         }
+	      }
+              else
+	      {
+	         TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
+	         memcpy(Buffer, Fcb->ReadPtr, TempLength);
+	         memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
+	         Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
+	      }
 
 	      Buffer += CopyLength;
 	      Length -= CopyLength;
[truncated at 1000 lines; 243 more skipped]