1 module doap.client.messaging.udp; 2 3 import doap.client.client : CoapClient; 4 import core.thread : Thread; 5 import std.socket : SocketFlags; 6 import core.sys.posix.sys.socket : MSG_TRUNC; 7 import doap.protocol; 8 import doap.exceptions; 9 import doap.client.request : CoapRequest; 10 import std.stdio; 11 import std.socket : Socket, SocketSet, Address, SocketType, ProtocolType, getAddress, parseAddress, InternetAddress, SocketShutdown; 12 import doap.client.messaging; 13 14 /** 15 * UDP-based messaging layer 16 * 17 * Handles the actual sending and receiving 18 * of datagrams and fulfilling of requests 19 */ 20 public class UDPMessaging : CoapMessagingLayer 21 { 22 /** 23 * Reading-loop thread 24 */ 25 private Thread readingThread; 26 27 /** 28 * Running status 29 */ 30 private bool running; // TODO: Check volatility 31 32 /** 33 * The datagram socket 34 */ 35 private Socket socket; 36 37 /** 38 * Constructs a new messaging layer instance 39 * associated with the provided client 40 * 41 * Params: 42 * client = the client 43 */ 44 this(CoapClient client) 45 { 46 super(client); 47 } 48 49 /** 50 * Starts the messaging layer by starting 51 * the underlying transport and then the 52 * reader loop 53 */ 54 public override void begin() // Candidate for Interface 55 { 56 // TODO: Handle socket errors nicely? 57 58 // Set status to running 59 this.running = true; 60 61 62 // TODO: IF connect fails then don't start messaging 63 this.socket = new Socket(getEndpointAddress().addressFamily(), SocketType.DGRAM, ProtocolType.UDP); 64 // this.socket.blocking(true); 65 this.socket.connect(getEndpointAddress()); 66 67 // Create the reading-loop thread and start it 68 this.readingThread = new Thread(&loop); 69 this.readingThread.start(); 70 } 71 72 /** 73 * Transmit the provided packet 74 * 75 * Params: 76 * packet = the `CoapPacket` 77 * to transmit 78 */ 79 public override void send(CoapPacket packet) // Candidate for Interface 80 { 81 // Encode the packet and send the bytes 82 ubyte[] encodedPacket = packet.getBytes(); 83 this.socket.send(encodedPacket); 84 } 85 86 /** 87 * Stops the messaging layer by 88 * stopping the underlying network 89 * transport and therefore the 90 * reading loop 91 * 92 * Blocks till the reading loop 93 * has terminated 94 */ 95 public override void close() // Candidate for Interface 96 { 97 // Set status to not running 98 this.running = false; 99 100 // Shutdown the socket (stopping the messaging layer) 101 this.socket.shutdown(SocketShutdown.BOTH); 102 103 // Unbind (disallow incoming datagrams to source port (from device)) 104 this.socket.close(); 105 106 // Wait till the reading-loop thread exits 107 this.readingThread.join(); 108 } 109 110 /** 111 * Reading loop which reads datagrams 112 * from the socket 113 */ 114 private void loop() 115 { 116 // TODO: Ensure below condition works well 117 while(this.running) 118 { 119 writeln("h"); 120 121 122 // TODO: Add select here, if readbale THEN do the below 123 /** 124 * TODO: Add a call to select(), if NOTHING is available 125 * then call the client's `onNoNewMessages()`. 126 * 127 * After this do a timed `receive()` below (this is where 128 * the thread gets some rest by doing a timed I/O wait). 129 * 130 * Recall, however, we don't want to wait forever, as 131 * we may now have elapsed over a request time-out 132 * for a CoapRequest and should loop back to the top 133 * to call `onNoNewMessages()` 134 */ 135 // SocketSet readSet = new SocketSet(); 136 // readSet.add(this.client.socket); 137 // Socket.select(readSet, null, null); 138 139 // If there is NOT data available 140 // if(!readSet.isSet(this.client.socket)) 141 // { 142 // writeln("No data available"); 143 144 // TODO: Implement me 145 // } 146 147 148 149 150 151 // TODO: Check if socket is readable, if not, 152 // ... check timers on outstanding messages 153 // ... and do any resends needed 154 SocketFlags flags = cast(SocketFlags)(SocketFlags.PEEK | MSG_TRUNC); 155 byte[] data; 156 data.length = 1; // At least one else never does underlying recv() 157 ptrdiff_t dgramSize = this.socket.receive(data, flags); 158 159 // If we have received something then dequeue it of the peeked length 160 if(dgramSize > 0) 161 { 162 data.length = dgramSize; 163 this.socket.receive(data); 164 writeln("received size: ", dgramSize); 165 writeln("received bytes: ", data); 166 167 try 168 { 169 CoapPacket receivedPacket = CoapPacket.fromBytes(cast(ubyte[])data); 170 writeln("Incoming coap packet: ", receivedPacket); 171 172 handlePacket(receivedPacket); 173 } 174 catch(CoapException e) 175 { 176 writeln("Skipping malformed coap packet"); 177 } 178 } 179 } 180 } 181 182 /** 183 * Processes a decoded packet. How this is 184 * handled depends on the type of packet 185 * received. Normally this means matching 186 * it up with a current `CoapRequest` 187 * present in the `CoapClient`, fulling 188 * it up with the received packet and 189 * waking it (handled in the client code). 190 * 191 * Params: 192 * packet = the packet to process 193 */ 194 private void handlePacket(CoapPacket packet) 195 { 196 CoapRequest request = getClient().yankRequest(packet); 197 if(request) 198 { 199 writeln("Matched response '"~packet.toString()~"' to request '"~request.toString()~"'"); 200 writeln("Elapsed time: ", request.getElapsedTime()); 201 202 // Fulfill the request with the received data and wake up sleepers 203 request.future.receiveWake(packet); 204 } 205 else 206 { 207 // TODO: pubsub support doe? 208 // TODO: What to do with reeived? no match just discard 209 writeln("Discarding received packet '"~packet.toString()~"' as it matches no request"); 210 } 211 } 212 }