Support asynchronous (aka overlapped) connect, read and write requests.
Modified: trunk/reactos/drivers/fs/np/create.c
Modified: trunk/reactos/drivers/fs/np/fsctrl.c
Modified: trunk/reactos/drivers/fs/np/npfs.h
Modified: trunk/reactos/drivers/fs/np/rw.c

Modified: trunk/reactos/drivers/fs/np/create.c
--- trunk/reactos/drivers/fs/np/create.c	2005-03-05 11:38:48 UTC (rev 13825)
+++ trunk/reactos/drivers/fs/np/create.c	2005-03-05 12:08:50 UTC (rev 13826)
@@ -48,17 +48,18 @@
 NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
 {
   PLIST_ENTRY CurrentEntry;
-  PNPFS_FCB ServerFcb;
+  PNPFS_WAITER_ENTRY Waiter;
 
-  CurrentEntry = Pipe->ServerFcbListHead.Flink;
-  while (CurrentEntry != &Pipe->ServerFcbListHead)
+  CurrentEntry = Pipe->WaiterListHead.Flink;
+  while (CurrentEntry != &Pipe->WaiterListHead)
     {
-      ServerFcb = CONTAINING_RECORD(CurrentEntry, NPFS_FCB, FcbListEntry);
-      if (ServerFcb->PipeState == FILE_PIPE_LISTENING_STATE)
+      Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
+      if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
 	{
-	  DPRINT("Server found! Fcb %p\n", ServerFcb);
-	  return ServerFcb;
+	  DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
+	  return Waiter->Fcb;
 	}
+
       CurrentEntry = CurrentEntry->Flink;
     }
 
@@ -66,6 +67,35 @@
 }
 
 
+static VOID
+NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe,
+					   PNPFS_FCB Fcb)
+{
+  PLIST_ENTRY CurrentEntry;
+  PNPFS_WAITER_ENTRY Waiter;
+
+  CurrentEntry = Pipe->WaiterListHead.Flink;
+  while (CurrentEntry != &Pipe->WaiterListHead)
+    {
+      Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
+      if (Waiter->Fcb == Fcb)
+	{
+	  DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
+
+	  KeSetEvent(Waiter->Irp->UserEvent, 0, FALSE);
+	  Waiter->Irp->UserIosb->Status = FILE_PIPE_CONNECTED_STATE;
+	  Waiter->Irp->UserIosb->Information = 0;
+	  IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
+
+	  RemoveEntryList(&Waiter->Entry);
+	  ExFreePool(Waiter);
+	  return;
+	}
+      CurrentEntry = CurrentEntry->Flink;
+    }
+}
+
+
 NTSTATUS STDCALL
 NpfsCreate(PDEVICE_OBJECT DeviceObject,
 	   PIRP Irp)
@@ -206,9 +236,8 @@
       ClientFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
       ServerFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
 
-      /* Wake server thread */
-      DPRINT("Setting the ConnectEvent for %x\n", ServerFcb);
-      KeSetEvent(&ServerFcb->ConnectEvent, 0, FALSE);
+      /* Signal the server thread and remove it from the waiter list */
+      NpfsSignalAndRemoveListeningServerInstance(Pipe, ServerFcb);
     }
 
   KeUnlockMutex(&Pipe->FcbListLock);
@@ -318,6 +347,7 @@
 
        InitializeListHead(&Pipe->ServerFcbListHead);
        InitializeListHead(&Pipe->ClientFcbListHead);
+       InitializeListHead(&Pipe->WaiterListHead);
        KeInitializeMutex(&Pipe->FcbListLock, 0);
 
        Pipe->PipeType = Buffer->NamedPipeType;

Modified: trunk/reactos/drivers/fs/np/fsctrl.c
--- trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-05 11:38:48 UTC (rev 13825)
+++ trunk/reactos/drivers/fs/np/fsctrl.c	2005-03-05 12:08:50 UTC (rev 13826)
@@ -18,9 +18,67 @@
 
 /* FUNCTIONS *****************************************************************/
 
+static VOID
+NpfsListeningCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
+                           IN PIRP Irp)
+{
+  PNPFS_WAITER_ENTRY Waiter;
+
+  DPRINT1("NpfsListeningCancelRoutine() called\n");
+  /* FIXME: Not tested. */
+
+  Waiter = Irp->Tail.Overlay.DriverContext[0];
+
+  RemoveEntryList(&Waiter->Entry);
+  ExFreePool(Waiter);
+
+  IoReleaseCancelSpinLock(Irp->CancelIrql);
+
+  Irp->IoStatus.Status = STATUS_CANCELLED;
+  Irp->IoStatus.Information = 0;
+  IoCompleteRequest(Irp, IO_NO_INCREMENT);
+}
+
+
 static NTSTATUS
-NpfsConnectPipe(PNPFS_FCB Fcb)
+NpfsAddListeningServerInstance(PIRP Irp,
+			       PNPFS_FCB Fcb)
 {
+  PNPFS_WAITER_ENTRY Entry;
+  KIRQL OldIrql;
+
+  Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
+  if (Entry == NULL)
+    return STATUS_INSUFFICIENT_RESOURCES;
+
+  Entry->Irp = Irp;
+  Entry->Fcb = Fcb;
+  InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
+
+  IoAcquireCancelSpinLock(&OldIrql);
+  if (!Irp->Cancel)
+    {
+      Irp->Tail.Overlay.DriverContext[0] = Entry;
+      IoMarkIrpPending(Irp);
+      IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
+      IoReleaseCancelSpinLock(OldIrql);
+      return STATUS_PENDING;
+    }
+  /* IRP has already been cancelled */
+  IoReleaseCancelSpinLock(OldIrql);
+
+  DPRINT1("FIXME: Remove waiter entry!\n");
+  RemoveEntryList(&Entry->Entry);
+  ExFreePool(Entry);
+
+  return STATUS_CANCELLED;
+}
+
+
+static NTSTATUS
+NpfsConnectPipe(PIRP Irp,
+                PNPFS_FCB Fcb)
+{
   PNPFS_PIPE Pipe;
   PLIST_ENTRY current_entry;
   PNPFS_FCB ClientFcb;
@@ -88,29 +146,18 @@
       current_entry = current_entry->Flink;
     }
 
-  KeUnlockMutex(&Pipe->FcbListLock);
-
   /* no listening client fcb found */
   DPRINT("No listening client fcb found -- waiting for client\n");
 
   Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
 
-  Status = KeWaitForSingleObject(&Fcb->ConnectEvent,
-				 UserRequest,
-				 KernelMode,
-				 FALSE,
-				 NULL);
-  if (!NT_SUCCESS(Status))
-    {
-      DPRINT("KeWaitForSingleObject() failed (Status %lx)\n", Status);
-      return Status;
-    }
+  Status = NpfsAddListeningServerInstance(Irp, Fcb);
 
-  Fcb->PipeState = FILE_PIPE_CONNECTED_STATE;
+  KeUnlockMutex(&Pipe->FcbListLock);
 
-  DPRINT("Client Fcb: %p\n", Fcb->OtherSide);
+  DPRINT("NpfsConnectPipe() done (Status %lx)\n", Status);
 
-  return STATUS_PIPE_CONNECTED;
+  return Status;
 }
 
 
@@ -327,7 +374,6 @@
 }
 
 
-
 NTSTATUS STDCALL
 NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
 		      PIRP Irp)
@@ -366,7 +412,7 @@
 
       case FSCTL_PIPE_LISTEN:
 	DPRINT("Connecting pipe %wZ\n", &Pipe->PipeName);
-	Status = NpfsConnectPipe(Fcb);
+	Status = NpfsConnectPipe(Irp, Fcb);
 	break;
 
       case FSCTL_PIPE_PEEK:
@@ -439,12 +485,15 @@
 	Status = STATUS_UNSUCCESSFUL;
     }
 
-  Irp->IoStatus.Status = Status;
-  Irp->IoStatus.Information = 0;
+  if (Status != STATUS_PENDING)
+    {
+      Irp->IoStatus.Status = Status;
+      Irp->IoStatus.Information = 0;
+ 
+      IoCompleteRequest(Irp, IO_NO_INCREMENT);
+    }
 
-  IoCompleteRequest(Irp, IO_NO_INCREMENT);
-
-  return(Status);
+  return Status;
 }
 
 

Modified: trunk/reactos/drivers/fs/np/npfs.h
--- trunk/reactos/drivers/fs/np/npfs.h	2005-03-05 11:38:48 UTC (rev 13825)
+++ trunk/reactos/drivers/fs/np/npfs.h	2005-03-05 12:08:50 UTC (rev 13826)
@@ -19,6 +19,7 @@
   KMUTEX FcbListLock;
   LIST_ENTRY ServerFcbListHead;
   LIST_ENTRY ClientFcbListHead;
+  LIST_ENTRY WaiterListHead;
   ULONG PipeType;
   ULONG ReadMode;
   ULONG WriteMode;
@@ -52,7 +53,15 @@
   KSPIN_LOCK DataListLock;	/* Data queue lock */
 } NPFS_FCB, *PNPFS_FCB;
 
+typedef struct _NPFS_WAITER_ENTRY
+{
+  LIST_ENTRY Entry;
+  PIRP Irp;
+  PNPFS_PIPE Pipe;
+  PNPFS_FCB Fcb;
+} NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
 
+
 extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
 
 

Modified: trunk/reactos/drivers/fs/np/rw.c
--- trunk/reactos/drivers/fs/np/rw.c	2005-03-05 11:38:48 UTC (rev 13825)
+++ trunk/reactos/drivers/fs/np/rw.c	2005-03-05 12:08:50 UTC (rev 13826)
@@ -101,7 +101,6 @@
       /* FIXME: check if in blocking mode */
       if (Fcb->ReadDataAvailable == 0)
 	{
-	  KeResetEvent(&Fcb->Event);
 	  if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
 	    {
 	      KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
@@ -167,6 +166,7 @@
 	  if (Length == 0)
 	    {
 	      KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
+	      KeResetEvent(&Fcb->Event);
 	      break;
 	    }
 	}
@@ -187,8 +187,19 @@
 #endif
 
 	      Information = CopyLength;
-	      Fcb->ReadDataAvailable = 0;
-	      Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+
+	      if (Fcb->ReadDataAvailable > Length)
+	        {
+	          memmove(Fcb->Data, Fcb->Data + Length,
+	                  Fcb->ReadDataAvailable - Length);
+	          Fcb->ReadDataAvailable -= Length;
+	          Status = STATUS_MORE_ENTRIES;
+	        }
+	      else
+	        {
+	          Fcb->ReadDataAvailable = 0;
+	          Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
+	        }
 	    }
 
 	  if (Information > 0)
@@ -197,6 +208,7 @@
 	        {
 	          KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
 	        }
+	      KeResetEvent(&Fcb->Event);
 	      break;
 	    }
 	}
@@ -291,7 +303,6 @@
     {
       if (ReaderFcb->WriteQuotaAvailable == 0)
 	{
-	  KeResetEvent(&Fcb->Event);
 	  KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
 	  KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
 	  if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
@@ -355,6 +366,7 @@
 	  if (Length == 0)
 	    {
 	      KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+	      KeResetEvent(&Fcb->Event);
 	      break;
 	    }
 	}
@@ -374,6 +386,7 @@
 	  if (Information > 0)
 	    {
 	      KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
+	      KeResetEvent(&Fcb->Event);
 	      break;
 	    }
 	}