1 module doap.client.messaging.udp;
2 
3 import doap.client.client : CoapClient;
4 import core.thread : Thread;
5 import std.socket : SocketFlags;
6 import core.sys.posix.sys.socket : MSG_TRUNC;
7 import doap.protocol;
8 import doap.exceptions;
9 import doap.client.request : CoapRequest;
10 import std.stdio;
11 import std.socket : Socket, SocketSet, Address, SocketType, ProtocolType, getAddress, parseAddress, InternetAddress, SocketShutdown;
12 import doap.client.messaging;
13 
14 /**
15  * UDP-based messaging layer
16  *
17  * Handles the actual sending and receiving
18  * of datagrams and fulfilling of requests
19  */
20 public class UDPMessaging : CoapMessagingLayer
21 {
22     /** 
23      * Reading-loop thread
24      */
25     private Thread readingThread;
26 
27     /** 
28      * Running status
29      */
30     private bool running; // TODO: Check volatility
31 
32     /** 
33      * The datagram socket
34      */
35     private Socket socket;
36 
37     /** 
38      * Constructs a new messaging layer instance
39      * associated with the provided client
40      *
41      * Params:
42      *   client = the client
43      */
44     this(CoapClient client)
45     {
46         super(client);
47     }
48 
49     /** 
50      * Starts the messaging layer by starting
51      * the underlying transport and then the
52      * reader loop
53      */
54     public override void begin() // Candidate for Interface
55     {
56         // TODO: Handle socket errors nicely?
57 
58         // Set status to running
59         this.running = true;
60 
61 
62         // TODO: IF connect fails then don't start messaging
63         this.socket = new Socket(getEndpointAddress().addressFamily(), SocketType.DGRAM, ProtocolType.UDP);
64         // this.socket.blocking(true);
65         this.socket.connect(getEndpointAddress());
66 
67         // Create the reading-loop thread and start it
68         this.readingThread = new Thread(&loop);
69         this.readingThread.start();
70     }
71 
72     /** 
73      * Transmit the provided packet
74      *
75      * Params:
76      *   packet = the `CoapPacket`
77      * to transmit
78      */
79     public override void send(CoapPacket packet) // Candidate for Interface
80     {
81         // Encode the packet and send the bytes
82         ubyte[] encodedPacket = packet.getBytes();
83         this.socket.send(encodedPacket);
84     }
85 
86     /** 
87      * Stops the messaging layer by
88      * stopping the underlying network
89      * transport and therefore the
90      * reading loop
91      *
92      * Blocks till the reading loop
93      * has terminated
94      */
95     public override void close() // Candidate for Interface
96     {
97         // Set status to not running
98         this.running = false;
99 
100         // Shutdown the socket (stopping the messaging layer)
101         this.socket.shutdown(SocketShutdown.BOTH);
102 
103         // Unbind (disallow incoming datagrams to source port (from device))
104         this.socket.close();
105 
106         // Wait till the reading-loop thread exits
107         this.readingThread.join();
108     }
109 
110     /** 
111      * Reading loop which reads datagrams
112      * from the socket
113      */
114     private void loop()
115     {
116         // TODO: Ensure below condition works well
117         while(this.running)
118         {
119             writeln("h");
120 
121 
122             // TODO: Add select here, if readbale THEN do the below
123             /** 
124              * TODO: Add a call to select(), if NOTHING is available
125              * then call the client's `onNoNewMessages()`.
126              *
127              * After this do a timed `receive()` below (this is where
128              * the thread gets some rest by doing a timed I/O wait).
129              *
130              * Recall, however, we don't want to wait forever, as
131              * we may now have elapsed over a request time-out
132              * for a CoapRequest and should loop back to the top
133              * to call `onNoNewMessages()`
134              */
135             // SocketSet readSet = new SocketSet();
136             // readSet.add(this.client.socket);
137             // Socket.select(readSet, null, null);
138 
139             // If there is NOT data available
140             // if(!readSet.isSet(this.client.socket))
141             // {
142                 // writeln("No data available");
143 
144                 // TODO: Implement me
145             // }
146             
147 
148 
149 
150 
151             // TODO: Check if socket is readable, if not,
152             // ... check timers on outstanding messages
153             // ... and do any resends needed
154             SocketFlags flags = cast(SocketFlags)(SocketFlags.PEEK | MSG_TRUNC);
155             byte[] data;
156             data.length = 1; // At least one else never does underlying recv()
157             ptrdiff_t dgramSize = this.socket.receive(data, flags);
158             
159             // If we have received something then dequeue it of the peeked length
160             if(dgramSize > 0)
161             {
162                 data.length = dgramSize;
163                 this.socket.receive(data);
164                 writeln("received size: ", dgramSize);
165                 writeln("received bytes: ", data);
166 
167                 try
168                 {
169                     CoapPacket receivedPacket = CoapPacket.fromBytes(cast(ubyte[])data);
170                     writeln("Incoming coap packet: ", receivedPacket);
171 
172                     handlePacket(receivedPacket);
173                 }
174                 catch(CoapException e)
175                 {
176                     writeln("Skipping malformed coap packet");
177                 }
178             }
179         }
180     }
181 
182     /** 
183      * Processes a decoded packet. How this is
184      * handled depends on the type of packet
185      * received. Normally this means matching
186      * it up with a current `CoapRequest`
187      * present in the `CoapClient`, fulling
188      * it up with the received packet and
189      * waking it (handled in the client code).
190      *
191      * Params:
192      *   packet = the packet to process
193      */
194     private void handlePacket(CoapPacket packet)
195     {
196         CoapRequest request = getClient().yankRequest(packet);
197         if(request)
198         {
199             writeln("Matched response '"~packet.toString()~"' to request '"~request.toString()~"'");
200             writeln("Elapsed time: ", request.getElapsedTime());
201 
202             // Fulfill the request with the received data and wake up sleepers
203             request.future.receiveWake(packet);
204         }
205         else
206         {
207             // TODO: pubsub support doe?
208             // TODO: What to do with reeived? no match just discard
209             writeln("Discarding received packet '"~packet.toString()~"' as it matches no request");
210         }
211     }
212 }