We previously discussed replicated state machines, leader election in the RAFT consensus algorithm, and log-based state maintenance. Now, we’ll focus on log replication across peer servers. We’ll also examine how RAFT ensures that the same commands are applied to the state machine at a given log index on every peer, because of the leader’s one-way log distribution to followers.

Leader Initialisation

Once a candidate becomes leader we call setupLeader function which initiates go routine for each peer in the RAFT cluster, Each go routine of respective peer is responsible for replicating new log entries or sending heartbeat via AppendEntries RPC.

 1func (rf *Raft) setupLeader() {
 2	rf.mu.Lock()
 3	defer rf.mu.Unlock()
 4
 5	ctx, cancel := context.WithCancel(context.Background())
 6	rf.leaderCancelFunc = cancel
 7
 8	for peerIndex := range rf.peers {
 9		if peerIndex != rf.me {
10			rf.nextIndex[peerIndex] = len(rf.log) + rf.snapshotLastLogIndex
11			go rf.replicate(peerIndex, ctx)
12		}
13	}
14
15	go func() {
16		for !rf.killed() {
17			select {
18			case <-ctx.Done():
19				return
20			case <-time.After(HEARTBEAT_TIMEOUT):
21				rf.replicatorCond.Broadcast()
22			}
23		}
24	}()
25}

When starting a replication logic thread for each peer, we also send a Go context to rf.replicate. This context shows the current leader’s status. If the leader steps down, we call rf.leaderCancelFunc, which cancels the context. When a context is canceled, the ctx.Done() channel closes, stopping any waiting for results from that channel. More information about Go’s context can be found here. Our RAFT struct includes a condition variable, replicatorCond (of type *sync.Cond), that signals all peer goroutines of the leader to run the replication logic every HEARTBEAT_TIMEOUT. This ensures that a heartbeat is sent to each peer at the specified interval. A condition variable provides functions like Wait, Signal, and Broadcast. If multiple threads are waiting on a condition variable after releasing the underlying mutex, Signal will wake one of such waiting threads, and Broadcast will wake all waiting threads. Here we are using Broadcast to wake all waiting threads of each peer to send the next heartbeat. A condition variable is an operating system concept, and you can read more about the same from here. To read about the API of Go’s of type *sync.Cond, check Go’s official Docs here.

Log Replication

Each individual peer thread runs the replicate method, given below is implementation of rf.replicate function

  1func (rf *Raft) replicate(peerIndex int, ctx context.Context) {
  2	logMismatch := false
  3	for !rf.killed() {
  4		select {
  5		case <-ctx.Done():
  6			dprintf("[leader-replicate: %d | peer: %d]: Leader stepped down from leadership before initiating replicate.\n", rf.me, peerIndex)
  7			return
  8		default:
  9			rf.mu.Lock()
 10
 11			if rf.state != StateLeader {
 12				dprintf("[leader-replicate: %d | peer: %d]: Not a leader anymore, winding up my leadership setup.\n", rf.me, peerIndex)
 13				if rf.leaderCancelFunc != nil {
 14					rf.leaderCancelFunc()
 15					rf.replicatorCond.Broadcast()
 16				}
 17				rf.mu.Unlock()
 18				return
 19			}
 20
 21			// Only waiting when:
 22			// - There is no log to send - In this case the wait will be signalled by the heartbeat
 23			// - We are in a continuous loop to find correct nextIndex for this peer with retrial RPCs
 24			if !logMismatch && rf.nextIndex[peerIndex] >= len(rf.log)+rf.snapshotLastLogIndex {
 25				dprintf("[leader-replicate: %d | peer: %d]: Wating for next signal to replicate.\n", rf.me, peerIndex)
 26				rf.replicatorCond.Wait()
 27			}
 28
 29			if rf.killed() {
 30				return
 31			}
 32
 33			reply := &AppendEntriesReply{}
 34			logStartIndex := rf.nextIndex[peerIndex]
 35			var prevLogTerm int
 36			prevLogIndex := logStartIndex - 1
 37			peer := rf.peers[peerIndex]
 38
 39			if prevLogIndex-rf.snapshotLastLogIndex > 0 {
 40				prevLogTerm = rf.log[prevLogIndex-rf.snapshotLastLogIndex].Term
 41			} else if prevLogIndex == rf.snapshotLastLogIndex {
 42				prevLogTerm = rf.snapshotLastLogTerm
 43			} else {
 44				// prevLogIndex < rf.snapshotLastLogIndex
 45				prevLogTerm = -1
 46			}
 47
 48			if prevLogTerm == -1 {
 49				logMismatch = true
 50				// Leader does not have logs at `prevLogIndex` because of compaction
 51				// Leader needs to send snaphot to the peer as part of log repairing
 52				args := &InstallSnapshotArgs{
 53					Term:              rf.currentTerm,
 54					LeaderId:          rf.me,
 55					LastIncludedIndex: rf.snapshotLastLogIndex,
 56					LastIncludedTerm:  rf.snapshotLastLogTerm,
 57					Data:              rf.persister.ReadSnapshot(),
 58				}
 59
 60				reply := &InstallSnapshotReply{}
 61
 62				dprintf("[leader-install-snapshot: %d: peer: %d]: InstallSnapshot RPC with index: %d and term: %d sent.\n", rf.me, peerIndex, args.LastIncludedIndex, args.LastIncludedTerm)
 63				rf.mu.Unlock()
 64
 65				ok := rf.sendRPCWithTimeout(ctx, peer, peerIndex, "InstallSnapshot", args, reply)
 66
 67				rf.mu.Lock()
 68
 69				if ok {
 70					if reply.Term > rf.currentTerm {
 71						dprintf("[leader-install-snapshot: %d: peer: %d]: Stepping down from leadership, Received InstallSnapshot reply from peer %d, with term %d > %d - my term\n", rf.me, peerIndex, peerIndex, reply.Term, rf.currentTerm)
 72
 73						rf.state = StateFollower
 74						rf.currentTerm = reply.Term
 75						rf.lastContactFromLeader = time.Now()
 76
 77						if rf.leaderCancelFunc != nil {
 78							rf.leaderCancelFunc()
 79							rf.replicatorCond.Broadcast()
 80						}
 81						rf.persist(nil)
 82						rf.mu.Unlock()
 83						return
 84					}
 85
 86					dprintf("[leader-install-snapshot: %d: peer: %d]: Snapshot installed successfully\n", rf.me, peerIndex)
 87					rf.nextIndex[peerIndex] = rf.snapshotLastLogIndex + 1
 88					rf.mu.Unlock()
 89					continue
 90				} else {
 91					dprintf("[leader-install-snapshot: %d: peer: %d]: Snapshot installtion failed!\n", rf.me, peerIndex)
 92					rf.nextIndex[peerIndex] = rf.snapshotLastLogIndex
 93					rf.mu.Unlock()
 94					continue
 95				}
 96			} else {
 97				replicateTerm := rf.currentTerm
 98
 99				logEndIndex := len(rf.log) + rf.snapshotLastLogIndex
100				nLogs := logEndIndex - logStartIndex
101
102				args := &AppendEntriesArgs{
103					Term:         rf.currentTerm,
104					LeaderId:     rf.me,
105					PrevLogIndex: prevLogIndex,
106					PrevLogTerm:  prevLogTerm,
107					LeaderCommit: rf.commitIndex,
108				}
109
110				if nLogs > 0 {
111					entriesToSend := rf.log[logStartIndex-rf.snapshotLastLogIndex:]
112					args.Entries = make([]LogEntry, len(entriesToSend))
113					copy(args.Entries, entriesToSend)
114					dprintf("[leader-replicate: %d | peer: %d]: Sending AppendEntries RPC in term %d with log index range [%d, %d).\n", rf.me, peerIndex, replicateTerm, logStartIndex, logEndIndex)
115				} else {
116					dprintf("[leader-replicate: %d | peer: %d]: Sending AppendEntries Heartbeat RPC for term %d.\n", rf.me, peerIndex, replicateTerm)
117				}
118
119				rf.mu.Unlock()
120				ok := rf.sendRPCWithTimeout(ctx, peer, peerIndex, "AppendEntries", args, reply)
121
122				rf.mu.Lock()
123
124				if ok {
125					select {
126					case <-ctx.Done():
127						dprintf("[leader-replicate: %d | peer: %d]: Leader stepped down from leadership after sending AppendEntries RPC.\n", rf.me, peerIndex)
128						rf.mu.Unlock()
129						return
130					default:
131						// Check fot change in state during the RPC call
132						if rf.currentTerm != replicateTerm || rf.state != StateLeader {
133							// Leader already stepped down
134							dprintf("[leader-replicate: %d | peer: %d]: Checked ladership state after getting AppendEntries Reply, Not a leader anymore, Winding up my leadership setup.\n", rf.me, peerIndex)
135							if rf.leaderCancelFunc != nil {
136								rf.leaderCancelFunc()
137								rf.replicatorCond.Broadcast()
138							}
139							rf.mu.Unlock()
140							return
141						}
142
143						// Handle Heartbeat response
144						if !reply.Success {
145							if reply.Term > rf.currentTerm {
146								dprintf("[leader-replicate: %d: peer: %d]: Stepping down from leadership, Received ApppendEntries reply from peer %d, with term %d > %d - my term\n", rf.me, peerIndex, peerIndex, reply.Term, rf.currentTerm)
147
148								rf.state = StateFollower
149								rf.currentTerm = reply.Term
150								rf.lastContactFromLeader = time.Now()
151
152								if rf.leaderCancelFunc != nil {
153									rf.leaderCancelFunc()
154									rf.replicatorCond.Broadcast()
155								}
156								rf.persist(nil)
157								rf.mu.Unlock()
158								return
159							}
160
161							// Follower rejected the AppendEntries RPC beacuse of log conflict
162							// Update the nextIndex for this follower
163							logMismatch = true
164							followersConflictTermPresent := false
165							if reply.ConflictTerm != -1 {
166								for i := prevLogIndex - rf.snapshotLastLogIndex; i > 0; i-- {
167									if rf.log[i].Term == reply.ConflictTerm {
168										rf.nextIndex[peerIndex] = i + 1 + rf.snapshotLastLogIndex
169										followersConflictTermPresent = true
170										break
171									}
172
173								}
174
175								if !followersConflictTermPresent {
176									rf.nextIndex[peerIndex] = reply.ConflictIndex
177								}
178							} else {
179								rf.nextIndex[peerIndex] = reply.ConflictIndex
180							}
181							dprintf("[leader-replicate: %d | peer: %d]: Logmismatch - AppendEntries RPC with previous log index %d of previous log term %d failed. Retrying with log index:%d.\n", rf.me, peerIndex, prevLogIndex, prevLogTerm, rf.nextIndex[peerIndex])
182							rf.mu.Unlock()
183							continue
184						} else {
185							dprintf("[leader-replicate: %d | peer: %d]: responded success to AppendEntries RPC in term %d with log index range [%d, %d).\n", rf.me, peerIndex, replicateTerm, logStartIndex, logEndIndex)
186							logMismatch = false
187
188							if nLogs > 0 {
189								// Log replication successful
190								rf.nextIndex[peerIndex] = prevLogIndex + nLogs + 1
191								rf.matchIndex[peerIndex] = prevLogIndex + nLogs
192
193								// Need to track majority replication upto latest log index
194								// - So that we can update commitIndex
195								// - Apply logs upto commitIndex
196								// Just an idea - maybe this needs to be done separately in a goroutine
197								// Where we continuosly check lastApplied and commitIndex
198								// Apply and lastApplied to commit index and if leader send the response to apply channel
199								majority := len(rf.peers)/2 + 1
200
201								for i := len(rf.log) - 1; i > rf.commitIndex-rf.snapshotLastLogIndex; i-- {
202									matchedPeerCount := 1
203									if rf.log[i].Term == rf.currentTerm {
204										for pi := range rf.peers {
205											if pi != rf.me && rf.matchIndex[pi] >= i+rf.snapshotLastLogIndex {
206												matchedPeerCount++
207											}
208										}
209									}
210
211									// Largest possible log index greater the commitIndex replicated at majority of peers
212									// update commitIndex
213									if matchedPeerCount >= majority {
214										rf.commitIndex = i + rf.snapshotLastLogIndex
215
216										dprintf("[leader-replicate: %d | peer: %d]: Log index %d replicated to majority of peers.(%d/%d peers), updating commitIndex to : %d, current lastApplied value: %d.\n", rf.me, peerIndex, i+rf.snapshotLastLogIndex, matchedPeerCount, len(rf.peers), rf.commitIndex, rf.lastApplied)
217										rf.applyCond.Signal()
218										break
219									}
220								}
221							}
222
223							rf.mu.Unlock()
224							continue
225						}
226					}
227				}
228				dprintf("[leader-replicate: %d | peer %d]: Sending AppendEntries RPC at leader's term: %d, failed. Payload prevLogIndex: %d | prevLogTerm: %d.\n", rf.me, peerIndex, replicateTerm, prevLogIndex, prevLogTerm)
229				rf.mu.Unlock()
230				continue
231			}
232		}
233	}
234}

The replicate function operates in a continuous loop. Inside this loop, a select statement monitors the leader’s context (ctx) status. If ctx.Done() is not yet closed, it confirms the current server is still the leader. To ensure correctness, the server’s current state is also checked to be StateLeader. If it’s not, rf.leaderCanelFunc is explicitly called to relinquish leadership, which closes the context’s done channel, signalling the leader’s relinquishment to other components. Additionally, the loop waits on rf.replicatorCond when there are no more logs to transmit and no log mismatch between the leader’s and the peer’s logs. As mentioned in Part 1 of this blog series, servers periodically compact their logs to manage their size. This involves taking a snapshot of the current state and then truncating the log up to that point. If a follower’s log is significantly behind the leader’s and the leader has already truncated its log, the leader may need to send a snapshot using the InstallSnapshot RPC. We will delve deeper into log compaction in a later part of this series. Leader maintains volatile state for each peer nextIndex and matchIndex, these two properties for each peer are maintained by the leader, and it tells the leader which next log index to send to the peer and index up to which logs are replicated for that peer respectively.

Heartbeat rules

The AppendEntries RPC also functions as a heartbeat, allowing the leader to communicate its identity and log status to all followers. A follower only accepts this RPC if:

  • The leader’s Term value is greater than or equal to the follower’s current term. If not, the follower rejects the RPC, signalling that the sender (mistakenly believing itself to be the leader) is outdated. The follower sends its own reply.Term to indicate this. Upon receiving a Term in the reply that is higher than its own, the leader relinquishes leadership.
  • The leader includes PrevLogIndex and PrevLogTerm with each AppendEntries RPC. These values are vital, as they indicate the leader’s log state by specifying the index and term of the log up to which the leader believes the follower’s log matches. If there is a discrepancy, the follower responds with reply.Success set to false, indicating a log mismatch. The leader then decreases nextIndex[peerIndex] for that peer and sends another heartbeat with an earlier log index. This process continues until a matching log index is found. At that point, the follower accepts the leader’s AppendEntries RPC and uses the reconcileLogs function to remove conflicting log entries and replace them with the log range sent by the leader from [PrevLogIndex + 1, LatestLogIndexAtLeader]. extracted from args,Entries field. This RAFT property is followed by each index of the follower, so if the current index and term match with the leader, so does the log index previous to this one, or else a conflict would have occurred previously itself. So this single property ensures that the log state matches between leader and follower up to the latest index; otherwise, the follower will not accept the heartbeat or normal AppendEntries RPC with log entries, and the to-and-fro of heartbeats after that to find the non-conflict index at the follower is called the log correction process. Later on, we will see how we can optimise this log correction because currently the leader needs to try for each log index in decreasing order, which can take a lot of time if the follower has been stale for a long time and during that time the leader’s log has grown a lot.

Pasted image 20251005180921.png The image above, shows a simple successful log replication situation. Here, Follower 1 and Follower 2 have the same initial part of the log as the leader. Also, the leader knows exactly which parts of the log need to be sent to each follower.

In the replicate method, we use a logMismatch flag within the replication loop. This flag shows if there was a log problem when sending the AppendEntries RPC in the last loop. If there was a problem, we don’t wait (we don’t call rf.replicatorCond.Wait(), which releases the lock and puts the thread to sleep). This is because, if there’s a log problem, we want to fix the log quickly, so we send the next AppendEntries RPC right away with the updated previous log index and term. If the previous log index is less than the index up to which we’ve taken a snapshot of the state and shortened the log, we send an InstallSnapshot RPC to send the snapshot to the follower instead, since we don’t have the needed logs. We’ll discuss snapshots and log compaction more later. We still follow one rule closely: if we get an RPC reply with a term value higher than the leader’s current term, the leader gives up leadership by cancelling the context. See the previous part for more information. Pasted image 20251005233402.png In the image, follower 1’s nextIndex is at index 7. When an AppendEntries RPC is sent with prevLogIndex=6 and prevLogTerm=2, the follower detects a log mismatch and rejects the RPC. Because the follower’s term is also 2, the leader keeps its leadership role and immediately reduces its nextIndex for follower 1 to 6. It then sends another AppendEntries RPC (without waiting due to logMismatch=true) with prevLogIndex=5 and prevLogTerm=2. This second RPC is accepted. The leader sends Entries=[1, 2], causing the entries after index 5 to be replaced by these new entries (handled by ). Thus, the leader can erase uncommitted entries from the follower during log correction. Follower 2 accepts the initial AppendEntries RPC because there is no conflict. Since The leader manages each peer in its own separate goroutine, After each successful RPC response, the leader checks if a majority (replication factor) has replicated the log up to a specific index. In this scenario, once follower 2 responds (likely before follower 1, which is still correcting its log), the leader will have replicated log index 7 on 2/3 of the peers (including itself).

The leader then updates its commitIndex and signals a condition variable called rf.applyCond. First, we will examine how the follower handles the AppendEntries RPC.  Then, building on this understanding, we will discuss commitIndex, lastApplied, and rf.applyCond, explaining how they ensure log entries are committed, what “committed” means, what “applying” a log means, and how it is handled."

Handling AppendEntries RPC

Lets now look at how a Follower handles an AppendEntries RPC from a leader, we will revisit leader’s replicate method when we get to how logs are committed and applied to the state machine by the leader and how this change is propagated to followers. Given below is the implementation of AppendEntries RPC:

  1func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  2	// Handling heart beats from leader
  3	// When follower recieves heartbeat it should check for heartbeat validity
  4
  5	// [Case: 1] if args.Term < rf.currentTerm then it means this follower is either a new leader
  6	// OR it voted for someone else after least heartbeat.
  7	// In such case follower should return reply.Success = false indicating it does not acknowledge the sender
  8	// as leader anymore and should set reply.Term = rf.currentTerm. (This indicated sender to step down from leadership)
  9
 10	// [Case: 2] if args.Term >= rf.currentTerm the the current follower/candidate becomes follower again accepting current leader
 11	// In such case rf.currentTerm = args.Term and reply.Success = true with reply.Term = rf.currentTerm (same term as the sender)
 12	// In Case 2 we should reset the election timer since we have received the heartbeat from a genuine leader
 13
 14	// In Case 1 since the previous leader is now left behind, there are 3 possibilities:
 15	// 		A. The current peer is a candidate now and an election was already started by it or even finished with it being the current leader
 16	// 		B. Some other peer is now a candidate with an election going on OR is a leader now
 17	// 		C. No election has taken place till now
 18	// In all the above cases we should not interrupt the election timeout or anything else
 19
 20	rf.mu.Lock()
 21	defer rf.mu.Unlock()
 22
 23	if len(args.Entries) == 0 {
 24		// Heartbeat
 25		dprintf("[Peer: %d]: Recieved AppendEntries RPC as heartbeat from leader %d for term %d with commitIndex %d.\n", rf.me, args.LeaderId, args.Term, args.LeaderCommit)
 26	}
 27
 28	if args.Term < rf.currentTerm {
 29		dprintf("[Peer: %d]: AppendEntries RPC from leader %d for term %d not acknowledged, Leader's term: %d is older than my current term: %d.\n", rf.me, args.LeaderId, args.Term, args.Term, rf.currentTerm)
 30		reply.Success = false
 31		reply.Term = rf.currentTerm
 32
 33		return
 34	} else {
 35		// Sent by current leader
 36		// Reset election timeout
 37		if rf.state == StateLeader {
 38			dprintf("[Peer: %d]: AppendEntries RPC recieved from current leader: %d, winding up my leadership setup.\n", rf.me, args.LeaderId)
 39			if rf.leaderCancelFunc != nil {
 40				rf.leaderCancelFunc()
 41				rf.replicatorCond.Broadcast()
 42			}
 43		}
 44
 45		rf.lastContactFromLeader = time.Now()
 46
 47		rf.currentTerm = args.Term
 48		rf.state = StateFollower
 49
 50		rf.persist(nil)
 51
 52		latestLogIndex := len(rf.log) - 1
 53		logTerm := 0
 54
 55		if args.PrevLogIndex > rf.snapshotLastLogIndex && args.PrevLogIndex <= latestLogIndex+rf.snapshotLastLogIndex {
 56			logTerm = rf.log[args.PrevLogIndex-rf.snapshotLastLogIndex].Term
 57		} else if args.PrevLogIndex == rf.snapshotLastLogIndex {
 58			logTerm = rf.snapshotLastLogTerm
 59		} else if args.PrevLogIndex < rf.snapshotLastLogIndex {
 60			// This should trigger InstallSnapshot from leader
 61			reply.ConflictIndex = rf.snapshotLastLogIndex + 1
 62			reply.ConflictTerm = -1
 63		}
 64
 65		if logTerm != args.PrevLogTerm {
 66			dprintf("[Peer: %d]: AppendEntries RPC from leader %d for term %d not acknowledged. Log terms do not match, (Leader term, Leader index): (%d, %d), peer's term for same log index: %d.\n", rf.me, args.LeaderId, args.Term, args.PrevLogTerm, args.PrevLogIndex, logTerm)
 67			reply.Success = false
 68			reply.Term = rf.currentTerm
 69
 70			if args.PrevLogIndex <= latestLogIndex+rf.snapshotLastLogIndex {
 71				reply.ConflictTerm = logTerm
 72				i := args.PrevLogIndex - rf.snapshotLastLogIndex
 73				// Find fist index of `logTerm` in follower's log
 74				for ; i > 0; i-- {
 75					if rf.log[i].Term != logTerm {
 76						break
 77					}
 78				}
 79
 80				reply.ConflictIndex = i + 1 + rf.snapshotLastLogIndex
 81			}
 82
 83			return
 84		}
 85
 86		reply.Success = true
 87		reply.Term = rf.currentTerm
 88
 89		dprintf("[Peer: %d]: AppendEntries RPC from leader %d for term %d acknowledged.\n", rf.me, args.LeaderId, args.Term)
 90
 91		if len(args.Entries) > 0 {
 92			rf.reconcileLogs(args.Entries, args.PrevLogIndex)
 93		}
 94
 95		if args.LeaderCommit > rf.commitIndex {
 96			// If leaderCommit > commitIndex, set commitIndex =
 97			// min(leaderCommit, index of last new entry)
 98
 99			rf.commitIndex = args.LeaderCommit
100
101			if args.LeaderCommit >= len(rf.log)+rf.snapshotLastLogIndex {
102				rf.commitIndex = len(rf.log) - 1 + rf.snapshotLastLogIndex
103			}
104
105			dprintf("[Peer: %d]: Updated commmit index to %d.\n", rf.me, rf.commitIndex)
106
107			rf.applyCond.Signal()
108		}
109	}
110
111}

Like other Remote Procedure Calls, we initially verify if the sender’s args.Term is lower than the recipient’s term. If it is, we reject the call, notifying the sending leader that its leadership is outdated and the cluster’s term has advanced since it was last the leader. We include the term value in the response, prompting the sender to relinquish leadership and begin synchronizing. As mentioned before, this synchronization occurs when the peer receives a heartbeat from the current leader, initiating any necessary log corrections. This demonstrates that log flow consistently originates from the current leader to all other followers. The same principle applies if the receiving peer believes itself to be the leader of a term less than or equal to the term in the AppendEntries RPC. In this scenario, we acknowledge the sender as the rightful current leader and invoke rf.leaderCancelFunc() to force the current peer to step down from leadership. Additionally, we broadcast on rf.replicatorCond to ensure that all peer-specific replication threads spawned by this receiver recognize the effect of the cancelled context as quickly as possible. After confirming the leader’s identity, we begin by updating the lastContactFromLeader timestamp. Each follower has a ticker that verifies if the last communication from the leader exceeded a set threshold (the election timeout). If it does, the follower can become a candidate and initiate an election. Updating the timestamp prevents unnecessary elections. More information about leader election can be found in the preceding section. After this We move on to verifying log consistency. This is done using the PrevLogIndex and PrevLogTerm fields of the AppendEntries RPC. The follower checks if it has a log entry at PrevLogIndex and whether that entry’s term matches PrevLogTerm. If either of these checks fail, it indicates that the follower’s log has diverged from the leader’s. In such cases, the follower rejects the RPC by returning Success = false. This rejection signals to the leader that it needs to adjust its nextIndex for this follower and retry from an earlier point in its log, Effectively “backing up” until it finds the last matching entry. Now, without any optimization, this correction process could be quite inefficient: the leader would decrement nextIndex[peer] one step at a time and re-send the RPC repeatedly until it finds a match. If the follower has fallen far behind (say, by N entries), this would take N separate RPCs: a linear-time process that can become costly in large logs. To avoid that, Raft introduces a log inconsistency optimization. Instead of merely rejecting the RPC, the follower includes two additional fields in its response: ConflictTerm and ConflictIndex. These tell the leader the term of the conflicting entry and the first index where that term begins. With this information, the leader can skip behind intelligently. It looks for the last entry in its own log with the same ConflictTerm.

  • If it finds one, it sets the follower’s nextIndex to just after that entry: meaning it assumes both logs agree up to that point, This assumption may fail when the AppendEntries is again rejected with backed up nextIndex value which means the followers log are lagging behind leader’s term at that index, so leader will repeat this process again with new ConflictIndex and ConflictTerm
  • If it doesn’t find any entry with that term, it sets nextIndex to ConflictIndex, effectively jumping back to where the follower’s conflicting term started. This optimization allows the leader to perform correction per term, rather than per index, drastically reducing the number of RPCs needed for log repair, especially when followers lag by many entries. You can check the code for replicate method above (especially when the res.Success is set to false with res.Term <= leader.currentTerm) to check the above behaviour of leader during log conflict. Once a consistent prefix is identified, the follower can safely append new entries included in the RPC. In the implementation, this is handled by the reconcileLogs method, which takes the leader’s entries and merges them into the follower’s log while ensuring term consistency.
 1func (rf *Raft) reconcileLogs(leaderEntries []LogEntry, leaderPrevLogIndex int) {
 2	nextIndex := leaderPrevLogIndex + 1
 3	currentLogLength := len(rf.log) + rf.snapshotLastLogIndex
 4	leaderEntriesIndex := 0
 5
 6	for nextIndex < currentLogLength && leaderEntriesIndex < len(leaderEntries) {
 7		if rf.log[nextIndex-rf.snapshotLastLogIndex].Term != leaderEntries[leaderEntriesIndex].Term {
 8			break
 9		}
10
11		nextIndex++
12		leaderEntriesIndex++
13	}
14
15	if leaderEntriesIndex < len(leaderEntries) {
16		// If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it
17		rf.log = rf.log[:nextIndex-rf.snapshotLastLogIndex]
18
19		//  Append any new entries not already in the log
20		rf.log = append(rf.log, leaderEntries[leaderEntriesIndex:]...)
21
22		rf.persist(nil)
23	}
24
25	dprintf("[Peer %d]: Reconciled logs with leader from index %d, current logs length %d.\n", rf.me, leaderPrevLogIndex, len(rf.log)+rf.snapshotLastLogIndex)
26}

This function essentially performs the final step of log reconciliation after the agreement point between leader and follower has been established.

  • It starts from the index immediately after leaderPrevLogIndex: the point up to which both leader and follower agree.
  • Then it walks through both logs in parallel, comparing terms for each entry.
  • As soon as it finds a mismatch (a conflict in term at the same index), it stops: that’s where the logs diverge.
  • From that point onward, it truncates the follower’s log (rf.log = rf.log[:nextIndex - rf.snapshotLastLogIndex]) and appends all remaining entries from the leader’s log.

This effectively overwrites any inconsistent or “stale” entries on the follower with the authoritative entries from the leader, restoring complete log alignment. So, reconcileLogs is the place where the follower’s log is actually modified. It’s where the “diff” from the leader is applied after the last point of agreement. Once again, this highlights the fundamental directionality of Raft’s replication flow: log entries always move from the leader to the followers, never the other way around.

Committing and Applying Log Entries

If a leader receives successful AppendEntries RPC responses from a majority of followers, it indicates that the logs of a majority of peers match the leader’s up to a specific index. Because agreement on a current log index implies agreement on all preceding indices, the leader can confidently assert that the logs match up to that given index. Consequently, the leader will then designate that index as the commit index.

 1for i := len(rf.log) - 1; i > rf.commitIndex-rf.snapshotLastLogIndex; i-- {
 2	matchedPeerCount := 1
 3	if rf.log[i].Term == rf.currentTerm {
 4		for pi := range rf.peers {
 5			if pi != rf.me && rf.matchIndex[pi] >= i+rf.snapshotLastLogIndex {
 6				matchedPeerCount++
 7			}
 8		}
 9	}
10
11	// Largest possible log index greater the commitIndex replicated at majority of peers
12	// update commitIndex
13	if matchedPeerCount >= majority {
14		rf.commitIndex = i + rf.snapshotLastLogIndex
15
16		dprintf("[leader-replicate: %d | peer: %d]: Log index %d replicated to majority of peers.(%d/%d peers), updating commitIndex to : %d, current lastApplied value: %d.\n", rf.me, peerIndex, i+rf.snapshotLastLogIndex, matchedPeerCount, len(rf.peers), rf.commitIndex, rf.lastApplied)
17		rf.applyCond.Signal()
18		break
19	}
20}

This is evident at the end of the replicate method. After obtaining a success response for each peer-specific thread, the method verifies the matchIndex for all peers. Similar to nextIndex, each leader maintains a matchIndex for each peer. This matchIndex essentially signifies the index up to which the peer’s log matches the leader’s, or, in other words, the extent to which the leader’s log has been replicated to that follower. As you can see if a new log entry has been replicated to majority, We also signal on applyCond condition variable. Upon starting, each peer launches both a ticker go routine and an applier go routine. The applier’s function is to retrieve logs from the lastAppliedIndex (the point of the last application) up to the most recent commit index and execute each command on the state machine. In this setup, every Raft server instance receives an apply channel at startup, provided by the service utilizing RAFT. In our key-value server example, commands from log entries sent to the apply channel are processed by the KV server, which then executes them.

 1func (rf *Raft) applier() {
 2	for !rf.killed() {
 3		rf.mu.Lock()
 4		for rf.lastApplied >= rf.commitIndex {
 5			rf.applyCond.Wait()
 6		}
 7
 8		if rf.killed() {
 9			return
10		}
11
12		i := rf.lastApplied + 1
13		if i < rf.snapshotLastLogIndex+1 {
14			i = rf.snapshotLastLogIndex + 1
15		}
16
17		messages := make([]raftapi.ApplyMsg, 0)
18		for ; i <= rf.commitIndex; i++ {
19			// Skip entries that are already in the snapshot
20			if i <= rf.snapshotLastLogIndex {
21				continue
22			}
23			msg := raftapi.ApplyMsg{
24				CommandValid: true,
25				Command:      rf.log[i-rf.snapshotLastLogIndex].Command,
26				CommandIndex: i,
27			}
28			messages = append(messages, msg)
29		}
30		commitIndex := rf.commitIndex
31		rf.mu.Unlock()
32
33		for _, msg := range messages {
34			if rf.killed() {
35				return
36			}
37			rf.applyCh <- msg
38		}
39
40		rf.mu.Lock()
41		rf.lastApplied = commitIndex
42		dprintf("[Peer: %d]: Applied logs till latest commit index: %d, lastApplied : %d.\n", rf.me, rf.lastApplied, rf.commitIndex)
43		rf.mu.Unlock()
44	}
45}

While sending something to a Go channel, the channel itself can work concurrently with multiple threads pushing data from it. So, we do not need the rf.mu lock while sending commands, which can be a blocking operation. In a non-buffered channel, an inserted element is only accepted when there is an active consumer to consume that element. Therefore, to avoid holding the Raft server state’s lock mu, we release the lock and only reacquire it after we have applied all the logs to log out the Raft server’s state.

If the commitIndex is not greater than lastAppliedIndex, the applier thread pauses by unlocking and waiting for a signal. When the commitIndex of a leader or follower changes, the applier thread is signaled to resume and apply new logs (between lastAppliedIndex and commitIndex) to the state machine. The leader’s commitIndex is updated upon majority replication. To ensure the replicated state of follower matches the leader’s RAFT state (same commitIndex) and the underlying state machine’s state (e.g., the KV server’s state), the updated commitIndex of leader is included in the next upcoming AppendEntries RPC heartbeat. Each follower, after agreeing with the RPC, updates its own commitIndex and signals its applier thread to update the state machine with the new logs received from the leader.

 1func (rf *Raft) (args *AppendEntriesArgs, reply *AppendEntriesReply) {
 2// ...existing code
 3if args.LeaderCommit > rf.commitIndex {
 4			// If leaderCommit > commitIndex, set commitIndex =
 5			// min(leaderCommit, index of last new entry)
 6
 7			rf.commitIndex = args.LeaderCommit
 8
 9			if args.LeaderCommit >= len(rf.log)+rf.snapshotLastLogIndex {
10				rf.commitIndex = len(rf.log) - 1 + rf.snapshotLastLogIndex
11			}
12
13			dprintf("[Peer: %d]: Updated commmit index to %d.\n", rf.me, rf.commitIndex)
14
15			rf.applyCond.Signal()
16		}
17}

Why Majority-Based Commitment Guarantees Safety

A critical property of Raft is that once a log entry is committed (replicated on a majority of servers), it is guaranteed to eventually be applied to the state machine even if the current leader crashes immediately after marking it committed. This guarantee comes from how Raft defines commitment and how election safety works together with log matching. When a leader replicates a new log entry to a majority of peers and advances its commitIndex, it knows that at least a majority of servers (including itself) now store that entry in their logs. Even if the leader crashes at this point (before or after applying the entry), that entry cannot be lost, because any future leader must have that entry as well. Here’s why: during the next election, any candidate must receive votes from a majority of peers to become the new leader. And followers will only vote for a candidate whose log is at least as up-to-date as their own, Meaning the candidate’s last log term and index are greater than or equal to the follower’s. Since the previously committed entry was stored on a majority, at least one of those peers (and in fact, at least one from that same majority) will participate in the next election. Because votes also require a majority, any new leader must intersect with that previous majority i.e., it must share at least one server that already contained the committed entry. As a result, the new leader’s log will contain that committed entry (or a longer version of it), ensuring that committed entries never roll back and will always be part of future leaders’ logs. This intersection property between consecutive majorities ensures Raft’s log commitment safety:

Once a log entry is committed, it will never be overwritten or lost, It will eventually be applied to the state machine on all servers.

This reasoning also explains why it’s safe for the applier thread to apply all entries up to commitIndex. The guarantee that every future leader’s log contains those entries means that applying them to the state machine can never be undone, preserving Raft’s state machine safety property, no server ever applies a log entry that another server later overwrites. However, this safety holds under one important restriction, we will discuss this restriction shortly, but first lets see few scenarios with the end client in picture using a service which is replicated with RAFT.

  1. Leader replication and response to client At the service layer, this guarantee extends all the way to the client. When a client issues a command to the leader (for example, a PUT request to the key-value server), the service calls the Raft instance’s Start() method. This appends the command as a new log entry and begins replication across peers (We signal the replicatorCond condition variable which resumes the replicate method this time with a new log entry). The Start() call returns immediately, but the KV service does not reply to the client yet, it waits until that log entry is replicated on a majority of servers and marked as committed. Only after the applier thread applies this entry to the state machine does the service respond to the client with success. This ensures that every command acknowledged to the client is backed by a majority-replicated, committed log entry, one that will survive even if the current leader crashes right after responding. Responding before commitment would be unsafe, since uncommitted entries can be discarded if the leader fails before majority replication completes. To visualize this flow, here’s how the end-to-end interaction between the client, leader, followers, and the KV state machine looks:
  sequenceDiagram
    participant C as Client
    participant L as Leader (KV + Raft)
    participant F1 as Follower 1
    participant F2 as Follower 2
    participant SM as State Machine

    C->>L: PUT x=5 (Client Request)
    L->>L: Start(cmd) → Append log entry
    L->>F1: AppendEntries RPC (new log entry)
    L->>F2: AppendEntries RPC (new log entry)
    F1-->>L: AppendEntries Reply (Success)
    F2-->>L: AppendEntries Reply (Success)
    Note over L: Entry replicated on majority
    L->>L: Update commitIndex, signal applier
    L->>SM: Apply command x=5
    SM-->>L: State updated
    L-->>C: Success (after apply)

    Note over L,C: Client response only after<br/>majority replication & application
  1. Leader Crashes After Committing but Before Applying:
  sequenceDiagram
    participant C as Client
    participant L as Leader (Term T)
    participant F1 as Follower 1
    participant F2 as Follower 2
    participant L' as New Leader (Term T+1)

    C->>L: Start(command)
    L->>L: Append log entry [term T, index i]
    L->>F1: AppendEntries [entry i]
    L->>F2: AppendEntries [entry i]
    F1-->>L: Ack
    F2-->>L: Ack
    L->>L: commitIndex = i (majority replicated)
    note right of L: ✅ Command is committed but not yet applied
    L--X C: (Crashes before replying)
    note over C: ⏱️ Client times out (no response)
    C->>L': Retry Start(command)
    note right of L': Election happens, new leader<br>contains entry i (since committed)
    L'->>L': Apply entry i to state machine
    L'-->>C: Respond with result (deduplicated via client ID)
  • The leader had committed the log (i.e., replicated to majority), but before applying or responding, it crashed.
  • The client times out, since no response was received.
  • A new leader is elected (say, L'), which must contain all committed entries from the previous term.
  • During recovery, L'’s applier thread applies the committed command to its state machine.
  • The client retries the command.
    • If the system uses a unique client ID + command ID, the server can detect it’s a duplicate request and not reapply the command, ensuring exactly-once semantics.

Why leader commits log entry when its of currentTerm only?

You may have noticed in the replicate method that when counting majority replication we start form the lastest log index at the leader and if that log’s term is equal to leader’s current term then only we check the replication factor for that log and update commit index and applier thread in case of majority, But why is this the case?

Pasted image 20251012235526.png Above image is form the original RAFT paper and it explain the scenario pretty well

A time sequence showing why a leader cannot determine commitment using log entries from older terms. In (a) S1 is leader and partially replicates the log entry at index 2. In (b) S1 crashes; S5 is elected leader for term 3 with votes from S3, S4, and itself, and accepts a different entry at log index 2. In (c) S5 crashes; S1 restarts, is elected leader, and continues replication. At this point, the log entry from term 2 has been replicated on a majority of the servers, but it is not committed. If S1 crashes as in (d), S5 could be elected leader (with votes from S2, S3, and S4) and overwrite the entry with its own entry from term 3. However, if S1 replicates an entry from its current term on a majority of the servers before crashing, as in (e), then this entry is committed (S5 cannot win an election). At this point all preceding entries in the log are committed as well.

at point (c) S1 is the leader again and it had log entry from term 2 which it was able to replicate to majority (S1, S2 & S3) but the current term at this point should be atleast 4 since there was already an election for term 3 with leader S5 and at point (c) S1 also appended one log for term 4. Now if at point (d) S1 crashes again S5 can again be elected for term 5 because S2, S3 and S4 will grant vote to S5 because form their point of view S5 has atleast uptodate log with lates log index <= theri own log and term value of 3 whihc is greater than 2. Because of this we cannot allow a log of older terms to be committed because a peer with log of term greater then the term of log replicated to majority can become a leader and as part of log correction erase this older term logs so we cannot commit such logs.

Conclusion

In this second part, we successfully implemented Raft’s Log Replication to build a fault-tolerant in-memory system. We explored the core mechanics: the leader’s use of the AppendEntries RPC for both sending log entries and serving as a heartbeat, the vital log consistency checks using PrevLogIndex/PrevLogTerm, and the log reconciliation process used by followers to fix conflicts. Finally, we detailed how entries achieve a committed state—only after successful replication to a majority of peers, and are then applied to the state machine by a dedicated applier goroutine.

6.5840 Labs provide us with following test cases for log replication:

 1Test (3B): basic agreement (reliable network)...
 2  ... Passed --  time  2.1s #peers 3 #RPCs    14 #Ops    0
 3Test (3B): RPC byte count (reliable network)...
 4  ... Passed --  time  3.3s #peers 3 #RPCs    46 #Ops    0
 5Test (3B): test progressive failure of followers (reliable network)...
 6  ... Passed --  time  6.4s #peers 3 #RPCs    41 #Ops    0
 7Test (3B): test failure of leaders (reliable network)...
 8  ... Passed --  time  8.6s #peers 3 #RPCs    68 #Ops    0
 9Test (3B): agreement after follower reconnects (reliable network)...
10  ... Passed --  time  5.4s #peers 3 #RPCs    50 #Ops    0
11Test (3B): no agreement if too many followers disconnect (reliable network)...
12  ... Passed --  time  5.2s #peers 5 #RPCs    75 #Ops    0
13Test (3B): concurrent Start()s (reliable network)...
14  ... Passed --  time  2.7s #peers 3 #RPCs    14 #Ops    0
15Test (3B): rejoin of partitioned leader (reliable network)...
16  ... Passed --  time  9.1s #peers 3 #RPCs    81 #Ops    0
17Test (3B): leader backs up quickly over incorrect follower logs (reliable network)...
18  ... Passed --  time 23.8s #peers 5 #RPCs   775 #Ops    0
19Test (3B): RPC counts aren't too high (reliable network)...
20  ... Passed --  time  4.1s #peers 3 #RPCs    30 #Ops    0

In the next part we will see some nuances of log compaction with snapshots and how persistence work currently in lab environment.

References