博主在24年5月在学校的OPPO实验室里负责物联网的OTA设计,只在博客里留下了一些莫名其妙的设计,这篇文章就是来补一下坑,从整体架构到细节方面介绍一下整个OTA系统的设计,第一次设计CS系统,有很多粗糙的甚至不安全不合理的地方,我以后也会慢慢形成思路进行改进


1. 系统架构

整个 OTA 系统采用典型的 C/S (Client-Server) 架构,并引入了一个独立的注册/管理节点来协调更新流程。

1.1 核心组件

  1. Server (OTA 文件服务器)

    • 职责: 存储升级包(Zip文件)及其元数据(version/content.json)。提供版本查询 API 和文件下载服务。
    • 角色: 这一组件相当于“仓库”,只负责“给我最新的版本号”和“给我文件”这两个简单的请求。
  2. Register (设备注册与管理中心)

    • 职责: 维护所有 IoT 设备的列表、状态(在线/离线)以及当前安装的软件包版本。提供管理控制台(Dashboard),管理员在此进行更新发布。
    • 角色: 这一组件是系统的“指挥单位”,它知道哪些设备需要更新,并向具体设备在什么时候发送更新指令。
  3. Client (IoT 设备终端)

    • 职责: 运行在具体的硬件设备上。该客户端包含常驻进程,负责向 Register 汇报心跳,接收 Register 下发的更新指令,并从 Server 下载文件执行具体的更新脚本。
    • 角色: 在具体的硬件设备上执行更新

2. 模块原理解析

2.1 Server 端 (server/)

Server 端的核心是一个 Flask 应用,主要负责版本控制和文件托管。

  • API 接口 (server/api.py):

    • /latestVersion: 接收 packagebranch 参数,查询数据库,返回最大版本号的 content.json 信息。
    • /getVersion: 查询特定版本的详细信息。
    • /ota-files/<path:filename>: 静态文件服务,用于提供升级包下载。
    • /login & /manage: 简单的管理员登录和页面托管。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @app.route("/latestVersion")
    def maxVersion() -> str: # 获取最新版本
    dic = {"status": 200, "content.json": {}}
    package = request.args.get("package")
    branch = request.args.get("branch")
    if package is None or branch is None: # 检查参数
    dic["status"] = 400
    return str(json.dumps(dic))
    content = vM.getMaxVersion(package, branch)
    if (content == None):
    dic["status"] = 404
    return str(json.dumps(dic))
    else:
    dic["content.json"] = content
    return str(json.dumps(dic))
  • 版本管理 (server/versionManager.py):

    • 存储机制: 使用 MySQL 数据库的 ota 表存储版本信息。核心字段包括 name (包名), branch (分支), version (版本号), content (JSON格式的元数据)。
    • 查询逻辑: 直接执行 SQL 语句 SELECT ... ORDER BY version DESC 来获取最新版本。
    1
    2
    3
    4
    5
    6
    def getMaxVersion(package: str, branch: str) -> dict:   # 获取最新版本的信息
    # ... 连接数据库 ...
    cursor.execute("SELECT content FROM ota WHERE name='%s' AND branch = '%s' ORDER BY version DESC" % (
    package, branch))
    results = cursor.fetchall()
    # ... 关闭数据库 ...

2.2 Register 端 (register/)

Register 端不仅是一个注册中心,实际上承担了任务调度器的角色。

1
2
3
4
5
6
7
8
9
10
11
12
13
while True:   # 监测进程是否存活
if not server_process.is_alive() and not update_process.is_alive():
logger.error("http server process and update process dead")
break
elif not server_process.is_alive():
logger.error("http server process dead")
update_process.terminate()
break
elif not update_process.is_alive():
logger.error("update process dead")
server_process.terminate()
break
time.sleep(1)
  • 设备管理 (register/deviceManager.py):

    • 设备注册: registerDevice 函数接收客户端上传的 metadata,生成唯一的 device_id 并存入 devices 数据表。
    • 任务队列: 系统维护了一个全局变量 updateList 作为任务队列。当管理员下发更新时,任务被追加到此列表。
    • 轮询与更新: updateNext 函数采用递归调用的方式,从 updateList 取出任务,调用 update 函数发起网络请求通知客户端。
  • API 接口 (register/registerServer.py):

    • /heartbeat: 接收客户端心跳,更新设备的 last_active 时间戳。
    • /api/updatePackage: 管理员调用的接口,将更新任务加入具体的 updateList 队列。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @app.route("/startUpdate", methods=["POST", "GET"])
    def startUpdate(): # 开始更新
    # ...
    try:
    if (not updateQueue.empty()): # 队列不为空
    res["status"] = 400
    res["error"] = "Queue not empty"
    return str(json.dumps(res))

    # client/update.py
    def update(update_queue):
    # ... 初始化日志 ...
    while (True):
    if (not update_queue.empty()): # 如果队列不为空
    package: update_package = update_queue.get() # 从队列中获取一个更新包
    try:
    # ... 检查 device.json ...
    logger.info(package.package_json["package"]+" update start")
    if (not package.startUpdate()): # 执行具体的更新逻辑
    logger.info(package.package_json["package"]+" update failed")
    continue
    # ... 更新 device.json ...
    except BaseException as e:
    logger.info(package.package_json["package"]+" update error")
    time.sleep(1)
    # ... 获取 content ...
    t = update_package(dic, register_path, device_id, None) # 创建更新包
    updateQueue.put(t) # 加入队列
    return str(json.dumps(res))
    except Exception as e:
    # ...

2.3 Client 端 (client/)

客户端的设计最为复杂,采用了多进程模型来确保服务的稳定性。

  • 双进程守护 (client/daemon.py):

    • 设计原理: 这是客户端的入口。它启动两个子进程 http_server (负责通信) 和 update (负责具体安装)。
    • 监控机制: 父进程在一个 while True 循环中每秒检查一次子进程状态。如果任何一个子进程死亡,父进程会立即杀死另一个子进程并退出。
  • HTTPServer 子进程 (client/http_server.py):

    • 角色: 它是设备上的 Web Server。之所以客户端需要跑一个 Server,是因为 Register 端是主动“推”消息的(Push 模型),而不是客户端一直“拉” (Pull)。
    • 路由:
      • /heartbeat: 独立线程向 Register 发送心跳。
      • /startUpdate: 接收 Register 的 POST 请求。注意,它不直接执行更新,而是将接收到的 content.json 放入 updateQueue (进程间队列)。这是一个典型的生产者-消费者模型。
  • Update 子进程 (client/update.py & client/update_package.py):

    • 角色: 消费者。它从 updateQueue 取出任务。
    • 更新流程 (State Machine):
      1. BeforeUpdate: 运行预处理脚本。
      2. Downloading: 从 Server 下载 zip 包。
      3. Updating: 解压覆盖文件。
      4. AfterUpdate: 运行后处理脚本。
      5. Restore: 如果中间失败,尝试恢复备份。
    • 状态上报: 每一步的状态变化都会通过 /updateInfo 接口回传给 Register Server,形成闭环反馈。

3. 核心流程

3.1 设备上线流程

  1. Client 启动 daemon.py
  2. Client 加载 device.json,读取自身的 ID 和 Register 地址。
  3. Client 的 http_server.py 启动心跳线程,每10秒向 Register 发送一次 HTTP POST /heartbeat
  4. Register 收到心跳,更新数据库中该设备的 last_active 时间。

3.2 软件更新分发流程

这是一个涉及三方协作的过程:

  1. 触发: 管理员在 Dashboard 点击“更新”,调用 Register 的 /api/updatePackage
  2. 入队: Register 将任务 {device_id, package, version} 加入内存队列 updateList
  3. 调度: updateNext() 被触发,向目标 Client 的 HTTP Server 发送 /startUpdate 请求,携带 content.json
  4. 接收: Client 的 HTTP Server 收到请求,校验合法性后,将 Content Put 入 updateQueue
  5. 执行: Client 的 Update 进程从 Queue Get 任务:
    • 解析 remote 字段,向 Server 请求下载文件。
    • Server 响应 /ota-files/... 返回 ZIP 包。
    • Client 解压并执行更新逻辑。
  6. 反馈: Client 在执行的每个阶段,都向 Register 的 /updateInfo 发送进度报告。

4. 关键数据结构

4.1 content.json (核心元数据)

这是贯穿整个系统的核心数据包,定义了“如何更新”。

1
2
3
4
5
6
7
8
9
10
11
{
"package": "ota-client", // 包名
"version": "1.0.1", // 版本
"branch": "stable", // 分支
"sha256": "abcdef...", // 完整性校验
"remote": "http://server-ip:port", // 下载源
"local": "/usr/local/app", // 目标安装路径
"BeforeUpdate": "systemctl stop app", // 更新前钩子
"AfterUpdate": "systemctl start app", // 更新后钩子
"restore": "restore.sh" // 回滚脚本
}

4.2 数据库 Schema

  • table devices: 存储设备信息,核心字段是 content,存储了一份 JSON 格式的当前软件包状态。
  • table ota: 存储发布的版本信息,实际的文件内容并未存库,只存储了 content.json 的字符串。

5. 代码实现细节

5.1 进程间通信 (IPC)

Client 端巧妙地使用了 Python 的 multiprocessing.Queue 进行 IPC。

  • http_server 进程接收网络 IO,属于 IO 密集型。
  • update 进程涉及文件解压和脚本执行,属于 CPU/磁盘 IO 密集型,且可能阻塞很久。
  • 使用 Queue 解耦,保证了即使更新脚本卡死,HTTP 心跳依然能发送,防止设备在 Register 端显示“掉线”。

5.2 错误处理机制

系统中存在多层级的错误处理:

  • 网络层: requests 请求大都包裹在 try-except 块中。
  • 进程层: daemon.py 监控子进程存活。
  • 业务层: 更新失败会触发 restore 脚本,尝试回滚到上一版本。

6. 技术栈总结

  • 语言: Python 3.x
  • Web 框架: Flask
  • 数据库: MySQL (PyMySQL 驱动)
  • 前端: Bootstrap + jQuery
  • 并发模型: Python Multiprocessing (多进程)