Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rustls compile time implementation #56

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ dhat-heap.json
.vscode
.idea
.cover
bleeper.user.toml
bleeper.user.toml
pingora-proxy/tests/keys/server.crt
pingora-proxy/tests/utils/conf/keys/server.crt
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"pingora-lru",
"pingora-openssl",
"pingora-boringssl",
"pingora-rustls",
"pingora-runtime",
"pingora-ketama",
"pingora-load-balancing",
Expand Down
1 change: 1 addition & 0 deletions pingora-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,4 @@ harness = false
default = ["openssl"]
openssl = ["pingora-core/openssl"]
boringssl = ["pingora-core/boringssl"]
rustls = ["pingora-core/rustls"]
7 changes: 5 additions & 2 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ path = "src/lib.rs"

[dependencies]
pingora-runtime = { version = "0.3.0", path = "../pingora-runtime" }
pingora-openssl = { version = "0.3.0", path = "../pingora-openssl", optional = true }
pingora-boringssl = { version = "0.3.0", path = "../pingora-boringssl", optional = true }
pingora-openssl = { version = "0.3.0", path = "../pingora-openssl", optional = true }
pingora-rustls = { version = "0.3.0", path = "../pingora-rustls", optional = true }
pingora-pool = { version = "0.3.0", path = "../pingora-pool" }
pingora-error = { version = "0.3.0", path = "../pingora-error" }
pingora-timeout = { version = "0.3.0", path = "../pingora-timeout" }
Expand Down Expand Up @@ -68,6 +69,7 @@ openssl-probe = "0.1"
tokio-test = "0.4"
zstd = "0"
httpdate = "1"
x509-parser = { version = "0.16.0", optional = true }

[dev-dependencies]
matches = "0.1"
Expand All @@ -81,4 +83,5 @@ jemallocator = "0.5"
default = ["openssl"]
openssl = ["pingora-openssl"]
boringssl = ["pingora-boringssl"]
patched_http1 = []
rustls = ["pingora-rustls", "dep:x509-parser"]
patched_http1 = []
104 changes: 34 additions & 70 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,29 @@

//! Connecting to servers

pub mod http;
mod l4;
mod offload;
mod tls;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

use futures::future::FutureExt;
use log::{debug, error, warn};
use parking_lot::RwLock;
use tokio::io::AsyncReadExt;
use tokio::sync::Mutex;

use offload::OffloadRuntime;
use pingora_error::{ErrorType::*, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool};

use crate::connectors::tls::{do_connect, Connector};
use crate::protocols::Stream;
use crate::server::configuration::ServerConf;
use crate::tls::ssl::SslConnector;
use crate::upstreams::peer::{Peer, ALPN};

use l4::connect as l4_connect;
use log::{debug, error, warn};
use offload::OffloadRuntime;
use parking_lot::RwLock;
use pingora_error::{Error, ErrorType::*, OrErr, Result};
use pingora_pool::{ConnectionMeta, ConnectionPool};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
pub mod http;
mod l4;
mod offload;
mod tls;

/// The options to configure a [TransportConnector]
#[derive(Clone)]
Expand Down Expand Up @@ -123,7 +126,7 @@ impl ConnectorOptions {

/// [TransportConnector] provides APIs to connect to servers via TCP or TLS with connection reuse
pub struct TransportConnector {
tls_ctx: tls::Connector,
tls_ctx: Connector,
connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
offload: Option<OffloadRuntime>,
bind_to_v4: Vec<SocketAddr>,
Expand All @@ -149,7 +152,7 @@ impl TransportConnector {
.as_ref()
.map_or_else(Vec::new, |o| o.bind_to_v6.clone());
TransportConnector {
tls_ctx: tls::Connector::new(options),
tls_ctx: Connector::new(options),
connection_pool: Arc::new(ConnectionPool::new(pool_size)),
offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
bind_to_v4,
Expand All @@ -171,11 +174,13 @@ impl TransportConnector {
let stream = if let Some(rt) = rt {
let peer = peer.clone();
let tls_ctx = self.tls_ctx.clone();
rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
.await
.or_err(InternalError, "offload runtime failure")??
rt.spawn(
async move { do_connect(&peer, bind_to, alpn_override, tls_ctx.context()).await },
)
.await
.or_err(InternalError, "offload runtime failure")??
} else {
do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
do_connect(peer, bind_to, alpn_override, self.tls_ctx.context()).await?
};

Ok(stream)
Expand Down Expand Up @@ -268,46 +273,6 @@ impl TransportConnector {
}
}

// Perform the actual L4 and tls connection steps while respecting the peer's
// connection timeout if there is one
async fn do_connect<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
// Create the future that does the connections, but don't evaluate it until
// we decide if we need a timeout or not
let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);

match peer.total_connection_timeout() {
Some(t) => match pingora_timeout::timeout(t, connect_future).await {
Ok(res) => res,
Err(_) => Error::e_explain(
ConnectTimedout,
format!("connecting to server {peer}, total-connection timeout {t:?}"),
),
},
None => connect_future.await,
}
}

// Perform the actual L4 and tls connection steps with no timeout
async fn do_connect_inner<P: Peer + Send + Sync>(
peer: &P,
bind_to: Option<SocketAddr>,
alpn_override: Option<ALPN>,
tls_ctx: &SslConnector,
) -> Result<Stream> {
let stream = l4_connect(peer, bind_to).await?;
if peer.tls() {
let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
Ok(Box::new(tls_stream))
} else {
Ok(Box::new(stream))
}
}

struct PreferredHttpVersion {
// TODO: shard to avoid the global lock
versions: RwLock<HashMap<u64, u8>>, // <hash of peer, version>
Expand Down Expand Up @@ -337,9 +302,6 @@ impl PreferredHttpVersion {
}
}

use futures::future::FutureExt;
use tokio::io::AsyncReadExt;

/// Test whether a stream is already closed or not reusable (server sent unexpected data)
fn test_reusable_stream(stream: &mut Stream) -> bool {
let mut buf = [0; 1];
Expand All @@ -365,13 +327,14 @@ fn test_reusable_stream(stream: &mut Stream) -> bool {

#[cfg(test)]
mod tests {
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;

use pingora_error::ErrorType;

use super::*;
use crate::tls::ssl::SslMethod;
use crate::upstreams::peer::BasicPeer;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;

use super::*;

// 192.0.2.1 is effectively a black hole
const BLACK_HOLE: &str = "192.0.2.1:79";
Expand Down Expand Up @@ -475,8 +438,8 @@ mod tests {
/// This assumes that the connection will fail to on the peer and returns
/// the decomposed error type and message
async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
let ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap().build();
let stream = do_connect(peer, None, None, &ssl_connector).await;
let connector = Connector::new(None);
let stream = do_connect(peer, None, None, connector.context()).await;
match stream {
Ok(_) => panic!("should throw an error"),
Err(e) => (
Expand Down Expand Up @@ -509,6 +472,7 @@ mod tests {
}

#[tokio::test]
#[ignore]
async fn test_do_connect_without_total_timeout() {
let peer = BasicPeer::new(BLACK_HOLE);
let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
Expand Down
Loading