1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 use std::io::Cursor;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 use std::vec::IntoIter;
20 use tokio::io::{AsyncRead, ReadBuf};
21 use tokio_util::io::ReaderStream;
22
23 /// A structure that helps you build a `multipart/form-data` message.
24 ///
25 /// # Examples
26 ///
27 /// ```
28 /// # use ylong_http_client::async_impl::{MultiPart, Part};
29 ///
30 /// let multipart = MultiPart::new()
31 /// .part(Part::new().name("name").body("xiaoming"))
32 /// .part(Part::new().name("password").body("123456789"));
33 /// ```
34 pub struct MultiPart {
35 parts: Vec<Part>,
36 boundary: String,
37 status: ReadStatus,
38 }
39
40 impl MultiPart {
41 /// Creates an empty `Multipart` with boundary created automatically.
42 ///
43 /// # Examples
44 ///
45 /// ```
46 /// # use ylong_http_client::async_impl::MultiPart;
47 ///
48 /// let multipart = MultiPart::new();
49 /// ```
new() -> Self50 pub fn new() -> Self {
51 Self {
52 parts: Vec::new(),
53 boundary: gen_boundary(),
54 status: ReadStatus::Never,
55 }
56 }
57
58 /// Sets a part to the `Multipart`.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// # use ylong_http_client::async_impl::{MultiPart, Part};
64 ///
65 /// let multipart = MultiPart::new()
66 /// .part(Part::new().name("name").body("xiaoming"));
67 /// ```
part(mut self, part: Part) -> Self68 pub fn part(mut self, part: Part) -> Self {
69 self.parts.push(part);
70 self
71 }
72
73 /// Gets the boundary of this `Multipart`.
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// # use ylong_http_client::async_impl::MultiPart;
79 ///
80 /// let multipart = MultiPart::new();
81 /// let boundary = multipart.boundary();
82 /// ```
boundary(&self) -> &str83 pub fn boundary(&self) -> &str {
84 self.boundary.as_str()
85 }
86
87 /// Get the total bytes of the `multpart/form-data` message, including
88 /// length of every parts, such as boundaries, headers, bodies, etc.
89 ///
90 /// # Examples
91 ///
92 /// ```
93 /// # use ylong_http_client::async_impl::{MultiPart, Part};
94 ///
95 /// let multipart = MultiPart::new()
96 /// .part(Part::new().name("name").body("xiaoming"));
97 ///
98 /// let bytes = multipart.total_bytes();
99 /// ```
total_bytes(&self) -> Option<u64>100 pub fn total_bytes(&self) -> Option<u64> {
101 let mut size = 0u64;
102 for part in self.parts.iter() {
103 size += part.length?;
104
105 // start boundary + \r\n
106 size += 2 + self.boundary.len() as u64 + 2;
107
108 // Content-Disposition: form-data
109 size += 30;
110
111 // ; name="xxx"
112 if let Some(name) = part.name.as_ref() {
113 size += 9 + name.len() as u64;
114 }
115
116 // ; filename="xxx"
117 if let Some(name) = part.file_name.as_ref() {
118 size += 13 + name.len() as u64;
119 }
120
121 // \r\n
122 size += 2;
123
124 // Content-Type: xxx
125 if let Some(mime) = part.mime.as_ref() {
126 size += 16 + mime.len() as u64;
127 }
128
129 // \r\n
130 size += 2 + 2;
131 }
132 // last boundary
133 size += 2 + self.boundary.len() as u64 + 4;
134 Some(size)
135 }
136
build_status(&mut self)137 pub(crate) fn build_status(&mut self) {
138 let mut states = Vec::new();
139 for part in self.parts.iter_mut() {
140 states.push(MultiPartState::bytes(
141 format!("--{}\r\n", self.boundary).into_bytes(),
142 ));
143 states.push(MultiPartState::bytes(
144 b"Content-Disposition: form-data".to_vec(),
145 ));
146
147 if let Some(ref name) = part.name {
148 states.push(MultiPartState::bytes(
149 format!("; name=\"{}\"", name).into_bytes(),
150 ));
151 }
152
153 if let Some(ref file_name) = part.file_name {
154 states.push(MultiPartState::bytes(
155 format!("; filename=\"{}\"", file_name).into_bytes(),
156 ));
157 }
158
159 states.push(MultiPartState::bytes(b"\r\n".to_vec()));
160
161 if let Some(ref mime) = part.mime {
162 states.push(MultiPartState::bytes(
163 format!("Content-Type: {}\r\n", mime).into_bytes(),
164 ));
165 }
166
167 states.push(MultiPartState::bytes(b"\r\n".to_vec()));
168
169 if let Some(body) = part.body.take() {
170 states.push(body);
171 }
172
173 states.push(MultiPartState::bytes(b"\r\n".to_vec()));
174 }
175 states.push(MultiPartState::bytes(
176 format!("--{}--\r\n", self.boundary).into_bytes(),
177 ));
178 self.status = ReadStatus::Reading(MultiPartStates {
179 states: states.into_iter(),
180 curr: None,
181 })
182 }
183 }
184
185 impl Default for MultiPart {
default() -> Self186 fn default() -> Self {
187 Self::new()
188 }
189 }
190
191 impl AsyncRead for MultiPart {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>192 fn poll_read(
193 mut self: Pin<&mut Self>,
194 cx: &mut Context<'_>,
195 buf: &mut ReadBuf<'_>,
196 ) -> Poll<std::io::Result<()>> {
197 match self.status {
198 ReadStatus::Never => self.build_status(),
199 ReadStatus::Reading(_) => {}
200 ReadStatus::Finish => return Poll::Ready(Ok(())),
201 }
202
203 if let ReadStatus::Reading(ref mut status) = self.status {
204 if buf.initialize_unfilled().is_empty() {
205 return Poll::Ready(Ok(()));
206 }
207 let filled = buf.filled().len();
208 return match Pin::new(status).poll_read(cx, buf) {
209 Poll::Ready(Ok(())) => {
210 let new_filled = buf.filled().len();
211 if filled == new_filled {
212 self.status = ReadStatus::Finish;
213 }
214 Poll::Ready(Ok(()))
215 }
216 Poll::Pending => {
217 let new_filled = buf.filled().len();
218 return if new_filled != filled {
219 Poll::Ready(Ok(()))
220 } else {
221 Poll::Pending
222 };
223 }
224 x => x,
225 };
226 }
227 Poll::Ready(Ok(()))
228 }
229 }
230
231 impl From<MultiPart> for reqwest::Body {
from(value: MultiPart) -> Self232 fn from(value: MultiPart) -> Self {
233 reqwest::Body::wrap_stream(ReaderStream::new(value))
234 }
235 }
236
237 /// A structure that represents a part of `multipart/form-data` message.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// # use ylong_http_client::async_impl::Part;
243 ///
244 /// let part = Part::new().name("name").body("xiaoming");
245 /// ```
246 pub struct Part {
247 name: Option<String>,
248 file_name: Option<String>,
249 mime: Option<String>,
250 length: Option<u64>,
251 body: Option<MultiPartState>,
252 }
253
254 impl Part {
255 /// Creates an empty `Part`.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// use ylong_http_client::async_impl::Part;
261 ///
262 /// let part = Part::new();
263 /// ```
new() -> Self264 pub fn new() -> Self {
265 Self {
266 name: None,
267 file_name: None,
268 mime: None,
269 length: None,
270 body: None,
271 }
272 }
273
274 /// Sets the name of this `Part`.
275 ///
276 /// The name message will be set to `Content-Disposition` header.
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// use ylong_http_client::async_impl::Part;
282 ///
283 /// let part = Part::new().name("name");
284 /// ```
name(mut self, name: &str) -> Self285 pub fn name(mut self, name: &str) -> Self {
286 self.name = Some(String::from(name));
287 self
288 }
289
290 /// Sets the file name of this `Part`.
291 ///
292 /// The file name message will be set to `Content-Disposition` header.
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use ylong_http_client::async_impl::Part;
298 ///
299 /// let part = Part::new().file_name("example.txt");
300 /// ```
file_name(mut self, file_name: &str) -> Self301 pub fn file_name(mut self, file_name: &str) -> Self {
302 self.file_name = Some(String::from(file_name));
303 self
304 }
305
306 /// Sets the mime type of this `Part`.
307 ///
308 /// The mime type message will be set to `Content-Type` header.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use ylong_http_client::async_impl::Part;
314 ///
315 /// let part = Part::new().mime("application/octet-stream");
316 /// ```
mime(mut self, mime: &str) -> Self317 pub fn mime(mut self, mime: &str) -> Self {
318 self.mime = Some(String::from(mime));
319 self
320 }
321
322 /// Sets the length of body of this `Part`.
323 ///
324 /// The length message will be set to `Content-Length` header.
325 ///
326 /// # Examples
327 ///
328 /// ```
329 /// use ylong_http_client::async_impl::Part;
330 ///
331 /// let part = Part::new().length(Some(8)).body("xiaoming");
332 /// ```
length(mut self, length: Option<u64>) -> Self333 pub fn length(mut self, length: Option<u64>) -> Self {
334 self.length = length;
335 self
336 }
337
338 /// Sets a slice body of this `Part`.
339 ///
340 /// The body message will be set to the body part.
341 ///
342 /// # Examples
343 ///
344 /// ```
345 /// use ylong_http_client::async_impl::Part;
346 ///
347 /// let part = Part::new().mime("application/octet-stream");
348 /// ```
body<T: AsRef<[u8]>>(mut self, body: T) -> Self349 pub fn body<T: AsRef<[u8]>>(mut self, body: T) -> Self {
350 let body = body.as_ref().to_vec();
351 self.length = Some(body.len() as u64);
352 self.body = Some(MultiPartState::bytes(body));
353 self
354 }
355
356 /// Sets a stream body of this `Part`.
357 ///
358 /// The body message will be set to the body part.
359 ///
360 /// # Examples
361 ///
362 /// ```
363 /// # use tokio::io::AsyncRead;
364 /// # use ylong_http_client::async_impl::Part;
365 ///
366 /// # fn set_stream_body<R: AsyncRead + Send + Sync + 'static>(stream: R) {
367 /// let part = Part::new().stream(stream);
368 /// # }
369 /// ```
stream<T: AsyncRead + Send + Sync + 'static>(mut self, body: T) -> Self370 pub fn stream<T: AsyncRead + Send + Sync + 'static>(mut self, body: T) -> Self {
371 self.body = Some(MultiPartState::stream(Box::pin(body)));
372 self
373 }
374 }
375
376 impl Default for Part {
default() -> Self377 fn default() -> Self {
378 Self::new()
379 }
380 }
381
382 impl AsRef<MultiPart> for MultiPart {
as_ref(&self) -> &MultiPart383 fn as_ref(&self) -> &MultiPart {
384 self
385 }
386 }
387
388 enum ReadStatus {
389 Never,
390 Reading(MultiPartStates),
391 Finish,
392 }
393
394 struct MultiPartStates {
395 states: IntoIter<MultiPartState>,
396 curr: Option<MultiPartState>,
397 }
398
399 impl MultiPartStates {
poll_read_curr( &mut self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>400 fn poll_read_curr(
401 &mut self,
402 cx: &mut Context<'_>,
403 buf: &mut ReadBuf<'_>,
404 ) -> Poll<std::io::Result<()>> {
405 if let Some(mut state) = self.curr.take() {
406 return match state {
407 MultiPartState::Bytes(ref mut bytes) => {
408 let filled_len = buf.filled().len();
409 let unfilled = buf.initialize_unfilled();
410 let unfilled_len = unfilled.len();
411 let new = std::io::Read::read(bytes, unfilled).unwrap();
412 buf.set_filled(filled_len + new);
413
414 if new >= unfilled_len {
415 self.curr = Some(state);
416 }
417 Poll::Ready(Ok(()))
418 }
419 MultiPartState::Stream(ref mut stream) => {
420 let old_len = buf.filled().len();
421 match stream.as_mut().poll_read(cx, buf) {
422 Poll::Ready(Ok(())) => {
423 if old_len != buf.filled().len() {
424 self.curr = Some(state);
425 }
426 Poll::Ready(Ok(()))
427 }
428 Poll::Pending => {
429 self.curr = Some(state);
430 Poll::Pending
431 }
432 x => x,
433 }
434 }
435 };
436 }
437 Poll::Ready(Ok(()))
438 }
439 }
440
441 enum MultiPartState {
442 Bytes(Cursor<Vec<u8>>),
443 Stream(Pin<Box<dyn AsyncRead + Send + Sync>>),
444 }
445
446 impl MultiPartState {
bytes(bytes: Vec<u8>) -> Self447 fn bytes(bytes: Vec<u8>) -> Self {
448 Self::Bytes(Cursor::new(bytes))
449 }
450
stream(reader: Pin<Box<dyn AsyncRead + Send + Sync>>) -> Self451 fn stream(reader: Pin<Box<dyn AsyncRead + Send + Sync>>) -> Self {
452 Self::Stream(reader)
453 }
454 }
455
456 impl AsyncRead for MultiPartStates {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>457 fn poll_read(
458 self: Pin<&mut Self>,
459 cx: &mut Context<'_>,
460 buf: &mut ReadBuf<'_>,
461 ) -> Poll<std::io::Result<()>> {
462 let this = self.get_mut();
463 while !buf.initialize_unfilled().is_empty() {
464 if this.curr.is_none() {
465 this.curr = match this.states.next() {
466 None => break,
467 x => x,
468 }
469 }
470
471 match this.poll_read_curr(cx, buf) {
472 Poll::Ready(Ok(())) => {}
473 x => return x,
474 }
475 }
476 Poll::Ready(Ok(()))
477 }
478 }
479
gen_boundary() -> String480 fn gen_boundary() -> String {
481 use crate::reqwest_impl::util::xor_shift as rand;
482
483 format!(
484 "{:016x}-{:016x}-{:016x}-{:016x}",
485 rand(),
486 rand(),
487 rand(),
488 rand()
489 )
490 }
491