在 MySQL 中存储和查询树形结构

处理例如目录树、部门层级关系、城乡结构等树形结构是常见的业务场景,本文将讨论几种通过 MySQL 存储和查询此类树形结构的简单方法。

1. 场景

现在我们需要实现一个目录树,即一个节点可以拥有若干个子节点。节点的结构设计如下:

1
2
3
4
5
type Node struct {
Id int64
ParentId int64
Children []*Node
}

数据存储在 MySQL 上,要求可以递归获取任意一个节点的所有子节点,可以设定下钻的深度。函数接口设计如下:

1
func getChildren(parentId int64, depth int) []*Node

MySQL 表结构设计如下:

1
2
3
4
5
6
7
8
create table node
(
id bigint auto_increment primary key,
parent_id bigint not null,
);

create index idx_parent_id
on node (parent_id);

随机插入一些测试数据:

1
2
3
4
5
6
7
8
func insertData() {
for i := 0; i < 10000; i++ {
var max sql.NullInt64
_ = mysql.Get(&max, "SELECT MAX(id) FROM node")
id := rand.Int63n(max.Int64 + 1)
_, _ = mysql.Exec("INSERT INTO node (parent_id) VALUES (?)", id)
}
}

2. Naive 方法

最简单的思路是在程序上做递归,即先获取目标节点的所有子节点,再递归获取子节点的所有子节点,直到下钻深度到达限制。

1
2
3
4
5
6
7
8
9
10
11
func naive(parentId int64, depth int) []*Node {
if depth == 0 {
return make([]*Node, 0)
}
nodes := make([]*Node, 0)
_ = mysql.Select(&nodes, "SELECT id, parent_id FROM node WHERE parent_id = ?", parentId)
for _, node := range nodes {
node.Children = naive(node.Id, depth - 1)
}
return nodes
}

这种思路和实现都非常直观。虽然每次查询数据库都非常快,但是每个节点都需要一次查询。随着节点数量变多,查询数据库的次数也变得非常多,总体执行速度非常慢。使用上述的随机数据,从根节点获取完整树形结构平均需要耗时 8 秒。

3. 递归 CTE

MySQL 8 开始提供了 CTE(Common Table Expressions)特性,允许将派生表的逻辑抽取出来单独定义。我们可以使用递归 CTE 来实现上述需求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func cte(parentId int64, depth int) []*Node {
if depth == 0 {
return make([]*Node, 0)
}
var allNodes []*Node
_ = mysql.Select(&allNodes, `
WITH RECURSIVE cte (id, parent_id, depth) AS (
SELECT id, parent_id, ? AS depth
FROM node
WHERE parent_id = ?
UNION ALL
SELECT n.id, n.parent_id, depth - 1
FROM node n
INNER JOIN cte
ON n.parent_id = cte.id AND depth != 1
)
SELECT id, parent_id FROM cte order by parent_id
`, depth, parentId)

m := make(map[int64]*Node)
dummy := &Node{Children: make([]*Node, 0)}
m[parentId] = dummy
for _, node := range allNodes {
if node.Children == nil {
node.Children = make([]*Node, 0)
}
m[node.Id] = node
parent := m[node.ParentId]
parent.Children = append(parent.Children, node)
}
return dummy.Children
}

上述程序分为两个部分,后一部分比较简单,就是将数据库查询结果拼接成树形结构,前一部分的核心是这一段 SQL:

1
2
3
4
5
6
7
8
9
10
11
WITH RECURSIVE cte (id, parent_id, depth) AS (
SELECT id, parent_id, ? AS depth
FROM node
WHERE parent_id = ?
UNION ALL
SELECT n.id, n.parent_id, depth - 1
FROM node n
INNER JOIN cte
ON n.parent_id = cte.id AND depth != 1
)
SELECT id, parent_id FROM cte order by parent_id

我们首先定义了一个递归 CTE,它的执行逻辑是,首先获取 parent_id = ? 的所有节点,然后通过 JOIN 查询这些节点的所有子节点,然后递归查询这些子节点的所有子节点,直到下钻深度满足要求。可以发现逻辑其实和之前的 naive 方法是一样的,但是这里我们借由 MySQL 来实现递归逻辑,从而大大减少了 IO 次数。

使用上述的随机数据,从根节点获取完整树形结构平均需要耗时 34 毫秒(其中 SQL 的执行时间约 25 毫秒)。

4. 路径查询

递归 CTE 的方法实现简单且效率很高,但是有一个致命的缺点:要求 MySQL 版本在 8 以上。然而很多公司的业务数据库都停留在 5.6 版本,无法支持新特性。不使用递归 CTE,也可以用存储更多信息地方式,通过空间换取时间。

首先改造 MySQL 的表结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
create table node
(
id bigint auto_increment
primary key,
parent_id bigint not null,
path varchar(500) not null,
level int not null
);

create index idx_parent_id
on node (parent_id);

create index idx_path
on node (path(50));

可以发现这里新增了两个字段:

  • path:存储从根目录到当前节点的路径信息,例如/0/3/27
  • level:存储当前节点的深度

另外,这里还对 path 字段添加了一个长度为 50 的前缀索引,用于提高搜索速度。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func path(parentId int64, depth int) []*Node {
var allNodes []*Node
if depth == -1 {
depth = 99999
}
_ = mysql.Select(&allNodes, `
SELECT id, parent_id
FROM node
WHERE path LIKE CONCAT(IFNULL((SELECT path FROM node WHERE id = ?), ''), "/", ?, "%")
AND level <= IFNULL((SELECT level FROM node WHERE id = ?), 0) + ?
ORDER BY parent_id
`, parentId, parentId, parentId, depth)

m := make(map[int64]*Node)
dummy := &Node{Children: make([]*Node, 0)}
m[parentId] = dummy
for _, node := range allNodes {
if node.Children == nil {
node.Children = make([]*Node, 0)
}
m[node.Id] = node
parent := m[node.ParentId]
parent.Children = append(parent.Children, node)
}
return dummy.Children
}

同样,代码的后半段是在拼接树形结构,而核心的 SQL 如下:

1
2
3
4
5
SELECT id, parent_id
FROM node
WHERE path LIKE CONCAT(IFNULL((SELECT path FROM node WHERE id = ?), ''), '/', ?, '%')
AND level <= IFNULL((SELECT level FROM node WHERE id = ?), 0) + ?
ORDER BY parent_id

逻辑也非常简单:将当前节点的路径取出,拼接后形成目标路径,找到前缀为该路径的所有节点,必然是当前路径的子节点。

使用上述的随机数据,从根节点获取完整树形结构平均需要耗时 20毫秒(其中 SQL 的执行时间约 8 毫秒)。