1 module doap.client.client;
2 
3 import std.socket : Socket, Address, SocketType, ProtocolType, getAddress, parseAddress, InternetAddress, SocketShutdown;
4 import doap.client.messaging;
5 import doap.protocol;
6 import doap.client.request : CoapRequestBuilder, CoapRequest, CoapRequestFuture, RequestState;
7 import doap.client.exceptions : RequestTimeoutException;
8 import core.sync.mutex : Mutex;
9 import core.sync.condition : Condition;
10 import std.container.slist : SList;
11 import core.thread : dur, Duration, Thread;
12 import std.datetime.stopwatch : StopWatch, AutoStart;
13 import doap.utils : findNextFree;
14 
15 /** 
16  * A CoAP client
17  */
18 public class CoapClient
19 {
20     /** 
21      * CoAP server endpoint
22      */
23     package Address address;
24 
25     /** 
26      * Running status
27      */
28     package bool running;
29 
30     /** 
31      * The messaging layer which provides
32      * request-response message match-ups
33      */
34     private CoapMessagingLayer messaging;
35 
36     /** 
37      * The request-response match list
38      */
39     private SList!(CoapRequest) outgoingRequests;
40 
41     /** 
42      * The lock for the request-response match list
43      */
44     private Mutex requestsLock;
45 
46     /** 
47      * Message IDs and lifetime map
48      */
49     private StopWatch[ushort] mids;
50 
51     /** 
52      * Lock for the above
53      */
54     private Mutex midsLock;
55 
56     /** 
57      * Creates a new CoAP client to the
58      * provided endpoint address
59      *
60      * Params:
61      *   address = the CoAP server endpoint
62      */
63     this(Address address)
64     {
65         this.address = address;
66 
67         import doap.client.messaging.udp : UDPMessaging;
68         this.messaging = new UDPMessaging(this); //UDP transport
69 
70         this.requestsLock = new Mutex();
71         this.midsLock = new Mutex();
72 
73         // FIXME: Change to algorithmic later
74         this.setExchangeLifetime(dur!("seconds")(5));
75 
76         init();
77     }
78 
79     /** 
80      * Maximum lifetime of a message ID before
81      * it is considered for re-use
82      */
83     private Duration EXCHANGE_LIFETIME = dur!("msecs")(180);
84 
85     /** 
86      * Sets the exchange lifetime. In other words
87      * the duration of time that must pass before
88      * a message ID is considered free-for-use again
89      *
90      * Params:
91      *   lifetime = the lifetime duration
92      */
93     public void setExchangeLifetime(Duration lifetime)
94     {
95         this.EXCHANGE_LIFETIME = lifetime;
96     }
97 
98     /** 
99      * Generates a new message ID
100      *
101      * Returns: the next message id
102      */
103     private final ushort newMid2()
104     {
105         // Lock rolling counter
106         this.midsLock.lock();
107 
108         scope(exit)
109         {
110             // Unlock rolling counter
111             this.midsLock.unlock();
112         }
113 
114         // Message IDs which are in use
115         ushort[] inUse;
116 
117         foreach(ushort occupied; this.mids.keys())
118         {
119             // Peek the value of the stopwatch
120             if(this.mids[occupied].peek() >= EXCHANGE_LIFETIME)
121             {
122                 // It's expired, so we can use it (first reset the time)
123                 this.mids[occupied].reset();
124 
125                 return occupied;
126             }
127             else
128             {
129                 inUse ~= occupied;
130             }
131         }
132 
133         // If none was available for re-use then find next available
134         // ... free and use that (also don't forget to register it)
135         ushort newMid;
136         bool gotAvailable = findNextFree(inUse, newMid);
137 
138         // FIXME: Add a thing which does something
139         // ... if `gotAvailable` is false
140 
141         this.mids[newMid] = StopWatch(AutoStart.yes);
142 
143         return newMid;
144     }
145 
146     /** 
147      * Constructs a new CoAP client to the
148      * provided endpoint address and port.
149      *
150      * This constructor provided name
151      * resolution on the host part.
152      *
153      * Params:
154      *   host = the CoAP host
155      *   port = the CoAP port
156      */
157     this(string host, ushort port)
158     {
159         this(new InternetAddress(host, port));
160     }
161 
162     /** 
163      * Sets up a new datagram socket,
164      * sets the running status to `true`
165      * and then starts the messaging
166      * layer
167      */
168     private void init()
169     {
170         // Set status to running
171         this.running = true;
172 
173         // Start the messaging layer
174         this.messaging.begin();
175     }
176 
177     /** 
178      * Stops this client
179      *
180      * This results in closing down the
181      * messaging layer and ensuring that
182      * no new datagrams may arrive on
183      * our source port.
184      */
185     public void close()
186     {
187         // Set status to not running
188         this.running = false;
189 
190         // Shutdown the messaging layer
191         this.messaging.close();
192         
193         // Cancel all active request futures
194         this.requestsLock.lock();
195         foreach(CoapRequest curReq; outgoingRequests)
196         {
197             curReq.future.cancel();
198         }
199         this.requestsLock.unlock();
200     }
201 
202     /** 
203      * Creates a new CoAP request builder
204      *
205      * Returns: a new `CoapRequestBuilder`
206      */
207     public CoapRequestBuilder newRequestBuilder()
208     {
209         return new CoapRequestBuilder(this);
210     }
211 
212     /** 
213      * Given the builder this will extract the details required
214      * to encode the CoAP packet into its byte form, register
215      * a coap request internally and return a future for this
216      * request.
217      *
218      * Params:
219      *   requestBuilder = the request builder
220      * Returns: the future
221      */
222     package CoapRequestFuture doRequest(CoapRequestBuilder requestBuilder)
223     {
224         // Encode the packet
225         CoapPacket requestPacket = new CoapPacket();
226         requestPacket.setCode(requestBuilder.requestCode);
227         requestPacket.setPayload(requestBuilder.pyld);
228         requestPacket.setToken(requestBuilder.tkn);
229         requestPacket.setMessageId(newMid2());
230 
231         // Create the future
232         CoapRequestFuture future = new CoapRequestFuture();
233 
234         // Link the CoapRequest to the future so it can be signalled
235         CoapRequest request = new CoapRequest(requestPacket, future);
236 
237         // Store the request
238         storeRequest(request);
239 
240         // Transmit the request
241         transmitRequest(request);
242 
243         return future;
244     }
245 
246     /** 
247      * Stores the request
248      *
249      * Params:
250      *   request = the `CoapRequest` to store in the
251      * tracking list
252      */
253     private void storeRequest(CoapRequest request)
254     {
255         // Store the request
256         requestsLock.lock();
257         outgoingRequests.insertAfter(outgoingRequests[], request);
258         requestsLock.unlock();
259     }
260 
261     /** 
262      * Given a packet this will try and find an active
263      * request with a matching token and return it.
264      *
265      * This will also remove it from the requests queue.
266      *
267      * Params:
268      *   packet = the packet received
269      * Returns: the original `CoapRequest` if a match
270      * is found, otherwise `null`
271      */
272     package CoapRequest yankRequest(CoapPacket packet)
273     {
274         CoapRequest foundRequest = null;
275 
276         requestsLock.lock();
277 
278         foreach(CoapRequest request; outgoingRequests)
279         {
280             if(request.getMid() == packet.getMessageId())
281             {
282                 foundRequest = request;
283                 outgoingRequests.linearRemoveElement(foundRequest);
284                 break;
285             }
286         }
287 
288         requestsLock.unlock();
289 
290         return foundRequest;
291     }
292 
293     /** 
294      * Transmits the given request's associated
295      * packet to the underlying transport
296      *
297      * Params:
298      *   request = the `CoapRequest` to put into
299      * flight
300      */
301     private void transmitRequest(CoapRequest request)
302     {
303         // Encode the request packet and send it
304         this.messaging.send(request.getRequestPacket());
305 
306         // Now start ticking the timer
307         request.startTime();
308     }
309 
310     // private Duration sweepInterval;
311     private Duration retransmitTimeout;
312 
313     /** 
314      * The intention of this method is that
315      * some kind-of `CoapMessagingLayer`
316      * can call this when it has no new
317      * messages to process.
318      *
319      * This then let's the client handle
320      * the checking of potentially timed
321      * out requests, and the re-issueing
322      * of them to the messaging layer.
323      */
324     package void onNoNewMessages()
325     {
326         requestsLock.lock();
327         foreach(CoapRequest curReq; outgoingRequests)
328         {
329             if(curReq.hasTimedOut(retransmitTimeout))
330             {
331                 // TODO: Retransmit
332             }
333         }
334         requestsLock.unlock();
335     }
336 }
337 
338 /**
339  * Tests the client
340  *
341  * In the future dogfooding should be
342  * used and we should test against our
343  * own server too.
344  */
345 unittest
346 {
347     // Address[] resolved = getAddress("coap.me");
348     // resolved[0].po
349     Address addr = new InternetAddress("coap.me", 5683);
350     // CoapClient client = new CoapClient(addr);
351 
352     // client.resource("/hello");
353 
354     // client.connect();
355 
356     // Test sending something
357     CoapPacket packet = new CoapPacket();
358     packet.setCode(Code.POST);
359     packet.setToken([69]);
360     packet.setPayload(cast(ubyte[])"My custom payload");
361     packet.setType(MessageType.CONFIRMABLE);
362     packet.setMessageId(257);
363 
364     // client.socket.send(packet.getBytes());
365 
366 }
367 
368 version(unittest)
369 {
370     import std.stdio : writeln;
371 }
372 
373 /**
374  * Client testing
375  *
376  * Tests the rolling of the message id,
377  * here I configure the `EXCHANGE_LIFETIME`
378  * to be a value high enough to not have
379  * them quickly expire.
380  *
381  * NOTE: In the future it may be calculated
382  * in relation to other variables and we may
383  * need a private method accessible here that
384  * can override it
385  */
386 unittest
387 {
388     CoapClient client = new CoapClient("coap.me", 5683);
389 
390     
391     CoapRequestFuture future = client.newRequestBuilder()
392                               .payload(cast(ubyte[])"First message")
393                               .token([69])
394                               .post();
395 
396 
397     // Set it to something high enough (TODO: Change this later)
398     client.setExchangeLifetime(dur!("msecs")(300));
399 
400     writeln("Future start (first)");
401     CoapPacket response = future.get();
402     writeln("Future done (first)");
403     writeln("Got response (first): ", response);
404     assert(response.getMessageId() == 0);
405 
406     future = client.newRequestBuilder()
407                               .payload(cast(ubyte[])"Second message")
408                               .token([69])
409                               .post();
410 
411 
412     writeln("Future start (second)");
413     response = future.get();
414     writeln("Future done (second)");
415     writeln("Got response (second): ", response);
416     assert(response.getMessageId() == 1);
417 
418 
419 
420 
421     client.close();
422 }
423 
424 
425 
426 /**
427  * Client testing
428  *
429  * This tests building of a request using the builder,
430  * finalizing through the client and then waiting on
431  * the returned future for a result.
432  *
433  * We test the blocking example here therefore, i.e.
434  * a blocking `get()`.
435  *
436  * This therefore tests the entire `messaging` module
437  * and `client` module.
438  */
439 unittest
440 {
441     CoapClient client = new CoapClient("coap.me", 5683);
442 
443     
444     CoapRequestFuture future = client.newRequestBuilder()
445                               .payload(cast(ubyte[])"Hello this is Tristan!")
446                               .token([69])
447                               .post();
448 
449 
450     writeln("Future start");
451     CoapPacket response  = future.get();
452     writeln("Future done");
453     writeln("Got response: ", response);
454 
455     client.close();
456 }
457 
458 version(unittest)
459 {
460     import core.time : dur;
461     // import doap.client.exceptions : RequestTimeoutException;
462     // import doap.client.request : CoapRequestFuture, RequestState;
463 }
464 
465 /**
466  * Client testing
467  *
468  * See above except we test a timeout-based
469  * request future here.
470  *
471  * This test DOES time out
472  */
473 unittest
474 {
475     CoapClient client = new CoapClient("coap.me", 5683);
476 
477     
478     CoapRequestFuture future = client.newRequestBuilder()
479                               .payload(cast(ubyte[])"Hello this is Tristan!")
480                               .token([69])
481                               .post();
482 
483     try
484     {
485         writeln("Future start");
486         CoapPacket response  = future.get(dur!("msecs")(10));
487 
488         // We should timeout and NOT get here
489         assert(false);
490     }
491     catch(RequestTimeoutException e)
492     {
493         // Ensure that we have the correct state
494         assert(future.getState() == RequestState.TIMEDOUT);
495 
496         // We SHOULD time out
497         assert(true);
498     }
499 
500     client.close();
501 }
502 
503 /**
504  * Client testing
505  *
506  * See above except we test a timeout-based
507  * request future here.
508  *
509  * This test DOES NOT time out (it tests
510  * with a high-enough threshold)
511  */
512 unittest
513 {
514     CoapClient client = new CoapClient("coap.me", 5683);
515 
516     
517     CoapRequestFuture future = client.newRequestBuilder()
518                               .payload(cast(ubyte[])"Hello this is Tristan!")
519                               .token([69])
520                               .post();
521 
522     try
523     {
524         writeln("Future start");
525         CoapPacket response  = future.get(dur!("msecs")(400));
526 
527         // Ensure that we have the correct state
528         assert(future.getState() == RequestState.COMPLETED);
529 
530         // We SHOULD get here
531         assert(true);
532     }
533     catch(RequestTimeoutException e)
534     {
535         // We should NOT time out
536         assert(false);
537     }
538 
539     client.close();
540 }