About Store Forum Documentation Contact



Post Reply 
Reliable UDP
Author Message
Fex Offline
Gold Supporter

Post: #1
Reliable UDP
I noticed there was no UDP (FastConnection) tutorial included with EE.

I have been converting my game to RUDP and so decided to make a little tutorial using what I learned. Its useful to see how to use UDP in EE as well as a simple way to do reliable-ordered UDP. See the attached esenthelproject, I have also included the code for people who are not using the latest EE.

Features
Multiple clients
Auto dropping timed-out clients
Can send packets both reliably and unreliably
Ping calculation
Packet loss calculation

Project includes a ReliableFastConnectionClass, a Demo Client, and Demo Server.

Why use this over RakNet? Because its simpler! 100% EE Script code.

What it missing that would be nice:
Handshakes, Encryption, Packet combining, Limiting Packets to 1500 bytes or auto-fragmenting into smaller, sequential delivery rather than strict ordered etc.. etc..

I didn't put more effort into it than I needed to, I only use EE on the client so that is why the server isn't more complete (testing for malformed packets, buffer overrun etc), a malicious client could kill it pretty easy!

The class
Code:
// communication constants
   const Byte RFC_PING = 0;
   const Byte RFC_PING_REPLY = 1;
   const Byte RFC_UNRELIABLE = 2;
   const Byte RFC_RELIABLE = 3;
   const Byte RFC_CONFIRM_RELIABLE = 4;

class ReliableFastConnection : FastConnection // UDP connection with optional reliable-ordered delivery
{
   // things that can be tweaked..
   Int pingFrequency = 10; //times per second to ping, 0 = server, > 0 = client
   Int reliableTimeout = 100; // time in milliseconds from last reliable message to try sending it again
   Int connectionTimeout = 15; // time in seconds that if no ping is received toremoved the ActiveDestination from remoteAddresses
  
   // things not to be tweaked..
   SockAddr localListenAddress;
   Flt lastPingTime = 0; //seconds since app start, used to decide if we need to send another ping or not (client only)
   Flt ping = 0; //round trip latency ms
   Memc<Flt> ping_times; // container for storing the last 10 ping times
   Flt packet_loss = 0; // percentage of ping packets that don't make it across the wire averaged over 10 seconds
   Memc<Flt> pings_received_last_10_secs; // pings that have been recieved in last 10 seconds
   ULong reliable_received = 0; // data in bytes received
   ULong unreliable_received = 0;
   ULong reliable_sent = 0;
   ULong unreliable_sent = 0;
  
   struct PacketAddr
   {
      Byte data[65536];
      UShort length;
      SockAddr address;
      
      void from_file(File &f,  SockAddr addr)
      {
         address = addr;
         f.pos(0);
         length = f.size();
         f.get(data, length);
      }
   }
  
   struct ActiveDestination
   {
      SockAddr address;
      Flt received_ping_at_time = 0; // the last time we got a ping in local time, not remote tag time
      ULong local_synchro_on_local  = 0; // what our last reliable packet sent out was tagged with
      ULong local_synchro_on_remote = 0; // the last confirmed received reliable packet from the remote connection
      ULong remote_synchro_on_local = 0; // the last reliable packet recieved from the remote connection was tagged with this
      Memc<PacketAddr> reliable_queue;
      Flt lastReliableTime = 0;
   }
  
   Memc<ActiveDestination> remoteAddresses;
   Memc<PacketAddr> received_queue; // this is currently not put into ActiveDestination like reliable_queue, but it certainly could be..
  
   // connect(..) doesn't actually send anything to the remote host, its really just setting the remote address and opening the local UDP
   // port for listening, after all, UDP is really connection-less, we establish a "connection" through keep-alive pinging in the update loop.
   void connect(SockAddr remoteaddr,  SockAddr listenaddr) // only the client uses this
   {
      ActiveDestination actdest;
      actdest.address = remoteaddr;
      remoteAddresses.add(actdest); // client will only have 1 destination address.
      listen(listenaddr);
      return;
   }
  
   void listen(SockAddr addr) // server will just listen, not use connect
   {
      create(addr);
      localListenAddress = addr;
      return;
   }
  
   void send_reliable(SockAddr addr, File &f) // this actually just adds to a queue
   {
      PacketAddr padd;
      padd.from_file(f, addr);
      if(remoteAddresses_has_remote(addr))
      {
         get_remoteAddress(addr).reliable_queue.add(padd); // add to end of queue
      }
      return;
   }
  
   void send_unreliable(SockAddr destination, File &f)
   {
      File f2;
      f2.writeMem().putByte(RFC_UNRELIABLE).putUShort(f.size());
      f.pos(0); // make sure it is at the beginning of the file before copying
      f.copy(f2);
      f2.pos(0);
      send(destination, f2);
      unreliable_sent += f.size(); // we ar enot including the RFC header and UShort size
   }
  
   Bool update()
   {
      Int bytes_received = 0;
      Byte datarec[65536];
      SockAddr sender_addr;
      bytes_received = receive(sender_addr, datarec);
      while(bytes_received > -1) // needs to be loop more than one command recieved at once is possible? seems like it shoudl onyl return 1 packet at a time, but maybe need multiple calls to receive if low framerate?
      {
         File f;f.readMem(datarec, bytes_received);
         //while(!f.end())  // should not need this? we will only send 1 command per packet, doing multiple commands per packet  makes it harder when we decide to ignore the rest of a packet like in RFS_RELIABLE case below, we would need to make sure to read out the rest of the data in the packet otherwise will will get buggy stuff
         {
            Byte command = f.getByte();
            switch(command)
            {
               case RFC_PING: // only server will receive this, client pings server, not vice-versa
                  {
                    Flt ptime = f.getFlt();
                    ULong loc_synchro_on_remote = f.getULong(); // learn what was the last reliable client got from the server was, lets us know if we need to resend
                    if(remoteAddresses_has_remote(sender_addr))
                     {
                        ActiveDestination & adest = get_remoteAddress(sender_addr);
                        adest.local_synchro_on_remote = loc_synchro_on_remote;
                        adest.received_ping_at_time = Time.realTime();
                        
                        File f2;
                        f2.writeMem().putByte(RFC_PING_REPLY).putFlt(ptime).putULong(adest.remote_s​ynchro_on_local).pos(0); // let the client know what the last reliable the server got from it was
                        send(sender_addr,  f2);
                     }
                     else
                     {
                        ActiveDestination actdest;
                        actdest.address = sender_addr;
                        actdest.received_ping_at_time = Time.realTime();
                        remoteAddresses.add(actdest);
                        
                        File f2;
                        f2.writeMem().putByte(RFC_PING_REPLY).putFlt(ptime).putULong(actdest.remote​_synchro_on_local).pos(0); // let the client know what the last reliable the server got from it was
                        send(sender_addr,  f2);
                     }
                  }  
                  break;
               case RFC_PING_REPLY: // client will recieve this
                  {
                    Flt lastPingTimeReceived = f.getFlt();
                    
                    // for packet loss calculation
                    pings_received_last_10_secs.add(lastPingTimeReceived);
                  
                    // calculate ping
                    if(ping_times.elms() > 9)
                     {
                        ping_times.add(Time.realTime() - lastPingTimeReceived);
                        ping_times.remove(0, true);
                        Flt ping_time_total = 0;
                        REPA(ping_times){ping_time_total +=ping_times[i];}
                        ping = (ping_time_total / ping_times.elms()) * 1000; // make it in milliseconds
                     }
                     else
                     {
                        ping_times.add(Time.realTime() - lastPingTimeReceived);
                     }
                    
                     // check to see if we need to increase our local synchro
                     test_local_synchro_on_remote(f.getULong(), remoteAddresses[0].local_synchro_on_remote, remoteAddresses[0]);
                  }
                  break;
               case RFC_UNRELIABLE:
                  {
                     UShort length_of_packet = f.getUShort(); // datarec may contain more than one packet, so we need to only get one at a time
                     File f2;
                     f2.writeMem();
                     f.copy(f2, length_of_packet); // hopefully f.copy advances pos() of f.. need ot maek sure otherwise this wont work, it will just keep reading the same packet over and over
                     PacketAddr padd;
                     padd.from_file(f2, sender_addr);
                     received_queue.add(padd);
                     unreliable_received += length_of_packet;
                  }
                  break;
               case RFC_RELIABLE:
                  {
                     ULong synchro = f.getULong();
                     if(remoteAddresses_has_remote(sender_addr))
                     {
                        ActiveDestination & adest = get_remoteAddress(sender_addr);
                        
                        if(synchro ==  (adest.remote_synchro_on_local + 1)) // is it the next synchro message? we need to make sure its not a repeat of something we already received
                        {
                           UShort length_of_packet = f.getUShort(); // datarec may contain more than one packet, so we need to only get one at a time
                           File f2;
                           f2.writeMem();
                           f.copy(f2, length_of_packet); // hopefully f.copy advances pos() of f.. need ot maek sure otherwise this wont work, it will just keep reading the same packet over and over
                           PacketAddr padd;
                           padd.from_file(f2, sender_addr);
                           received_queue.add(padd);
                           adest.remote_synchro_on_local += 1;
                           reliable_received += length_of_packet;
                          
                           // now we need to send a confirmation back immediately so remote doesn't have to wait for next ping
                           File f3;
                           f3.writeMem().putByte(RFC_CONFIRM_RELIABLE).putULong(adest.remote_synchro_on_loc​al).pos(0); // let the remote know what the last reliable the server got from it was
                           send(sender_addr,  f3);
                        }
                     }
                  }
               case RFC_CONFIRM_RELIABLE:
                  {
                     REPA(remoteAddresses)if(remoteAddresses[i].address == sender_addr)
                     test_local_synchro_on_remote(f.getULong(), remoteAddresses[i].local_synchro_on_remote, remoteAddresses[i]); // check to see if we need to increase our local synchro
                  }  
                  break;
               default:
                  Exit("RFC Default, something went wrong, no matching command!");
                  break;
            }
         bytes_received = receive(sender_addr, datarec); // read the next packet, if it exists
         }
       }
      send_ping(); // currently only works for client
      send_from_reliable_queues();
      if(pingFrequency == 0)remove_dead_remote_addresses(); // only server should do this for now, unless server->client pings are implemented
      return true;
   }
  
private:
  
   void remove_dead_remote_addresses()
   {
      Bool removed_one = false;
      REPA(remoteAddresses)
      {
         if(((Time.realTime() - remoteAddresses[i].received_ping_at_time) > connectionTimeout) && !removed_one)
         {
            remoteAddresses.remove(i);
            removed_one =true;
            break;
         }
      }
   }
  
   void send_from_reliable_queues()
   {
      REPA(remoteAddresses)
      if(remoteAddresses[i].reliable_queue.elms()) // if there is stuff in the queue
         if(remoteAddresses[i].local_synchro_on_local == remoteAddresses[i].local_synchro_on_remote) // if the destination address has received  all previous reliable messages
         {
            actually_send_reliable(remoteAddresses[i].reliable_queue.first(),remoteAddresses[i].lastReliableTime, ++remoteAddresses[i].local_synchro_on_local);
         }
         else if((Time.realTime() - remoteAddresses[i].lastReliableTime) > (reliableTimeout/1000) ) // destination has not received our last sent reliable, and timeout has occured to send it again
         {
            actually_send_reliable(remoteAddresses[i].reliable_queue.first(),remoteAddresses[i].lastReliableTime, remoteAddresses[i].local_synchro_on_local);
         }
   }
  
   void send_ping()
   {
      if(pingFrequency > 0) // is a client so ping
         if((Time.realTime() - lastPingTime) > (1/pingFrequency))
         {
            File f;
            f.writeMem().putByte(RFC_PING).putFlt(Time.realTime()).putULong(remoteAddre​sses[0].remote_synchro_on_local).pos(0); //remote_synchro_on_local let the server know what the last reliable the client got from it was
            //send(destinationAddress,  f);
            send(remoteAddresses[0].address,  f);
            lastPingTime = Time.realTime();
            
            // update pack loss stuff only for client
            update_packet_loss();
            packet_loss = ((pingFrequency * 10) - pings_received_last_10_secs.elms()) / 100.0;
            if(packet_loss < 0) packet_loss = 0.0;
            if(Time.realTime() < 10.0)packet_loss = 0; // jus tignor eit the first 10 secs of app opening
         }
   }

   void actually_send_reliable(PacketAddr &padd, Flt &last_reliable_sent_time, ULong synchro_to_tag_packet_with)
   {
        File f;
        f.writeMem().putByte(RFC_RELIABLE).putULong(synchro_to_tag_packet_with).put​UShort(padd.length); // tag as reliable, put the synchro, add the length of the packet
        f.put(padd.data, padd.length); // add packet
        f.pos(0);
        send(padd.address,  f);
        last_reliable_sent_time = Time.realTime();
        reliable_sent += padd.length; // we are not including the RFC header and UShort size
   }
  
   void update_packet_loss()
   {
      REPA(pings_received_last_10_secs)
      {
         Bool removed_one = false;
         if(Time.realTime() - pings_received_last_10_secs[i] > 10 &&  !removed_one)
         {
            pings_received_last_10_secs.remove(i);
            removed_one = true;
            update_packet_loss();
         }
      }
   }
  
   void test_local_synchro_on_remote(ULong test_synchro, ULong &loc_synchro_on_remote,  ActiveDestination &adest)
   {
      if(test_synchro == loc_synchro_on_remote + 1) // server has confirmed a new reliable
      {
         loc_synchro_on_remote += 1;
         adest.reliable_queue.remove(0, true); // this will shift memory, is this too slow?
      }
   }
  
   Bool remoteAddresses_has_remote(SockAddr address) // is the remote address in remoteAddresses?
   {
      Bool found_remote = false;
      REPA(remoteAddresses)if(address == remoteAddresses[i].address){ found_remote = true; break;}
      return found_remote;
   }
  
   ActiveDestination& get_remoteAddress(SockAddr address)
   {
      REPA(remoteAddresses)if(address == remoteAddresses[i].address) return remoteAddresses[i];
      Exit("Couldn't find remote addess, make sure to callremoteAddresses_has_remote first");
      return remoteAddresses[0];
   }
}

The client demo

Code:
/******************************************************************************

   This tutorial presents a sample  Reliable UDP Client Connection.

   It can be used together with second tutorial "Demo Server"

/******************************************************************************/
Str destinationIP = "127.0.0.1";
Str last_message_string = "nothing";
// destinationPort and listenPort must be different if server is run on localhost,
// because only one process can listen to a UDP port at one time,
// otherwise they can be the same port, and probably should be to help with
// NAT transversal (so that end users don't need to forward ports on router).
Int destinationPort = 8791;
//Int listenPort = 8790;
ReliableFastConnection connection;
/******************************************************************************/
void InitPre()
{
   EE_INIT();
   App.flag=APP_WORK_IN_BACKGROUND|APP_NO_PAUSE_ON_WINDOW_MOVE_SIZE; // specify work in background flag to work also when not focused
   App.x=1;
   D.mode(500, 400);
   D.scale(1.25);
}
bool Init()
{
   SockAddr remoteAddress;
   remoteAddress.setIP(destinationIP, destinationPort);
      
   if(Contains(destinationIP, "127.0.0.1")) //can't listen and send from 2 apps on same ip:port over UDP
   {
      SockAddr address;
      address.setIP(destinationIP, destinationPort);
      SockAddr localAddress;
      localAddress.setLocalFast(destinationPort + Random(1, 1000)); // choose random port, hopefully no conflicts
      connection.connect(remoteAddress, localAddress);
   }
   else
   {
      SockAddr localAddress;
      localAddress.setLocalFast(destinationPort); // if we are not running server and client on same PC it is OK to use the same listen port on server and client
      connection.connect(remoteAddress, localAddress);
   }
   return true;
}
/******************************************************************************/
void Shut()
{
  
}
/******************************************************************************/
bool Update()
{
   if(Kb.bp(KB_ESC))return false;
   connection.update();
  
   if(Kb.b(KB_R)) //flood reliable
   {
      File f; f.writeMem().putStr(S+ "Reliable Random # :" + Random(0, 1000)).pos(0);
      connection.send_reliable(connection.remoteAddresses[0].address, f); //connection.remoteAddresses[0].address is kind of awkward, clients will only have 1 remoteAddress
   }
   if(Kb.b(KB_U)) //flood unreliable
   {
      File f; f.writeMem().putStr(S+ "UNreliable Random # :" + Random(0, 1000)).pos(0);
      connection.send_unreliable(connection.remoteAddresses[0].address, f);
   }
   return true;
}
/******************************************************************************/
void Draw()
{
   D.clear(TURQ);
   {
      D.text(0, 0.7, S+"Press 'U' to send unreliable packets, 'R' to send reliable packets.");
      
      if(connection.received_queue.elms())
      {
         File f;
         f.writeMem();
         f.put(connection.received_queue.last().data, connection.received_queue.last().length);
         f.pos(0);
         last_message_string = f.getStr();
         connection.received_queue.clear();
      }
      
      D.text(0, 0.6, S+"Last Packet Received Had String: " + last_message_string);
      
      D.text(0, 0.5, S+"packet_loss "+ connection.packet_loss * 100 + "%");
      D.text(0, 0.4, S+"connection rec: "+ connection.received());
      D.text(0, 0.3, S+"connection sent: "+ connection.sent());
      D.text(0, 0.2, S+"connectdes: "+connection.remoteAddresses[0].address.asText());
      D.text(0, 0.1, S+"connectloc: "+connection.localListenAddress.asText());
      D.text(0, 0.0, S+"FPS "+ Time.fps());
      D.text(0, -0.1, S+"remote_synchro_on_local "+ connection.remoteAddresses[0].remote_synchro_on_local);
      D.text(0, -0.2, S+"local_synchro_on_remote "+ connection.remoteAddresses[0].local_synchro_on_remote);
      D.text(0, -0.3, S+"local_synchro_on_local "+ connection.remoteAddresses[0].local_synchro_on_local);
      D.text(0, -0.4, S+"ping "+ connection.ping);
      D.text(0, -0.5, S+"reliable_sent "+ connection.reliable_sent);
      D.text(0, -0.6, S+"unreliable_sent "+ connection.unreliable_sent);
      
   }
}

The server demo
Code:
/******************************************************************************

   This tutorial presents a sample  Reliable UDP Server Connection.

   It can be used together with second tutorial "Demo Client"

/******************************************************************************/
Int listenPort = 8791;
ReliableFastConnection connection;
Str last_message_string = "nothing";
/******************************************************************************/
void InitPre()
{
   EE_INIT();
   App.flag=APP_WORK_IN_BACKGROUND|APP_NO_PAUSE_ON_WINDOW_MOVE_SIZE; // specify work in background flag to work also when not focused
   App.x=1;
   D.mode(500, 400);
   D.scale(1.25);
}
bool Init()
{
   SockAddr addr;
   addr.setLocalFast(listenPort);
   connection.listen(addr);
   connection.pingFrequency = 0; // server doesn't send pings
   return true;
}
/******************************************************************************/
void Shut()
{
  
}
/******************************************************************************/
bool Update()
{
   if(Kb.bp(KB_ESC))return false;
   connection.update();
  
   if(Kb.b(KB_R)) //flood reliable to all connected (have received recent ping) clients
   {
      File f; f.writeMem().putStr(S+ "From Server Reliable# :" + Random(0, 1000)).pos(0);
      REPA(connection.remoteAddresses)connection.send_reliable(connection.remoteA​ddresses[i].address, f);
   }
   if(Kb.b(KB_U)) //flood unreliable to all connected (have received recent ping) clients
   {
      File f; f.writeMem().putStr(S+ "From Server UNreliable# :" + Random(0, 1000)).pos(0);
      REPA(connection.remoteAddresses)connection.send_unreliable(connection.remot​eAddresses[i].address, f);
   }
  
   return true;
}
/******************************************************************************/
void Draw()
{
   D.clear(TURQ);
   {
      D.text(0, 0.7, S+"Press 'U' to send unreliable packets, 'R' to send reliable packets.");
      
      if(connection.received_queue.elms())
      {
         File f;
         f.writeMem();
         f.put(connection.received_queue.last().data, connection.received_queue.last().length);
         f.pos(0);
         last_message_string = f.getStr();
         connection.received_queue.clear();
      }
      
      D.text(0, 0.6, S+"Last Packet Received Had String: " + last_message_string);
      D.text(0, 0.5, S+"FPS "+ Time.fps());
      D.text(0, 0.4, S+"connection.received()"+ connection.received());
      D.text(0, 0.3, S+"connection.sent()"+ connection.sent());
      D.text(0, 0.2, S+"connection.unreliable_received "+ connection.unreliable_received);
      D.text(0, 0.1, S+"connection.reliable_received "+ connection.reliable_received);
      D.text(0, 0.0, S+"number of clients: "+connection.remoteAddresses.elms());
      D.text(0, -0.1, S+"connection.localListenAddress: "+connection.localListenAddress.asText());
      //D.text(0, -0.2, S+"remote_synchro_on_local "+ connection.remote_synchro_on_local);
      //D.text(0, -0.3, S+"local_synchro_on_remote "+ connection.local_synchro_on_remote);
      //D.text(0, -0.4, S+"local_synchro_on_local "+ connection.local_synchro_on_local);
      
      //D.text(0, 0.0, S+"Received Data: "+data);
   }
}


Attached File(s)
.zip  ReliableFastConnection7.zip (Size: 6.82 KB / Downloads: 16)
12-10-2014 01:31 AM
Find all posts by this user Quote this message in a reply
Pherael Offline
Member

Post: #2
RE: Reliable UDP
Wow, nice! Many Thanks for sharing!
12-10-2014 08:49 AM
Find all posts by this user Quote this message in a reply
Post Reply