1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 #![cfg(feature = "net")]
15 
16 use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt};
17 use ylong_runtime::net::{TcpListener, TcpStream};
18 
19 const ADDR: &str = "127.0.0.1:0";
20 
21 /// SDV test cases for `TcpStream`.
22 ///
23 /// # Brief
24 /// 1. Bind `TcpListener` and wait for `accept()` using an ipv6 address.
25 /// 2. After accept, write `hello`.
26 /// 2. `TcpStream` connect to listener and try to read buf.
27 /// 4. Check if the result is correct.
28 #[test]
sdv_tcp_ipv6_connect()29 fn sdv_tcp_ipv6_connect() {
30     let handle = ylong_runtime::spawn(async move {
31         let listener = TcpListener::bind("[::1]:0").await.unwrap();
32         let addr = listener.local_addr().unwrap();
33 
34         let handle = ylong_runtime::spawn(async move {
35             let mut stream = TcpStream::connect(addr).await;
36             while stream.is_err() {
37                 stream = TcpStream::connect(addr).await;
38             }
39             let mut stream = stream.unwrap();
40             let mut buf = vec![0; 5];
41             let _ = stream.read(&mut buf).await;
42             assert_eq!(buf, b"hello");
43         });
44 
45         let (mut stream, _) = listener.accept().await.unwrap();
46         stream.write(b"hello").await.unwrap();
47 
48         handle.await.unwrap();
49     });
50     ylong_runtime::block_on(handle).unwrap();
51 }
52 
53 /// SDV test cases for `TcpListener`.
54 ///
55 /// # Brief
56 /// 1. Bind `TcpListener`.
57 /// 2. Call local_addr(), set_ttl(), ttl(), take_error().
58 /// 3. Check result is correct.
59 #[test]
sdv_tcp_listener_interface()60 fn sdv_tcp_listener_interface() {
61     let handle = ylong_runtime::spawn(async {
62         let server = TcpListener::bind(ADDR).await.unwrap();
63 
64         server.set_ttl(101).unwrap();
65         assert_eq!(server.ttl().unwrap(), 101);
66 
67         assert!(server.take_error().unwrap().is_none());
68     });
69     ylong_runtime::block_on(handle).unwrap();
70 }
71 
72 /// SDV test cases for `TcpStream`.
73 ///
74 /// # Brief
75 /// 1. Bind `TcpListener` and wait for `accept()`.
76 /// 2. After accept, try to write buf.
77 /// 2. `TcpStream` connect to listener and try to read buf.
78 /// 4. Check result is correct.
79 #[test]
sdv_tcp_stream_try()80 fn sdv_tcp_stream_try() {
81     let handle = ylong_runtime::spawn(async move {
82         let listener = TcpListener::bind(ADDR).await.unwrap();
83         let addr = listener.local_addr().unwrap();
84 
85         let handle = ylong_runtime::spawn(async move {
86             let mut stream = TcpStream::connect(addr).await;
87             while stream.is_err() {
88                 stream = TcpStream::connect(addr).await;
89             }
90             let stream = stream.unwrap();
91             let mut buf = vec![0; 5];
92             stream.readable().await.unwrap();
93             stream.try_read(&mut buf).unwrap();
94             assert_eq!(buf, b"hello");
95         });
96 
97         let (stream, _) = listener.accept().await.unwrap();
98         stream.writable().await.unwrap();
99         stream.try_write(b"hello").unwrap();
100 
101         handle.await.unwrap();
102     });
103     ylong_runtime::block_on(handle).unwrap();
104 }
105 
106 /// SDV test cases for `TcpStream`.
107 ///
108 /// # Brief
109 /// 1. Bind `TcpListener` and wait for `accept()`.
110 /// 2. `TcpStream` connect to listener.
111 /// 3. Call peer_addr(), local_addr(), set_ttl(), ttl(), set_nodelay(),
112 ///    nodelay(), take_error().
113 /// 4. Check result is correct.
114 #[test]
sdv_tcp_stream_basic()115 fn sdv_tcp_stream_basic() {
116     let handle = ylong_runtime::spawn(async move {
117         let listener = TcpListener::bind(ADDR).await.unwrap();
118         let addr = listener.local_addr().unwrap();
119 
120         let handle = ylong_runtime::spawn(async move {
121             let mut stream = TcpStream::connect(addr).await;
122             while stream.is_err() {
123                 stream = TcpStream::connect(addr).await;
124             }
125             let stream = stream.unwrap();
126 
127             assert_eq!(stream.peer_addr().unwrap(), addr);
128             assert_eq!(
129                 stream.local_addr().unwrap().ip(),
130                 std::net::Ipv4Addr::new(127, 0, 0, 1)
131             );
132             stream.set_ttl(101).unwrap();
133             assert_eq!(stream.ttl().unwrap(), 101);
134             stream.set_nodelay(true).unwrap();
135             assert!(stream.nodelay().unwrap());
136             assert!(stream.linger().unwrap().is_none());
137             stream
138                 .set_linger(Some(std::time::Duration::from_secs(1)))
139                 .unwrap();
140             assert_eq!(
141                 stream.linger().unwrap(),
142                 Some(std::time::Duration::from_secs(1))
143             );
144             assert!(stream.take_error().unwrap().is_none());
145         });
146 
147         listener.accept().await.unwrap();
148 
149         handle.await.unwrap();
150     });
151     ylong_runtime::block_on(handle).unwrap();
152 }
153 
154 /// SDV test cases for `TcpStream`.
155 ///
156 /// # Brief
157 /// 1. Bind `TcpListener` and wait for `accept()`.
158 /// 2. `TcpStream` connect to listener.
159 /// 3. Call peek() to get.
160 /// 4. Check result is correct.
161 #[test]
sdv_tcp_stream_peek()162 fn sdv_tcp_stream_peek() {
163     let handle = ylong_runtime::spawn(async {
164         let listener = TcpListener::bind(ADDR).await.unwrap();
165         let addr = listener.local_addr().unwrap();
166 
167         let handle = ylong_runtime::spawn(async move {
168             let mut stream = TcpStream::connect(addr).await;
169             while stream.is_err() {
170                 stream = TcpStream::connect(addr).await;
171             }
172             let stream = stream.unwrap();
173 
174             let mut buf = [0; 100];
175             let len = stream.peek(&mut buf).await.expect("peek failed!");
176             let buf = &buf[0..len];
177             assert_eq!(len, 5);
178             assert_eq!(String::from_utf8_lossy(buf), "hello");
179         });
180 
181         let (mut stream, _) = listener.accept().await.unwrap();
182         stream.write(b"hello").await.unwrap();
183 
184         handle.await.unwrap();
185     });
186     ylong_runtime::block_on(handle).unwrap();
187 }
188 
189 #[test]
sdv_tcp_global_runtime()190 fn sdv_tcp_global_runtime() {
191     let handle = ylong_runtime::spawn(async move {
192         let listener = TcpListener::bind(ADDR).await.expect("Bind Listener Failed");
193         let addr = listener.local_addr().unwrap();
194 
195         // Start a thread as client side
196         let handle = ylong_runtime::spawn(async move {
197             let mut client = TcpStream::connect(addr).await;
198             while client.is_err() {
199                 client = TcpStream::connect(addr).await;
200             }
201             let mut client = client.unwrap();
202 
203             let n = client
204                 .write(b"hello server")
205                 .await
206                 .expect("client send failed");
207             assert_eq!(n, "hello server".len());
208 
209             let mut recv_buf = [0_u8; 12];
210             let n = client
211                 .read(&mut recv_buf)
212                 .await
213                 .expect("client recv failed");
214             assert_eq!(
215                 std::str::from_utf8(&recv_buf).unwrap(),
216                 "hello client".to_string()
217             );
218             assert_eq!(n, "hello client".len());
219         });
220 
221         let (mut socket, _) = listener.accept().await.expect("Bind accept Failed");
222         loop {
223             let mut buf = [0_u8; 12];
224             match socket.read(&mut buf).await.expect("recv Failed") {
225                 0 => break,
226                 n => {
227                     assert_eq!(
228                         std::str::from_utf8(&buf).unwrap(),
229                         "hello server".to_string()
230                     );
231                     assert_eq!(n, "hello server".len());
232                 }
233             };
234 
235             socket
236                 .write(b"hello client")
237                 .await
238                 .expect("failed to write to socket");
239         }
240         handle.await.unwrap();
241     });
242     ylong_runtime::block_on(handle).expect("block_on failed");
243 }
244 
245 #[cfg(feature = "multi_instance_runtime")]
246 #[test]
sdv_tcp_multi_runtime()247 fn sdv_tcp_multi_runtime() {
248     use ylong_runtime::builder::RuntimeBuilder;
249     let runtime = RuntimeBuilder::new_multi_thread().build().unwrap();
250 
251     runtime.block_on(async {
252         let tcp = TcpListener::bind(ADDR).await.unwrap();
253         let addr = tcp.local_addr().unwrap();
254 
255         let client = runtime.spawn(async move {
256             let mut tcp = TcpStream::connect(addr).await;
257             while tcp.is_err() {
258                 tcp = TcpStream::connect(addr).await;
259             }
260             let mut tcp = tcp.unwrap();
261             let buf = [3; 100];
262             tcp.write_all(&buf).await.unwrap();
263 
264             let mut buf = [0; 100];
265             tcp.read_exact(&mut buf).await.unwrap();
266             assert_eq!(buf, [2; 100]);
267         });
268 
269         let (mut stream, _) = tcp.accept().await.unwrap();
270         let mut buf = [0; 100];
271         stream.read_exact(&mut buf).await.unwrap();
272         assert_eq!(buf, [3; 100]);
273 
274         let buf = [2; 100];
275         stream.write_all(&buf).await.unwrap();
276 
277         client.await.unwrap();
278     });
279 }
280 
281 /// SDV test cases for `TcpStream` of split().
282 ///
283 /// # Brief
284 /// 1. Bind `TcpListener` and wait for `accept()`.
285 /// 2. `TcpStream` connect to listener.
286 /// 3. Split TcpStream into read half and write half with borrowed.
287 /// 4. Write with write half and read with read half.
288 /// 5. Check result is correct.
289 #[test]
sdv_tcp_split_borrow_half()290 fn sdv_tcp_split_borrow_half() {
291     let handle = ylong_runtime::spawn(async {
292         let listener = TcpListener::bind(ADDR).await.unwrap();
293         let addr = listener.local_addr().unwrap();
294 
295         let handle = ylong_runtime::spawn(async move {
296             let mut stream = TcpStream::connect(addr).await;
297             while stream.is_err() {
298                 stream = TcpStream::connect(addr).await;
299             }
300             let mut stream = stream.unwrap();
301 
302             let (mut read_half, mut write_half) = stream.split();
303             write_half.write(b"I am write half.").await.unwrap();
304             write_half.flush().await.unwrap();
305             write_half.shutdown().await.unwrap();
306 
307             let mut buf = [0; 6];
308             let n = read_half.read(&mut buf).await.expect("server read err");
309             assert_eq!(n, 6);
310             assert_eq!(buf, [1, 2, 3, 4, 5, 6]);
311         });
312 
313         let (mut stream, _) = listener.accept().await.unwrap();
314         let (mut read_half, mut write_half) = stream.split();
315         let mut buf = [0; 16];
316         let n = read_half.read(&mut buf).await.expect("server read err");
317         assert_eq!(n, 16);
318         assert_eq!(
319             String::from_utf8(Vec::from(buf)).unwrap().as_str(),
320             "I am write half."
321         );
322 
323         let data1 = [1, 2, 3];
324         let data2 = [4, 5, 6];
325         let slice1 = std::io::IoSlice::new(&data1);
326         let slice2 = std::io::IoSlice::new(&data2);
327         write_half.write_vectored(&[slice1, slice2]).await.unwrap();
328 
329         handle.await.unwrap();
330     });
331     ylong_runtime::block_on(handle).unwrap();
332 }
333 
334 /// SDV case for binding on the same port twice
335 ///
336 /// # Breif
337 /// 1. Create a new TcpListener
338 /// 2. Create another TcpListener that binds to the same port
339 /// 3. Check if the return is an error
340 #[test]
sdv_tcp_address_in_use()341 fn sdv_tcp_address_in_use() {
342     ylong_runtime::block_on(async move {
343         let tcp = TcpListener::bind(ADDR).await.unwrap();
344         let addr = tcp.local_addr().unwrap();
345 
346         let tcp2 = TcpListener::bind(addr).await;
347         assert!(tcp2.is_err());
348     });
349 }
350 
351 /// SDV test cases for `TcpStream` of into_split().
352 ///
353 /// # Brief
354 /// 1. Bind `TcpListener` and wait for `accept()`.
355 /// 2. `TcpStream` connect to listener.
356 /// 3. Split TcpStream into read half and write half with owned.
357 /// 4. Write with write half and read with read half.
358 /// 5. Check result is correct.
359 #[test]
sdv_tcp_split_owned_half()360 fn sdv_tcp_split_owned_half() {
361     let handle = ylong_runtime::spawn(async move {
362         let listener = TcpListener::bind(ADDR).await.unwrap();
363         let addr = listener.local_addr().unwrap();
364 
365         let handle = ylong_runtime::spawn(async move {
366             let mut stream = TcpStream::connect(addr).await;
367             while stream.is_err() {
368                 stream = TcpStream::connect(addr).await;
369             }
370             let stream = stream.unwrap();
371             let (mut read_half, mut write_half) = stream.into_split();
372 
373             write_half.write(b"I am write half.").await.unwrap();
374             write_half.flush().await.unwrap();
375             write_half.shutdown().await.unwrap();
376 
377             let mut buf = [0; 6];
378             let n = read_half.read(&mut buf).await.expect("server read err");
379             assert_eq!(n, 6);
380             assert_eq!(buf, [1, 2, 3, 4, 5, 6]);
381         });
382 
383         let (mut stream, _) = listener.accept().await.unwrap();
384         let (mut read_half, mut write_half) = stream.split();
385 
386         let mut buf = [0; 16];
387         let n = read_half.read(&mut buf).await.expect("server read err");
388         assert_eq!(n, 16);
389         assert_eq!(
390             String::from_utf8(Vec::from(buf)).unwrap().as_str(),
391             "I am write half."
392         );
393         let data1 = [1, 2, 3];
394         let data2 = [4, 5, 6];
395         let slice1 = std::io::IoSlice::new(&data1);
396         let slice2 = std::io::IoSlice::new(&data2);
397         write_half.write_vectored(&[slice1, slice2]).await.unwrap();
398 
399         handle.await.unwrap();
400     });
401     ylong_runtime::block_on(handle).unwrap();
402 }
403 
404 /// SDV case for dropping TcpStream outside of worker context
405 ///
406 /// # Breif
407 /// 1. Starts 2 tasks via `spawn` that simulates a connection between client and
408 ///    server
409 /// 2. Returns the streams to the main thread which is outside of the worker
410 ///    context
411 /// 3. Drops the streams and it should not cause Panic
412 #[test]
413 #[cfg(all(not(feature = "ffrt"), feature = "sync"))]
sdv_tcp_drop_out_context()414 fn sdv_tcp_drop_out_context() {
415     let (tx, rx) = ylong_runtime::sync::oneshot::channel();
416     let handle = ylong_runtime::spawn(async move {
417         let tcp = TcpListener::bind(ADDR).await.unwrap();
418         let addr = tcp.local_addr().unwrap();
419         tx.send(addr).unwrap();
420         let (mut stream, _) = tcp.accept().await.unwrap();
421         let mut buf = [0; 10];
422         stream.read_exact(&mut buf).await.unwrap();
423         assert_eq!(buf, [3; 10]);
424 
425         let buf = [2; 10];
426         stream.write_all(&buf).await.unwrap();
427         stream
428     });
429 
430     let client = ylong_runtime::block_on(async move {
431         let addr = rx.await.unwrap();
432         let mut tcp = TcpStream::connect(addr).await;
433         while tcp.is_err() {
434             tcp = TcpStream::connect(addr).await;
435         }
436         let mut tcp = tcp.unwrap();
437         let buf = [3; 10];
438         tcp.write_all(&buf).await.unwrap();
439 
440         let mut buf = [0; 10];
441         tcp.read_exact(&mut buf).await.unwrap();
442         assert_eq!(buf, [2; 10]);
443         tcp
444     });
445 
446     let server = ylong_runtime::block_on(handle).unwrap();
447 
448     drop(server);
449     drop(client);
450 }
451 
452 /// SDV case for canceling TcpStream and then reconnecting on the same port
453 ///
454 /// # Breif
455 /// 1. Starts a TCP connection using port 8201
456 /// 2. Cancels the TCP connection before its finished
457 /// 3. Starts another TCP connection using the same port 8201
458 /// 4. checks if the connection is successful.
459 #[cfg(feature = "time")]
460 #[test]
sdv_tcp_cancel()461 fn sdv_tcp_cancel() {
462     use std::time::Duration;
463 
464     use ylong_runtime::time::sleep;
465 
466     let (tx, rx) = ylong_runtime::sync::oneshot::channel();
467     let server = ylong_runtime::spawn(async {
468         let tcp = TcpListener::bind(ADDR).await.unwrap();
469         let addr = tcp.local_addr().unwrap();
470         let _ = tx.send(addr);
471         let (mut stream, _) = tcp.accept().await.unwrap();
472         sleep(Duration::from_secs(10000)).await;
473 
474         let mut buf = [0; 100];
475         stream.read_exact(&mut buf).await.unwrap();
476         assert_eq!(buf, [3; 100]);
477 
478         let buf = [2; 100];
479         stream.write_all(&buf).await.unwrap();
480     });
481 
482     let client = ylong_runtime::spawn(async {
483         let addr = match rx.await {
484             Ok(addr) => addr,
485             Err(_) => {
486                 sleep(Duration::from_secs(100000)).await;
487                 return;
488             }
489         };
490         let mut tcp = TcpStream::connect(addr).await;
491         while tcp.is_err() {
492             tcp = TcpStream::connect(addr).await;
493         }
494         sleep(Duration::from_secs(10000)).await;
495         let mut tcp = tcp.unwrap();
496         let buf = [3; 100];
497         tcp.write_all(&buf).await.unwrap();
498 
499         let mut buf = [0; 100];
500         tcp.read_exact(&mut buf).await.unwrap();
501         assert_eq!(buf, [2; 100]);
502     });
503 
504     client.cancel();
505     server.cancel();
506     let ret = ylong_runtime::block_on(server);
507     assert!(ret.is_err());
508     let ret = ylong_runtime::block_on(client);
509     assert!(ret.is_err());
510 
511     let (tx, rx) = ylong_runtime::sync::oneshot::channel();
512     let server = ylong_runtime::spawn(async move {
513         let tcp = TcpListener::bind(ADDR).await.unwrap();
514         let addr = tcp.local_addr().unwrap();
515         tx.send(addr).unwrap();
516         let (mut stream, _) = tcp.accept().await.unwrap();
517 
518         let mut buf = [0; 100];
519         stream.read_exact(&mut buf).await.unwrap();
520         assert_eq!(buf, [3; 100]);
521 
522         let buf = [2; 100];
523         stream.write_all(&buf).await.unwrap();
524     });
525 
526     let client = ylong_runtime::spawn(async move {
527         let addr = rx.await.unwrap();
528         let mut tcp = TcpStream::connect(addr).await;
529         while tcp.is_err() {
530             tcp = TcpStream::connect(addr).await;
531         }
532         let mut tcp = tcp.unwrap();
533         let buf = [3; 100];
534         tcp.write_all(&buf).await.unwrap();
535 
536         let mut buf = [0; 100];
537         tcp.read_exact(&mut buf).await.unwrap();
538         assert_eq!(buf, [2; 100]);
539     });
540 
541     ylong_runtime::block_on(server).unwrap();
542     ylong_runtime::block_on(client).unwrap();
543 }
544 
545 /// SDV case for binding on the same port twice
546 ///
547 /// # Breif
548 /// 1. Create a Tcp connection
549 /// 2. Close the client side before any data transmission
550 /// 3. Check if the server side gets an UnexpectedEof error
551 #[test]
552 #[cfg(feature = "time")]
sdv_tcp_unexpected_eof()553 fn sdv_tcp_unexpected_eof() {
554     use std::sync::Arc;
555 
556     let val = Arc::new(std::sync::Mutex::new(0));
557     let val2 = val.clone();
558 
559     let (tx, rx) = ylong_runtime::sync::oneshot::channel();
560     let handle = ylong_runtime::spawn(async move {
561         let tcp = TcpListener::bind(ADDR).await.unwrap();
562         let addr = tcp.local_addr().unwrap();
563         tx.send(addr).unwrap();
564         let (mut stream, _) = tcp.accept().await.unwrap();
565         let mut buf = [0; 10];
566         let ret = stream.read_exact(&mut buf).await.unwrap_err();
567         assert_eq!(ret.kind(), std::io::ErrorKind::UnexpectedEof);
568     });
569 
570     let client = ylong_runtime::spawn(async move {
571         let addr = rx.await.unwrap();
572         let mut tcp = TcpStream::connect(addr).await;
573         while tcp.is_err() {
574             tcp = TcpStream::connect(addr).await;
575         }
576         let mut tcp = tcp.unwrap();
577         {
578             let mut guard = val2.lock().unwrap();
579             *guard = 1;
580         }
581         ylong_runtime::time::sleep(std::time::Duration::from_secs(10)).await;
582         let buf = [3; 10];
583         tcp.write_all(&buf).await.unwrap();
584     });
585 
586     loop {
587         let guard = val.lock().unwrap();
588         if *guard != 0 {
589             break;
590         }
591         drop(guard);
592         std::thread::sleep(std::time::Duration::from_millis(10));
593     }
594     client.cancel();
595     ylong_runtime::block_on(handle).unwrap();
596 }
597