Tuesday, May 06, 2008

Sample code for TCP server using completion ports

As I have promised in my previous post I'm presenting sample code of TCP server that is receiving variable length messages in specific format. Data transfer protocol implies that network messages consist of prefix holding body size and body part.

At first, code defines application state object

/// 
/// Server state holds current state of the client socket
///

class AsyncServerState
{
public byte[] Buffer = new byte[512]; //buffer for network i/o
public int DataSize = 0; //data size to be received by the server

//flag that indicates whether prefix was received
public bool DataSizeReceived = false;

public MemoryStream Data = new MemoryStream(); //place where data is stored
public SocketAsyncEventArgs ReadEventArgs = new SocketAsyncEventArgs();
public Socket Client;
}

To preserve application state between async operations SocketAsyncEventArgs.UserToken is used.
/// 
/// Async server sample, demonstates usage of XxxAsync methods
///

class AsyncServer
{
Socket listeningSocket;
List messages = new List();
const int PrefixSize = 4;

SocketAsyncEventArgs acceptEvtArgs;

public AsyncServer()
{
this.listeningSocket = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
this.acceptEvtArgs = new SocketAsyncEventArgs();
}

public void Start(IPEndPoint listeningAddress)
{
acceptEvtArgs.Completed += new EventHandler(
Accept_Completed);

listeningSocket.Bind(listeningAddress);
listeningSocket.Listen(1);

ProcessAccept(acceptEvtArgs);
}

///
/// Accept completion handler
///

void Accept_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
Socket client = e.AcceptSocket;
AsyncServerState state = new AsyncServerState();
state.ReadEventArgs.AcceptSocket = client;
state.ReadEventArgs.Completed += new EventHandler(
IO_Completed);
state.ReadEventArgs.UserToken = state;
state.Client = client;
state.ReadEventArgs.SetBuffer(state.Buffer, 0, state.Buffer.Length);

if (!client.ReceiveAsync(state.ReadEventArgs))
{ //call completed synchonously
ProcessReceive(state.ReadEventArgs);
}
}
ProcessAccept(e);
}

private void ProcessAccept(SocketAsyncEventArgs e)
{
e.AcceptSocket = null;
if (!listeningSocket.AcceptAsync(acceptEvtArgs))
{ //operation completed synchronously
Accept_Completed(null, acceptEvtArgs);
}
}

///
/// Genereic I/O completion handler
///

void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new NotImplementedException("The code will "
+"handle only receive and send operations");
}
}

///
/// In future will process server send operations
///

private void ProcessSend(SocketAsyncEventArgs e) { }

///
/// Implements server receive logic
///

private void ProcessReceive(SocketAsyncEventArgs e)
{
//single message can be received using several receive operation
AsyncServerState state = e.UserToken as AsyncServerState;

if (e.BytesTransferred <= 0 || e.SocketError != SocketError.Success) { CloseConnection(e); } int dataRead = e.BytesTransferred; int dataOffset = 0; int restOfData = 0; while (dataRead > 0)
{
if (!state.DataSizeReceived)
{
//there is already some data in the buffer
if (state.Data.Length > 0)
{
restOfData = PrefixSize - (int)state.Data.Length;
state.Data.Write(state.Buffer, dataOffset, restOfData);
dataRead -= restOfData;
dataOffset += restOfData;
}
else if (dataRead >= PrefixSize)
{ //store whole data size prefix
state.Data.Write(state.Buffer, dataOffset, PrefixSize);
dataRead -= PrefixSize;
dataOffset += PrefixSize;
}
else
{ // store only part of the size prefix
state.Data.Write(state.Buffer, dataOffset, dataRead);
dataOffset += dataRead;
dataRead = 0;
}

if (state.Data.Length == PrefixSize)
{ //we received data size prefix
state.DataSize = BitConverter.ToInt32(state.Data.GetBuffer(), 0);
state.DataSizeReceived = true;

state.Data.Position = 0;
state.Data.SetLength(0);
}
else
{ //we received just part of the headers information
//issue another read
if (!state.Client.ReceiveAsync(state.ReadEventArgs))
ProcessReceive(state.ReadEventArgs);
return;
}
}

//at this point we know the size of the pending data
if ((state.Data.Length + dataRead) >= state.DataSize)
{ //we have all the data for this message

restOfData = state.DataSize - (int)state.Data.Length;

state.Data.Write(state.Buffer, dataOffset, restOfData);
Console.WriteLine("Data message received. Size: {0}",
state.DataSize);

dataOffset += restOfData;
dataRead -= restOfData;

state.Data.SetLength(0);
state.Data.Position = 0;
state.DataSizeReceived = false;
state.DataSize = 0;

if (dataRead == 0)
{
if (!state.Client.ReceiveAsync(state.ReadEventArgs))
ProcessReceive(state.ReadEventArgs);
return;
}
else
continue;
}
else
{ //there is still data pending, store what we've
//received and issue another BeginReceive
state.Data.Write(state.Buffer, dataOffset, dataRead);

if (!state.Client.ReceiveAsync(state.ReadEventArgs))
ProcessReceive(state.ReadEventArgs);

dataRead = 0;
}
}
}

private void CloseConnection(SocketAsyncEventArgs e)
{
AsyncServerState state = e.UserToken as AsyncServerState;

try
{
state.Client.Shutdown(SocketShutdown.Send);
}
catch (Exception) { }

state.Client.Close();
}
}

Code sample above gives basic idea how completion ports asynchronous pattern can be used in TCP server development.

19 comments:

  1. Hi there,
    Its a nice blog about the port completion process in asynchronous operations with sockets, but I need your openion on something ...

    You see we are using the SocketEventArgs object to provide data to the asynchrounous methods ... which is really good ... and WE CAN USE THE OBEJCT OVER AND OVER AGAIN for various other async operations ... well ... my question is how we are going to use it multiple times through the operations ?

    The trick starts with the the AcceptAsync method ... we create a brand new object for it ... now when we start accepting other clients we will use the same object (what changes to the SocketEventArgs object we should make to accept again ?) ... and now we want to receive data from the accepted client ... now shall we use the same object or a new object for it ?? and if we wanted to send data ... shall i create a new object as well ???

    All this is to maximize the performance as much as we can ... so if you can help fasten the answer for me I will appreciate it ...

    Thanks in Advance !

    ReplyDelete
  2. To maximize performance you can create a pool of SocketEventArgs objects. Say "accept pool" will have 5 objects cached. Send/Receive object pools can have great number of cached object. Typically Send or Receive objects pools will contain the amount of connections your application is able to process simultaneously.

    To reuse SocketEventArgs again you have to clean its AcceptSocket property (just assign null to it).

    ReplyDelete
  3. Great article, this is the first article I seen describing how the *Async is different from Begin*.

    I have a general question: I want to have an IRC style server to wich clients initiate a connection, and server keeps a list of client and works on a spoke-hub model. I tried keeping a List<Socket> from accepted sockets, but they seem to disconnect (without server or client requesting that). Can I do that : can I keep a list of open sockets to send on them whenever I get a new message? Do I have no guarantee that the sockets stays open? Otherwise how can I keep one open or reopen it without having a new SYN? I need to keep my established connection since most networks are either firewalled or NATed; so creating a new "reverse" connection is not an option.

    ReplyDelete
  4. Generally, if you want detect closed Socket you have to do an operation on it. For instance, if other peer has closed connection without notification, then to validate socket you should perform some kind of I/O (recv or send) on it. On the other hand you can check socket properties like Connected.
    But from my experience these properties aren't accurate.

    The best method is to implement keep-alive feature. It can be sending small piece of data, just to see that other peer is still there.

    ReplyDelete
  5. Nice Article.

    well i want to develop client application using SocketAsyncEventArgs API that can create multiple( 100 client connection) and send/receive that data from server.

    Actually my means is to calculate Txn Per Sceond(TPS) of my server. so do u have any about it?

    Thanks
    Ravi
    mcamail2002@gmail.com

    ReplyDelete
  6. What does "Txn" stand for?

    If you want count number of connections opened on server - global counter can do.

    Increase when connection is open, decrease when it is closed or I/O error on it happens.

    ReplyDelete
  7. Txn stands for Transaction.

    Actually my mean here i want to create a client application (Load generator) which can send serveral request messages simultaneously for a particular time of period.
    And calculate the "Transaction Per Second".
    For instance

    Client sent 2000 requests in 90000miliseconds then tps will be 2000/90 = 22.22 requests handled per second by the server.

    Thanks
    Ravi

    ReplyDelete
  8. IMO it will be more simpler if you will start with simple synchronous sockets.

    You can create several threads. In each thread there will be a socket that will connect to the server. After sending request and getting response global request counter is updated and the process is repeated.

    ReplyDelete
  9. What's AsyncServerState?

    ReplyDelete
  10. Ignore my question about AsyncServerState.

    I missed your definition of it at the very top.

    ReplyDelete
  11. How to make this example work for UDP?

    ReplyDelete
  12. I found this site using [url=http://google.com]google.com[/url] And i want to thank you for your work. You have done really very good site. Great work, great site! Thank you!

    Sorry for offtopic

    ReplyDelete
  13. Who knows where to download XRumer 5.0 Palladium?
    Help, please. All recommend this program to effectively advertise on the Internet, this is the best program!

    ReplyDelete
  14. What magnificent phrase


    I suggest you to visit a site on which there is a lot of information on a theme interesting you. Hot Health

    ReplyDelete
  15. hi

    nice work ...:)

    1. can you give an example for the process send function ?

    2. how can I handle network connection problems in this kind of implementation ?

    ReplyDelete
  16. Nice Post! I am desperatly looking for a sample multicast TCP server based on asynchronous Events.
    Everything i found until now where unicast(echo) samples..

    Would you please tell me, how i can do it?

    ReplyDelete
  17. [url=http://conservativethoughts.us/wp-content/themes/post/view.php]Cialis soft pills[/url]

    ReplyDelete
  18. In it something is also to me it seems it is excellent idea. Completely with you I will agree.

    ReplyDelete