// Copyright (c) 2023 Huawei Device Co., Ltd. // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::mem::take; use std::sync::{Arc, Mutex}; #[cfg(feature = "http2")] use ylong_http::request::uri::Scheme; use ylong_http::request::uri::Uri; use crate::async_impl::connector::ConnInfo; use crate::async_impl::Connector; use crate::error::HttpClientError; use crate::runtime::{AsyncRead, AsyncWrite}; #[cfg(feature = "http2")] use crate::util::config::H2Config; use crate::util::config::{HttpConfig, HttpVersion}; use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher}; use crate::util::pool::{Pool, PoolKey}; pub(crate) struct ConnPool { pool: Pool>, connector: Arc, config: HttpConfig, } impl ConnPool { pub(crate) fn new(config: HttpConfig, connector: C) -> Self { Self { pool: Pool::new(), connector: Arc::new(connector), config, } } pub(crate) async fn connect_to(&self, uri: &Uri) -> Result, HttpClientError> { let key = PoolKey::new( uri.scheme().unwrap().clone(), uri.authority().unwrap().clone(), ); self.pool .get(key, Conns::new) .conn(self.config.clone(), self.connector.clone(), uri) .await } } pub(crate) struct Conns { list: Arc>>>, #[cfg(feature = "http2")] h2_conn: Arc>>>, } impl Conns { fn new() -> Self { Self { list: Arc::new(Mutex::new(Vec::new())), #[cfg(feature = "http2")] h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), } } } impl Clone for Conns { fn clone(&self) -> Self { Self { list: self.list.clone(), #[cfg(feature = "http2")] h2_conn: self.h2_conn.clone(), } } } impl Conns { async fn conn( &mut self, config: HttpConfig, connector: Arc, url: &Uri, ) -> Result, HttpClientError> where C: Connector, { match config.version { #[cfg(feature = "http2")] HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await, #[cfg(feature = "http1_1")] HttpVersion::Http1 => self.conn_h1(connector, url).await, HttpVersion::Negotiate => { #[cfg(all(feature = "http1_1", not(feature = "http2")))] return self.conn_h1(connector, url).await; #[cfg(all(feature = "http2", feature = "http1_1"))] return self .conn_negotiate(connector, url, config.http2_config) .await; } } } async fn conn_h1(&self, connector: Arc, url: &Uri) -> Result, HttpClientError> where C: Connector, { if let Some(conn) = self.exist_h1_conn() { return Ok(conn); } let dispatcher = ConnDispatcher::http1(connector.connect(url).await?); Ok(self.dispatch_h1_conn(dispatcher)) } #[cfg(feature = "http2")] async fn conn_h2( &self, connector: Arc, url: &Uri, config: H2Config, ) -> Result, HttpClientError> where C: Connector, { // The lock `h2_occupation` is used to prevent multiple coroutines from sending // Requests at the same time under concurrent conditions, // resulting in the creation of multiple tcp connections let mut lock = self.h2_conn.lock().await; if let Some(conn) = Self::exist_h2_conn(&mut lock) { return Ok(conn); } let stream = connector.connect(url).await?; let details = stream.conn_detail(); let tls = if let Some(scheme) = url.scheme() { *scheme == Scheme::HTTPS } else { false }; match details.alpn() { None if tls => return err_from_msg!(Connect, "The peer does not support http/2."), Some(protocol) if protocol != b"h2" => { return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") } _ => {} } Ok(Self::dispatch_h2_conn(config, stream, &mut lock)) } #[cfg(all(feature = "http2", feature = "http1_1"))] async fn conn_negotiate( &self, connector: Arc, url: &Uri, config: H2Config, ) -> Result, HttpClientError> where C: Connector, { match *url.scheme().unwrap() { Scheme::HTTPS => { let mut lock = self.h2_conn.lock().await; if let Some(conn) = Self::exist_h2_conn(&mut lock) { return Ok(conn); } if let Some(conn) = self.exist_h1_conn() { return Ok(conn); } let stream = connector.connect(url).await?; let details = stream.conn_detail(); let protocol = if let Some(bytes) = details.alpn() { bytes } else { let dispatcher = ConnDispatcher::http1(stream); return Ok(self.dispatch_h1_conn(dispatcher)); }; if protocol == b"http/1.1" { let dispatcher = ConnDispatcher::http1(stream); Ok(self.dispatch_h1_conn(dispatcher)) } else if protocol == b"h2" { Ok(Self::dispatch_h2_conn(config, stream, &mut lock)) } else { err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") } } Scheme::HTTP => self.conn_h1(connector, url).await, } } fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher) -> Conn { // We must be able to get the `Conn` here. let conn = dispatcher.dispatch().unwrap(); let mut list = self.list.lock().unwrap(); list.push(dispatcher); conn } #[cfg(feature = "http2")] fn dispatch_h2_conn( config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard>>, ) -> Conn { let dispatcher = ConnDispatcher::http2(config, stream); let conn = dispatcher.dispatch().unwrap(); lock.push(dispatcher); conn } fn exist_h1_conn(&self) -> Option> { let mut list = self.list.lock().unwrap(); let mut conn = None; let curr = take(&mut *list); // TODO Distinguish between http2 connections and http1 connections. for dispatcher in curr.into_iter() { // Discard invalid dispatchers. if dispatcher.is_shutdown() { continue; } if conn.is_none() { conn = dispatcher.dispatch(); } list.push(dispatcher); } conn } #[cfg(feature = "http2")] fn exist_h2_conn( lock: &mut crate::runtime::MutexGuard>>, ) -> Option> { if let Some(dispatcher) = lock.pop() { if !dispatcher.is_shutdown() { if let Some(conn) = dispatcher.dispatch() { lock.push(dispatcher); return Some(conn); } } } None } }