Author: cmihail
Date: Mon Jul 25 18:45:59 2011
New Revision: 52864
URL:
http://svn.reactos.org/svn/reactos?rev=52864&view=rev
Log:
[lwIP/TCPIP]
- Add queue for connections objects in order to buffer packets that have arrived but have
no corresponding receive requests to be assigned to. We buffer these packets to avoid
giving a timeout that could cause throughput slowdowns.
Modified:
branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h
branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c
branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c
branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h
branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c
Modified: branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h
URL:
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/drivers/n…
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h [iso-8859-1]
(original)
+++ branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h [iso-8859-1]
Mon Jul 25 18:45:59 2011
@@ -268,6 +268,8 @@
LIST_ENTRY SendRequest; /* Queued send requests */
LIST_ENTRY ShutdownRequest;/* Queued shutdown requests */
+ LIST_ENTRY PacketQueue; /* Queued received packets waiting to be processed */
+
/* Signals */
UINT SignalState; /* Active signals from oskit */
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c
URL:
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drive…
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c [iso-8859-1]
(original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c [iso-8859-1] Mon
Jul 25 18:45:59 2011
@@ -144,18 +144,32 @@
TCPFinEventHandler(void *arg, err_t err)
{
PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg;
+ const NTSTATUS status = TCPTranslateError(err);
DbgPrint("[IP, TCPFinEventHandler] Called for Connection( 0x%x )->
SocketContext = pcb (0x%x)\n", Connection, Connection->SocketContext);
/* Only clear the pointer if the shutdown was caused by an error */
- if (err != ERR_OK)
+ if ((err != ERR_OK))// && (status != STATUS_REMOTE_DISCONNECT))
{
/* We're already closed by the error so we don't want to call lwip_close
*/
DbgPrint("[IP, TCPFinEventHandler] MAKING Connection( 0x%x )->
SocketContext = pcb (0x%x) NULL\n", Connection, Connection->SocketContext);
+
+ // close all possible callbacks
+ /*tcp_arg((PTCP_PCB)Connection->SocketContext, NULL);
+
+ if (((PTCP_PCB)Connection->SocketContext)->state != LISTEN)
+ {
+ tcp_recv((PTCP_PCB)Connection->SocketContext, NULL);
+ tcp_sent((PTCP_PCB)Connection->SocketContext, NULL);
+ tcp_err((PTCP_PCB)Connection->SocketContext, NULL);
+ }
+
+ tcp_accept((PTCP_PCB)Connection->SocketContext, NULL);*/
+
Connection->SocketContext = NULL;
}
-
- FlushAllQueues(Connection, TCPTranslateError(err));
+
+ FlushAllQueues(Connection, status);
DbgPrint("[IP, TCPFinEventHandler] Done\n");
}
@@ -305,7 +319,7 @@
u32_t
TCPRecvEventHandler(void *arg, struct pbuf *p)
{
- PCONNECTION_ENDPOINT Connection = arg;
+ PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg;
PTDI_BUCKET Bucket;
PLIST_ENTRY Entry;
PIRP Irp;
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c
URL:
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drive…
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c [iso-8859-1]
(original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c [iso-8859-1] Mon Jul
25 18:45:59 2011
@@ -25,7 +25,7 @@
VOID ConnectionFree(PVOID Object)
{
- PCONNECTION_ENDPOINT Connection = Object;
+ PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)Object;
KIRQL OldIrql;
TI_DbgPrint(DEBUG_TCP, ("Freeing TCP Endpoint\n"));
@@ -41,7 +41,7 @@
PCONNECTION_ENDPOINT TCPAllocateConnectionEndpoint( PVOID ClientContext )
{
- PCONNECTION_ENDPOINT Connection =
+ PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)
ExAllocatePoolWithTag(NonPagedPool, sizeof(CONNECTION_ENDPOINT),
CONN_ENDPT_TAG);
if (!Connection)
@@ -58,6 +58,7 @@
InitializeListHead(&Connection->ReceiveRequest);
InitializeListHead(&Connection->SendRequest);
InitializeListHead(&Connection->ShutdownRequest);
+ InitializeListHead(&Connection->PacketQueue);
/* Save client context pointer */
Connection->ClientContext = ClientContext;
@@ -407,6 +408,9 @@
{
PTDI_BUCKET Bucket;
KIRQL OldIrql;
+ PUCHAR DataBuffer;
+ UINT DataLen, Received;
+ NTSTATUS Status;
TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Called for %d bytes (on socket
%x)\n",
ReceiveLength, Connection->SocketContext));
@@ -414,29 +418,38 @@
DbgPrint("[IP, TCPReceiveData] Called for %d bytes (on
Connection->SocketContext = 0x%x)\n",
ReceiveLength, Connection->SocketContext);
- LockObject(Connection, &OldIrql);
-
- /* Freed in TCPSocketState */
- Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), TDI_BUCKET_TAG );
- if( !Bucket )
- {
- TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate
bucket\n"));
+ NdisQueryBuffer(Buffer, &DataBuffer, &DataLen);
+
+ Status = LibTCPGetDataFromConnectionQueue(Connection, DataBuffer, DataLen,
&Received);
+
+ if (Status == STATUS_PENDING)
+ {
+ LockObject(Connection, &OldIrql);
+
+ /* Freed in TCPSocketState */
+ Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), TDI_BUCKET_TAG );
+ if( !Bucket )
+ {
+ TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate
bucket\n"));
+ UnlockObject(Connection, OldIrql);
+
+ return STATUS_NO_MEMORY;
+ }
+
+ Bucket->Request.RequestNotifyObject = Complete;
+ Bucket->Request.RequestContext = Context;
+ *BytesReceived = 0;
+
+ InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry );
+ TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n"));
+
UnlockObject(Connection, OldIrql);
- return STATUS_NO_MEMORY;
- }
-
- Bucket->Request.RequestNotifyObject = Complete;
- Bucket->Request.RequestContext = Context;
- *BytesReceived = 0;
-
- InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry );
- TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n"));
-
- UnlockObject(Connection, OldIrql);
-
- TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status =
STATUS_PENDING\n"));
- DbgPrint("[IP, TCPReceiveData] Leaving. Status = STATUS_PENDING\n");
+ TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status =
STATUS_PENDING\n"));
+ }
+
+ DbgPrint("[IP, TCPReceiveData] Leaving. Status = %s\n",
+ Status == STATUS_PENDING? "STATUS_PENDING" :
"STATUS_SUCCESS");
return STATUS_PENDING;
}
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h
URL:
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drive…
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h [iso-8859-1]
(original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h [iso-8859-1] Mon
Jul 25 18:45:59 2011
@@ -6,7 +6,19 @@
#include "lwip/ip_addr.h"
#include "tcpip.h"
+#ifndef LWIP_TAG
+ #define LWIP_TAG 'PIwl'
+#endif
+
typedef struct tcp_pcb* PTCP_PCB;
+
+typedef struct _QUEUE_ENTRY
+{
+ struct pbuf *p;
+ LIST_ENTRY ListEntry;
+} QUEUE_ENTRY, *PQUEUE_ENTRY;
+
+NTSTATUS LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, PUCHAR
RecvBuffer, UINT RecvLen, UINT *Received);
/* External TCP event handlers */
extern void TCPConnectEventHandler(void *arg, const err_t err);
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c
URL:
http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/drive…
==============================================================================
--- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] (original)
+++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] Mon Jul 25
18:45:59 2011
@@ -30,6 +30,93 @@
extern KEVENT TerminationEvent;
static
+void
+LibTCPEmptyQueue(PCONNECTION_ENDPOINT Connection)
+{
+ PLIST_ENTRY Entry;
+ PQUEUE_ENTRY qp = NULL;
+
+ ReferenceObject(Connection);
+
+
+ while (!IsListEmpty(&Connection->PacketQueue))
+ {
+ DbgPrint("[lwIP, LibTCPEmptyQueue] Removed packet off queue++++\n");
+
+ Entry = RemoveHeadList(&Connection->PacketQueue);
+ qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry);
+
+ // reenable this later
+ //pbuf_free(qp->p);
+
+ ExFreePoolWithTag(qp, LWIP_TAG);
+ }
+
+ DereferenceObject(Connection);
+}
+
+void LibTCPEnqueuePacket(PCONNECTION_ENDPOINT Connection, struct pbuf *p)
+{
+ PQUEUE_ENTRY qp;
+
+ qp = (PQUEUE_ENTRY)ExAllocatePoolWithTag(NonPagedPool, sizeof(QUEUE_ENTRY),
LWIP_TAG);
+ qp->p = p;
+
+ ExInterlockedInsertTailList(&Connection->PacketQueue, &qp->ListEntry,
&Connection->Lock);
+}
+
+PQUEUE_ENTRY LibTCPDequeuePacket(PCONNECTION_ENDPOINT Connection)
+{
+ PLIST_ENTRY Entry;
+ PQUEUE_ENTRY qp = NULL;
+
+ Entry = ExInterlockedRemoveHeadList(&Connection->PacketQueue,
&Connection->Lock);
+
+ qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry);
+
+ return qp;
+}
+
+NTSTATUS LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, PUCHAR
RecvBuffer, UINT RecvLen, UINT *Received)
+{
+ PQUEUE_ENTRY qp;
+ struct pbuf* p;
+ NTSTATUS Status = STATUS_PENDING;
+
+ if (!IsListEmpty(&Connection->PacketQueue))
+ {
+ DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] Getting packet off the
queue\n");
+
+ qp = LibTCPDequeuePacket(Connection);
+ p = qp->p;
+
+ RecvLen = MIN(p->tot_len, RecvLen);
+
+ for ((*Received) = 0; (*Received) < RecvLen; *Received += p->len, p =
p->next)
+ {
+ DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] 0x%x: Copying %d
bytes to 0x%x from 0x%x\n",
+ p, p->len, ((PUCHAR)RecvBuffer) + (*Received), p->payload);
+
+ RtlCopyMemory(RecvBuffer + (*Received), p->payload, p->len);
+ }
+
+ // reenable this later
+ //pbuf_free(qp->p);
+ ExFreePoolWithTag(qp, LWIP_TAG);
+
+ Status = STATUS_SUCCESS;
+ }
+ else
+ {
+ DbgPrint("[lwIP, LibTCPGetPacketFromQueue] Queue is EMPTY\n");
+
+ Status = STATUS_PENDING;
+ }
+
+ return Status;
+}
+
+static
BOOLEAN
WaitForEventSafely(PRKEVENT Event)
{
@@ -56,7 +143,7 @@
static
err_t
-InternalSendEventHandler(void *arg, struct tcp_pcb *pcb, const u16_t space)
+InternalSendEventHandler(void *arg, PTCP_PCB pcb, const u16_t space)
{
DbgPrint("[lwIP, InternalSendEventHandler] SendEvent (0x%x, 0x%x, %d)\n",
arg, pcb, (unsigned int)space);
@@ -75,7 +162,7 @@
static
err_t
-InternalRecvEventHandler(void *arg, struct tcp_pcb *pcb, struct pbuf *p, const err_t
err)
+InternalRecvEventHandler(void *arg, PTCP_PCB pcb, struct pbuf *p, const err_t err)
{
u32_t len;
@@ -125,8 +212,13 @@
else
{
/* We want lwIP to store the pbuf on its queue for later */
- DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT\n");
- return ERR_TIMEOUT;
+ DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT queuing
pbuf\n");
+
+ LibTCPEnqueuePacket((PCONNECTION_ENDPOINT)arg, p);
+
+ tcp_recved(pcb, p->tot_len);
+
+ return ERR_OK;//ERR_TIMEOUT;//
}
}
else if (err == ERR_OK)
@@ -145,7 +237,7 @@
static
err_t
-InternalAcceptEventHandler(void *arg, struct tcp_pcb *newpcb, const err_t err)
+InternalAcceptEventHandler(void *arg, PTCP_PCB newpcb, const err_t err)
{
DbgPrint("[lwIP, InternalAcceptEventHandler] AcceptEvent arg = 0x%x, newpcb =
0x%x, err = %d\n",
arg, newpcb, (unsigned int)err);
@@ -168,7 +260,7 @@
static
err_t
-InternalConnectEventHandler(void *arg, struct tcp_pcb *pcb, const err_t err)
+InternalConnectEventHandler(void *arg, PTCP_PCB pcb, const err_t err)
{
DbgPrint("[lwIP, InternalConnectEventHandler] ConnectEvent (0x%x, pcb = 0x%x,
err = %d)\n",
arg, pcb, (unsigned int)err);
@@ -686,8 +778,8 @@
if (pcb->state != LISTEN)
{
tcp_recv(pcb, NULL);
- //tcp_sent(pcb, NULL);
- //tcp_err(pcb, NULL);
+ tcp_sent(pcb, NULL);
+ tcp_err(pcb, NULL);
}
tcp_accept(pcb, NULL);
@@ -705,13 +797,15 @@
{
DbgPrint("[lwIP, LibTCPCloseCallback] NULL pcb...bail, bail!!!\n");
- ASSERT(FALSE);
+ //ASSERT(FALSE);
msg->Error = ERR_OK;
return;
}
CloseCallbacks((PTCP_PCB)msg->Connection->SocketContext);
+
+ LibTCPEmptyQueue(msg->Connection);
if (((PTCP_PCB)msg->Connection->SocketContext)->state == LISTEN)
{
@@ -752,6 +846,9 @@
if (safe)
{
CloseCallbacks((PTCP_PCB)Connection->SocketContext);
+
+ LibTCPEmptyQueue(Connection);
+
if ( ((PTCP_PCB)Connection->SocketContext)->state == LISTEN )
{
DbgPrint("[lwIP, LibTCPClose] Closing a listener\n");