PostgreSQL事務(wù)回卷實(shí)戰(zhàn)案例詳析
背景
前陣子某個(gè)客戶反饋他的RDS PostgreSQL無(wú)法寫入,報(bào)錯(cuò)信息如下:
postgres=# select * from test;
id
----
(0 rows)postgres=# insert into test select 1;
ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT: Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.
隨后RDS工程師介入處理以后,該問(wèn)題立馬得到了解決。
XID基礎(chǔ)原理
XID 定義
XID(Transaction ID)是 PostgreSQL 內(nèi)部的事務(wù)編號(hào),每個(gè)事務(wù)都會(huì)分配一個(gè)XID,依次遞增。PostgreSQL 數(shù)據(jù)中每個(gè)元組頭部都會(huì)保存著 插入 或者 刪除 這條元組的XID(Transaction ID),然后內(nèi)核通過(guò)這個(gè) XID 構(gòu)造數(shù)據(jù)庫(kù)的一致性讀。在事務(wù)隔離級(jí)別是 可重復(fù)讀 的情況下,假設(shè)如有兩個(gè)事務(wù),xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元組,看不到 t_xmin > 200 的元組。
typedef uint32 TransactionId; /* 事務(wù)號(hào)定義,32位無(wú)符號(hào)整數(shù) */
typedef struct HeapTupleFields
{
TransactionId t_xmin; /* 插入該元組的事務(wù)號(hào) */
TransactionId t_xmax; /* 刪除或鎖定該元組的事務(wù)號(hào) */
/*** 其它屬性省略 ***/
} HeapTupleFields;
struct HeapTupleHeaderData
{
union
{
HeapTupleFields t_heap;
DatumTupleFields t_datum;
} t_choice;
/*** 其它屬性省略 ***/
};XID 發(fā)行機(jī)制
從上面結(jié)構(gòu)中我們可以看到,XID 是一個(gè)32位無(wú)符號(hào)整數(shù),也就是 XID 的范圍是 0到2^32-1;那么超過(guò)了 2^32-1的事務(wù)怎么辦呢?其實(shí) XID 是一個(gè)環(huán),超過(guò)了 2^32-1 之后又會(huì)從頭開(kāi)始分配。通過(guò)源代碼也證明了上述結(jié)論:
// 無(wú)效事務(wù)號(hào)
#define InvalidTransactionId ((TransactionId) 0)
// 引導(dǎo)事務(wù)號(hào),在數(shù)據(jù)庫(kù)初始化過(guò)程(BKI執(zhí)行)中使用
#define BootstrapTransactionId ((TransactionId) 1)
// 凍結(jié)事務(wù)號(hào)用于表示非常陳舊的元組,它們比所有正常事務(wù)號(hào)都要早(也就是可見(jiàn))
#define FrozenTransactionId ((TransactionId) 2)
// 第一個(gè)正常事務(wù)號(hào)
#define FirstNormalTransactionId ((TransactionId) 3)
// 把 FullTransactionId 的低32位作為無(wú)符號(hào)整數(shù)生成 xid
#define XidFromFullTransactionId(x) ((uint32) (x).value)
static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
dest->value++;
while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
dest->value++;
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
/*** 省略 ***/
full_xid = ShmemVariableCache->nextFullXid;
xid = XidFromFullTransactionId(full_xid);
/*** 省略 ***/
FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
/*** 省略 ***
return full_xid;
}
static void
AssignTransactionId(TransactionState s)
{
/*** 省略 ***/
s->fullTransactionId = GetNewTransactionId(isSubXact);
if (!isSubXact)
XactTopFullTransactionId = s->fullTransactionId;
/*** 省略 ***/
}
TransactionId
GetTopTransactionId(void)
{
if (!FullTransactionIdIsValid(XactTopFullTransactionId))
AssignTransactionId(&TopTransactionStateData);
return XidFromFullTransactionId(XactTopFullTransactionId);
}可以看到,新事務(wù)號(hào)保存在共享變量緩存中:ShmemVariableCache->nextFullXid,每發(fā)行一個(gè)事務(wù)號(hào)后,向上調(diào)整它的值,并跳過(guò)上述三個(gè)特殊值。三個(gè)特殊仠分別為0、1和2,作用可以看上面代碼注釋。
XID 回卷機(jī)制
前面說(shuō)到,XID 是一個(gè)環(huán),分配到 2^32-1 之后又從 3 開(kāi)始,那么內(nèi)核是怎么比較兩個(gè)事務(wù)的大小的呢?比如 xid 經(jīng)歷了這樣一個(gè)過(guò)程 3-> 2^32-1 -> 5,那么內(nèi)核怎么樣知道 5 這個(gè)事務(wù)在 2^32-1 后面呢?我們?cè)倏匆幌麓a:
/*
* TransactionIdPrecedes --- is id1 logically < id2?
*/
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
/*
* If either ID is a permanent XID then we can just do unsigned
* comparison. If both are normal, do a modulo-2^32 comparison.
*/
int32 diff;
if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
return (id1 < id2);
diff = (int32) (id1 - id2);
return (diff < 0);
}可以看到,內(nèi)核使用了一個(gè)比較取巧的方法:(int32) (id1 - id2) < 0,32位有符號(hào)整數(shù)的取值范圍是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以轉(zhuǎn)換成 int32 會(huì)變成負(fù)數(shù)。但是這里面有一個(gè)問(wèn)題,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,一旦大于就會(huì)出現(xiàn)回卷,導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見(jiàn)。
XID 回卷預(yù)防
前面講到,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,否則會(huì)發(fā)生回卷導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見(jiàn),那內(nèi)核是怎么避免這個(gè)問(wèn)題的呢??jī)?nèi)核是這樣處理的:通過(guò)定期把老事務(wù)產(chǎn)生的元組的 XID 更新為 FrozenTransactionId,即更新為2,來(lái)回收 XID,而 XID 為2 的元組對(duì)所有的事務(wù)可見(jiàn),這個(gè)過(guò)程稱為 XID 凍結(jié),通過(guò)這個(gè)方式可以回收 XID 來(lái)保證 |最新事務(wù)號(hào)-最老事務(wù)號(hào)| < 2^31。
除了內(nèi)核自動(dòng)凍結(jié)回收XID,我們也可以通過(guò)命令或者 sql 的方式手動(dòng)進(jìn)行 xid 凍結(jié)回收
- 查詢數(shù)據(jù)庫(kù)或表的年齡,數(shù)據(jù)庫(kù)年齡指的是:「最新事務(wù)號(hào)-數(shù)據(jù)庫(kù)中最老事務(wù)號(hào)」,表年齡指的是:「最新事務(wù)號(hào)-表中最老事務(wù)號(hào)」
# 查看每個(gè)庫(kù)的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1個(gè)庫(kù)每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc;
# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;
手動(dòng)凍結(jié)回收一張表的元組的 xid 的sql:
vacuum freeze 表名;
手動(dòng)凍結(jié)回收一個(gè)庫(kù)里面的所有表 xid 的命令:
vacuumdb -d 庫(kù)名 --freeze --jobs=30 -h 連接串 -p 端口號(hào) -U 庫(kù)Owner
凍結(jié)回收過(guò)程是一個(gè)重 IO 的操作,這個(gè)過(guò)程內(nèi)核會(huì)描述表的所有頁(yè)面,然后把符合要求的元組的 t_xmin 字段更新為 2,所以這個(gè)過(guò)程需要在業(yè)務(wù)低峰進(jìn)行,避免影響業(yè)務(wù)。
與凍結(jié)回收相關(guān)的內(nèi)核參數(shù)有三個(gè):vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于筆者對(duì)于這三個(gè)參數(shù)理解不深,就不在這里班門弄斧了,感興趣的同學(xué)可以自行找資料了解一下。
解決方案
問(wèn)題分析
基于上面的原理分析,我們知道,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 = 2^31-1000000,即當(dāng)前可用的 xid 僅剩下一百萬(wàn)的時(shí)候,內(nèi)核就會(huì)禁止實(shí)例寫入并報(bào)錯(cuò):database is not accepting commands to avoid wraparound data loss in database, 這個(gè)時(shí)候必須連到提示中的 "xxxx" 對(duì)表進(jìn)行 freeze 回收更多的 XID。
void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
TransactionId xidVacLimit;
TransactionId xidWarnLimit;
TransactionId xidStopLimit;
TransactionId xidWrapLimit;
TransactionId curXid;
Assert(TransactionIdIsNormal(oldest_datfrozenxid));
/*
* xidWrapLimit = 最老的事務(wù)號(hào) + 0x7FFFFFFF,當(dāng)前事務(wù)號(hào)一旦到達(dá)xidWrapLimit將發(fā)生回卷
*/
xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
if (xidWrapLimit < FirstNormalTransactionId)
xidWrapLimit += FirstNormalTransactionId;
/*
* 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidStopLimit,實(shí)例將不可寫入,保留 1000000 的xid用于vacuum
* 每 vacuum 一張表需要占用一個(gè)xid
*/
xidStopLimit = xidWrapLimit - 1000000;
if (xidStopLimit < FirstNormalTransactionId)
xidStopLimit -= FirstNormalTransactionId;
/*
* 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidWarnLimit,將不停地收到
* WARNING: database "xxxx" must be vacuumed within 2740112 transactions
*/
xidWarnLimit = xidStopLimit - 10000000;
if (xidWarnLimit < FirstNormalTransactionId)
xidWarnLimit -= FirstNormalTransactionId;
/*
* 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidVacLimit將觸發(fā)force autovacuums
*/
xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
if (xidVacLimit < FirstNormalTransactionId)
xidVacLimit += FirstNormalTransactionId;
/* Grab lock for just long enough to set the new limit values */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->oldestXid = oldest_datfrozenxid;
ShmemVariableCache->xidVacLimit = xidVacLimit;
ShmemVariableCache->xidWarnLimit = xidWarnLimit;
ShmemVariableCache->xidStopLimit = xidStopLimit;
ShmemVariableCache->xidWrapLimit = xidWrapLimit;
ShmemVariableCache->oldestXidDB = oldest_datoid;
curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
LWLockRelease(XidGenLock);
/* Log the info */
ereport(DEBUG1,
(errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
xidWrapLimit, oldest_datoid)));
/*
* 如果 當(dāng)前事務(wù)號(hào)>=最老事務(wù)號(hào)+autovacuum_freeze_max_age
* 觸發(fā) autovacuum 對(duì)年齡最老的數(shù)據(jù)庫(kù)進(jìn)行清理,如果有多個(gè)數(shù)據(jù)庫(kù)達(dá)到要求,按年齡最老的順序依次清理
* 通過(guò)設(shè)置標(biāo)志位標(biāo)記當(dāng)前 autovacuum 結(jié)束之后再來(lái)一次 autovacuum
*/
if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
IsUnderPostmaster && !InRecovery)
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
/* Give an immediate warning if past the wrap warn point */
if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
{
char *oldest_datname;
if (IsTransactionState())
oldest_datname = get_database_name(oldest_datoid);
else
oldest_datname = NULL;
if (oldest_datname)
ereport(WARNING,
(errmsg("database \"%s\" must be vacuumed within %u transactions",
oldest_datname,
xidWrapLimit - curXid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
else
ereport(WARNING,
(errmsg("database with OID %u must be vacuumed within %u transactions",
oldest_datoid,
xidWrapLimit - curXid),
errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
}
}
bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
int32 diff;
if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
return (id1 >= id2);
diff = (int32) (id1 - id2);
return (diff >= 0);
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
/*** 省略 ***/
full_xid = ShmemVariableCache->nextFullXid;
xid = XidFromFullTransactionId(full_xid);
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
{
TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
Oid oldest_datoid = ShmemVariableCache->oldestXidDB;
/*** 省略 ***/
if (IsUnderPostmaster &&
TransactionIdFollowsOrEquals(xid, xidStopLimit))
{
char *oldest_datname = get_database_name(oldest_datoid);
/* complain even if that DB has disappeared */
if (oldest_datname)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
oldest_datname),
errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
"You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
/*** 省略 ***/
}
/*** 省略 ***/
}
/*** 省略 ***/
}問(wèn)題定位
# 查看每個(gè)庫(kù)的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1個(gè)庫(kù)每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc;
# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;
問(wèn)題解決
- 通過(guò)上面的第一個(gè) sql,查找年齡最大的數(shù)據(jù)庫(kù),數(shù)據(jù)庫(kù)年齡指的是:|最新事務(wù)號(hào)-數(shù)據(jù)庫(kù)中最老事務(wù)號(hào)|
- 通過(guò)上面第二個(gè) sql,查找年齡最大的表,然后對(duì)表依次執(zhí)行:vacuum freeze 表名,把表中的老事務(wù)號(hào)凍結(jié)回收,表年齡指的是:|最新事務(wù)號(hào)-表中最老事務(wù)號(hào)|
- 運(yùn)維腳本
單進(jìn)程 Shell 腳本
# 對(duì)指定數(shù)據(jù)庫(kù)中年齡最大的前 50 張表進(jìn)行 vacuum freeze
for cmd in `psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫(kù)名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd | grep -v row | grep vacuum`; do
psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫(kù)名 -c "$cmd"
done多進(jìn)程 Python 腳本
from multiprocessing import Pool
import psycopg2
args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='數(shù)據(jù)庫(kù)名',
user='用戶名', password='密碼')
def vacuum_handler(sql):
sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
try:
conn = psycopg2.connect(**args)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur = conn.cursor()
cur.execute(sql_str)
print cur.fetchall()
conn.close()
except Exception as e:
print str(e)
# 對(duì)指定數(shù)據(jù)庫(kù)中年齡最大的前 1000 張表進(jìn)行 vacuum freeze,32 個(gè)進(jìn)程并發(fā)執(zhí)行
def multi_vacuum():
pool = Pool(processes=32)
sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
try:
conn = psycopg2.connect(**args)
cur = conn.cursor()
cur.execute(sql_str)
rows = cur.fetchall()
for row in rows:
cmd = row['vacuum_cmd']
pool.apply_async(vacuum_handler, (cmd, ))
conn.close()
pool.close()
pool.join()
except Exception as e:
print str(e)
multi_vacuum()友情提示
vacuum freeze 會(huì)掃描表的所有頁(yè)面并更新,是一個(gè)重 IO 的操作,操作過(guò)程中一定要控制好并發(fā)數(shù),否則非常容易把實(shí)例打掛。
作者信息
謝桂起(花名:淵渱) 2020年畢業(yè)后加入阿里云,一直從事RDS PostgreSQL相關(guān)工作,善于解決線上各類RDS PostgreSQL運(yùn)維管控相關(guān)問(wèn)題。
總結(jié)
到此這篇關(guān)于PostgreSQL事務(wù)回卷的文章就介紹到這了,更多相關(guān)PostgreSQL事務(wù)回卷內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非maisonbaluchon.cn所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。
關(guān)注官方微信