1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 use std::io::Cursor;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 use std::vec::IntoIter;
20 use tokio::io::{AsyncRead, ReadBuf};
21 use tokio_util::io::ReaderStream;
22 
23 /// A structure that helps you build a `multipart/form-data` message.
24 ///
25 /// # Examples
26 ///
27 /// ```
28 /// # use ylong_http_client::async_impl::{MultiPart, Part};
29 ///
30 /// let multipart = MultiPart::new()
31 ///     .part(Part::new().name("name").body("xiaoming"))
32 ///     .part(Part::new().name("password").body("123456789"));
33 /// ```
34 pub struct MultiPart {
35     parts: Vec<Part>,
36     boundary: String,
37     status: ReadStatus,
38 }
39 
40 impl MultiPart {
41     /// Creates an empty `Multipart` with boundary created automatically.
42     ///
43     /// # Examples
44     ///
45     /// ```
46     /// # use ylong_http_client::async_impl::MultiPart;
47     ///
48     /// let multipart = MultiPart::new();
49     /// ```
new() -> Self50     pub fn new() -> Self {
51         Self {
52             parts: Vec::new(),
53             boundary: gen_boundary(),
54             status: ReadStatus::Never,
55         }
56     }
57 
58     /// Sets a part to the `Multipart`.
59     ///
60     /// # Examples
61     ///
62     /// ```
63     /// # use ylong_http_client::async_impl::{MultiPart, Part};
64     ///
65     /// let multipart = MultiPart::new()
66     ///     .part(Part::new().name("name").body("xiaoming"));
67     /// ```
part(mut self, part: Part) -> Self68     pub fn part(mut self, part: Part) -> Self {
69         self.parts.push(part);
70         self
71     }
72 
73     /// Gets the boundary of this `Multipart`.
74     ///
75     /// # Examples
76     ///
77     /// ```
78     /// # use ylong_http_client::async_impl::MultiPart;
79     ///
80     /// let multipart = MultiPart::new();
81     /// let boundary = multipart.boundary();
82     /// ```
boundary(&self) -> &str83     pub fn boundary(&self) -> &str {
84         self.boundary.as_str()
85     }
86 
87     /// Get the total bytes of the `multpart/form-data` message, including
88     /// length of every parts, such as boundaries, headers, bodies, etc.
89     ///
90     /// # Examples
91     ///
92     /// ```
93     /// # use ylong_http_client::async_impl::{MultiPart, Part};
94     ///
95     /// let multipart = MultiPart::new()
96     ///     .part(Part::new().name("name").body("xiaoming"));
97     ///
98     /// let bytes = multipart.total_bytes();
99     /// ```
total_bytes(&self) -> Option<u64>100     pub fn total_bytes(&self) -> Option<u64> {
101         let mut size = 0u64;
102         for part in self.parts.iter() {
103             size += part.length?;
104 
105             // start boundary + \r\n
106             size += 2 + self.boundary.len() as u64 + 2;
107 
108             // Content-Disposition: form-data
109             size += 30;
110 
111             // ; name="xxx"
112             if let Some(name) = part.name.as_ref() {
113                 size += 9 + name.len() as u64;
114             }
115 
116             // ; filename="xxx"
117             if let Some(name) = part.file_name.as_ref() {
118                 size += 13 + name.len() as u64;
119             }
120 
121             // \r\n
122             size += 2;
123 
124             // Content-Type: xxx
125             if let Some(mime) = part.mime.as_ref() {
126                 size += 16 + mime.len() as u64;
127             }
128 
129             // \r\n
130             size += 2 + 2;
131         }
132         // last boundary
133         size += 2 + self.boundary.len() as u64 + 4;
134         Some(size)
135     }
136 
build_status(&mut self)137     pub(crate) fn build_status(&mut self) {
138         let mut states = Vec::new();
139         for part in self.parts.iter_mut() {
140             states.push(MultiPartState::bytes(
141                 format!("--{}\r\n", self.boundary).into_bytes(),
142             ));
143             states.push(MultiPartState::bytes(
144                 b"Content-Disposition: form-data".to_vec(),
145             ));
146 
147             if let Some(ref name) = part.name {
148                 states.push(MultiPartState::bytes(
149                     format!("; name=\"{}\"", name).into_bytes(),
150                 ));
151             }
152 
153             if let Some(ref file_name) = part.file_name {
154                 states.push(MultiPartState::bytes(
155                     format!("; filename=\"{}\"", file_name).into_bytes(),
156                 ));
157             }
158 
159             states.push(MultiPartState::bytes(b"\r\n".to_vec()));
160 
161             if let Some(ref mime) = part.mime {
162                 states.push(MultiPartState::bytes(
163                     format!("Content-Type: {}\r\n", mime).into_bytes(),
164                 ));
165             }
166 
167             states.push(MultiPartState::bytes(b"\r\n".to_vec()));
168 
169             if let Some(body) = part.body.take() {
170                 states.push(body);
171             }
172 
173             states.push(MultiPartState::bytes(b"\r\n".to_vec()));
174         }
175         states.push(MultiPartState::bytes(
176             format!("--{}--\r\n", self.boundary).into_bytes(),
177         ));
178         self.status = ReadStatus::Reading(MultiPartStates {
179             states: states.into_iter(),
180             curr: None,
181         })
182     }
183 }
184 
185 impl Default for MultiPart {
default() -> Self186     fn default() -> Self {
187         Self::new()
188     }
189 }
190 
191 impl AsyncRead for MultiPart {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>192     fn poll_read(
193         mut self: Pin<&mut Self>,
194         cx: &mut Context<'_>,
195         buf: &mut ReadBuf<'_>,
196     ) -> Poll<std::io::Result<()>> {
197         match self.status {
198             ReadStatus::Never => self.build_status(),
199             ReadStatus::Reading(_) => {}
200             ReadStatus::Finish => return Poll::Ready(Ok(())),
201         }
202 
203         if let ReadStatus::Reading(ref mut status) = self.status {
204             if buf.initialize_unfilled().is_empty() {
205                 return Poll::Ready(Ok(()));
206             }
207             let filled = buf.filled().len();
208             return match Pin::new(status).poll_read(cx, buf) {
209                 Poll::Ready(Ok(())) => {
210                     let new_filled = buf.filled().len();
211                     if filled == new_filled {
212                         self.status = ReadStatus::Finish;
213                     }
214                     Poll::Ready(Ok(()))
215                 }
216                 Poll::Pending => {
217                     let new_filled = buf.filled().len();
218                     return if new_filled != filled {
219                         Poll::Ready(Ok(()))
220                     } else {
221                         Poll::Pending
222                     };
223                 }
224                 x => x,
225             };
226         }
227         Poll::Ready(Ok(()))
228     }
229 }
230 
231 impl From<MultiPart> for reqwest::Body {
from(value: MultiPart) -> Self232     fn from(value: MultiPart) -> Self {
233         reqwest::Body::wrap_stream(ReaderStream::new(value))
234     }
235 }
236 
237 /// A structure that represents a part of `multipart/form-data` message.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// # use ylong_http_client::async_impl::Part;
243 ///
244 /// let part = Part::new().name("name").body("xiaoming");
245 /// ```
246 pub struct Part {
247     name: Option<String>,
248     file_name: Option<String>,
249     mime: Option<String>,
250     length: Option<u64>,
251     body: Option<MultiPartState>,
252 }
253 
254 impl Part {
255     /// Creates an empty `Part`.
256     ///
257     /// # Examples
258     ///
259     /// ```
260     /// use ylong_http_client::async_impl::Part;
261     ///
262     /// let part = Part::new();
263     /// ```
new() -> Self264     pub fn new() -> Self {
265         Self {
266             name: None,
267             file_name: None,
268             mime: None,
269             length: None,
270             body: None,
271         }
272     }
273 
274     /// Sets the name of this `Part`.
275     ///
276     /// The name message will be set to `Content-Disposition` header.
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// use ylong_http_client::async_impl::Part;
282     ///
283     /// let part = Part::new().name("name");
284     /// ```
name(mut self, name: &str) -> Self285     pub fn name(mut self, name: &str) -> Self {
286         self.name = Some(String::from(name));
287         self
288     }
289 
290     /// Sets the file name of this `Part`.
291     ///
292     /// The file name message will be set to `Content-Disposition` header.
293     ///
294     /// # Examples
295     ///
296     /// ```
297     /// use ylong_http_client::async_impl::Part;
298     ///
299     /// let part = Part::new().file_name("example.txt");
300     /// ```
file_name(mut self, file_name: &str) -> Self301     pub fn file_name(mut self, file_name: &str) -> Self {
302         self.file_name = Some(String::from(file_name));
303         self
304     }
305 
306     /// Sets the mime type of this `Part`.
307     ///
308     /// The mime type message will be set to `Content-Type` header.
309     ///
310     /// # Examples
311     ///
312     /// ```
313     /// use ylong_http_client::async_impl::Part;
314     ///
315     /// let part = Part::new().mime("application/octet-stream");
316     /// ```
mime(mut self, mime: &str) -> Self317     pub fn mime(mut self, mime: &str) -> Self {
318         self.mime = Some(String::from(mime));
319         self
320     }
321 
322     /// Sets the length of body of this `Part`.
323     ///
324     /// The length message will be set to `Content-Length` header.
325     ///
326     /// # Examples
327     ///
328     /// ```
329     /// use ylong_http_client::async_impl::Part;
330     ///
331     /// let part = Part::new().length(Some(8)).body("xiaoming");
332     /// ```
length(mut self, length: Option<u64>) -> Self333     pub fn length(mut self, length: Option<u64>) -> Self {
334         self.length = length;
335         self
336     }
337 
338     /// Sets a slice body of this `Part`.
339     ///
340     /// The body message will be set to the body part.
341     ///
342     /// # Examples
343     ///
344     /// ```
345     /// use ylong_http_client::async_impl::Part;
346     ///
347     /// let part = Part::new().mime("application/octet-stream");
348     /// ```
body<T: AsRef<[u8]>>(mut self, body: T) -> Self349     pub fn body<T: AsRef<[u8]>>(mut self, body: T) -> Self {
350         let body = body.as_ref().to_vec();
351         self.length = Some(body.len() as u64);
352         self.body = Some(MultiPartState::bytes(body));
353         self
354     }
355 
356     /// Sets a stream body of this `Part`.
357     ///
358     /// The body message will be set to the body part.
359     ///
360     /// # Examples
361     ///
362     /// ```
363     /// # use tokio::io::AsyncRead;
364     /// # use ylong_http_client::async_impl::Part;
365     ///
366     /// # fn set_stream_body<R: AsyncRead + Send + Sync + 'static>(stream: R) {
367     /// let part = Part::new().stream(stream);
368     /// # }
369     /// ```
stream<T: AsyncRead + Send + Sync + 'static>(mut self, body: T) -> Self370     pub fn stream<T: AsyncRead + Send + Sync + 'static>(mut self, body: T) -> Self {
371         self.body = Some(MultiPartState::stream(Box::pin(body)));
372         self
373     }
374 }
375 
376 impl Default for Part {
default() -> Self377     fn default() -> Self {
378         Self::new()
379     }
380 }
381 
382 impl AsRef<MultiPart> for MultiPart {
as_ref(&self) -> &MultiPart383     fn as_ref(&self) -> &MultiPart {
384         self
385     }
386 }
387 
388 enum ReadStatus {
389     Never,
390     Reading(MultiPartStates),
391     Finish,
392 }
393 
394 struct MultiPartStates {
395     states: IntoIter<MultiPartState>,
396     curr: Option<MultiPartState>,
397 }
398 
399 impl MultiPartStates {
poll_read_curr( &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>400     fn poll_read_curr(
401         &mut self,
402         cx: &mut Context<'_>,
403         buf: &mut ReadBuf<'_>,
404     ) -> Poll<std::io::Result<()>> {
405         if let Some(mut state) = self.curr.take() {
406             return match state {
407                 MultiPartState::Bytes(ref mut bytes) => {
408                     let filled_len = buf.filled().len();
409                     let unfilled = buf.initialize_unfilled();
410                     let unfilled_len = unfilled.len();
411                     let new = std::io::Read::read(bytes, unfilled).unwrap();
412                     buf.set_filled(filled_len + new);
413 
414                     if new >= unfilled_len {
415                         self.curr = Some(state);
416                     }
417                     Poll::Ready(Ok(()))
418                 }
419                 MultiPartState::Stream(ref mut stream) => {
420                     let old_len = buf.filled().len();
421                     match stream.as_mut().poll_read(cx, buf) {
422                         Poll::Ready(Ok(())) => {
423                             if old_len != buf.filled().len() {
424                                 self.curr = Some(state);
425                             }
426                             Poll::Ready(Ok(()))
427                         }
428                         Poll::Pending => {
429                             self.curr = Some(state);
430                             Poll::Pending
431                         }
432                         x => x,
433                     }
434                 }
435             };
436         }
437         Poll::Ready(Ok(()))
438     }
439 }
440 
441 enum MultiPartState {
442     Bytes(Cursor<Vec<u8>>),
443     Stream(Pin<Box<dyn AsyncRead + Send + Sync>>),
444 }
445 
446 impl MultiPartState {
bytes(bytes: Vec<u8>) -> Self447     fn bytes(bytes: Vec<u8>) -> Self {
448         Self::Bytes(Cursor::new(bytes))
449     }
450 
stream(reader: Pin<Box<dyn AsyncRead + Send + Sync>>) -> Self451     fn stream(reader: Pin<Box<dyn AsyncRead + Send + Sync>>) -> Self {
452         Self::Stream(reader)
453     }
454 }
455 
456 impl AsyncRead for MultiPartStates {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>457     fn poll_read(
458         self: Pin<&mut Self>,
459         cx: &mut Context<'_>,
460         buf: &mut ReadBuf<'_>,
461     ) -> Poll<std::io::Result<()>> {
462         let this = self.get_mut();
463         while !buf.initialize_unfilled().is_empty() {
464             if this.curr.is_none() {
465                 this.curr = match this.states.next() {
466                     None => break,
467                     x => x,
468                 }
469             }
470 
471             match this.poll_read_curr(cx, buf) {
472                 Poll::Ready(Ok(())) => {}
473                 x => return x,
474             }
475         }
476         Poll::Ready(Ok(()))
477     }
478 }
479 
gen_boundary() -> String480 fn gen_boundary() -> String {
481     use crate::reqwest_impl::util::xor_shift as rand;
482 
483     format!(
484         "{:016x}-{:016x}-{:016x}-{:016x}",
485         rand(),
486         rand(),
487         rand(),
488         rand()
489     )
490 }
491