ESP32 + PulseSensor 全踩坑记录
本文内容较新
·
今天更新
最后更新: 2026年04月01日
预计阅读时间: 55.5 分钟
13000 字
7 图
250 字/分
哥们玩的就是心跳
起因是哥们有兴趣做一个项目, 其中的一个核心功能就是检测用户的生理状态, 包括心率, 并推算出 HRV 判断情绪波动
于是乎为了节约Money就先搞了一套 ESP32-S3-N16R8 和一个PulseSensor传感器(一个三线模拟脉搏传感器)
三个线还是很好接的罢
传感器 + -> ESP32 3V3
传感器 - -> ESP32 GND
传感器 S -> GPIO4

接完线以后链接ArduinoIDE, 烧录了一个简单的代码先测一下东西的好坏
int val = analogRead(4);
Serial.println(val);直接连接串口, 笔者用的是MAC, 于是直接cu连接, 把手放上去看到有信号输出
bernard@BernarddeMacBook-Pro ~ % ls /dev/cu.*
/dev/cu.Bluetooth-Incoming-Port /dev/cu.debug-console /dev/cu.usbmodem5B8F0730691
bernard@BernarddeMacBook-Pro ~ % sudo cu -l /dev/cu.usbmodem5B8F0730691 -s 115200
3400
3395
3410
3388我以为最开始就能看到那种心率的波形图, 结果很惨, 看到的全是高频输出的数值 3000 多的数据, 甚至手在不在上面也在乱动
去查一下才发现这个心率传感器输出的不是心率信息, 而是光强模拟电压, 而心率需要我们从其中的大电压上的微小变化提取出来, 这才是真正的心跳
然后我实现了一个简单的心率波形图的实现, 然后用Python做上位机用来读数据画图



我懵掉了, 乱七八糟的, 我尝试用一个这个电压减去一个固定的阈值来分离出心跳数据, 结果结果只是如上图
但问题是, 这个基础阈值是可能因为各种环境变化, 比如压力 光线等变化的
于是乎去查了一下知乎的一篇文章 玩的就是心跳 —— 使用 PulseSensor 脉搏传感器测量心率 看到他们用的是一种动态阈值和上升沿检测
之后我把它的算法核心分为了三步
- 第一步是一个滑动窗口, 采集最近 50 个采样点
- 第二步找出其中的最大值最小值取平均, 作为一个动态阈值
- 第三步检测穿过中线的瞬间, 即只在穿过中线的那一刻算一次心跳
这点确实是我没想到的, 我一直想的是, 在一段时间中的最大值, 作为一个检测心跳的事件, 结果跨国阈值才是记录心跳的效果远远好于这个
一旦能检测到了心率, 后面的事情就简单了, 根据 HRV 的常用算法补了一些关键数据, 这个demo就实现力

分享下我使用的代码:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from scipy import signal
# =========================
# 串口配置 上位机
# =========================
PORT = "/dev/cu.usbmodem5B8F0730691"
BAUD = 115200
# =========================
# 缓冲区长度
# =========================
MAX_WAVE_POINTS = 300
MAX_IBI_POINTS = 180 # 最近 180 个 IBI,够做短时 HRV
MIN_IBI_MS = 300
MAX_IBI_MS = 2000
# =========================
# 全局数据
# =========================
wave_data = deque([0] * MAX_WAVE_POINTS, maxlen=MAX_WAVE_POINTS)
ibi_history = deque(maxlen=MAX_IBI_POINTS)
current_bpm = 0
current_ibi = 0
last_serial_time = 0
lock = threading.Lock()
# =========================
# HRV 计算函数
# =========================
def compute_time_domain_metrics(ibi_list_ms):
"""
ibi_list_ms: list of IBI in milliseconds
"""
if len(ibi_list_ms) < 3:
return None
ibi = np.array(ibi_list_ms, dtype=float)
mean_ibi = np.mean(ibi)
median_ibi = np.median(ibi)
mean_hr = 60000.0 / mean_ibi if mean_ibi > 0 else 0.0
sdnn = np.std(ibi, ddof=1) if len(ibi) > 1 else 0.0
diff_ibi = np.diff(ibi)
rmssd = np.sqrt(np.mean(diff_ibi ** 2)) if len(diff_ibi) > 0 else 0.0
sdsd = np.std(diff_ibi, ddof=1) if len(diff_ibi) > 1 else 0.0
nn50 = int(np.sum(np.abs(diff_ibi) > 50.0))
pnn50 = (nn50 / len(diff_ibi) * 100.0) if len(diff_ibi) > 0 else 0.0
cvrr = sdnn / mean_ibi if mean_ibi > 0 else 0.0
mad = np.median(np.abs(ibi - median_ibi))
return {
"count": len(ibi),
"mean_ibi": mean_ibi,
"median_ibi": median_ibi,
"mean_hr": mean_hr,
"sdnn": sdnn,
"rmssd": rmssd,
"sdsd": sdsd,
"nn50": nn50,
"pnn50": pnn50,
"cvrr": cvrr,
"mad": mad,
"diff_ibi": diff_ibi,
}
def compute_poincare_metrics(ibi_list_ms):
"""
Poincaré plot 指标:
SD1 = sqrt(0.5) * std(diff)
SD2 = sqrt(2*SDNN^2 - 0.5*SDSD^2)
"""
if len(ibi_list_ms) < 3:
return None
ibi = np.array(ibi_list_ms, dtype=float)
diff_ibi = np.diff(ibi)
sdnn = np.std(ibi, ddof=1) if len(ibi) > 1 else 0.0
sdsd = np.std(diff_ibi, ddof=1) if len(diff_ibi) > 1 else 0.0
sd1 = np.sqrt(0.5) * sdsd
inner = max(0.0, 2 * (sdnn ** 2) - 0.5 * (sdsd ** 2))
sd2 = np.sqrt(inner)
ratio = sd1 / sd2 if sd2 > 1e-9 else 0.0
x = ibi[:-1]
y = ibi[1:]
return {
"sd1": sd1,
"sd2": sd2,
"sd1_sd2": ratio,
"x": x,
"y": y,
}
def compute_frequency_domain_metrics(ibi_list_ms):
"""
短时频域 HRV(简化版)
方法:
1. 将 IBI 转换成累计时间轴
2. 插值到均匀采样(4 Hz)
3. Welch PSD
频段:
VLF: 0.0033–0.04 Hz
LF : 0.04–0.15 Hz
HF : 0.15–0.40 Hz
"""
if len(ibi_list_ms) < 20:
return None
ibi = np.array(ibi_list_ms, dtype=float)
t = np.cumsum(ibi) / 1000.0 # seconds
t = t - t[0]
if t[-1] < 20:
return None
# 用每个间期对应的值做插值
fs_interp = 4.0
t_uniform = np.arange(0, t[-1], 1.0 / fs_interp)
if len(t_uniform) < 16:
return None
# 插值信号:IBI 序列本身
ibi_uniform = np.interp(t_uniform, t, ibi)
# 去趋势
ibi_uniform = signal.detrend(ibi_uniform)
# Welch 功率谱
nperseg = min(256, len(ibi_uniform))
if nperseg < 16:
return None
freqs, psd = signal.welch(
ibi_uniform,
fs=fs_interp,
nperseg=nperseg,
scaling="density"
)
def band_power(f, p, low, high):
mask = (f >= low) & (f < high)
if not np.any(mask):
return 0.0
return np.trapz(p[mask], f[mask])
vlf = band_power(freqs, psd, 0.0033, 0.04)
lf = band_power(freqs, psd, 0.04, 0.15)
hf = band_power(freqs, psd, 0.15, 0.40)
total = vlf + lf + hf
lf_hf = lf / hf if hf > 1e-9 else 0.0
return {
"freqs": freqs,
"psd": psd,
"vlf": vlf,
"lf": lf,
"hf": hf,
"lf_hf": lf_hf,
"total": total,
}
# =========================
# 串口读取线程
# =========================
def serial_reader():
global current_bpm, current_ibi, last_serial_time
while True:
try:
ser = serial.Serial(PORT, BAUD, timeout=1)
time.sleep(2)
while True:
line = ser.readline().decode(errors="ignore").strip()
if not line:
continue
with lock:
last_serial_time = time.time()
if line.startswith("S"):
try:
val = int(line[1:])
wave_data.append(val)
except ValueError:
pass
elif line.startswith("B"):
try:
current_bpm = int(line[1:])
except ValueError:
pass
elif line.startswith("Q"):
try:
ibi = int(line[1:])
if MIN_IBI_MS <= ibi <= MAX_IBI_MS:
current_ibi = ibi
ibi_history.append(ibi)
except ValueError:
pass
except Exception as e:
print("Serial connection/read error:", e)
time.sleep(1)
# =========================
# 主界面
# =========================
def main():
thread = threading.Thread(target=serial_reader, daemon=True)
thread.start()
fig = plt.figure(figsize=(16, 10))
gs = fig.add_gridspec(3, 3)
ax_wave = fig.add_subplot(gs[0, :2])
ax_ibi = fig.add_subplot(gs[1, :2])
ax_hist = fig.add_subplot(gs[2, 0])
ax_poincare = fig.add_subplot(gs[2, 1])
ax_psd = fig.add_subplot(gs[1:, 2])
# 波形图
line_wave, = ax_wave.plot(list(wave_data), linewidth=1.8)
ax_wave.set_title("Pulse Wave")
ax_wave.set_xlabel("Sample")
ax_wave.set_ylabel("Amplitude")
ax_wave.grid(True)
# IBI 序列图
line_ibi, = ax_ibi.plot([], [], marker='o', linewidth=1.2, markersize=3)
ax_ibi.set_title("IBI Series")
ax_ibi.set_xlabel("Beat Index")
ax_ibi.set_ylabel("IBI (ms)")
ax_ibi.grid(True)
# 右上角文本区域,挂在波形图里
bpm_text = ax_wave.text(1.02, 0.98, "BPM: --", transform=ax_wave.transAxes,
fontsize=14, va='top')
ibi_text = ax_wave.text(1.02, 0.93, "IBI: -- ms", transform=ax_wave.transAxes,
fontsize=12, va='top')
status_text = ax_wave.text(1.02, 0.88, "Status: waiting...", transform=ax_wave.transAxes,
fontsize=11, va='top')
metric_texts = []
y0 = 0.82
labels = [
"Count: --",
"Mean IBI: -- ms",
"Median IBI: -- ms",
"Mean HR: -- bpm",
"SDNN: -- ms",
"RMSSD: -- ms",
"SDSD: -- ms",
"NN50: --",
"pNN50: -- %",
"CVRR: --",
"MAD: -- ms",
"SD1: --",
"SD2: --",
"SD1/SD2: --",
"VLF: --",
"LF: --",
"HF: --",
"LF/HF: --",
]
for i, label in enumerate(labels):
t = ax_wave.text(1.02, y0 - i * 0.045, label,
transform=ax_wave.transAxes, fontsize=10.5, va='top')
metric_texts.append(t)
def update(frame):
with lock:
y_wave = list(wave_data)
ibis = list(ibi_history)
bpm = current_bpm
ibi = current_ibi
last_t = last_serial_time
# ---- 波形图 ----
line_wave.set_ydata(y_wave)
line_wave.set_xdata(range(len(y_wave)))
ymin = min(y_wave)
ymax = max(y_wave)
if ymin == ymax:
ymin -= 1
ymax += 1
margin = max(20, int((ymax - ymin) * 0.2))
ax_wave.set_xlim(0, MAX_WAVE_POINTS - 1)
ax_wave.set_ylim(ymin - margin, ymax + margin)
bpm_text.set_text(f"BPM: {bpm if bpm > 0 else '--'}")
ibi_text.set_text(f"IBI: {ibi if ibi > 0 else '--'} ms")
if time.time() - last_t < 2:
status_text.set_text("Status: receiving data")
else:
status_text.set_text("Status: no serial data")
# ---- IBI 时序图 ----
ax_ibi.cla()
ax_ibi.set_title("IBI Series")
ax_ibi.set_xlabel("Beat Index")
ax_ibi.set_ylabel("IBI (ms)")
ax_ibi.grid(True)
if len(ibis) > 0:
ax_ibi.plot(range(len(ibis)), ibis, marker='o', linewidth=1.2, markersize=3)
# ---- 直方图 ----
ax_hist.cla()
ax_hist.set_title("IBI Histogram")
ax_hist.set_xlabel("IBI (ms)")
ax_hist.set_ylabel("Count")
if len(ibis) > 1:
ax_hist.hist(ibis, bins=min(20, max(5, len(ibis)//3)))
# ---- Poincaré Plot ----
ax_poincare.cla()
ax_poincare.set_title("Poincaré Plot")
ax_poincare.set_xlabel("IBI(n) ms")
ax_poincare.set_ylabel("IBI(n+1) ms")
ax_poincare.grid(True)
poincare = compute_poincare_metrics(ibis)
if poincare is not None and len(poincare["x"]) > 0:
ax_poincare.scatter(poincare["x"], poincare["y"], s=18, alpha=0.7)
all_points = np.concatenate([poincare["x"], poincare["y"]])
mn, mx = np.min(all_points), np.max(all_points)
pad = max(20, (mx - mn) * 0.1 if mx > mn else 20)
ax_poincare.set_xlim(mn - pad, mx + pad)
ax_poincare.set_ylim(mn - pad, mx + pad)
ax_poincare.plot([mn - pad, mx + pad], [mn - pad, mx + pad], linestyle='--')
# ---- 频谱图 ----
ax_psd.cla()
ax_psd.set_title("HRV PSD")
ax_psd.set_xlabel("Frequency (Hz)")
ax_psd.set_ylabel("PSD")
ax_psd.grid(True)
time_metrics = compute_time_domain_metrics(ibis)
freq_metrics = compute_frequency_domain_metrics(ibis)
if freq_metrics is not None:
ax_psd.plot(freq_metrics["freqs"], freq_metrics["psd"])
ax_psd.set_xlim(0, 0.5)
# ---- 文本指标 ----
values = ["--"] * len(metric_texts)
if time_metrics is not None:
values[0] = f"Count: {time_metrics['count']}"
values[1] = f"Mean IBI: {time_metrics['mean_ibi']:.1f} ms"
values[2] = f"Median IBI: {time_metrics['median_ibi']:.1f} ms"
values[3] = f"Mean HR: {time_metrics['mean_hr']:.1f} bpm"
values[4] = f"SDNN: {time_metrics['sdnn']:.1f} ms"
values[5] = f"RMSSD: {time_metrics['rmssd']:.1f} ms"
values[6] = f"SDSD: {time_metrics['sdsd']:.1f} ms"
values[7] = f"NN50: {time_metrics['nn50']}"
values[8] = f"pNN50: {time_metrics['pnn50']:.1f} %"
values[9] = f"CVRR: {time_metrics['cvrr']:.4f}"
values[10] = f"MAD: {time_metrics['mad']:.1f} ms"
if poincare is not None:
values[11] = f"SD1: {poincare['sd1']:.1f}"
values[12] = f"SD2: {poincare['sd2']:.1f}"
values[13] = f"SD1/SD2: {poincare['sd1_sd2']:.4f}"
if freq_metrics is not None:
values[14] = f"VLF: {freq_metrics['vlf']:.2f}"
values[15] = f"LF: {freq_metrics['lf']:.2f}"
values[16] = f"HF: {freq_metrics['hf']:.2f}"
values[17] = f"LF/HF: {freq_metrics['lf_hf']:.3f}"
for txt, val in zip(metric_texts, values):
txt.set_text(val)
return [line_wave, bpm_text, ibi_text, status_text, *metric_texts]
ani = FuncAnimation(fig, update, interval=200, blit=False)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()// ===== ESP32-S3 + PulseSensor 模拟脉搏传感器 =====
// 接线:
// + -> 3V3
// - -> GND
// S -> GPIO4
const int SENSOR_PIN = 4;
const int SAMPLE_PERIOD_MS = 20; // 20ms 一次,50Hz
const int BUFFER_SIZE = 50; // 50 * 20ms = 1秒窗口
int dataBuf[BUFFER_SIZE];
int bufIndex = 0;
int readData = 0;
int preReadData = 0;
int sigOut = 0; // 发给上位机显示的波形值
int signalMin = 4095;
int signalMax = 0;
int mid = 2048; // 动态阈值
int filterNoise = 30; // 突变滤波阈值,后面会动态调整
bool pulseState = false;
bool prePulseState = false;
unsigned long sampleCount = 0; // 采样计数
unsigned long firstBeatTime = 0;
unsigned long secondBeatTime = 0;
unsigned long lastBeatTime = 0;
int pulseCount = 0;
int IBI = 0; // ms
int BPM = 0;
void setup() {
Serial.begin(115200);
analogReadResolution(12); // 0~4095
delay(500);
int initVal = analogRead(SENSOR_PIN);
for (int i = 0; i < BUFFER_SIZE; i++) {
dataBuf[i] = initVal;
}
mid = initVal;
}
int getArrayMax(int *arr, int len) {
int m = arr[0];
for (int i = 1; i < len; i++) {
if (arr[i] > m) m = arr[i];
}
return m;
}
int getArrayMin(int *arr, int len) {
int m = arr[0];
for (int i = 1; i < len; i++) {
if (arr[i] < m) m = arr[i];
}
return m;
}
void loop() {
preReadData = readData;
readData = analogRead(SENSOR_PIN);
// 过滤特别离谱的瞬时突变
if (abs(readData - preReadData) < filterNoise || bufIndex == 0) {
dataBuf[bufIndex++] = readData;
} else {
dataBuf[bufIndex++] = preReadData;
}
// 波形缓存满了就重算动态阈值
if (bufIndex >= BUFFER_SIZE) {
bufIndex = 0;
signalMax = getArrayMax(dataBuf, BUFFER_SIZE);
signalMin = getArrayMin(dataBuf, BUFFER_SIZE);
mid = (signalMax + signalMin) / 2;
// 动态噪声阈值
filterNoise = (signalMax - signalMin) / 2;
if (filterNoise < 15) filterNoise = 15;
if (filterNoise > 300) filterNoise = 300;
}
prePulseState = pulseState;
pulseState = (readData > mid);
// 输出给上位机画图的波形,减去中线后更直观
sigOut = readData - mid;
Serial.print("S");
Serial.println(sigOut);
// 检测“从低到高穿过中线”的特征点
if (!prePulseState && pulseState) {
unsigned long nowMs = sampleCount * SAMPLE_PERIOD_MS;
// 心跳最短间隔保护,防止噪声连续触发
if (lastBeatTime == 0 || (nowMs - lastBeatTime) > 300) {
pulseCount++;
pulseCount %= 2;
if (pulseCount == 1) {
firstBeatTime = nowMs;
} else {
secondBeatTime = nowMs;
if (secondBeatTime > firstBeatTime) {
IBI = secondBeatTime - firstBeatTime;
if (IBI > 300 && IBI < 2000) {
BPM = 60000 / IBI;
if (BPM < 30) BPM = 30;
if (BPM > 200) BPM = 200;
Serial.print("B");
Serial.println(BPM);
Serial.print("Q");
Serial.println(IBI);
}
}
}
lastBeatTime = nowMs;
}
}
sampleCount++;
delay(SAMPLE_PERIOD_MS);
}
评论 暂无