In my previous post about transferring data in the async manner. I was talking about designing a small data exchange protocol to transfer a message over the network. All I/O was done in the async manner using BeginXXX/EndXXX pattern. Code on that post was handling single message only. However, in the real world it rarely happens that only one message is being transferred over single connection. It is more common to expect that several messages can be received by the peer.
Data exchange protocol contains messages prefixed by the size. Size prefix has fixed length.
Prefixing data with its size is the corner stone of the simple data transfer protocol introduced in the previous example. There is no problem transferring multiple messages over single connection. Remote peer should have no problem distinguishing separate messages from the data stream.
This post will provide code sample how to read multiple messages from the network.
What to expect when multiple messages arrive at the server?
While dealing with multiple messages one has to remember that receive operation can return arbitrary number of bytes being read from the net. Typically that size is from 0 to specified buffer length in the Receive or BeginReceive methods.
Our data exchange format is illustrated on the image above.
Peer code after receiving number of bytes should be able to answer what part of the message it has just received. Is it part of the size prefix or it is a message body?
Sometimes, several messages can be received at one Receive call.
Let's see what situations we can encounter while processing incoming data:
- received data contain only data size prefix
- received data contain part of the data size prefix
- received data contain prefix and part of the data
- received data contain prefix, message data and part of the prefix of the next message
- received data contain prefix, message, prefix of the next message and part of its body.
When developing data processing code one has to expect the above illustrated scenarios will happen.
Here's the server code that handles conditions described above. I present here only server callback function. Client sending code can be obtained from the previous post .
private void ServerReadCallback(IAsyncResult ar)
{
try
{
ServerState state = (ServerState)ar.AsyncState;
Socket client = state.Client;
SocketError socketError;
int dataRead = client.EndReceive(ar, out socketError);
int dataOffset = 0; //to simplify logic
int restOfData = 0;
if (socketError != SocketError.Success)
{
client.Close();
return;
}
if (dataRead <= 0)
{
client.Close();
return;
}
while (dataRead > 0)
{
//check to determine what income data contain: size prefix or message
if (!state.DataSizeReceived)
{
//there is already some data in the buffer
if (state.Data.Length > 0)
{
restOfData = PrefixSize - (int)state.Data.Length;
state.Data.Write(state.Buffer, dataOffset, restOfData);
dataRead -= restOfData;
dataOffset += restOfData;
}
else if (dataRead >= PrefixSize)
{ //store whole data size prefix
state.Data.Write(state.Buffer, dataOffset, PrefixSize);
dataRead -= PrefixSize;
dataOffset += PrefixSize;
}
else
{ // store only part of the size prefix
state.Data.Write(state.Buffer, dataOffset, dataRead);
dataOffset += dataRead;
dataRead = 0;
}
if (state.Data.Length == PrefixSize )
{ //we received data size prefix
state.DataSize = BitConverter.ToInt32(state.Data.GetBuffer(), 0);
state.DataSizeReceived = true;
//reset internal data stream
state.Data.Position = 0;
state.Data.SetLength(0);
}
else
{ //we received just part of the prefix information
//issue another read
client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback),
state);
return;
}
}
//at this point we know the size of the pending data
if ((state.Data.Length + dataRead) >= state.DataSize)
{ //we have all the data for this message
restOfData = state.DataSize - (int)state.Data.Length;
state.Data.Write(state.Buffer, dataOffset, restOfData);
Console.WriteLine("Data message received. Size: {0}",
state.DataSize);
//store received messages
//lock(messages)
// messages.Add(state.Data.ToArray());
dataOffset += restOfData;
dataRead -= restOfData;
//message received - cleanup internal memory stream
state.Data.SetLength(0);
state.Data.Position = 0;
state.DataSizeReceived = false;
state.DataSize = 0;
if (dataRead == 0)
{ //no more data remaining to process - issue another receive
client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback),
state);
return;
}
else
continue; //there's still some data to process in the buffers
}
else
{ //there is still data pending, store what we've
//received and issue another BeginReceive
state.Data.Write(state.Buffer, dataOffset, dataRead);
client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback), state);
dataRead = 0;
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
Does your code handles the size restriction of total number of byte length per data when there is any limitation on the size of data per packet say 255 bytes.
ReplyDeleteAt present code handles int.MaxValue data size as data prefix has 4 bytes length.
ReplyDeleteWhat do you mean by "imitation on the size of data per packet"?
Hi thanks for this brilliant article.
ReplyDeleteI just want to point out what could be an issue with protocols
that have a fixed length header/prefix that is larger that just a couple of bytes.
You could get this situatoin:
1. Receive part of the prefix
|-------->
| Prefix <
|-------->
2. Receive second part of the prefix and maybe some of the message
|-------->
| Prefix < ... Maybe a message here
|-------->
You would need to modify you code above like this:
//there is already some data in the buffer
if (state.Data.Length > 0)
{
restOfData = PrefixSize - (int)state.Data.Length;
// If only part of the prefix was read, truncate rest of data to the amount
if (restOfData > dataRead)
{
restOfData = dataRead;
}
state.Data.Write(state.Buffer, dataOffset, restOfData);
dataRead -= restOfData;
dataOffset += restOfData;
}
Would this be the correct thing to do, or don't you think this scenario would ever occur?
2 Anonymous:
ReplyDeleteScenario where part of the prefix will be received is very common.
Code that processes (deserializes) packets from the network should work well even if data is received by one byte.
Did you test your the fix you propose?
Good code, but please fix the comment made by Anonymous about partial "prefix" reception. His point is right !
ReplyDeletehi, could you tell me where did you declare "PrefixSize" ? is it in the ServerState class?
ReplyDeletePrefixSize is just a constant. You can declare it anywhere.
ReplyDeleteHi, I don't understand well. The "prefixsize" is first 4 bytes in the message that tell the server the size of the data to be receive..and is sent from the client right? if so..why is it a constant? Thanks. =)
ReplyDeleteIt is a code constant like const int PrefixSize = 4;
ReplyDeleteIf in the future prefix size will change, say it will become 5 bytes, then only PrefixSize value should be changed. No other code will require any adjustment to accommodate that change.
Hi, Thanks for your help. I also wonder how could we handle data if client1 sends screen capture images data and file data to the server at the same time? Should we handle this situation by changing the prefix size for each type of data or should we use a byte delimiter? And what If client1 and client2 are sending screen capture image data to the server at the same time? Thank! =)
ReplyDeleteIt depends on your protocol. Prefixing data with size will only tell you how much data you will receive, but does not tell anything about data. One way to handle this is add another byte after the prefix that will indicate data type. In this case your data packet will look like [SizePrefix][DataType][Data]. [SizePrefix] - 4 bytes and [DataType] - 1 byte.
ReplyDeleteThis is old but helped me today. If created much more shorter way:
ReplyDeleteconst int PrefixSize = 4;
bool prefixSet = false;
public int DataSize = 0;
int RecLength = 0;
byte[] Received = new byte[2048];
try {
int dataRead = socket.EndReceive(ar);
if (dataRead > 0) {
Buffer.BlockCopy(buffer, 0, Received, RecLength , dataRead);
RecLength += dataRead;
if (RecLength >= PrefixSize) {
if (!prefixSet) {
prefixSet = true;
DataSize = BitConverter.ToInt32(Received, 0);
}
if (RecLength >= DataSize) {
RecLength = 0; prefixSet = false;
byte[] data = new byte[DataSize];
Buffer.BlockCopy(Received, 4, data, 0, DataSize);
ReceiveNextMessage();
Message m = new Message(this, data);
}
else {
ReceiveNextMessage();
}
}
else {
ReceiveNextMessage();
}
}
}
catch (Exception ex) {
Console.WriteLine(ex.ToString());
}