1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 #![cfg(feature = "net")]
15
16 use ylong_runtime::io::{AsyncReadExt, AsyncWriteExt};
17 use ylong_runtime::net::{TcpListener, TcpStream};
18
19 const ADDR: &str = "127.0.0.1:0";
20
21 /// SDV test cases for `TcpStream`.
22 ///
23 /// # Brief
24 /// 1. Bind `TcpListener` and wait for `accept()` using an ipv6 address.
25 /// 2. After accept, write `hello`.
26 /// 2. `TcpStream` connect to listener and try to read buf.
27 /// 4. Check if the result is correct.
28 #[test]
sdv_tcp_ipv6_connect()29 fn sdv_tcp_ipv6_connect() {
30 let handle = ylong_runtime::spawn(async move {
31 let listener = TcpListener::bind("[::1]:0").await.unwrap();
32 let addr = listener.local_addr().unwrap();
33
34 let handle = ylong_runtime::spawn(async move {
35 let mut stream = TcpStream::connect(addr).await;
36 while stream.is_err() {
37 stream = TcpStream::connect(addr).await;
38 }
39 let mut stream = stream.unwrap();
40 let mut buf = vec![0; 5];
41 let _ = stream.read(&mut buf).await;
42 assert_eq!(buf, b"hello");
43 });
44
45 let (mut stream, _) = listener.accept().await.unwrap();
46 stream.write(b"hello").await.unwrap();
47
48 handle.await.unwrap();
49 });
50 ylong_runtime::block_on(handle).unwrap();
51 }
52
53 /// SDV test cases for `TcpListener`.
54 ///
55 /// # Brief
56 /// 1. Bind `TcpListener`.
57 /// 2. Call local_addr(), set_ttl(), ttl(), take_error().
58 /// 3. Check result is correct.
59 #[test]
sdv_tcp_listener_interface()60 fn sdv_tcp_listener_interface() {
61 let handle = ylong_runtime::spawn(async {
62 let server = TcpListener::bind(ADDR).await.unwrap();
63
64 server.set_ttl(101).unwrap();
65 assert_eq!(server.ttl().unwrap(), 101);
66
67 assert!(server.take_error().unwrap().is_none());
68 });
69 ylong_runtime::block_on(handle).unwrap();
70 }
71
72 /// SDV test cases for `TcpStream`.
73 ///
74 /// # Brief
75 /// 1. Bind `TcpListener` and wait for `accept()`.
76 /// 2. After accept, try to write buf.
77 /// 2. `TcpStream` connect to listener and try to read buf.
78 /// 4. Check result is correct.
79 #[test]
sdv_tcp_stream_try()80 fn sdv_tcp_stream_try() {
81 let handle = ylong_runtime::spawn(async move {
82 let listener = TcpListener::bind(ADDR).await.unwrap();
83 let addr = listener.local_addr().unwrap();
84
85 let handle = ylong_runtime::spawn(async move {
86 let mut stream = TcpStream::connect(addr).await;
87 while stream.is_err() {
88 stream = TcpStream::connect(addr).await;
89 }
90 let stream = stream.unwrap();
91 let mut buf = vec![0; 5];
92 stream.readable().await.unwrap();
93 stream.try_read(&mut buf).unwrap();
94 assert_eq!(buf, b"hello");
95 });
96
97 let (stream, _) = listener.accept().await.unwrap();
98 stream.writable().await.unwrap();
99 stream.try_write(b"hello").unwrap();
100
101 handle.await.unwrap();
102 });
103 ylong_runtime::block_on(handle).unwrap();
104 }
105
106 /// SDV test cases for `TcpStream`.
107 ///
108 /// # Brief
109 /// 1. Bind `TcpListener` and wait for `accept()`.
110 /// 2. `TcpStream` connect to listener.
111 /// 3. Call peer_addr(), local_addr(), set_ttl(), ttl(), set_nodelay(),
112 /// nodelay(), take_error().
113 /// 4. Check result is correct.
114 #[test]
sdv_tcp_stream_basic()115 fn sdv_tcp_stream_basic() {
116 let handle = ylong_runtime::spawn(async move {
117 let listener = TcpListener::bind(ADDR).await.unwrap();
118 let addr = listener.local_addr().unwrap();
119
120 let handle = ylong_runtime::spawn(async move {
121 let mut stream = TcpStream::connect(addr).await;
122 while stream.is_err() {
123 stream = TcpStream::connect(addr).await;
124 }
125 let stream = stream.unwrap();
126
127 assert_eq!(stream.peer_addr().unwrap(), addr);
128 assert_eq!(
129 stream.local_addr().unwrap().ip(),
130 std::net::Ipv4Addr::new(127, 0, 0, 1)
131 );
132 stream.set_ttl(101).unwrap();
133 assert_eq!(stream.ttl().unwrap(), 101);
134 stream.set_nodelay(true).unwrap();
135 assert!(stream.nodelay().unwrap());
136 assert!(stream.linger().unwrap().is_none());
137 stream
138 .set_linger(Some(std::time::Duration::from_secs(1)))
139 .unwrap();
140 assert_eq!(
141 stream.linger().unwrap(),
142 Some(std::time::Duration::from_secs(1))
143 );
144 assert!(stream.take_error().unwrap().is_none());
145 });
146
147 listener.accept().await.unwrap();
148
149 handle.await.unwrap();
150 });
151 ylong_runtime::block_on(handle).unwrap();
152 }
153
154 /// SDV test cases for `TcpStream`.
155 ///
156 /// # Brief
157 /// 1. Bind `TcpListener` and wait for `accept()`.
158 /// 2. `TcpStream` connect to listener.
159 /// 3. Call peek() to get.
160 /// 4. Check result is correct.
161 #[test]
sdv_tcp_stream_peek()162 fn sdv_tcp_stream_peek() {
163 let handle = ylong_runtime::spawn(async {
164 let listener = TcpListener::bind(ADDR).await.unwrap();
165 let addr = listener.local_addr().unwrap();
166
167 let handle = ylong_runtime::spawn(async move {
168 let mut stream = TcpStream::connect(addr).await;
169 while stream.is_err() {
170 stream = TcpStream::connect(addr).await;
171 }
172 let stream = stream.unwrap();
173
174 let mut buf = [0; 100];
175 let len = stream.peek(&mut buf).await.expect("peek failed!");
176 let buf = &buf[0..len];
177 assert_eq!(len, 5);
178 assert_eq!(String::from_utf8_lossy(buf), "hello");
179 });
180
181 let (mut stream, _) = listener.accept().await.unwrap();
182 stream.write(b"hello").await.unwrap();
183
184 handle.await.unwrap();
185 });
186 ylong_runtime::block_on(handle).unwrap();
187 }
188
189 #[test]
sdv_tcp_global_runtime()190 fn sdv_tcp_global_runtime() {
191 let handle = ylong_runtime::spawn(async move {
192 let listener = TcpListener::bind(ADDR).await.expect("Bind Listener Failed");
193 let addr = listener.local_addr().unwrap();
194
195 // Start a thread as client side
196 let handle = ylong_runtime::spawn(async move {
197 let mut client = TcpStream::connect(addr).await;
198 while client.is_err() {
199 client = TcpStream::connect(addr).await;
200 }
201 let mut client = client.unwrap();
202
203 let n = client
204 .write(b"hello server")
205 .await
206 .expect("client send failed");
207 assert_eq!(n, "hello server".len());
208
209 let mut recv_buf = [0_u8; 12];
210 let n = client
211 .read(&mut recv_buf)
212 .await
213 .expect("client recv failed");
214 assert_eq!(
215 std::str::from_utf8(&recv_buf).unwrap(),
216 "hello client".to_string()
217 );
218 assert_eq!(n, "hello client".len());
219 });
220
221 let (mut socket, _) = listener.accept().await.expect("Bind accept Failed");
222 loop {
223 let mut buf = [0_u8; 12];
224 match socket.read(&mut buf).await.expect("recv Failed") {
225 0 => break,
226 n => {
227 assert_eq!(
228 std::str::from_utf8(&buf).unwrap(),
229 "hello server".to_string()
230 );
231 assert_eq!(n, "hello server".len());
232 }
233 };
234
235 socket
236 .write(b"hello client")
237 .await
238 .expect("failed to write to socket");
239 }
240 handle.await.unwrap();
241 });
242 ylong_runtime::block_on(handle).expect("block_on failed");
243 }
244
245 #[cfg(feature = "multi_instance_runtime")]
246 #[test]
sdv_tcp_multi_runtime()247 fn sdv_tcp_multi_runtime() {
248 use ylong_runtime::builder::RuntimeBuilder;
249 let runtime = RuntimeBuilder::new_multi_thread().build().unwrap();
250
251 runtime.block_on(async {
252 let tcp = TcpListener::bind(ADDR).await.unwrap();
253 let addr = tcp.local_addr().unwrap();
254
255 let client = runtime.spawn(async move {
256 let mut tcp = TcpStream::connect(addr).await;
257 while tcp.is_err() {
258 tcp = TcpStream::connect(addr).await;
259 }
260 let mut tcp = tcp.unwrap();
261 let buf = [3; 100];
262 tcp.write_all(&buf).await.unwrap();
263
264 let mut buf = [0; 100];
265 tcp.read_exact(&mut buf).await.unwrap();
266 assert_eq!(buf, [2; 100]);
267 });
268
269 let (mut stream, _) = tcp.accept().await.unwrap();
270 let mut buf = [0; 100];
271 stream.read_exact(&mut buf).await.unwrap();
272 assert_eq!(buf, [3; 100]);
273
274 let buf = [2; 100];
275 stream.write_all(&buf).await.unwrap();
276
277 client.await.unwrap();
278 });
279 }
280
281 /// SDV test cases for `TcpStream` of split().
282 ///
283 /// # Brief
284 /// 1. Bind `TcpListener` and wait for `accept()`.
285 /// 2. `TcpStream` connect to listener.
286 /// 3. Split TcpStream into read half and write half with borrowed.
287 /// 4. Write with write half and read with read half.
288 /// 5. Check result is correct.
289 #[test]
sdv_tcp_split_borrow_half()290 fn sdv_tcp_split_borrow_half() {
291 let handle = ylong_runtime::spawn(async {
292 let listener = TcpListener::bind(ADDR).await.unwrap();
293 let addr = listener.local_addr().unwrap();
294
295 let handle = ylong_runtime::spawn(async move {
296 let mut stream = TcpStream::connect(addr).await;
297 while stream.is_err() {
298 stream = TcpStream::connect(addr).await;
299 }
300 let mut stream = stream.unwrap();
301
302 let (mut read_half, mut write_half) = stream.split();
303 write_half.write(b"I am write half.").await.unwrap();
304 write_half.flush().await.unwrap();
305 write_half.shutdown().await.unwrap();
306
307 let mut buf = [0; 6];
308 let n = read_half.read(&mut buf).await.expect("server read err");
309 assert_eq!(n, 6);
310 assert_eq!(buf, [1, 2, 3, 4, 5, 6]);
311 });
312
313 let (mut stream, _) = listener.accept().await.unwrap();
314 let (mut read_half, mut write_half) = stream.split();
315 let mut buf = [0; 16];
316 let n = read_half.read(&mut buf).await.expect("server read err");
317 assert_eq!(n, 16);
318 assert_eq!(
319 String::from_utf8(Vec::from(buf)).unwrap().as_str(),
320 "I am write half."
321 );
322
323 let data1 = [1, 2, 3];
324 let data2 = [4, 5, 6];
325 let slice1 = std::io::IoSlice::new(&data1);
326 let slice2 = std::io::IoSlice::new(&data2);
327 write_half.write_vectored(&[slice1, slice2]).await.unwrap();
328
329 handle.await.unwrap();
330 });
331 ylong_runtime::block_on(handle).unwrap();
332 }
333
334 /// SDV case for binding on the same port twice
335 ///
336 /// # Breif
337 /// 1. Create a new TcpListener
338 /// 2. Create another TcpListener that binds to the same port
339 /// 3. Check if the return is an error
340 #[test]
sdv_tcp_address_in_use()341 fn sdv_tcp_address_in_use() {
342 ylong_runtime::block_on(async move {
343 let tcp = TcpListener::bind(ADDR).await.unwrap();
344 let addr = tcp.local_addr().unwrap();
345
346 let tcp2 = TcpListener::bind(addr).await;
347 assert!(tcp2.is_err());
348 });
349 }
350
351 /// SDV test cases for `TcpStream` of into_split().
352 ///
353 /// # Brief
354 /// 1. Bind `TcpListener` and wait for `accept()`.
355 /// 2. `TcpStream` connect to listener.
356 /// 3. Split TcpStream into read half and write half with owned.
357 /// 4. Write with write half and read with read half.
358 /// 5. Check result is correct.
359 #[test]
sdv_tcp_split_owned_half()360 fn sdv_tcp_split_owned_half() {
361 let handle = ylong_runtime::spawn(async move {
362 let listener = TcpListener::bind(ADDR).await.unwrap();
363 let addr = listener.local_addr().unwrap();
364
365 let handle = ylong_runtime::spawn(async move {
366 let mut stream = TcpStream::connect(addr).await;
367 while stream.is_err() {
368 stream = TcpStream::connect(addr).await;
369 }
370 let stream = stream.unwrap();
371 let (mut read_half, mut write_half) = stream.into_split();
372
373 write_half.write(b"I am write half.").await.unwrap();
374 write_half.flush().await.unwrap();
375 write_half.shutdown().await.unwrap();
376
377 let mut buf = [0; 6];
378 let n = read_half.read(&mut buf).await.expect("server read err");
379 assert_eq!(n, 6);
380 assert_eq!(buf, [1, 2, 3, 4, 5, 6]);
381 });
382
383 let (mut stream, _) = listener.accept().await.unwrap();
384 let (mut read_half, mut write_half) = stream.split();
385
386 let mut buf = [0; 16];
387 let n = read_half.read(&mut buf).await.expect("server read err");
388 assert_eq!(n, 16);
389 assert_eq!(
390 String::from_utf8(Vec::from(buf)).unwrap().as_str(),
391 "I am write half."
392 );
393 let data1 = [1, 2, 3];
394 let data2 = [4, 5, 6];
395 let slice1 = std::io::IoSlice::new(&data1);
396 let slice2 = std::io::IoSlice::new(&data2);
397 write_half.write_vectored(&[slice1, slice2]).await.unwrap();
398
399 handle.await.unwrap();
400 });
401 ylong_runtime::block_on(handle).unwrap();
402 }
403
404 /// SDV case for dropping TcpStream outside of worker context
405 ///
406 /// # Breif
407 /// 1. Starts 2 tasks via `spawn` that simulates a connection between client and
408 /// server
409 /// 2. Returns the streams to the main thread which is outside of the worker
410 /// context
411 /// 3. Drops the streams and it should not cause Panic
412 #[test]
413 #[cfg(all(not(feature = "ffrt"), feature = "sync"))]
sdv_tcp_drop_out_context()414 fn sdv_tcp_drop_out_context() {
415 let (tx, rx) = ylong_runtime::sync::oneshot::channel();
416 let handle = ylong_runtime::spawn(async move {
417 let tcp = TcpListener::bind(ADDR).await.unwrap();
418 let addr = tcp.local_addr().unwrap();
419 tx.send(addr).unwrap();
420 let (mut stream, _) = tcp.accept().await.unwrap();
421 let mut buf = [0; 10];
422 stream.read_exact(&mut buf).await.unwrap();
423 assert_eq!(buf, [3; 10]);
424
425 let buf = [2; 10];
426 stream.write_all(&buf).await.unwrap();
427 stream
428 });
429
430 let client = ylong_runtime::block_on(async move {
431 let addr = rx.await.unwrap();
432 let mut tcp = TcpStream::connect(addr).await;
433 while tcp.is_err() {
434 tcp = TcpStream::connect(addr).await;
435 }
436 let mut tcp = tcp.unwrap();
437 let buf = [3; 10];
438 tcp.write_all(&buf).await.unwrap();
439
440 let mut buf = [0; 10];
441 tcp.read_exact(&mut buf).await.unwrap();
442 assert_eq!(buf, [2; 10]);
443 tcp
444 });
445
446 let server = ylong_runtime::block_on(handle).unwrap();
447
448 drop(server);
449 drop(client);
450 }
451
452 /// SDV case for canceling TcpStream and then reconnecting on the same port
453 ///
454 /// # Breif
455 /// 1. Starts a TCP connection using port 8201
456 /// 2. Cancels the TCP connection before its finished
457 /// 3. Starts another TCP connection using the same port 8201
458 /// 4. checks if the connection is successful.
459 #[cfg(feature = "time")]
460 #[test]
sdv_tcp_cancel()461 fn sdv_tcp_cancel() {
462 use std::time::Duration;
463
464 use ylong_runtime::time::sleep;
465
466 let (tx, rx) = ylong_runtime::sync::oneshot::channel();
467 let server = ylong_runtime::spawn(async {
468 let tcp = TcpListener::bind(ADDR).await.unwrap();
469 let addr = tcp.local_addr().unwrap();
470 let _ = tx.send(addr);
471 let (mut stream, _) = tcp.accept().await.unwrap();
472 sleep(Duration::from_secs(10000)).await;
473
474 let mut buf = [0; 100];
475 stream.read_exact(&mut buf).await.unwrap();
476 assert_eq!(buf, [3; 100]);
477
478 let buf = [2; 100];
479 stream.write_all(&buf).await.unwrap();
480 });
481
482 let client = ylong_runtime::spawn(async {
483 let addr = match rx.await {
484 Ok(addr) => addr,
485 Err(_) => {
486 sleep(Duration::from_secs(100000)).await;
487 return;
488 }
489 };
490 let mut tcp = TcpStream::connect(addr).await;
491 while tcp.is_err() {
492 tcp = TcpStream::connect(addr).await;
493 }
494 sleep(Duration::from_secs(10000)).await;
495 let mut tcp = tcp.unwrap();
496 let buf = [3; 100];
497 tcp.write_all(&buf).await.unwrap();
498
499 let mut buf = [0; 100];
500 tcp.read_exact(&mut buf).await.unwrap();
501 assert_eq!(buf, [2; 100]);
502 });
503
504 client.cancel();
505 server.cancel();
506 let ret = ylong_runtime::block_on(server);
507 assert!(ret.is_err());
508 let ret = ylong_runtime::block_on(client);
509 assert!(ret.is_err());
510
511 let (tx, rx) = ylong_runtime::sync::oneshot::channel();
512 let server = ylong_runtime::spawn(async move {
513 let tcp = TcpListener::bind(ADDR).await.unwrap();
514 let addr = tcp.local_addr().unwrap();
515 tx.send(addr).unwrap();
516 let (mut stream, _) = tcp.accept().await.unwrap();
517
518 let mut buf = [0; 100];
519 stream.read_exact(&mut buf).await.unwrap();
520 assert_eq!(buf, [3; 100]);
521
522 let buf = [2; 100];
523 stream.write_all(&buf).await.unwrap();
524 });
525
526 let client = ylong_runtime::spawn(async move {
527 let addr = rx.await.unwrap();
528 let mut tcp = TcpStream::connect(addr).await;
529 while tcp.is_err() {
530 tcp = TcpStream::connect(addr).await;
531 }
532 let mut tcp = tcp.unwrap();
533 let buf = [3; 100];
534 tcp.write_all(&buf).await.unwrap();
535
536 let mut buf = [0; 100];
537 tcp.read_exact(&mut buf).await.unwrap();
538 assert_eq!(buf, [2; 100]);
539 });
540
541 ylong_runtime::block_on(server).unwrap();
542 ylong_runtime::block_on(client).unwrap();
543 }
544
545 /// SDV case for binding on the same port twice
546 ///
547 /// # Breif
548 /// 1. Create a Tcp connection
549 /// 2. Close the client side before any data transmission
550 /// 3. Check if the server side gets an UnexpectedEof error
551 #[test]
552 #[cfg(feature = "time")]
sdv_tcp_unexpected_eof()553 fn sdv_tcp_unexpected_eof() {
554 use std::sync::Arc;
555
556 let val = Arc::new(std::sync::Mutex::new(0));
557 let val2 = val.clone();
558
559 let (tx, rx) = ylong_runtime::sync::oneshot::channel();
560 let handle = ylong_runtime::spawn(async move {
561 let tcp = TcpListener::bind(ADDR).await.unwrap();
562 let addr = tcp.local_addr().unwrap();
563 tx.send(addr).unwrap();
564 let (mut stream, _) = tcp.accept().await.unwrap();
565 let mut buf = [0; 10];
566 let ret = stream.read_exact(&mut buf).await.unwrap_err();
567 assert_eq!(ret.kind(), std::io::ErrorKind::UnexpectedEof);
568 });
569
570 let client = ylong_runtime::spawn(async move {
571 let addr = rx.await.unwrap();
572 let mut tcp = TcpStream::connect(addr).await;
573 while tcp.is_err() {
574 tcp = TcpStream::connect(addr).await;
575 }
576 let mut tcp = tcp.unwrap();
577 {
578 let mut guard = val2.lock().unwrap();
579 *guard = 1;
580 }
581 ylong_runtime::time::sleep(std::time::Duration::from_secs(10)).await;
582 let buf = [3; 10];
583 tcp.write_all(&buf).await.unwrap();
584 });
585
586 loop {
587 let guard = val.lock().unwrap();
588 if *guard != 0 {
589 break;
590 }
591 drop(guard);
592 std::thread::sleep(std::time::Duration::from_millis(10));
593 }
594 client.cancel();
595 ylong_runtime::block_on(handle).unwrap();
596 }
597