1,进程启动:



1,main->PostMasterMain->ServerLoop:主进程的启动,在此从服务器连接,sender进程启动
selres = select(nSockets, &rmask, NULL, NULL, &timeout);监听文件描述符selres = select(nSockets, &rmask, NULL, NULL, &timeout);
if (selres > 0){
for (i = 0; i < MAXLISTEN; i++){
//找一个有效的fd
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask)){
port = ConnCreate(ListenSocket[i]);->StreamConnection->accept
BackendStartup(port);//fork进程
2,监听到文件描述符(不同需求到达)变化后,在不同的端口创建不同的进程
pid = fork_process();
3,子进程创建后会在postgresMain启动
所有后台进程都在这里启动
PostgresMain
postgres main loop -- all backends, interactive or otherwise start here
2,sender与receiver启动

2.1,sender启动细节:
注:主进程启动sender后,由sender全权负责所有工作
#1 0x000000000071db41 in WaitEventSetWait ()#2 0x000000000071df69 in WaitLatchOrSocket ()#3 0x00000000006f3b64 in WalSndLoop ()#4 0x00000000006f4320 in exec_replication_command ()#5 0x000000000073e61d in PostgresMain ()#6 0x00000000006cf9da in ServerLoop ()#7 0x00000000006d0898 in PostmasterMain ()#8 0x000000000048075e in main ()
PostgresMain ()
WalSndSignals(); 信号初始化MarkPostmasterChildWalSender(); 告诉master进程我是sender 存活时间更长WalSndErrorCleanup(void) 上一个walsender错误发生需要进行清理forbidden_in_wal_sender
exec_replication_command ()
case T_StartReplicationCmd: 如果是开始复制的命令StartReplication(cmd);执行开始复制
walsender主循环:
回调参数:函数指针WalSndLoop(XLogSendPhysical);``WalSndLoop(WalSndSendDataCallback send_data)
为什么在walsnd断点插不进去,而在exec_replication_command可以
WalSndLoop
1ProcessRepliesIfAny(void):处理备机的回复
for (;;){pq_startmsgread();//判断是否有字符输入,否则breakr = pq_getbyte_if_available(&firstchar);switch (firstchar){/** 'd' means a standby reply wrapped in a CopyData packet.d为备机的回复*/case 'd':ProcessStandbyMessage();received = true;break;/*c为完成流复制的请求* CopyDone means the standby requested to finish streaming.* Reply with CopyDone, if we had not sent that already.*/case 'c':if (!streamingDoneSending){pq_putmessage_noblock('c', NULL, 0);streamingDoneSending = true;}streamingDoneReceiving = true;received = true;break;/*备机关闭socket* 'X' means that the standby is closing down the socket.*/case 'X':proc_exit(0);default:ereport(FATAL,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg("invalid standby message type \"%c\"",firstchar)));}}
1.1ProcessStandbyReplyMessage(void):接受备机回复wal日志的位置
/** Update shared state for this WalSender process based on reply data from* standby.更新位置,根据回复的数据*/ProcessStandbyReplyMessage(void){WalSnd *walsnd = MyWalSnd;SpinLockAcquire(&walsnd->mutex);walsnd->write = writePtr;walsnd->flush = flushPtr;walsnd->apply = applyPtr;if (writeLag != -1 || clearLagTimes)walsnd->writeLag = writeLag;if (flushLag != -1 || clearLagTimes)walsnd->flushLag = flushLag;if (applyLag != -1 || clearLagTimes)walsnd->applyLag = applyLag;walsnd->replyTime = replyTime;SpinLockRelease(&walsnd->mutex);/** Advance our local xmin horizon when the client confirmed a flush.当接受到备机的flush后,更新自己最小的XLOG*/if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr){if (SlotIsLogical(MyReplicationSlot))LogicalConfirmReceivedLocation(flushPtr);elsePhysicalConfirmReceivedLocation(flushPtr);}}
2
问题: 流复制结束,结束进程? pending data,发送数据的条件
//流复制结束,结束循环if (streamingDoneReceiving && streamingDoneSending &&!pq_is_send_pending())break;发送数据/** If we don't have any pending data in the output buffer, try to send* some more. If there is some, we don't bother to call send_data* again until we've flushed it ... but we'd better assume we are not* caught up.*/if (!pq_is_send_pending())send_data(); //walsndloop的参数:类型为函数指针elseWalSndCaughtUp = false;/* Check for replication timeout. */WalSndCheckTimeOut(); //检测超时/* Send keepalive if the time has come */WalSndKeepaliveIfNecessary();//发送keepalive(void) WaitLatchOrSocket(MyLatch, wakeEvents,MyProcPort->sock, sleeptime,WAIT_EVENT_WAL_SENDER_MAIN);//发送完毕之后 阻塞
_ 2.1 send_data() ——> _XLogSendPhysical(void)
数据如何流动的,以及全局变量
Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,将刷盘的wal开始发送XLogSendPhysical(void){//获得刷盘地址SendRqstPtr = GetFlushRecPtr();//记录尾部和时间线LagTrackerWrite()resetStringInfo(&output_message);pq_sendbyte(&output_message, 'w');//发送pq_sendint64(&output_message, startptr); /* dataStart */pq_sendint64(&output_message, SendRqstPtr); /* walEnd */pq_sendint64(&output_message, 0); /* sendtime, filled in last */}
StartReplication(cmd);StartLogicalReplication(cmd);区别
2,2,receiver启动细节
#0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6#1 0x000000000071db41 in WaitEventSetWait ()#2 0x000000000071df69 in WaitLatchOrSocket ()#3 0x00000000006f69d8 in WalReceiverMain ()#4 0x0000000000517a04 in AuxiliaryProcessMain ()#5 0x00000000006cd15a in StartChildProcess ()#6 0x00000000006cd9d8 in MaybeStartWalReceiver ()#7 0x00000000006ce82f in sigusr1_handler ()#8 <signal handler called>#9 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6#10 0x00000000006ced35 in ServerLoop ()#11 0x00000000006d0898 in PostmasterMain ()#12 0x000000000048075e in main ()
standby:ServerLoop()//信号 允许receiver 发现了自己为从服务器if (WalReceiverRequested)//启动receiver进程MaybeStartWalReceiver();
AuxiliaryProcessMain(int argc, char *argv[])WalReceiverMain(void)
WalReceiverMain.XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);根据类型处理消息switch (type){//wal消息case 'w': /* WAL records */{/* copy message to StringInfo */hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);if (len < hdrlen)ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));appendBinaryStringInfo(&incoming_message, buf, hdrlen);/* read the fields */dataStart = pq_getmsgint64(&incoming_message);walEnd = pq_getmsgint64(&incoming_message);sendTime = pq_getmsgint64(&incoming_message);ProcessWalSndrMessage(walEnd, sendTime);buf += hdrlen;len -= hdrlen;XLogWalRcvWrite(buf, len, dataStart);break;}//保活消息case 'k': /* Keepalive */{/* copy message to StringInfo */hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);if (len != hdrlen)ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid keepalive message received from primary")));appendBinaryStringInfo(&incoming_message, buf, hdrlen);/* read the fields */walEnd = pq_getmsgint64(&incoming_message);sendTime = pq_getmsgint64(&incoming_message);replyRequested = pq_getmsgbyte(&incoming_message);ProcessWalSndrMessage(walEnd, sendTime);/* If the primary requested a reply, send one immediately */if (replyRequested)XLogWalRcvSendReply(true, false);break;}}WalReceiverMain.XLogWalRcvSendReply:不停发送自己的log位置时间等XLogWalRcvFlush(false);//刷盘,并且让startup进程了解
2.3 startup启动细节
#0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6#1 0x000000000071db41 in WaitEventSetWait ()#2 0x000000000071df69 in WaitLatchOrSocket ()#3 0x0000000000504eb4 in XLogPageRead ()#4 0x0000000000512aa2 in ReadPageInternal ()#5 0x0000000000513370 in XLogReadRecord ()#6 0x0000000000502c6b in ReadRecord ()#7 0x00000000005092ba in StartupXLOG ()#8 0x00000000006d0ff1 in StartupProcessMain ()#9 0x00000000005179fa in AuxiliaryProcessMain ()#10 0x00000000006cd15a in StartChildProcess ()#11 0x00000000006cd27a in PostmasterStateMachine ()#12 0x00000000006ce215 in reaper ()#13 <signal handler called>#14 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6#15 0x00000000006ced35 in ServerLoop ()#16 0x00000000006d0898 in PostmasterMain ()#17 0x000000000048075e in main ()StartupXLOG(void)...
