简单OTA系统的设计与实现
博主在24年5月在学校的OPPO实验室里负责物联网的OTA设计,只在博客里留下了一些莫名其妙的设计,这篇文章就是来补一下坑,从整体架构到细节方面介绍一下整个OTA系统的设计,第一次设计CS系统,有很多粗糙的甚至不安全不合理的地方,我以后也会慢慢形成思路进行改进
1. 系统架构
整个 OTA 系统采用典型的 C/S (Client-Server) 架构,并引入了一个独立的注册/管理节点来协调更新流程。
1.1 核心组件
Server (OTA 文件服务器)
- 职责: 存储升级包(Zip文件)及其元数据(version/content.json)。提供版本查询 API 和文件下载服务。
- 角色: 这一组件相当于“仓库”,只负责“给我最新的版本号”和“给我文件”这两个简单的请求。
Register (设备注册与管理中心)
- 职责: 维护所有 IoT 设备的列表、状态(在线/离线)以及当前安装的软件包版本。提供管理控制台(Dashboard),管理员在此进行更新发布。
- 角色: 这一组件是系统的“指挥单位”,它知道哪些设备需要更新,并向具体设备在什么时候发送更新指令。
Client (IoT 设备终端)
- 职责: 运行在具体的硬件设备上。该客户端包含常驻进程,负责向 Register 汇报心跳,接收 Register 下发的更新指令,并从 Server 下载文件执行具体的更新脚本。
- 角色: 在具体的硬件设备上执行更新
2. 模块原理解析
2.1 Server 端 (server/)
Server 端的核心是一个 Flask 应用,主要负责版本控制和文件托管。
API 接口 (
server/api.py):/latestVersion: 接收package和branch参数,查询数据库,返回最大版本号的content.json信息。/getVersion: 查询特定版本的详细信息。/ota-files/<path:filename>: 静态文件服务,用于提供升级包下载。/login&/manage: 简单的管理员登录和页面托管。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def maxVersion() -> str: # 获取最新版本
dic = {"status": 200, "content.json": {}}
package = request.args.get("package")
branch = request.args.get("branch")
if package is None or branch is None: # 检查参数
dic["status"] = 400
return str(json.dumps(dic))
content = vM.getMaxVersion(package, branch)
if (content == None):
dic["status"] = 404
return str(json.dumps(dic))
else:
dic["content.json"] = content
return str(json.dumps(dic))版本管理 (
server/versionManager.py):- 存储机制: 使用 MySQL 数据库的
ota表存储版本信息。核心字段包括name(包名),branch(分支),version(版本号),content(JSON格式的元数据)。 - 查询逻辑: 直接执行 SQL 语句
SELECT ... ORDER BY version DESC来获取最新版本。
1
2
3
4
5
6def getMaxVersion(package: str, branch: str) -> dict: # 获取最新版本的信息
# ... 连接数据库 ...
cursor.execute("SELECT content FROM ota WHERE name='%s' AND branch = '%s' ORDER BY version DESC" % (
package, branch))
results = cursor.fetchall()
# ... 关闭数据库 ...- 存储机制: 使用 MySQL 数据库的
2.2 Register 端 (register/)
Register 端不仅是一个注册中心,实际上承担了任务调度器的角色。
1 | while True: # 监测进程是否存活 |
设备管理 (
register/deviceManager.py):- 设备注册:
registerDevice函数接收客户端上传的 metadata,生成唯一的device_id并存入devices数据表。 - 任务队列: 系统维护了一个全局变量
updateList作为任务队列。当管理员下发更新时,任务被追加到此列表。 - 轮询与更新:
updateNext函数采用递归调用的方式,从updateList取出任务,调用update函数发起网络请求通知客户端。
- 设备注册:
API 接口 (
register/registerServer.py):/heartbeat: 接收客户端心跳,更新设备的last_active时间戳。/api/updatePackage: 管理员调用的接口,将更新任务加入具体的updateList队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def startUpdate(): # 开始更新
# ...
try:
if (not updateQueue.empty()): # 队列不为空
res["status"] = 400
res["error"] = "Queue not empty"
return str(json.dumps(res))
# client/update.py
def update(update_queue):
# ... 初始化日志 ...
while (True):
if (not update_queue.empty()): # 如果队列不为空
package: update_package = update_queue.get() # 从队列中获取一个更新包
try:
# ... 检查 device.json ...
logger.info(package.package_json["package"]+" update start")
if (not package.startUpdate()): # 执行具体的更新逻辑
logger.info(package.package_json["package"]+" update failed")
continue
# ... 更新 device.json ...
except BaseException as e:
logger.info(package.package_json["package"]+" update error")
time.sleep(1)
# ... 获取 content ...
t = update_package(dic, register_path, device_id, None) # 创建更新包
updateQueue.put(t) # 加入队列
return str(json.dumps(res))
except Exception as e:
# ...
2.3 Client 端 (client/)
客户端的设计最为复杂,采用了多进程模型来确保服务的稳定性。
双进程守护 (
client/daemon.py):- 设计原理: 这是客户端的入口。它启动两个子进程
http_server(负责通信) 和update(负责具体安装)。 - 监控机制: 父进程在一个
while True循环中每秒检查一次子进程状态。如果任何一个子进程死亡,父进程会立即杀死另一个子进程并退出。
- 设计原理: 这是客户端的入口。它启动两个子进程
HTTPServer 子进程 (
client/http_server.py):- 角色: 它是设备上的 Web Server。之所以客户端需要跑一个 Server,是因为 Register 端是主动“推”消息的(Push 模型),而不是客户端一直“拉” (Pull)。
- 路由:
/heartbeat: 独立线程向 Register 发送心跳。/startUpdate: 接收 Register 的 POST 请求。注意,它不直接执行更新,而是将接收到的content.json放入updateQueue(进程间队列)。这是一个典型的生产者-消费者模型。
Update 子进程 (
client/update.py&client/update_package.py):- 角色: 消费者。它从
updateQueue取出任务。 - 更新流程 (State Machine):
- BeforeUpdate: 运行预处理脚本。
- Downloading: 从 Server 下载 zip 包。
- Updating: 解压覆盖文件。
- AfterUpdate: 运行后处理脚本。
- Restore: 如果中间失败,尝试恢复备份。
- 状态上报: 每一步的状态变化都会通过
/updateInfo接口回传给 Register Server,形成闭环反馈。
- 角色: 消费者。它从
3. 核心流程
3.1 设备上线流程
- Client 启动
daemon.py。 - Client 加载
device.json,读取自身的 ID 和 Register 地址。 - Client 的
http_server.py启动心跳线程,每10秒向 Register 发送一次 HTTP POST/heartbeat。 - Register 收到心跳,更新数据库中该设备的
last_active时间。
3.2 软件更新分发流程
这是一个涉及三方协作的过程:
- 触发: 管理员在 Dashboard 点击“更新”,调用 Register 的
/api/updatePackage。 - 入队: Register 将任务
{device_id, package, version}加入内存队列updateList。 - 调度:
updateNext()被触发,向目标 Client 的 HTTP Server 发送/startUpdate请求,携带content.json。 - 接收: Client 的 HTTP Server 收到请求,校验合法性后,将 Content Put 入
updateQueue。 - 执行: Client 的 Update 进程从 Queue Get 任务:
- 解析
remote字段,向 Server 请求下载文件。 - Server 响应
/ota-files/...返回 ZIP 包。 - Client 解压并执行更新逻辑。
- 解析
- 反馈: Client 在执行的每个阶段,都向 Register 的
/updateInfo发送进度报告。
4. 关键数据结构
4.1 content.json (核心元数据)
这是贯穿整个系统的核心数据包,定义了“如何更新”。
1 | { |
4.2 数据库 Schema
- table
devices: 存储设备信息,核心字段是content,存储了一份 JSON 格式的当前软件包状态。 - table
ota: 存储发布的版本信息,实际的文件内容并未存库,只存储了content.json的字符串。
5. 代码实现细节
5.1 进程间通信 (IPC)
Client 端巧妙地使用了 Python 的 multiprocessing.Queue 进行 IPC。
http_server进程接收网络 IO,属于 IO 密集型。update进程涉及文件解压和脚本执行,属于 CPU/磁盘 IO 密集型,且可能阻塞很久。- 使用 Queue 解耦,保证了即使更新脚本卡死,HTTP 心跳依然能发送,防止设备在 Register 端显示“掉线”。
5.2 错误处理机制
系统中存在多层级的错误处理:
- 网络层:
requests请求大都包裹在try-except块中。 - 进程层:
daemon.py监控子进程存活。 - 业务层: 更新失败会触发
restore脚本,尝试回滚到上一版本。
6. 技术栈总结
- 语言: Python 3.x
- Web 框架: Flask
- 数据库: MySQL (PyMySQL 驱动)
- 前端: Bootstrap + jQuery
- 并发模型: Python Multiprocessing (多进程)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 四叶草の博客!








