- Guarded the calls to IoSetCancelRoutine with IoAcquireCancelSpinLock/IoReleaseCancelSpinLock.  
- Used a fastmutex as lock for the data queue.  
- Used paged pool for the data buffers.  
- Allowed the server to read (and to wait) on a listening pipe.  
- Implemented the non blocking read operations.
Modified: trunk/reactos/drivers/fs/np/create.c
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
Modified: trunk/reactos/drivers/fs/np/npfs.c
Modified: trunk/reactos/drivers/fs/np/npfs.h
Modified: trunk/reactos/drivers/fs/np/rw.c

Modified: trunk/reactos/drivers/fs/np/create.c
--- trunk/reactos/drivers/fs/np/create.c	2005-03-23 21:51:40 UTC (rev 14295)
+++ trunk/reactos/drivers/fs/np/create.c	2005-03-23 22:11:20 UTC (rev 14296)
@@ -49,6 +49,7 @@
 {
   PLIST_ENTRY CurrentEntry;
   PNPFS_WAITER_ENTRY Waiter;
+  KIRQL oldIrql;
 
   CurrentEntry = Pipe->WaiterListHead.Flink;
   while (CurrentEntry != &Pipe->WaiterListHead)
@@ -58,11 +59,15 @@
           !Waiter->Irp->Cancel)
 	{
 	  DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
-
-          if (IoSetCancelRoutine(Waiter->Irp, NULL) != NULL)
-            {
+  
+	  IoAcquireCancelSpinLock(&oldIrql);
+          if (!Waiter->Irp->Cancel)
+	    {
+	      IoSetCancelRoutine(Waiter->Irp, NULL);
+              IoReleaseCancelSpinLock(oldIrql);
               return Waiter->Fcb;
             }
+          IoReleaseCancelSpinLock(oldIrql);
 	}
 
       CurrentEntry = CurrentEntry->Flink;
@@ -174,7 +179,7 @@
   /* Initialize data list. */
   if (Pipe->OutboundQuota)
     {
-      ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota);
+      ClientFcb->Data = ExAllocatePool(PagedPool, Pipe->OutboundQuota);
       if (ClientFcb->Data == NULL)
         {
           DPRINT("No memory!\n");
@@ -195,7 +200,7 @@
   ClientFcb->ReadDataAvailable = 0;
   ClientFcb->WriteQuotaAvailable = Pipe->OutboundQuota;
   ClientFcb->MaxDataLength = Pipe->OutboundQuota;
-  KeInitializeSpinLock(&ClientFcb->DataListLock);
+  ExInitializeFastMutex(&ClientFcb->DataListLock);
   KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE);
   KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
 
@@ -455,13 +460,17 @@
 
    if (Pipe->InboundQuota)
      {
-       Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota);
+       Fcb->Data = ExAllocatePool(PagedPool, Pipe->InboundQuota);
        if (Fcb->Data == NULL)
          {
            ExFreePool(Fcb);
 
            if (NewPipe)
              {
+               /* 
+                * FIXME:
+	        *   Lock the pipelist and remove the pipe from the list.
+	        */
                RtlFreeUnicodeString(&Pipe->PipeName);
                ExFreePool(Pipe);
              }
@@ -481,7 +490,7 @@
    Fcb->ReadDataAvailable = 0;
    Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
    Fcb->MaxDataLength = Pipe->InboundQuota;
-   KeInitializeSpinLock(&Fcb->DataListLock);
+   ExInitializeFastMutex(&Fcb->DataListLock);
 
    Pipe->CurrentInstances++;
 

Modified: trunk/reactos/drivers/fs/np/fsctrl.c
--- trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-23 21:51:40 UTC (rev 14295)
+++ trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-23 22:11:20 UTC (rev 14296)
@@ -46,6 +46,7 @@
 			       PNPFS_FCB Fcb)
 {
   PNPFS_WAITER_ENTRY Entry;
+  KIRQL oldIrql;
 
   Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
   if (Entry == NULL)
@@ -61,13 +62,15 @@
   Irp->Tail.Overlay.DriverContext[0] = Entry;
   InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
 
-  IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
-  
+  IoAcquireCancelSpinLock(&oldIrql);
   if (!Irp->Cancel)
     {
+      IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
+      IoReleaseCancelSpinLock(oldIrql);
       KeUnlockMutex(&Fcb->Pipe->FcbListLock);
       return STATUS_PENDING;
     }
+  IoReleaseCancelSpinLock(oldIrql);
   
   RemoveEntryList(&Entry->Entry);
   

Modified: trunk/reactos/drivers/fs/np/npfs.c
--- trunk/reactos/drivers/fs/np/npfs.c	2005-03-23 21:51:40 UTC (rev 14295)
+++ trunk/reactos/drivers/fs/np/npfs.c	2005-03-23 22:11:20 UTC (rev 14296)
@@ -73,8 +73,10 @@
    /* initialize the device extension */
    DeviceExtension = DeviceObject->DeviceExtension;
    InitializeListHead(&DeviceExtension->PipeListHead);
+   InitializeListHead(&DeviceExtension->ThreadListHead);
    KeInitializeMutex(&DeviceExtension->PipeListLock,
 		     0);
+   DeviceExtension->EmptyWaiterCount = 0;
 
    /* set the size quotas */
    DeviceExtension->MinQuota = PAGE_SIZE;

Modified: trunk/reactos/drivers/fs/np/npfs.h
--- trunk/reactos/drivers/fs/np/npfs.h	2005-03-23 21:51:40 UTC (rev 14295)
+++ trunk/reactos/drivers/fs/np/npfs.h	2005-03-23 22:11:20 UTC (rev 14296)
@@ -6,7 +6,9 @@
 typedef struct _NPFS_DEVICE_EXTENSION
 {
   LIST_ENTRY PipeListHead;
+  LIST_ENTRY ThreadListHead;
   KMUTEX PipeListLock;
+  ULONG EmptyWaiterCount;
   ULONG MinQuota;
   ULONG DefaultQuota;
   ULONG MaxQuota;
@@ -20,6 +22,7 @@
   LIST_ENTRY ServerFcbListHead;
   LIST_ENTRY ClientFcbListHead;
   LIST_ENTRY WaiterListHead;
+  LIST_ENTRY EmptyBufferListHead;
   ULONG PipeType;
   ULONG ReadMode;
   ULONG WriteMode;
@@ -50,9 +53,29 @@
   PVOID WritePtr;
   ULONG MaxDataLength;
 
-  KSPIN_LOCK DataListLock;	/* Data queue lock */
+  FAST_MUTEX DataListLock;	/* Data queue lock */
 } NPFS_FCB, *PNPFS_FCB;
 
+typedef struct _NPFS_CONTEXT
+{
+  PDEVICE_OBJECT DeviceObject;
+  PIRP Irp;
+  PNPFS_FCB Fcb;
+  UCHAR MajorFunction;
+  BOOLEAN AllocatedFromPool;
+} NPFS_CONTEXT, *PNPFS_CONTEXT;
+
+typedef struct _NPFS_THREAD_CONTEXT
+{
+  ULONG Count;
+  KEVENT Event;
+  PNPFS_DEVICE_EXTENSION DeviceExt;
+  LIST_ENTRY ListEntry;
+  PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
+  KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
+  PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
+} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
+
 typedef struct _NPFS_WAITER_ENTRY
 {
   LIST_ENTRY Entry;

Modified: trunk/reactos/drivers/fs/np/rw.c
--- trunk/reactos/drivers/fs/np/rw.c	2005-03-23 21:51:40 UTC (rev 14295)
+++ trunk/reactos/drivers/fs/np/rw.c	2005-03-23 22:11:20 UTC (rev 14296)
@@ -46,16 +46,197 @@
 }
 #endif
 
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context);
 
-NTSTATUS STDCALL
-NpfsRead(PDEVICE_OBJECT DeviceObject,
-	 PIRP Irp)
+static VOID STDCALL 
+NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+                         IN PIRP Irp)
 {
+   PNPFS_CONTEXT Context;
+   PNPFS_DEVICE_EXTENSION DeviceExt;
+
+   DPRINT1("NpfsWaitingCancelRoutine() called\n");
+
+   IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+   Context = Irp->Tail.Overlay.DriverContext[0];
+   DeviceExt = Context->DeviceObject->DeviceExtension;
+
+   KeLockMutex(&DeviceExt->PipeListLock);
+   KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
+   KeUnlockMutex(&DeviceExt->PipeListLock);
+}
+
+static VOID STDCALL
+NpfsWaiterThread(PVOID Context)
+{
+   PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
+   ULONG CurrentCount, Count = 0;
+   PNPFS_CONTEXT WaitContext = NULL;
+   NTSTATUS Status;
+   BOOLEAN Terminate = FALSE;
+   BOOLEAN Cancel = FALSE;
+   KIRQL oldIrql;
+
+   KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+
+   while (1)
+     {
+       CurrentCount = ThreadContext->Count;
+       KeResetEvent(&ThreadContext->Event);
+       KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+       if (WaitContext)
+         {
+           if (Cancel)
+             {
+	       WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
+               WaitContext->Irp->IoStatus.Information = 0;
+               IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
+	       ExFreePool(WaitContext);
+	     }
+	   else
+	     {
+	       switch (WaitContext->MajorFunction)
+	         {
+	           case IRP_MJ_READ:
+                     NpfsReadFromPipe(WaitContext);
+		     break;
+		   default:
+		     KEBUGCHECK(0);
+		 }
+	     }
+         }
+       if (Terminate)
+         {
+	   break;
+	 }
+       Status = KeWaitForMultipleObjects(CurrentCount,
+	                                 ThreadContext->WaitObjectArray,
+					 WaitAny,
+					 Executive,
+					 KernelMode,
+					 FALSE,
+					 NULL,
+					 ThreadContext->WaitBlockArray);
+       KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
+       if (!NT_SUCCESS(Status))
+         {
+           KEBUGCHECK(0);
+         }
+       Count = Status - STATUS_SUCCESS;
+       ASSERT (Count <= CurrentCount);
+       if (Count > 0)
+         {
+	   WaitContext = ThreadContext->WaitContextArray[Count];
+	   ThreadContext->Count--;
+	   ThreadContext->DeviceExt->EmptyWaiterCount++;
+	   ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
+	   ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
+           IoAcquireCancelSpinLock(&oldIrql);
+	   Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
+           IoReleaseCancelSpinLock(oldIrql);
+        }
+      else
+        {
+	  /* someone has add a new wait request */
+          WaitContext = NULL;
+        }
+      if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
+        {
+          /* it exist an other thread with empty wait slots, we can remove our thread from the list */
+          RemoveEntryList(&ThreadContext->ListEntry);
+          ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
+	  Terminate = TRUE;
+        }
+     }
+   KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
+   ExFreePool(ThreadContext);
+}
+
+static NTSTATUS
+NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
+{
+   PLIST_ENTRY ListEntry;
+   PNPFS_THREAD_CONTEXT ThreadContext;
+   NTSTATUS Status;
+   HANDLE hThread;
+   KIRQL oldIrql;
+
+   KeLockMutex(&DeviceExt->PipeListLock);
+
+   ListEntry = DeviceExt->ThreadListHead.Flink;
+   while (ListEntry != &DeviceExt->ThreadListHead)
+     {
+       ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
+       if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
+         {
+           break;
+         }
+       ListEntry = ListEntry->Flink;
+     }
+   if (ListEntry == &DeviceExt->ThreadListHead)
+     {
+       ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
+       if (ThreadContext == NULL)
+         {
+           KeUnlockMutex(&DeviceExt->PipeListLock);
+           return STATUS_NO_MEMORY;
+         }
+       ThreadContext->DeviceExt = DeviceExt;
+       KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
+       ThreadContext->Count = 1;
+       ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
+
+   
+       DPRINT("Creating a new system thread for waiting read requests\n");
+      
+       Status = PsCreateSystemThread(&hThread,
+		                     THREAD_ALL_ACCESS,
+				     NULL,
+				     NULL,
+				     NULL,
+				     NpfsWaiterThread,
+				     (PVOID)ThreadContext);
+       if (!NT_SUCCESS(Status))
+         {
+           ExFreePool(ThreadContext);
+           KeUnlockMutex(&DeviceExt->PipeListLock);
+           return Status;
+	 }
+       InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
+       DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;       
+     }
+   IoMarkIrpPending(Context->Irp);
+   Context->Irp->Tail.Overlay.DriverContext[0] = Context;
+
+   IoAcquireCancelSpinLock(&oldIrql);
+   if (Context->Irp->Cancel)
+     {
+       IoReleaseCancelSpinLock(oldIrql);
+       Status = STATUS_CANCELLED;
+     }
+   else
+     {
+       IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
+       IoReleaseCancelSpinLock(oldIrql);
+       ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
+       ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
+       ThreadContext->Count++;
+       DeviceExt->EmptyWaiterCount--;
+       KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
+       Status = STATUS_SUCCESS;
+     }
+   KeUnlockMutex(&DeviceExt->PipeListLock);
+   return Status;
+}
+
+static NTSTATUS
+NpfsReadFromPipe(PNPFS_CONTEXT Context)
+{
   PIO_STACK_LOCATION IoStack;
   PFILE_OBJECT FileObject;
   NTSTATUS Status;
-  PNPFS_DEVICE_EXTENSION DeviceExt;
-  KIRQL OldIrql;
   ULONG Information;
   PNPFS_FCB Fcb;
   PNPFS_FCB WriterFcb;
@@ -65,23 +246,14 @@
   ULONG CopyLength;
   ULONG TempLength;
 
-  DPRINT("NpfsRead(DeviceObject %p  Irp %p)\n", DeviceObject, Irp);
+  DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
 
-  DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
-  IoStack = IoGetCurrentIrpStackLocation(Irp);
+  IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
   FileObject = IoStack->FileObject;
   Fcb = FileObject->FsContext;
   Pipe = Fcb->Pipe;
   WriterFcb = Fcb->OtherSide;
 
-  if (Irp->MdlAddress == NULL)
-    {
-      DPRINT("Irp->MdlAddress == NULL\n");
-      Status = STATUS_UNSUCCESSFUL;
-      Information = 0;
-      goto done;
-    }
-
   if (Fcb->Data == NULL)
     {
       DPRINT("Pipe is NOT readable!\n");
@@ -94,41 +266,71 @@
   Length = IoStack->Parameters.Read.Length;
   Information = 0;
 
-  Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
-  KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+  Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
+  ExAcquireFastMutex(&Fcb->DataListLock);
   while (1)
     {
-      /* FIXME: check if in blocking mode */
       if (Fcb->ReadDataAvailable == 0)
 	{
 	  if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
 	    {
 	      KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
 	    }
-	  KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+	  ExReleaseFastMutex(&Fcb->DataListLock);
 	  if (Information > 0)
 	    {
 	      Status = STATUS_SUCCESS;
 	      goto done;
 	    }
 
-	  if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
+	  if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE &&
+	      !(Fcb->PipeState == FILE_PIPE_LISTENING_STATE && Fcb->PipeEnd == FILE_PIPE_SERVER_END))
 	    {
 	      DPRINT("PipeState: %x\n", Fcb->PipeState);
 	      Status = STATUS_PIPE_BROKEN;
 	      goto done;
 	    }
 
-	  /* Wait for ReadEvent to become signaled */
-	  DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
-	  Status = KeWaitForSingleObject(&Fcb->Event,
-				         UserRequest,
-				         KernelMode,
-				         FALSE,
-				         NULL);
-	  DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+	  if (IoIsOperationSynchronous(Context->Irp))
+	    {
+	      /* Wait for ReadEvent to become signaled */
+	      DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
+	      Status = KeWaitForSingleObject(&Fcb->Event,
+				             UserRequest,
+				             KernelMode,
+				             FALSE,
+				             NULL);
+	      DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
+	    }
+	  else
+	    {
+	      PNPFS_CONTEXT NewContext;
 
-	  KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
+	      NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
+	      if (NewContext == NULL)
+	        {
+		   Status = STATUS_NO_MEMORY;
+		   goto done;
+		}
+	      memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
+	      NewContext->AllocatedFromPool = TRUE;
+	      NewContext->Fcb = Fcb;
+	      NewContext->MajorFunction = IRP_MJ_READ;
+
+	      Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
+	         
+	      if (NT_SUCCESS(Status))
+	        {
+		  Status = STATUS_PENDING;
+		}
+	      else
+	        {
+		  ExFreePool(NewContext);
+		}
+	      goto done;
+	    }
+
+	  ExAcquireFastMutex(&Fcb->DataListLock);
 	}
 
       if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -217,20 +419,48 @@
 	}
     }
 
-  KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
+  ExReleaseFastMutex(&Fcb->DataListLock);
 
 done:
-  Irp->IoStatus.Status = Status;
-  Irp->IoStatus.Information = Information;
+  Context->Irp->IoStatus.Status = Status;
+  Context->Irp->IoStatus.Information = Information;
 
-  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+  if (Status != STATUS_PENDING)
+    {
+      IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
+    }
 
+  if (Context->AllocatedFromPool)
+    {
+      ExFreePool(Context);
+    }
   DPRINT("NpfsRead done (Status %lx)\n", Status);
 
   return Status;
 }
 
+NTSTATUS STDCALL
+NpfsRead(PDEVICE_OBJECT DeviceObject,
+	 PIRP Irp)
+{
+  NPFS_CONTEXT Context;
 
+  Context.AllocatedFromPool = FALSE;
+  Context.DeviceObject = DeviceObject;
+  Context.Irp = Irp;
+  
+  if (Irp->MdlAddress == NULL)
+    {
+      DPRINT("Irp->MdlAddress == NULL\n");
+      Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
+      Irp->IoStatus.Information = 0;
+      IoCompleteRequest(Irp, IO_NO_INCREMENT);
+      return STATUS_UNSUCCESSFUL;
+    }
+
+  return NpfsReadFromPipe(&Context);
+}
+
 NTSTATUS STDCALL
 NpfsWrite(PDEVICE_OBJECT DeviceObject,
 	  PIRP Irp)
@@ -244,7 +474,6 @@
   NTSTATUS Status = STATUS_SUCCESS;
   ULONG Length;
   ULONG Offset;
-  KIRQL OldIrql;
   ULONG Information;
   ULONG CopyLength;
   ULONG TempLength;
@@ -296,7 +525,7 @@
   Status = STATUS_SUCCESS;
   Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
 
-  KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+  ExAcquireFastMutex(&ReaderFcb->DataListLock);
 #ifndef NDEBUG
   DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
   HexDump(Buffer, Length);
@@ -307,7 +536,7 @@
       if (ReaderFcb->WriteQuotaAvailable == 0)
 	{
 	  KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
-	  KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+	  ExReleaseFastMutex(&ReaderFcb->DataListLock);
 	  if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
 	    {
 	      Status = STATUS_PIPE_BROKEN;
@@ -332,7 +561,7 @@
 	      Status = STATUS_PIPE_BROKEN;
 	      goto done;
 	    }
-	  KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
+	  ExAcquireFastMutex(&ReaderFcb->DataListLock);
 	}
 
       if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
@@ -395,7 +624,7 @@
 	}
     }
 
-  KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
+  ExReleaseFastMutex(&ReaderFcb->DataListLock);
 
 done:
   Irp->IoStatus.Status = Status;