wmproxy
wmproxy
将用Rust
实现http/https
代理, socks5
代理, 反向代理, 静态文件服务器,后续将实现websocket
代理, 内外网穿透等, 会将实现过程分享出来, 感兴趣的可以一起造个轮子法
项目地址
gite: https://gitee.com/tickbh/wmproxy
github: https://github.com/tickbh/wmproxy
为什么我们需要主动
主动可以让我们掌握好系统的稳定性,假设我们有一条连接不可达,连接超时的判定是5秒,需要检测失败3次才认定为失败,那么此时从我们开始检测,到判定失败需要耗时15秒。
如果此时我们是个高并发的系统,每秒的QPS是1000,我们有三个地址判定,那么此时我们有1/3的失败概率。那么在15秒内,我们会收到15000个请求,会造成5000个请求失败,如果是重要的数据,我们会丢失很多重要数据。
如果此时客户端拥有重试机制,那么客户端在失败的时候会发起重试,而且系统可能会反复的分配到那台不可达的系统,将会造成短时间内请求数激增,可能引发系统的雪崩。
所以此时我们主动知道目标端的系统稳定性极其重要。
网络访问示意图
以下是没有主动健康检查
如果出错的时候,一个请求的平均时长可能会达到(1.4s + 5s) / 2 = (3.2s)
,比正常访问多了(3.2 - 1.4) = 1.8s
,节点的宕机会对系统的稳定性产生较大的影响
以下是主动健康检查,它保证了访问后端服务器组均是正常的状态
服务器2
出错的时候,主动检查已经检查出服务器2
不可用,负载均衡的时候选择已经把服务器2
摘除,所以系统的平均耗时1.4s,系统依然保持稳定
健康检查的种类
在目前的系统中有以下两分类:
- HTTP 请求特定的方法及路径,判断返回是否得到预期的status或者body
- TCP 仅只能测试连通性,如果能连接表示正常,会出现能连接但无服务的情况
健康检查的准备
我们需要从配置中读出所有的需要健康检查的类型,即需要去重,把同一个指向的地址过滤掉
配置有可能被重新加载,所以我们需要预留发送配置的方式(或者后续类似nginx用新开进程的方式则不需要),此处做一个预留。
-
如何去重
像这种简单级别的去重通常用HashSet
复杂度为O(1)
或者用简单的Vec
复杂度为O(n)
,以SocketAddr
的为键值,判断是否有重复的数据。 -
如何保证不影响主线程
把健康请求的方法移到异步函数,用tokio::spawn
中处理,在健康检查的情况下保证不影响其它数据处理 -
如果同时处理多个地址的健康检查
每一次健康检查都会在一个异步函数中执行,在我们调用完请求后,我们会对当前该异步进行tokio::time::sleep
以让出当前CPU。 -
如何按指定间隔时间请求
因为每一次健康请求都是在异步函数中,我们不确认之前的异步是否完成,所以我们在每次请求前都记录last_request
,我们在请求前调用HealthCheck::check_can_request
判断当前是否可以发送请求来保证间隔时间内不多次请求造成服务器的压力。 -
超时连接判定处理
利用tokio::time::timeout
和future
做组合,等超时的时候直接按错误处理
部分实现源码
主要源码定义在check/active.rs
中,主要的定义两个类
/// 单项健康检查
#[derive(Debug, Clone)]
pub struct OneHealth {
/// 主动检查地址
pub addr: SocketAddr,
/// 主动检查方法, 有http/https/tcp等
pub method: String,
/// 每次检查间隔
pub interval: Duration,
/// 最后一次记录时间
pub last_record: Instant,
}
/// 主动式健康检查
pub struct ActiveHealth {
/// 所有的健康列表
pub healths: Vec<OneHealth>,
/// 接收健康列表,当配置变更时重新载入
pub receiver: Receiver<Vec<OneHealth>>,
}
我们在配置的时候获取所有需要主动检查的数据
/// 获取所有待健康检查的列表
pub fn get_health_check(&self) -> Vec<OneHealth> {
let mut result = vec![];
let mut already: HashSet<SocketAddr> = HashSet::new();
if let Some(proxy) = &self.proxy {
// ...
}
if let Some(http) = &self.http {
// ...
}
result
}
主要的检查源码,所有的最终信息都落在HealthCheck
中的静态变量里:
pub async fn do_check(&self) -> ProxyResult<()> {
// 防止短时间内健康检查的连接过多, 做一定的超时处理, 或者等上一条消息处理完毕
if !HealthCheck::check_can_request(&self.addr, self.interval) {
return Ok(())
}
if self.method.eq_ignore_ascii_case("http") {
match tokio::time::timeout(self.interval + Duration::from_secs(1), self.connect_http()).await {
Ok(r) => match r {
Ok(r) => {
if r.status().is_server_error() {
log::trace!("主动健康检查:HTTP:{}, 返回失败:{}", self.addr, r.status());
HealthCheck::add_fall_down(self.addr);
} else {
HealthCheck::add_rise_up(self.addr);
}
}
Err(e) => {
log::trace!("主动健康检查:HTTP:{}, 发生错误:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
},
Err(e) => {
log::trace!("主动健康检查:HTTP:{}, 发生超时:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
},
}
} else {
match tokio::time::timeout(Duration::from_secs(3), self.connect_http()).await {
Ok(r) => {
match r {
Ok(_) => {
HealthCheck::add_rise_up(self.addr);
}
Err(e) => {
log::trace!("主动健康检查:TCP:{}, 发生错误:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Err(e) => {
log::trace!("主动健康检查:TCP:{}, 发生超时:{:?}", self.addr, e);
HealthCheck::add_fall_down(self.addr);
}
}
}
Ok(())
}
结语
主动检查可以及时的更早的发现系统中不稳定的因素,是系统稳定性的基石,也可以通过更早的发现因素来通知运维介入,我们的目的是使系统更稳定,更健壮,处理延时更少。
点击 [关注],[在看],[点赞] 是对作者最大的支持