1,进程启动:
1,main->PostMasterMain->ServerLoop:主进程的启动,在此从服务器连接,sender进程启动
selres = select(nSockets, &rmask, NULL, NULL, &timeout);监听文件描述符
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
if (selres > 0){
for (i = 0; i < MAXLISTEN; i++){
//找一个有效的fd
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask)){
port = ConnCreate(ListenSocket[i]);->StreamConnection->accept
BackendStartup(port);//fork进程
2,监听到文件描述符(不同需求到达)变化后,在不同的端口创建不同的进程
pid = fork_process();
3,子进程创建后会在postgresMain启动
所有后台进程都在这里启动
PostgresMain
postgres main loop -- all backends, interactive or otherwise start here
2,sender与receiver启动
2.1,sender启动细节:
注:主进程启动sender后,由sender全权负责所有工作
#1 0x000000000071db41 in WaitEventSetWait ()
#2 0x000000000071df69 in WaitLatchOrSocket ()
#3 0x00000000006f3b64 in WalSndLoop ()
#4 0x00000000006f4320 in exec_replication_command ()
#5 0x000000000073e61d in PostgresMain ()
#6 0x00000000006cf9da in ServerLoop ()
#7 0x00000000006d0898 in PostmasterMain ()
#8 0x000000000048075e in main ()
PostgresMain ()
WalSndSignals(); 信号初始化
MarkPostmasterChildWalSender(); 告诉master进程我是sender 存活时间更长
WalSndErrorCleanup(void) 上一个walsender错误发生需要进行清理
forbidden_in_wal_sender
exec_replication_command ()
case T_StartReplicationCmd: 如果是开始复制的命令
StartReplication(cmd);执行开始复制
walsender主循环:
回调参数:函数指针WalSndLoop(XLogSendPhysical);``WalSndLoop(WalSndSendDataCallback send_data)
为什么在walsnd断点插不进去,而在exec_replication_command可以
WalSndLoop
1ProcessRepliesIfAny(void)
:处理备机的回复
for (;;)
{
pq_startmsgread();
//判断是否有字符输入,否则break
r = pq_getbyte_if_available(&firstchar);
switch (firstchar)
{
/*
* 'd' means a standby reply wrapped in a CopyData packet.
d为备机的回复
*/
case 'd':
ProcessStandbyMessage();
received = true;
break;
/*
c为完成流复制的请求
* CopyDone means the standby requested to finish streaming.
* Reply with CopyDone, if we had not sent that already.
*/
case 'c':
if (!streamingDoneSending)
{
pq_putmessage_noblock('c', NULL, 0);
streamingDoneSending = true;
}
streamingDoneReceiving = true;
received = true;
break;
/*
备机关闭socket
* 'X' means that the standby is closing down the socket.
*/
case 'X':
proc_exit(0);
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid standby message type \"%c\"",
firstchar)));
}
}
1.1ProcessStandbyReplyMessage(void):接受备机回复wal日志的位置
/*
* Update shared state for this WalSender process based on reply data from
* standby.
更新位置,根据回复的数据
*/
ProcessStandbyReplyMessage(void)
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
if (writeLag != -1 || clearLagTimes)
walsnd->writeLag = writeLag;
if (flushLag != -1 || clearLagTimes)
walsnd->flushLag = flushLag;
if (applyLag != -1 || clearLagTimes)
walsnd->applyLag = applyLag;
walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
/*
* Advance our local xmin horizon when the client confirmed a flush.
当接受到备机的flush后,更新自己最小的XLOG
*/
if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
{
if (SlotIsLogical(MyReplicationSlot))
LogicalConfirmReceivedLocation(flushPtr);
else
PhysicalConfirmReceivedLocation(flushPtr);
}
}
2
问题: 流复制结束,结束进程? pending data,发送数据的条件
//流复制结束,结束循环
if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
break;
发送数据
/*
* If we don't have any pending data in the output buffer, try to send
* some more. If there is some, we don't bother to call send_data
* again until we've flushed it ... but we'd better assume we are not
* caught up.
*/
if (!pq_is_send_pending())
send_data(); //walsndloop的参数:类型为函数指针
else
WalSndCaughtUp = false;
/* Check for replication timeout. */
WalSndCheckTimeOut(); //检测超时
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary();
//发送keepalive
(void) WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_MAIN);
//发送完毕之后 阻塞
_ 2.1 send_data() ——> _XLogSendPhysical(void)
数据如何流动的,以及全局变量
Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
将刷盘的wal开始发送
XLogSendPhysical(void){
//获得刷盘地址
SendRqstPtr = GetFlushRecPtr();
//记录尾部和时间线
LagTrackerWrite()
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'w');
//发送
pq_sendint64(&output_message, startptr); /* dataStart */
pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
pq_sendint64(&output_message, 0); /* sendtime, filled in last */
}
StartReplication(cmd);StartLogicalReplication(cmd);区别
2,2,receiver启动细节
#0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000071db41 in WaitEventSetWait ()
#2 0x000000000071df69 in WaitLatchOrSocket ()
#3 0x00000000006f69d8 in WalReceiverMain ()
#4 0x0000000000517a04 in AuxiliaryProcessMain ()
#5 0x00000000006cd15a in StartChildProcess ()
#6 0x00000000006cd9d8 in MaybeStartWalReceiver ()
#7 0x00000000006ce82f in sigusr1_handler ()
#8 <signal handler called>
#9 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6
#10 0x00000000006ced35 in ServerLoop ()
#11 0x00000000006d0898 in PostmasterMain ()
#12 0x000000000048075e in main ()
standby:ServerLoop()
//信号 允许receiver 发现了自己为从服务器
if (WalReceiverRequested)
//启动receiver进程
MaybeStartWalReceiver();
AuxiliaryProcessMain(int argc, char *argv[])
WalReceiverMain(void)
WalReceiverMain.XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);根据类型处理消息
switch (type)
{
//wal消息
case 'w': /* WAL records */
{
/* copy message to StringInfo */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
if (len < hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
/* read the fields */
dataStart = pq_getmsgint64(&incoming_message);
walEnd = pq_getmsgint64(&incoming_message);
sendTime = pq_getmsgint64(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen;
len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
break;
}
//保活消息
case 'k': /* Keepalive */
{
/* copy message to StringInfo */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
if (len != hdrlen)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message);
sendTime = pq_getmsgint64(&incoming_message);
replyRequested = pq_getmsgbyte(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
if (replyRequested)
XLogWalRcvSendReply(true, false);
break;
}
}
WalReceiverMain.XLogWalRcvSendReply:不停发送自己的log位置时间等
XLogWalRcvFlush(false);//刷盘,并且让startup进程了解
2.3 startup启动细节
#0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1 0x000000000071db41 in WaitEventSetWait ()
#2 0x000000000071df69 in WaitLatchOrSocket ()
#3 0x0000000000504eb4 in XLogPageRead ()
#4 0x0000000000512aa2 in ReadPageInternal ()
#5 0x0000000000513370 in XLogReadRecord ()
#6 0x0000000000502c6b in ReadRecord ()
#7 0x00000000005092ba in StartupXLOG ()
#8 0x00000000006d0ff1 in StartupProcessMain ()
#9 0x00000000005179fa in AuxiliaryProcessMain ()
#10 0x00000000006cd15a in StartChildProcess ()
#11 0x00000000006cd27a in PostmasterStateMachine ()
#12 0x00000000006ce215 in reaper ()
#13 <signal handler called>
#14 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6
#15 0x00000000006ced35 in ServerLoop ()
#16 0x00000000006d0898 in PostmasterMain ()
#17 0x000000000048075e in main ()
StartupXLOG(void)...