学习PostgreSQL的FDW(#6)-file_fdw源码分析

之前文章介绍了实现FDW的函数,现在来看看PostgreSQL实现的文件的FDW是怎么的吧。
PostgreSQL实现的file_fdw的源码可以在PostgreSQL的源码目录下contrib目录下找到。

简介

file_fdw支持的数据文件必须是COPY FROM可读格式(text,csv[逗号分隔值],binary).
用这个包装器创建的一个外部表可以有下列选项:

参数 说明
filename 指定要被读取的文件。必须是一个绝对路径名。 必须指定filename或program, 但不能同时指定两个。
program 指定要执行的命令。该命令的标准输出将被读取, 就像使用COPY FROM PROGRAM一样。必须指定program 或filename,但不能同时指定两个。
format 指定数据的格式,和COPY的FORMAT选项相同。即text,csv(逗号分隔值),binary
header 指定文件包含标题行,其中有每一列的名称。在输出时,第一行包含 来自表的列名。在输入时,第一行会被忽略。只有使用 CSV格式时才允许这个选项。
delimiter 指定分隔文件每行中各列的字符。文本格式中默认是一个制表符, 而CSV格式中默认是一个逗号。这必须是一个单一 的单字节字符。使用binary格式时不允许这个选项。
quote 指定一个数据值被引用时使用的引用字符。默认是双引号。 这必须是一个单一的单字节字符。只有使用 CSV格式时才允许这个选项。
escape 指定应该出现在一个匹配quote值的数据字符之前 的字符。默认和quote值一样(这样如果引用字符 出现在数据中,它会被双写)。这必须是一个单一的单字节字符。 只有使用CSV格式时才允许这个选项。
null 指定表示一个空值的字符串。文本格式中默认是 \N(反斜线-N),CSV格式中默认 是一个未加引用的空串。在你不想区分空值和空串的情况下,即使在文本 格式中你也可能更喜欢空串。使用binary格式时不允许这 个选项。
encoding 指定文件被以encoding_name编码。如果省略这个选项,将使用当前的客户端编码。

添加file_fdw扩展

检查PG安装目录的lib/postgresql目录下是否存在file_fdw.so文件,确保安装时已经编译安装了file_fdw扩展。

1
2
3
4
5
6
7
8
9
10
fdw=# create extension file_fdw ;
CREATE EXTENSION
fdw=# \dx
List of installed extensions
Name | Version | Schema | Description
--------------+---------+------------+----------------------------------------------------
file_fdw | 1.0 | public | foreign-data wrapper for flat file access
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
postgres_fdw | 1.0 | public | foreign-data wrapper for remote PostgreSQL servers
(3 rows)

测试file_fdw

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
fdw=# --创建测试表,并添加数据
fdw=# create table tb1(id int,name varchar,password varchar);
CREATE TABLE
fdw=# insert into tb1 select generate_series(1,50),'oYo',md5(random()::text);
INSERT 0 50
fdw=# --通过copy拷贝数据到文件
fdw=# copy tb1 to '/data/file_fdw/tb1.csv';
COPY 50
fdw=# --创建外部服务器
fdw=# create server file_fdw_server foreign data wrapper file_fdw ;
CREATE SERVER
fdw=# \des
List of foreign servers
Name | Owner | Foreign-data wrapper
-----------------+-------+----------------------
file_fdw_server | xdb | file_fdw
pg_117_56 | xdb | postgres_fdw
(2 rows)
fdw=# --创建外部表
fdw=# create foreign table tb1_fdw(id int,name varchar,password varchar) server file_fdw_server options(filename '/data/file_fdw/tb1.csv');
CREATE FOREIGN TABLE
fdw=# \d tb1_fdw
Foreign table "public.tb1_fdw"
Column | Type | Collation | Nullable | Default | FDW options
----------+-------------------+-----------+----------+---------+-------------
id | integer | | | |
name | character varying | | | |
password | character varying | | | |
Server: file_fdw_server
FDW options: (filename '/data/file_fdw/tb1.csv')

fdw=# select * from tb1_fdw order by id limit 10;
id | name | password
----+------+----------------------------------
1 | oYo | 99c0aa4a3ad2128e6ea55522012d8806
2 | oYo | a126e01cfdfd4717a1d79b45a42a3bb6
3 | oYo | f031835b6c57d4a8781309e05542eb18
4 | oYo | 7bea90889b4aa3d0fe67ddbdba68be8e
5 | oYo | 01e38c84d7ca2e140f00efc9a9d7a767
6 | oYo | 28269e1942431e7970c638023953f43b
7 | oYo | fcf45570efa8616e4081b81e6d514433
8 | oYo | 5c90103b272cc172d052d7fabb748188
9 | oYo | cb8b46761425c64390f77cf1557bb1ae
10 | oYo | 634404bf272dd5610e1b7caf7e1a1542
(10 rows)

fdw=# --执行计划
fdw=# explain select * from tb1_fdw order by id limit 10;
QUERY PLAN
-------------------------------------------------------------------------
Limit (cost=3.55..3.58 rows=10 width=68)
-> Sort (cost=3.55..3.61 rows=21 width=68)
Sort Key: id
-> Foreign Scan on tb1_fdw (cost=0.00..3.10 rows=21 width=68)
Foreign File: /data/file_fdw/tb1.csv
Foreign File Size: 1991 b
(6 rows)

数据结构

  • FileFdwOption
1
2
3
4
5
6
7
8
9
/*
* Describes the valid options for objects that use this wrapper.
* 此外部器可用的选项对象
*/
struct FileFdwOption
{
const char *optname;
Oid optcontext; /* Oid of catalog in which option may appear */
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
* Valid options for file_fdw.
* 可以的选项
* These options are based on the options for the COPY FROM command.
* But note that force_not_null and force_null are handled as boolean options
* attached to a column, not as table options.
*
* Note: If you are adding new option for user mapping, you need to modify
* fileGetOptions(), which currently doesn't bother to look at user mappings.
*/
static const struct FileFdwOption valid_options[] = {
/* Data source options */
{"filename", ForeignTableRelationId},
{"program", ForeignTableRelationId},

/* Format options */
/* oids option is not supported */
{"format", ForeignTableRelationId},
{"header", ForeignTableRelationId},
{"delimiter", ForeignTableRelationId},
{"quote", ForeignTableRelationId},
{"escape", ForeignTableRelationId},
{"null", ForeignTableRelationId},
{"encoding", ForeignTableRelationId},
{"force_not_null", AttributeRelationId},
{"force_null", AttributeRelationId},

/*
* force_quote is not supported by file_fdw because it's for COPY TO.
*/

/* Sentinel */
{NULL, InvalidOid}
};
  • FileFdwPlanState
1
2
3
4
5
6
7
8
9
10
11
12
13
/*
* FDW-specific information for RelOptInfo.fdw_private.
* 此FDW优化阶段的fdw_private的数据结构体
*/
typedef struct FileFdwPlanState
{
char *filename; /*需要读取的文件或程序 file or program to read from */
bool is_program; /*是否是程序 true if filename represents an OS command */
List *options; /*其他选项 merged COPY options, excluding filename and
* is_program */
BlockNumber pages; /*物理文件的估算大小 estimate of file's physical size */
double ntuples; /*数据行的估算数量 estimate of number of data rows */
} FileFdwPlanState;
  • FileFdwExecutionState
1
2
3
4
5
6
7
8
9
10
11
12
/*
* FDW-specific information for ForeignScanState.fdw_state.
* 执行时的fdw_state的结构体
*/
typedef struct FileFdwExecutionState
{
char *filename; /* file or program to read from */
bool is_program; /* true if filename represents an OS command */
List *options; /* merged COPY options, excluding filename and
* is_program */
CopyState cstate; /*COPY命令执行时的状态 COPY execution state */
} FileFdwExecutionState;

函数

  • fileIsForeignScanParallelSafe
1
2
3
4
5
6
7
8
9
10
11
12
/*
* fileIsForeignScanParallelSafe
* Reading a file, or external program, in a parallel worker should work
* just the same as reading it in the leader, so mark scans safe.
* 是否能多并发读取文件
*/
static bool
fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte)
{
return true;
}
  • fileGetForeignRelSize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
* fileGetForeignRelSize
* Obtain relation size estimates for a foreign table
* 获取外部表的估计大小
*/
static void
fileGetForeignRelSize(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid)
{
FileFdwPlanState *fdw_private;

/*
* Fetch options. We only need filename (or program) at this point, but
* we might as well get everything and not need to re-fetch it later in
* planning.
* 获取fdw的选项参数
*/
fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
fileGetOptions(foreigntableid,
&fdw_private->filename,
&fdw_private->is_program,
&fdw_private->options);
baserel->fdw_private = (void *) fdw_private;

/*估算表大小 Estimate relation size */
estimate_size(root, baserel, fdw_private);
}
  • estimate_size
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Estimate size of a foreign table.
* 估算外部表大小
* The main result is returned in baserel->rows. We also set
* fdw_private->pages and fdw_private->ntuples for later use in the cost
* calculation.
*/
static void
estimate_size(PlannerInfo *root, RelOptInfo *baserel,
FileFdwPlanState *fdw_private)
{
struct stat stat_buf;
BlockNumber pages;
double ntuples;
double nrows;

/*
* 如果是程序或者获取文件失败,则设置默认值
* Get size of the file. It might not be there at plan time, though, in
* which case we have to use a default estimate. We also have to fall
* back to the default if using a program as the input.
*/
if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
stat_buf.st_size = 10 * BLCKSZ;

/*
* 将大小转换成页数
* Convert size to pages for use in I/O cost estimate later.
*/
pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
if (pages < 1)
pages = 1;
fdw_private->pages = pages;

/*
* Estimate the number of tuples in the file.
*/
if (baserel->pages > 0)
{
/* 如果已经执行过ANALYZE,则baserel->pages的值大于0,直接进行行数计算
* We have # of pages and # of tuples from pg_class (that is, from a
* previous ANALYZE), so compute a tuples-per-page estimate and scale
* that by the current file size.
*/
double density;

density = baserel->tuples / (double) baserel->pages;# // 计算每页的行数密度
ntuples = clamp_row_est(density * (double) pages); //估算行数
}
else
{
/*
* Otherwise we have to fake it. We back into this estimate using the
* planner's idea of the relation width; which is bogus if not all
* columns are being read, not to mention that the text representation
* of a row probably isn't the same size as its internal
* representation. Possibly we could do something better, but the
* real answer to anyone who complains is "ANALYZE" ...
*/
int tuple_width;

tuple_width = MAXALIGN(baserel->reltarget->width) +
MAXALIGN(SizeofHeapTupleHeader);// 计算行宽
ntuples = clamp_row_est((double) stat_buf.st_size /
(double) tuple_width);//数据量大小除行宽估算行数
}
fdw_private->ntuples = ntuples;

/* 加上提供baserestrictinfo信息扫描后的结果估算行数
* Now estimate the number of rows returned by the scan after applying the
* baserestrictinfo quals.
*/
nrows = ntuples *
clauselist_selectivity(root,
baserel->baserestrictinfo,
0,
JOIN_INNER,
NULL);

nrows = clamp_row_est(nrows);

/* Save the output-rows estimate for the planner */
baserel->rows = nrows;
}
  • fileGetForeignPaths
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*
* fileGetForeignPaths
* Create possible access paths for a scan on the foreign table
* 创建扫描外部表的访问路径
* Currently we don't support any push-down feature, so there is only one
* possible access path, which simply returns all records in the order in
* the data file.
*/
static void
fileGetForeignPaths(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid)
{
FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
Cost startup_cost;
Cost total_cost;
List *columns;
List *coptions = NIL;

/* 决定是否有选择地执行二进制转换 Decide whether to selectively perform binary conversion */
if (check_selective_binary_conversion(baserel,
foreigntableid,
&columns))
coptions = list_make1(makeDefElem("convert_selectively",
(Node *) columns, -1));

/* 估算代价 Estimate costs */
estimate_costs(root, baserel, fdw_private,
&startup_cost, &total_cost);

/*
* 创建外部路径节点并添加到路径列表中
* Create a ForeignPath node and add it as only possible path. We use the
* fdw_private list of the path to carry the convert_selectively option;
* it will be propagated into the fdw_private list of the Plan node.
*/
add_path(baserel, (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
startup_cost,
total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NULL, /* no extra plan */
coptions));

/*
* If data file was sorted, and we knew it somehow, we could insert
* appropriate pathkeys into the ForeignPath node to tell the planner
* that.
*/
}
  • estimate_costs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* Estimate costs of scanning a foreign table.
* 估算代价
* Results are returned in *startup_cost and *total_cost.
*/
static void
estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
FileFdwPlanState *fdw_private,
Cost *startup_cost, Cost *total_cost)
{
BlockNumber pages = fdw_private->pages;
double ntuples = fdw_private->ntuples;
Cost run_cost = 0;
Cost cpu_per_tuple;

/*
* We estimate costs almost the same way as cost_seqscan(), thus assuming
* that I/O costs are equivalent to a regular table file of the same size.
* However, we take per-tuple CPU costs as 10x of a seqscan, to account
* for the cost of parsing records.
*
* In the case of a program source, this calculation is even more divorced
* from reality, but we have no good alternative; and it's not clear that
* the numbers we produce here matter much anyway, since there's only one
* access path for the rel.
*/
run_cost += seq_page_cost * pages;

*startup_cost = baserel->baserestrictcost.startup;
cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
run_cost += cpu_per_tuple * ntuples;
*total_cost = *startup_cost + run_cost;
}
  • fileGetForeignPlan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*
* fileGetForeignPlan
* Create a ForeignScan plan node for scanning the foreign table
* 创建外部扫描计划
*/
static ForeignScan *
fileGetForeignPlan(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses,
Plan *outer_plan)
{
Index scan_relid = baserel->relid;

/*
* We have no native ability to evaluate restriction clauses, so we just
* put all the scan_clauses into the plan node's qual list for the
* executor to check. So all we have to do here is strip RestrictInfo
* nodes from the clauses and ignore pseudoconstants (which will be
* handled elsewhere).
*/
scan_clauses = extract_actual_clauses(scan_clauses, false);// 去掉约束语句

/*创建ForeignScan节点 Create the ForeignScan node */
return make_foreignscan(tlist,
scan_clauses,
scan_relid,
NIL, /* no expressions to evaluate */
best_path->fdw_private,
NIL, /* no custom tlist */
NIL, /* no remote quals */
outer_plan);
}


未完待续