1,进程启动:

(3)Pg启动过程 - 图1

(3)Pg启动过程 - 图2(3)Pg启动过程 - 图3

1,main->PostMasterMain->ServerLoop:主进程的启动,在此从服务器连接,sender进程启动

selres = select(nSockets, &rmask, NULL, NULL, &timeout);监听文件描述符
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
if (selres > 0){
for (i = 0; i < MAXLISTEN; i++){
//找一个有效的fd
if (ListenSocket[i] == PGINVALID_SOCKET)
break;
if (FD_ISSET(ListenSocket[i], &rmask)){
port = ConnCreate(ListenSocket[i]);->StreamConnection->accept
BackendStartup(port);//fork进程

2,监听到文件描述符(不同需求到达)变化后,在不同的端口创建不同的进程

pid = fork_process();

3,子进程创建后会在postgresMain启动
所有后台进程都在这里启动

PostgresMain
postgres main loop -- all backends, interactive or otherwise start here

2,sender与receiver启动

(3)Pg启动过程 - 图4

2.1,sender启动细节:

注:主进程启动sender后,由sender全权负责所有工作

#1 0x000000000071db41 in WaitEventSetWait ()
#2 0x000000000071df69 in WaitLatchOrSocket ()
#3 0x00000000006f3b64 in WalSndLoop ()
#4 0x00000000006f4320 in exec_replication_command ()
#5 0x000000000073e61d in PostgresMain ()
#6 0x00000000006cf9da in ServerLoop ()
#7 0x00000000006d0898 in PostmasterMain ()
#8 0x000000000048075e in main ()

PostgresMain ()

WalSndSignals(); 信号初始化
MarkPostmasterChildWalSender(); 告诉master进程我是sender 存活时间更长
WalSndErrorCleanup(void) 上一个walsender错误发生需要进行清理
forbidden_in_wal_sender

exec_replication_command ()

case T_StartReplicationCmd: 如果是开始复制的命令
StartReplication(cmd);执行开始复制

walsender主循环:
回调参数:函数指针
WalSndLoop(XLogSendPhysical);``WalSndLoop(WalSndSendDataCallback send_data)

为什么在walsnd断点插不进去,而在exec_replication_command可以

WalSndLoop

1ProcessRepliesIfAny(void):处理备机的回复

  1. for (;;)
  2. {
  3. pq_startmsgread();
  4. //判断是否有字符输入,否则break
  5. r = pq_getbyte_if_available(&firstchar);
  6. switch (firstchar)
  7. {
  8. /*
  9. * 'd' means a standby reply wrapped in a CopyData packet.
  10. d为备机的回复
  11. */
  12. case 'd':
  13. ProcessStandbyMessage();
  14. received = true;
  15. break;
  16. /*
  17. c为完成流复制的请求
  18. * CopyDone means the standby requested to finish streaming.
  19. * Reply with CopyDone, if we had not sent that already.
  20. */
  21. case 'c':
  22. if (!streamingDoneSending)
  23. {
  24. pq_putmessage_noblock('c', NULL, 0);
  25. streamingDoneSending = true;
  26. }
  27. streamingDoneReceiving = true;
  28. received = true;
  29. break;
  30. /*
  31. 备机关闭socket
  32. * 'X' means that the standby is closing down the socket.
  33. */
  34. case 'X':
  35. proc_exit(0);
  36. default:
  37. ereport(FATAL,
  38. (errcode(ERRCODE_PROTOCOL_VIOLATION),
  39. errmsg("invalid standby message type \"%c\"",
  40. firstchar)));
  41. }
  42. }

1.1ProcessStandbyReplyMessage(void):接受备机回复wal日志的位置

  1. /*
  2. * Update shared state for this WalSender process based on reply data from
  3. * standby.
  4. 更新位置,根据回复的数据
  5. */
  6. ProcessStandbyReplyMessage(void)
  7. {
  8. WalSnd *walsnd = MyWalSnd;
  9. SpinLockAcquire(&walsnd->mutex);
  10. walsnd->write = writePtr;
  11. walsnd->flush = flushPtr;
  12. walsnd->apply = applyPtr;
  13. if (writeLag != -1 || clearLagTimes)
  14. walsnd->writeLag = writeLag;
  15. if (flushLag != -1 || clearLagTimes)
  16. walsnd->flushLag = flushLag;
  17. if (applyLag != -1 || clearLagTimes)
  18. walsnd->applyLag = applyLag;
  19. walsnd->replyTime = replyTime;
  20. SpinLockRelease(&walsnd->mutex);
  21. /*
  22. * Advance our local xmin horizon when the client confirmed a flush.
  23. 当接受到备机的flush后,更新自己最小的XLOG
  24. */
  25. if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
  26. {
  27. if (SlotIsLogical(MyReplicationSlot))
  28. LogicalConfirmReceivedLocation(flushPtr);
  29. else
  30. PhysicalConfirmReceivedLocation(flushPtr);
  31. }
  32. }

2
问题: 流复制结束,结束进程? pending data,发送数据的条件

  1. //流复制结束,结束循环
  2. if (streamingDoneReceiving && streamingDoneSending &&
  3. !pq_is_send_pending())
  4. break;
  5. 发送数据
  6. /*
  7. * If we don't have any pending data in the output buffer, try to send
  8. * some more. If there is some, we don't bother to call send_data
  9. * again until we've flushed it ... but we'd better assume we are not
  10. * caught up.
  11. */
  12. if (!pq_is_send_pending())
  13. send_data(); //walsndloop的参数:类型为函数指针
  14. else
  15. WalSndCaughtUp = false;
  16. /* Check for replication timeout. */
  17. WalSndCheckTimeOut(); //检测超时
  18. /* Send keepalive if the time has come */
  19. WalSndKeepaliveIfNecessary();
  20. //发送keepalive
  21. (void) WaitLatchOrSocket(MyLatch, wakeEvents,
  22. MyProcPort->sock, sleeptime,
  23. WAIT_EVENT_WAL_SENDER_MAIN);
  24. //发送完毕之后 阻塞

_ 2.1 send_data() ——> _XLogSendPhysical(void)

数据如何流动的,以及全局变量

  1. Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
  2. 将刷盘的wal开始发送
  3. XLogSendPhysical(void){
  4. //获得刷盘地址
  5. SendRqstPtr = GetFlushRecPtr();
  6. //记录尾部和时间线
  7. LagTrackerWrite()
  8. resetStringInfo(&output_message);
  9. pq_sendbyte(&output_message, 'w');
  10. //发送
  11. pq_sendint64(&output_message, startptr); /* dataStart */
  12. pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
  13. pq_sendint64(&output_message, 0); /* sendtime, filled in last */
  14. }

StartReplication(cmd);StartLogicalReplication(cmd);区别

2,2,receiver启动细节

  1. #0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6
  2. #1 0x000000000071db41 in WaitEventSetWait ()
  3. #2 0x000000000071df69 in WaitLatchOrSocket ()
  4. #3 0x00000000006f69d8 in WalReceiverMain ()
  5. #4 0x0000000000517a04 in AuxiliaryProcessMain ()
  6. #5 0x00000000006cd15a in StartChildProcess ()
  7. #6 0x00000000006cd9d8 in MaybeStartWalReceiver ()
  8. #7 0x00000000006ce82f in sigusr1_handler ()
  9. #8 <signal handler called>
  10. #9 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6
  11. #10 0x00000000006ced35 in ServerLoop ()
  12. #11 0x00000000006d0898 in PostmasterMain ()
  13. #12 0x000000000048075e in main ()
  1. standbyServerLoop()
  2. //信号 允许receiver 发现了自己为从服务器
  3. if (WalReceiverRequested)
  4. //启动receiver进程
  5. MaybeStartWalReceiver();
  1. AuxiliaryProcessMain(int argc, char *argv[])
  2. WalReceiverMain(void)
  1. WalReceiverMain.XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);根据类型处理消息
  2. switch (type)
  3. {
  4. //wal消息
  5. case 'w': /* WAL records */
  6. {
  7. /* copy message to StringInfo */
  8. hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
  9. if (len < hdrlen)
  10. ereport(ERROR,
  11. (errcode(ERRCODE_PROTOCOL_VIOLATION),
  12. errmsg_internal("invalid WAL message received from primary")));
  13. appendBinaryStringInfo(&incoming_message, buf, hdrlen);
  14. /* read the fields */
  15. dataStart = pq_getmsgint64(&incoming_message);
  16. walEnd = pq_getmsgint64(&incoming_message);
  17. sendTime = pq_getmsgint64(&incoming_message);
  18. ProcessWalSndrMessage(walEnd, sendTime);
  19. buf += hdrlen;
  20. len -= hdrlen;
  21. XLogWalRcvWrite(buf, len, dataStart);
  22. break;
  23. }
  24. //保活消息
  25. case 'k': /* Keepalive */
  26. {
  27. /* copy message to StringInfo */
  28. hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
  29. if (len != hdrlen)
  30. ereport(ERROR,
  31. (errcode(ERRCODE_PROTOCOL_VIOLATION),
  32. errmsg_internal("invalid keepalive message received from primary")));
  33. appendBinaryStringInfo(&incoming_message, buf, hdrlen);
  34. /* read the fields */
  35. walEnd = pq_getmsgint64(&incoming_message);
  36. sendTime = pq_getmsgint64(&incoming_message);
  37. replyRequested = pq_getmsgbyte(&incoming_message);
  38. ProcessWalSndrMessage(walEnd, sendTime);
  39. /* If the primary requested a reply, send one immediately */
  40. if (replyRequested)
  41. XLogWalRcvSendReply(true, false);
  42. break;
  43. }
  44. }
  45. WalReceiverMain.XLogWalRcvSendReply:不停发送自己的log位置时间等
  46. XLogWalRcvFlush(false);//刷盘,并且让startup进程了解

2.3 startup启动细节

  1. #0 0x00007f06b41b30c3 in __epoll_wait_nocancel () from /lib64/libc.so.6
  2. #1 0x000000000071db41 in WaitEventSetWait ()
  3. #2 0x000000000071df69 in WaitLatchOrSocket ()
  4. #3 0x0000000000504eb4 in XLogPageRead ()
  5. #4 0x0000000000512aa2 in ReadPageInternal ()
  6. #5 0x0000000000513370 in XLogReadRecord ()
  7. #6 0x0000000000502c6b in ReadRecord ()
  8. #7 0x00000000005092ba in StartupXLOG ()
  9. #8 0x00000000006d0ff1 in StartupProcessMain ()
  10. #9 0x00000000005179fa in AuxiliaryProcessMain ()
  11. #10 0x00000000006cd15a in StartChildProcess ()
  12. #11 0x00000000006cd27a in PostmasterStateMachine ()
  13. #12 0x00000000006ce215 in reaper ()
  14. #13 <signal handler called>
  15. #14 0x00007f06b41a9b23 in __select_nocancel () from /lib64/libc.so.6
  16. #15 0x00000000006ced35 in ServerLoop ()
  17. #16 0x00000000006d0898 in PostmasterMain ()
  18. #17 0x000000000048075e in main ()
  19. StartupXLOG(void)...