Author: cmihail Date: Mon Jul 25 18:45:59 2011 New Revision: 52864
URL: http://svn.reactos.org/svn/reactos?rev=52864&view=rev Log: [lwIP/TCPIP] - Add queue for connections objects in order to buffer packets that have arrived but have no corresponding receive requests to be assigned to. We buffer these packets to avoid giving a timeout that could cause throughput slowdowns.
Modified: branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c
Modified: branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h URL: http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/drivers/ne... ============================================================================== --- branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h [iso-8859-1] (original) +++ branches/GSoC_2011/TcpIpDriver/drivers/network/tcpip/include/titypes.h [iso-8859-1] Mon Jul 25 18:45:59 2011 @@ -268,6 +268,8 @@ LIST_ENTRY SendRequest; /* Queued send requests */ LIST_ENTRY ShutdownRequest;/* Queued shutdown requests */
+ LIST_ENTRY PacketQueue; /* Queued received packets waiting to be processed */ + /* Signals */ UINT SignalState; /* Active signals from oskit */
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c URL: http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/driver... ============================================================================== --- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c [iso-8859-1] (original) +++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/event.c [iso-8859-1] Mon Jul 25 18:45:59 2011 @@ -144,18 +144,32 @@ TCPFinEventHandler(void *arg, err_t err) { PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg; + const NTSTATUS status = TCPTranslateError(err);
DbgPrint("[IP, TCPFinEventHandler] Called for Connection( 0x%x )-> SocketContext = pcb (0x%x)\n", Connection, Connection->SocketContext);
/* Only clear the pointer if the shutdown was caused by an error */ - if (err != ERR_OK) + if ((err != ERR_OK))// && (status != STATUS_REMOTE_DISCONNECT)) { /* We're already closed by the error so we don't want to call lwip_close */ DbgPrint("[IP, TCPFinEventHandler] MAKING Connection( 0x%x )-> SocketContext = pcb (0x%x) NULL\n", Connection, Connection->SocketContext); + + // close all possible callbacks + /*tcp_arg((PTCP_PCB)Connection->SocketContext, NULL); + + if (((PTCP_PCB)Connection->SocketContext)->state != LISTEN) + { + tcp_recv((PTCP_PCB)Connection->SocketContext, NULL); + tcp_sent((PTCP_PCB)Connection->SocketContext, NULL); + tcp_err((PTCP_PCB)Connection->SocketContext, NULL); + } + + tcp_accept((PTCP_PCB)Connection->SocketContext, NULL);*/ + Connection->SocketContext = NULL; } - - FlushAllQueues(Connection, TCPTranslateError(err)); + + FlushAllQueues(Connection, status);
DbgPrint("[IP, TCPFinEventHandler] Done\n"); } @@ -305,7 +319,7 @@ u32_t TCPRecvEventHandler(void *arg, struct pbuf *p) { - PCONNECTION_ENDPOINT Connection = arg; + PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)arg; PTDI_BUCKET Bucket; PLIST_ENTRY Entry; PIRP Irp;
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c URL: http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/driver... ============================================================================== --- branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c [iso-8859-1] (original) +++ branches/GSoC_2011/TcpIpDriver/lib/drivers/ip/transport/tcp/tcp.c [iso-8859-1] Mon Jul 25 18:45:59 2011 @@ -25,7 +25,7 @@
VOID ConnectionFree(PVOID Object) { - PCONNECTION_ENDPOINT Connection = Object; + PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT)Object; KIRQL OldIrql;
TI_DbgPrint(DEBUG_TCP, ("Freeing TCP Endpoint\n")); @@ -41,7 +41,7 @@
PCONNECTION_ENDPOINT TCPAllocateConnectionEndpoint( PVOID ClientContext ) { - PCONNECTION_ENDPOINT Connection = + PCONNECTION_ENDPOINT Connection = (PCONNECTION_ENDPOINT) ExAllocatePoolWithTag(NonPagedPool, sizeof(CONNECTION_ENDPOINT), CONN_ENDPT_TAG); if (!Connection) @@ -58,6 +58,7 @@ InitializeListHead(&Connection->ReceiveRequest); InitializeListHead(&Connection->SendRequest); InitializeListHead(&Connection->ShutdownRequest); + InitializeListHead(&Connection->PacketQueue);
/* Save client context pointer */ Connection->ClientContext = ClientContext; @@ -407,6 +408,9 @@ { PTDI_BUCKET Bucket; KIRQL OldIrql; + PUCHAR DataBuffer; + UINT DataLen, Received; + NTSTATUS Status;
TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Called for %d bytes (on socket %x)\n", ReceiveLength, Connection->SocketContext)); @@ -414,29 +418,38 @@ DbgPrint("[IP, TCPReceiveData] Called for %d bytes (on Connection->SocketContext = 0x%x)\n", ReceiveLength, Connection->SocketContext);
- LockObject(Connection, &OldIrql); - - /* Freed in TCPSocketState */ - Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), TDI_BUCKET_TAG ); - if( !Bucket ) - { - TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate bucket\n")); + NdisQueryBuffer(Buffer, &DataBuffer, &DataLen); + + Status = LibTCPGetDataFromConnectionQueue(Connection, DataBuffer, DataLen, &Received); + + if (Status == STATUS_PENDING) + { + LockObject(Connection, &OldIrql); + + /* Freed in TCPSocketState */ + Bucket = ExAllocatePoolWithTag( NonPagedPool, sizeof(*Bucket), TDI_BUCKET_TAG ); + if( !Bucket ) + { + TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Failed to allocate bucket\n")); + UnlockObject(Connection, OldIrql); + + return STATUS_NO_MEMORY; + } + + Bucket->Request.RequestNotifyObject = Complete; + Bucket->Request.RequestContext = Context; + *BytesReceived = 0; + + InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry ); + TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n")); + UnlockObject(Connection, OldIrql);
- return STATUS_NO_MEMORY; - } - - Bucket->Request.RequestNotifyObject = Complete; - Bucket->Request.RequestContext = Context; - *BytesReceived = 0; - - InsertTailList( &Connection->ReceiveRequest, &Bucket->Entry ); - TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Queued read irp\n")); - - UnlockObject(Connection, OldIrql); - - TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status = STATUS_PENDING\n")); - DbgPrint("[IP, TCPReceiveData] Leaving. Status = STATUS_PENDING\n"); + TI_DbgPrint(DEBUG_TCP,("[IP, TCPReceiveData] Leaving. Status = STATUS_PENDING\n")); + } + + DbgPrint("[IP, TCPReceiveData] Leaving. Status = %s\n", + Status == STATUS_PENDING? "STATUS_PENDING" : "STATUS_SUCCESS");
return STATUS_PENDING; }
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h URL: http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/driver... ============================================================================== --- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h [iso-8859-1] (original) +++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/include/rosip.h [iso-8859-1] Mon Jul 25 18:45:59 2011 @@ -6,7 +6,19 @@ #include "lwip/ip_addr.h" #include "tcpip.h"
+#ifndef LWIP_TAG + #define LWIP_TAG 'PIwl' +#endif + typedef struct tcp_pcb* PTCP_PCB; + +typedef struct _QUEUE_ENTRY +{ + struct pbuf *p; + LIST_ENTRY ListEntry; +} QUEUE_ENTRY, *PQUEUE_ENTRY; + +NTSTATUS LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, PUCHAR RecvBuffer, UINT RecvLen, UINT *Received);
/* External TCP event handlers */ extern void TCPConnectEventHandler(void *arg, const err_t err);
Modified: branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c URL: http://svn.reactos.org/svn/reactos/branches/GSoC_2011/TcpIpDriver/lib/driver... ============================================================================== --- branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] (original) +++ branches/GSoC_2011/TcpIpDriver/lib/drivers/lwip/src/rostcp.c [iso-8859-1] Mon Jul 25 18:45:59 2011 @@ -30,6 +30,93 @@ extern KEVENT TerminationEvent;
static +void +LibTCPEmptyQueue(PCONNECTION_ENDPOINT Connection) +{ + PLIST_ENTRY Entry; + PQUEUE_ENTRY qp = NULL; + + ReferenceObject(Connection); + + + while (!IsListEmpty(&Connection->PacketQueue)) + { + DbgPrint("[lwIP, LibTCPEmptyQueue] Removed packet off queue++++\n"); + + Entry = RemoveHeadList(&Connection->PacketQueue); + qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry); + + // reenable this later + //pbuf_free(qp->p); + + ExFreePoolWithTag(qp, LWIP_TAG); + } + + DereferenceObject(Connection); +} + +void LibTCPEnqueuePacket(PCONNECTION_ENDPOINT Connection, struct pbuf *p) +{ + PQUEUE_ENTRY qp; + + qp = (PQUEUE_ENTRY)ExAllocatePoolWithTag(NonPagedPool, sizeof(QUEUE_ENTRY), LWIP_TAG); + qp->p = p; + + ExInterlockedInsertTailList(&Connection->PacketQueue, &qp->ListEntry, &Connection->Lock); +} + +PQUEUE_ENTRY LibTCPDequeuePacket(PCONNECTION_ENDPOINT Connection) +{ + PLIST_ENTRY Entry; + PQUEUE_ENTRY qp = NULL; + + Entry = ExInterlockedRemoveHeadList(&Connection->PacketQueue, &Connection->Lock); + + qp = CONTAINING_RECORD(Entry, QUEUE_ENTRY, ListEntry); + + return qp; +} + +NTSTATUS LibTCPGetDataFromConnectionQueue(PCONNECTION_ENDPOINT Connection, PUCHAR RecvBuffer, UINT RecvLen, UINT *Received) +{ + PQUEUE_ENTRY qp; + struct pbuf* p; + NTSTATUS Status = STATUS_PENDING; + + if (!IsListEmpty(&Connection->PacketQueue)) + { + DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] Getting packet off the queue\n"); + + qp = LibTCPDequeuePacket(Connection); + p = qp->p; + + RecvLen = MIN(p->tot_len, RecvLen); + + for ((*Received) = 0; (*Received) < RecvLen; *Received += p->len, p = p->next) + { + DbgPrint("[lwIP, LibTCPGetDataFromConnectionQueue] 0x%x: Copying %d bytes to 0x%x from 0x%x\n", + p, p->len, ((PUCHAR)RecvBuffer) + (*Received), p->payload); + + RtlCopyMemory(RecvBuffer + (*Received), p->payload, p->len); + } + + // reenable this later + //pbuf_free(qp->p); + ExFreePoolWithTag(qp, LWIP_TAG); + + Status = STATUS_SUCCESS; + } + else + { + DbgPrint("[lwIP, LibTCPGetPacketFromQueue] Queue is EMPTY\n"); + + Status = STATUS_PENDING; + } + + return Status; +} + +static BOOLEAN WaitForEventSafely(PRKEVENT Event) { @@ -56,7 +143,7 @@
static err_t -InternalSendEventHandler(void *arg, struct tcp_pcb *pcb, const u16_t space) +InternalSendEventHandler(void *arg, PTCP_PCB pcb, const u16_t space) { DbgPrint("[lwIP, InternalSendEventHandler] SendEvent (0x%x, 0x%x, %d)\n", arg, pcb, (unsigned int)space); @@ -75,7 +162,7 @@
static err_t -InternalRecvEventHandler(void *arg, struct tcp_pcb *pcb, struct pbuf *p, const err_t err) +InternalRecvEventHandler(void *arg, PTCP_PCB pcb, struct pbuf *p, const err_t err) { u32_t len;
@@ -125,8 +212,13 @@ else { /* We want lwIP to store the pbuf on its queue for later */ - DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT\n"); - return ERR_TIMEOUT; + DbgPrint("[lwIP, InternalRecvEventHandler] Done ERR_TIMEOUT queuing pbuf\n"); + + LibTCPEnqueuePacket((PCONNECTION_ENDPOINT)arg, p); + + tcp_recved(pcb, p->tot_len); + + return ERR_OK;//ERR_TIMEOUT;// } } else if (err == ERR_OK) @@ -145,7 +237,7 @@
static err_t -InternalAcceptEventHandler(void *arg, struct tcp_pcb *newpcb, const err_t err) +InternalAcceptEventHandler(void *arg, PTCP_PCB newpcb, const err_t err) { DbgPrint("[lwIP, InternalAcceptEventHandler] AcceptEvent arg = 0x%x, newpcb = 0x%x, err = %d\n", arg, newpcb, (unsigned int)err); @@ -168,7 +260,7 @@
static err_t -InternalConnectEventHandler(void *arg, struct tcp_pcb *pcb, const err_t err) +InternalConnectEventHandler(void *arg, PTCP_PCB pcb, const err_t err) { DbgPrint("[lwIP, InternalConnectEventHandler] ConnectEvent (0x%x, pcb = 0x%x, err = %d)\n", arg, pcb, (unsigned int)err); @@ -686,8 +778,8 @@ if (pcb->state != LISTEN) { tcp_recv(pcb, NULL); - //tcp_sent(pcb, NULL); - //tcp_err(pcb, NULL); + tcp_sent(pcb, NULL); + tcp_err(pcb, NULL); }
tcp_accept(pcb, NULL); @@ -705,13 +797,15 @@ { DbgPrint("[lwIP, LibTCPCloseCallback] NULL pcb...bail, bail!!!\n");
- ASSERT(FALSE); + //ASSERT(FALSE);
msg->Error = ERR_OK; return; }
CloseCallbacks((PTCP_PCB)msg->Connection->SocketContext); + + LibTCPEmptyQueue(msg->Connection);
if (((PTCP_PCB)msg->Connection->SocketContext)->state == LISTEN) { @@ -752,6 +846,9 @@ if (safe) { CloseCallbacks((PTCP_PCB)Connection->SocketContext); + + LibTCPEmptyQueue(Connection); + if ( ((PTCP_PCB)Connection->SocketContext)->state == LISTEN ) { DbgPrint("[lwIP, LibTCPClose] Closing a listener\n");