ESP32 + IIC屏幕播放badapple!!!
本文内容较新
·
今天更新
最后更新: 2026年04月02日
预计阅读时间: 39.8 分钟
9940 字
250 字/分
经典入门成就 --- 有屏幕的地方就有BadApple!!! 我的设计很简单, 就是下面这样的
整个系统分为两部分:
1. 下位机(ESP32)
* 接收串口数据
* 解析图像帧
* 渲染到 OLED
2. 上位机(Python)
* 读取图片 / 生成动画
* 转换为 1bit 图像
* 按协议发送
我用的屏幕是SSD1306的 I2C 0.96英寸 OLED 屏幕, 这个屏幕使用页寻址模式, 分辨率 128*64, 每八行为一页, 总共八页, 每列的八个像素是一个字节
为了保证数据同步, 搞了一个简单的帧协议
['B']['A'][1024 bytes]
B 和 A 是帧头, 用于同步, 紧跟着的 1024 字节的完整的图像数据
但不知道是我这个屏幕的问题, 还是所有的I2C屏幕都这样, 最顶部的八行和最底部的八行显示的内容是相同的, 所以我还搞了个-8的offset, 才让图像正确显示
#include <Wire.h>
#include <Adafruit_GFX.h>
#include <Adafruit_SSD1306.h>
//头文件与驱动
//接口和屏幕定义
#define SCREEN_WIDTH 128
#define SCREEN_HEIGHT 64
#define OLED_ADDR 0x3C
#define I2C_SDA 8
#define I2C_SCL 9
//图像整体下移 8 行
#define Y_OFFSET -8
Adafruit_SSD1306 display(SCREEN_WIDTH, SCREEN_HEIGHT, &Wire, -1);
static uint8_t frameBuffer[SCREEN_WIDTH * SCREEN_HEIGHT / 8];
//定义协议
enum RecvState {
WAIT_HEADER_1,
WAIT_HEADER_2,
READ_FRAME
};
RecvState recvState = WAIT_HEADER_1;
int frameIndex = 0;
//绘制函数
void drawFrame() {
//清空之前的内容
display.clearDisplay();
// 遍历每一页
for (int page = 0; page < 8; page++) {
// 遍历每一列(每页有128 列, 8 行)
for (int x = 0; x < 128; x++) {
// 从frameBuffer中取出一个字节
uint8_t b = frameBuffer[page * 128 + x];
// 遍历每行, 准备渲染信息
for (int bit = 0; bit < 8; bit++) {
int y = page * 8 + bit - Y_OFFSET;
// 处理偏移, 超出范围的直接continue
if (y < 0 || y >= 64) continue;
// 做移位运算, 逐个点亮像素
if (b & (1 << bit)) {
display.drawPixel(x, y, SSD1306_WHITE);
}
}
}
}
display.display();
}
//初始化函数
void setup() {
// 设置波特率
Serial.begin(921600);
// 等待~我随时随地在等待~
delay(500);
// 启动I2C总线
Wire.begin(I2C_SDA, I2C_SCL);
// 启动屏幕
if (!display.begin(SSD1306_SWITCHCAPVCC, OLED_ADDR)) {
while (1) delay(1000);
}
// 清除信息 设定基本文字
display.clearDisplay();
display.setTextSize(1);
display.setTextColor(SSD1306_WHITE);
display.setCursor(0, 0);
display.println("Offset Mode");
display.display();
}
void loop() {
// 只要没死就一直干活
while (Serial.available() > 0) {
// 读字节
uint8_t b = Serial.read();
switch (recvState) {
case WAIT_HEADER_1:
if (b == 'B') recvState = WAIT_HEADER_2;
break;
case WAIT_HEADER_2:
if (b == 'A') {
frameIndex = 0;
recvState = READ_FRAME;
} else {
recvState = WAIT_HEADER_1;
}
break;
// 检测头, 连续接到 BA 即开始读取屏幕内容
case READ_FRAME:
frameBuffer[frameIndex++] = b; //把屏幕信息储存至frameBuffer 内
if (frameIndex >= sizeof(frameBuffer)) { //接收完成, 开始绘制
drawFrame();
recvState = WAIT_HEADER_1; // 重置状态机
}
break;
}
}
}当然, 还需要一个上位机捕获屏幕内容, 然后通过串口打包成 BA1024 的格式发送到 ESP32 来显示
上位机一如既往的选择Python来写, 代码如下
import threading
import time
import tkinter as tk
import mss
import serial
from PIL import Image
# =========================
# 基本配置
# =========================
#定义串口 波特率 帧率 屏幕大小等
SERIAL_PORT = "/dev/cu.usbmodem5B8F0730691"
BAUDRATE = 921600
FPS = 10
OLED_W = 128
OLED_H = 64
FRAME_BYTES = OLED_W * OLED_H // 8
# 采集框比例:Bad Apple 用的4:3 干脆就这样了
CAPTURE_ASPECT = 4 / 3
# 黑白阈值
THRESHOLD = 128
# 反色开关:如果黑白反了,就改成 True
INVERT = False
# 最小选框宽高
MIN_W = 240
MIN_H = int(MIN_W / CAPTURE_ASPECT)
# =========================
# OLED 帧格式转换
# =========================
def image_to_ssd1306_bytes(img: Image.Image) -> bytes:
"""
把 1-bit PIL 图像转换成 SSD1306 page 格式的 1024 字节
"""
if img.mode != "1":
img = img.convert("1")
# 图像全部转换为二值图像
# 得到像素对象
pixels = img.load()
# 创建输出缓冲区, 存储输出信息
out = bytearray(FRAME_BYTES)
for page in range(8): # 64 / 8 = 8页
for x in range(OLED_W): # 遍历每列
byte_val = 0 # 存储竖直方向的八个像素
for bit in range(8): # 处理每个字节
y = page * 8 + bit # 计算这个像素应该在的 Y 坐标
pixel_on = (pixels[x, y] != 0) # 从像素对象转换为点亮信息
if INVERT:
pixel_on = not pixel_on
if pixel_on:
byte_val |= (1 << bit)
out[page * OLED_W + x] = byte_val
return bytes(out)
def process_frame(raw_img: Image.Image) -> bytes:
"""
把任意截图:
1. 转灰度
2. 等比缩放进 128x64
3. 黑边填充
4. 二值化
5. 转 SSD1306 数据
"""
src = raw_img.convert("L") # 转为灰度图
src_w, src_h = src.size
src_aspect = src_w / src_h
oled_aspect = OLED_W / OLED_H
# 等比缩放进 OLED 画布
if src_aspect > oled_aspect:
# 图更宽:铺满宽度
new_w = OLED_W
new_h = int(round(OLED_W / src_aspect))
else:
# 图更窄:铺满高度
new_h = OLED_H
new_w = int(round(OLED_H * src_aspect))
resized = src.resize((new_w, new_h), Image.Resampling.LANCZOS)
# 黑底画布
canvas = Image.new("L", (OLED_W, OLED_H), 0)
# 居中粘贴
paste_x = (OLED_W - new_w) // 2
paste_y = (OLED_H - new_h) // 2
canvas.paste(resized, (paste_x, paste_y))
# 二值化
bw = canvas.point(lambda p: 255 if p > THRESHOLD else 0, mode="1")
return image_to_ssd1306_bytes(bw)
# =========================
# 可视化选框窗口
# =========================
class CaptureOverlay:
def __init__(self):
self.root = tk.Tk()
self.root.geometry("640x480+200+120") # 4:3
self.root.configure(bg="black")
self.root.attributes("-alpha", 0.28)
self.root.attributes("-topmost", True)
self.root.overrideredirect(True)
self.canvas = tk.Canvas(
self.root,
bg="black",
highlightthickness=3,
highlightbackground="red"
)
self.canvas.pack(fill="both", expand=True)
self.topbar = tk.Frame(self.root, bg="#660000", height=34)
self.topbar.place(x=0, y=0, relwidth=1)
self.info = tk.Label(
self.topbar,
text="拖动移动 | 右下角缩放(锁定4:3) | Enter开始 | Esc退出",
bg="#660000",
fg="white",
font=("Arial", 12)
)
self.info.pack(side="left", padx=8)
self.start_btn = tk.Button(
self.topbar,
text="Start",
command=self.start_stream,
bg="#aa2222",
fg="white",
relief="flat",
padx=10
)
self.start_btn.pack(side="right", padx=6, pady=4)
self.handle_size = 20
self.handle = self.canvas.create_rectangle(
0, 0, self.handle_size, self.handle_size,
fill="white", outline="red", width=2
)
self.drag_mode = None
self.start_mouse_x = 0
self.start_mouse_y = 0
self.start_win_x = 0
self.start_win_y = 0
self.start_w = 0
self.start_h = 0
self.streaming = False
self.capture_region = None
for w in (self.root, self.canvas, self.info, self.topbar):
w.bind("<Button-1>", self.on_mouse_down)
w.bind("<B1-Motion>", self.on_mouse_drag)
w.bind("<ButtonRelease-1>", self.on_mouse_up)
w.bind("<Button-1>", self.force_focus, add="+")
self.root.bind_all("<Return>", self.start_stream)
self.root.bind_all("<Escape>", self.exit_app)
self.root.after(50, self.refresh_ui)
self.root.after(200, self.force_focus)
def force_focus(self, event=None):
try:
self.root.lift()
self.root.focus_force()
except Exception:
pass
def refresh_ui(self):
w = max(self.root.winfo_width(), MIN_W)
h = max(self.root.winfo_height(), MIN_H)
s = self.handle_size
self.canvas.coords(self.handle, w - s - 4, h - s - 4, w - 4, h - 4)
if self.root.winfo_exists():
self.root.after(50, self.refresh_ui)
def point_in_handle(self, local_x, local_y):
w = self.root.winfo_width()
h = self.root.winfo_height()
s = self.handle_size
return local_x >= w - s - 8 and local_y >= h - s - 8
def on_mouse_down(self, event):
self.start_mouse_x = event.x_root
self.start_mouse_y = event.y_root
self.start_win_x = self.root.winfo_x()
self.start_win_y = self.root.winfo_y()
self.start_w = self.root.winfo_width()
self.start_h = self.root.winfo_height()
local_x = event.x_root - self.root.winfo_x()
local_y = event.y_root - self.root.winfo_y()
if self.point_in_handle(local_x, local_y):
self.drag_mode = "resize"
else:
self.drag_mode = "move"
def on_mouse_drag(self, event):
dx = event.x_root - self.start_mouse_x
dy = event.y_root - self.start_mouse_y
if self.drag_mode == "move":
self.root.geometry(
f"{self.start_w}x{self.start_h}+{self.start_win_x + dx}+{self.start_win_y + dy}"
)
elif self.drag_mode == "resize":
# 锁定 4:3 比例
candidate_w_from_dx = self.start_w + dx
candidate_w_from_dy = self.start_w + int(dy * CAPTURE_ASPECT)
if abs(dx) >= abs(dy * CAPTURE_ASPECT):
new_w = candidate_w_from_dx
else:
new_w = candidate_w_from_dy
new_w = max(MIN_W, new_w)
new_h = int(round(new_w / CAPTURE_ASPECT))
self.root.geometry(
f"{new_w}x{new_h}+{self.start_win_x}+{self.start_win_y}"
)
def on_mouse_up(self, event):
self.drag_mode = None
def get_region(self):
# 再次强制修正为精确 4:3
x = self.root.winfo_x()
y = self.root.winfo_y()
w = self.root.winfo_width()
h = int(round(w / CAPTURE_ASPECT))
return {
"left": x,
"top": y,
"width": w,
"height": h,
}
def start_stream(self, event=None):
if self.streaming:
return
self.capture_region = self.get_region()
print("capture region:", self.capture_region, flush=True)
self.streaming = True
self.root.withdraw()
threading.Thread(target=self.stream_loop, daemon=True).start()
def exit_app(self, event=None):
print("exit", flush=True)
self.streaming = False
try:
self.root.destroy()
except Exception:
pass
def stream_loop(self):
try:
print("opening serial:", SERIAL_PORT, flush=True)
ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
time.sleep(3.0) # 等 ESP32 重启完
print("serial opened", flush=True)
except Exception as e:
print("串口打开失败:", e, flush=True)
self.streaming = False
try:
self.root.deiconify()
self.force_focus()
except Exception:
pass
return
frame_interval = 1.0 / FPS
sent = 0
with mss.mss() as sct:
while self.streaming:
start = time.time()
try:
shot = sct.grab(self.capture_region)
# 关键修复:用 BGRA 原始数据正确解码,避免底部出现顶部内容
img = Image.frombytes("RGB", shot.size, shot.bgra, "raw", "BGRX")
frame = process_frame(img)
ser.write(b"BA")
ser.write(frame)
ser.flush()
sent += 1
if sent % 10 == 0:
print(f"sent frames: {sent}", flush=True)
except Exception as e:
print("stream error:", e, flush=True)
break
elapsed = time.time() - start
sleep_time = frame_interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
try:
ser.close()
except Exception:
pass
print("stream ended", flush=True)
self.streaming = False
def run(self):
self.force_focus()
self.root.mainloop()
if __name__ == "__main__":
CaptureOverlay().run()
评论 暂无