1 module doap.client.client; 2 3 import std.socket : Socket, Address, SocketType, ProtocolType, getAddress, parseAddress, InternetAddress, SocketShutdown; 4 import doap.client.messaging; 5 import doap.protocol; 6 import doap.client.request : CoapRequestBuilder, CoapRequest, CoapRequestFuture, RequestState; 7 import doap.client.exceptions : RequestTimeoutException; 8 import core.sync.mutex : Mutex; 9 import core.sync.condition : Condition; 10 import std.container.slist : SList; 11 import core.thread : dur, Duration, Thread; 12 import std.datetime.stopwatch : StopWatch, AutoStart; 13 import doap.utils : findNextFree; 14 15 /** 16 * A CoAP client 17 */ 18 public class CoapClient 19 { 20 /** 21 * CoAP server endpoint 22 */ 23 package Address address; 24 25 /** 26 * Running status 27 */ 28 package bool running; 29 30 /** 31 * The messaging layer which provides 32 * request-response message match-ups 33 */ 34 private CoapMessagingLayer messaging; 35 36 /** 37 * The request-response match list 38 */ 39 private SList!(CoapRequest) outgoingRequests; 40 41 /** 42 * The lock for the request-response match list 43 */ 44 private Mutex requestsLock; 45 46 /** 47 * Message IDs and lifetime map 48 */ 49 private StopWatch[ushort] mids; 50 51 /** 52 * Lock for the above 53 */ 54 private Mutex midsLock; 55 56 /** 57 * Creates a new CoAP client to the 58 * provided endpoint address 59 * 60 * Params: 61 * address = the CoAP server endpoint 62 */ 63 this(Address address) 64 { 65 this.address = address; 66 67 import doap.client.messaging.udp : UDPMessaging; 68 this.messaging = new UDPMessaging(this); //UDP transport 69 70 this.requestsLock = new Mutex(); 71 this.midsLock = new Mutex(); 72 73 // FIXME: Change to algorithmic later 74 this.setExchangeLifetime(dur!("seconds")(5)); 75 76 init(); 77 } 78 79 /** 80 * Maximum lifetime of a message ID before 81 * it is considered for re-use 82 */ 83 private Duration EXCHANGE_LIFETIME = dur!("msecs")(180); 84 85 /** 86 * Sets the exchange lifetime. In other words 87 * the duration of time that must pass before 88 * a message ID is considered free-for-use again 89 * 90 * Params: 91 * lifetime = the lifetime duration 92 */ 93 public void setExchangeLifetime(Duration lifetime) 94 { 95 this.EXCHANGE_LIFETIME = lifetime; 96 } 97 98 /** 99 * Generates a new message ID 100 * 101 * Returns: the next message id 102 */ 103 private final ushort newMid2() 104 { 105 // Lock rolling counter 106 this.midsLock.lock(); 107 108 scope(exit) 109 { 110 // Unlock rolling counter 111 this.midsLock.unlock(); 112 } 113 114 // Message IDs which are in use 115 ushort[] inUse; 116 117 foreach(ushort occupied; this.mids.keys()) 118 { 119 // Peek the value of the stopwatch 120 if(this.mids[occupied].peek() >= EXCHANGE_LIFETIME) 121 { 122 // It's expired, so we can use it (first reset the time) 123 this.mids[occupied].reset(); 124 125 return occupied; 126 } 127 else 128 { 129 inUse ~= occupied; 130 } 131 } 132 133 // If none was available for re-use then find next available 134 // ... free and use that (also don't forget to register it) 135 ushort newMid; 136 bool gotAvailable = findNextFree(inUse, newMid); 137 138 // FIXME: Add a thing which does something 139 // ... if `gotAvailable` is false 140 141 this.mids[newMid] = StopWatch(AutoStart.yes); 142 143 return newMid; 144 } 145 146 /** 147 * Constructs a new CoAP client to the 148 * provided endpoint address and port. 149 * 150 * This constructor provided name 151 * resolution on the host part. 152 * 153 * Params: 154 * host = the CoAP host 155 * port = the CoAP port 156 */ 157 this(string host, ushort port) 158 { 159 this(new InternetAddress(host, port)); 160 } 161 162 /** 163 * Sets up a new datagram socket, 164 * sets the running status to `true` 165 * and then starts the messaging 166 * layer 167 */ 168 private void init() 169 { 170 // Set status to running 171 this.running = true; 172 173 // Start the messaging layer 174 this.messaging.begin(); 175 } 176 177 /** 178 * Stops this client 179 * 180 * This results in closing down the 181 * messaging layer and ensuring that 182 * no new datagrams may arrive on 183 * our source port. 184 */ 185 public void close() 186 { 187 // Set status to not running 188 this.running = false; 189 190 // Shutdown the messaging layer 191 this.messaging.close(); 192 193 // Cancel all active request futures 194 this.requestsLock.lock(); 195 foreach(CoapRequest curReq; outgoingRequests) 196 { 197 curReq.future.cancel(); 198 } 199 this.requestsLock.unlock(); 200 } 201 202 /** 203 * Creates a new CoAP request builder 204 * 205 * Returns: a new `CoapRequestBuilder` 206 */ 207 public CoapRequestBuilder newRequestBuilder() 208 { 209 return new CoapRequestBuilder(this); 210 } 211 212 /** 213 * Given the builder this will extract the details required 214 * to encode the CoAP packet into its byte form, register 215 * a coap request internally and return a future for this 216 * request. 217 * 218 * Params: 219 * requestBuilder = the request builder 220 * Returns: the future 221 */ 222 package CoapRequestFuture doRequest(CoapRequestBuilder requestBuilder) 223 { 224 // Encode the packet 225 CoapPacket requestPacket = new CoapPacket(); 226 requestPacket.setCode(requestBuilder.requestCode); 227 requestPacket.setPayload(requestBuilder.pyld); 228 requestPacket.setToken(requestBuilder.tkn); 229 requestPacket.setMessageId(newMid2()); 230 231 // Create the future 232 CoapRequestFuture future = new CoapRequestFuture(); 233 234 // Link the CoapRequest to the future so it can be signalled 235 CoapRequest request = new CoapRequest(requestPacket, future); 236 237 // Store the request 238 storeRequest(request); 239 240 // Transmit the request 241 transmitRequest(request); 242 243 return future; 244 } 245 246 /** 247 * Stores the request 248 * 249 * Params: 250 * request = the `CoapRequest` to store in the 251 * tracking list 252 */ 253 private void storeRequest(CoapRequest request) 254 { 255 // Store the request 256 requestsLock.lock(); 257 outgoingRequests.insertAfter(outgoingRequests[], request); 258 requestsLock.unlock(); 259 } 260 261 /** 262 * Given a packet this will try and find an active 263 * request with a matching token and return it. 264 * 265 * This will also remove it from the requests queue. 266 * 267 * Params: 268 * packet = the packet received 269 * Returns: the original `CoapRequest` if a match 270 * is found, otherwise `null` 271 */ 272 package CoapRequest yankRequest(CoapPacket packet) 273 { 274 CoapRequest foundRequest = null; 275 276 requestsLock.lock(); 277 278 foreach(CoapRequest request; outgoingRequests) 279 { 280 if(request.getMid() == packet.getMessageId()) 281 { 282 foundRequest = request; 283 outgoingRequests.linearRemoveElement(foundRequest); 284 break; 285 } 286 } 287 288 requestsLock.unlock(); 289 290 return foundRequest; 291 } 292 293 /** 294 * Transmits the given request's associated 295 * packet to the underlying transport 296 * 297 * Params: 298 * request = the `CoapRequest` to put into 299 * flight 300 */ 301 private void transmitRequest(CoapRequest request) 302 { 303 // Encode the request packet and send it 304 this.messaging.send(request.getRequestPacket()); 305 306 // Now start ticking the timer 307 request.startTime(); 308 } 309 310 // private Duration sweepInterval; 311 private Duration retransmitTimeout; 312 313 /** 314 * The intention of this method is that 315 * some kind-of `CoapMessagingLayer` 316 * can call this when it has no new 317 * messages to process. 318 * 319 * This then let's the client handle 320 * the checking of potentially timed 321 * out requests, and the re-issueing 322 * of them to the messaging layer. 323 */ 324 package void onNoNewMessages() 325 { 326 requestsLock.lock(); 327 foreach(CoapRequest curReq; outgoingRequests) 328 { 329 if(curReq.hasTimedOut(retransmitTimeout)) 330 { 331 // TODO: Retransmit 332 } 333 } 334 requestsLock.unlock(); 335 } 336 } 337 338 /** 339 * Tests the client 340 * 341 * In the future dogfooding should be 342 * used and we should test against our 343 * own server too. 344 */ 345 unittest 346 { 347 // Address[] resolved = getAddress("coap.me"); 348 // resolved[0].po 349 Address addr = new InternetAddress("coap.me", 5683); 350 // CoapClient client = new CoapClient(addr); 351 352 // client.resource("/hello"); 353 354 // client.connect(); 355 356 // Test sending something 357 CoapPacket packet = new CoapPacket(); 358 packet.setCode(Code.POST); 359 packet.setToken([69]); 360 packet.setPayload(cast(ubyte[])"My custom payload"); 361 packet.setType(MessageType.CONFIRMABLE); 362 packet.setMessageId(257); 363 364 // client.socket.send(packet.getBytes()); 365 366 } 367 368 version(unittest) 369 { 370 import std.stdio : writeln; 371 } 372 373 /** 374 * Client testing 375 * 376 * Tests the rolling of the message id, 377 * here I configure the `EXCHANGE_LIFETIME` 378 * to be a value high enough to not have 379 * them quickly expire. 380 * 381 * NOTE: In the future it may be calculated 382 * in relation to other variables and we may 383 * need a private method accessible here that 384 * can override it 385 */ 386 unittest 387 { 388 CoapClient client = new CoapClient("coap.me", 5683); 389 390 391 CoapRequestFuture future = client.newRequestBuilder() 392 .payload(cast(ubyte[])"First message") 393 .token([69]) 394 .post(); 395 396 397 // Set it to something high enough (TODO: Change this later) 398 client.setExchangeLifetime(dur!("msecs")(300)); 399 400 writeln("Future start (first)"); 401 CoapPacket response = future.get(); 402 writeln("Future done (first)"); 403 writeln("Got response (first): ", response); 404 assert(response.getMessageId() == 0); 405 406 future = client.newRequestBuilder() 407 .payload(cast(ubyte[])"Second message") 408 .token([69]) 409 .post(); 410 411 412 writeln("Future start (second)"); 413 response = future.get(); 414 writeln("Future done (second)"); 415 writeln("Got response (second): ", response); 416 assert(response.getMessageId() == 1); 417 418 419 420 421 client.close(); 422 } 423 424 425 426 /** 427 * Client testing 428 * 429 * This tests building of a request using the builder, 430 * finalizing through the client and then waiting on 431 * the returned future for a result. 432 * 433 * We test the blocking example here therefore, i.e. 434 * a blocking `get()`. 435 * 436 * This therefore tests the entire `messaging` module 437 * and `client` module. 438 */ 439 unittest 440 { 441 CoapClient client = new CoapClient("coap.me", 5683); 442 443 444 CoapRequestFuture future = client.newRequestBuilder() 445 .payload(cast(ubyte[])"Hello this is Tristan!") 446 .token([69]) 447 .post(); 448 449 450 writeln("Future start"); 451 CoapPacket response = future.get(); 452 writeln("Future done"); 453 writeln("Got response: ", response); 454 455 client.close(); 456 } 457 458 version(unittest) 459 { 460 import core.time : dur; 461 // import doap.client.exceptions : RequestTimeoutException; 462 // import doap.client.request : CoapRequestFuture, RequestState; 463 } 464 465 /** 466 * Client testing 467 * 468 * See above except we test a timeout-based 469 * request future here. 470 * 471 * This test DOES time out 472 */ 473 unittest 474 { 475 CoapClient client = new CoapClient("coap.me", 5683); 476 477 478 CoapRequestFuture future = client.newRequestBuilder() 479 .payload(cast(ubyte[])"Hello this is Tristan!") 480 .token([69]) 481 .post(); 482 483 try 484 { 485 writeln("Future start"); 486 CoapPacket response = future.get(dur!("msecs")(10)); 487 488 // We should timeout and NOT get here 489 assert(false); 490 } 491 catch(RequestTimeoutException e) 492 { 493 // Ensure that we have the correct state 494 assert(future.getState() == RequestState.TIMEDOUT); 495 496 // We SHOULD time out 497 assert(true); 498 } 499 500 client.close(); 501 } 502 503 /** 504 * Client testing 505 * 506 * See above except we test a timeout-based 507 * request future here. 508 * 509 * This test DOES NOT time out (it tests 510 * with a high-enough threshold) 511 */ 512 unittest 513 { 514 CoapClient client = new CoapClient("coap.me", 5683); 515 516 517 CoapRequestFuture future = client.newRequestBuilder() 518 .payload(cast(ubyte[])"Hello this is Tristan!") 519 .token([69]) 520 .post(); 521 522 try 523 { 524 writeln("Future start"); 525 CoapPacket response = future.get(dur!("msecs")(400)); 526 527 // Ensure that we have the correct state 528 assert(future.getState() == RequestState.COMPLETED); 529 530 // We SHOULD get here 531 assert(true); 532 } 533 catch(RequestTimeoutException e) 534 { 535 // We should NOT time out 536 assert(false); 537 } 538 539 client.close(); 540 }