Previously, we explained the Worker Module Source Code Analysis: How DolphinScheduler Achieves Billion-Level Task Scheduling. Today, let's explore the principles of the Master module source code.
Master Slot Calculation
Core Code Logic:
Located in org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify
public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
List<Server> serverList = masterNodeInfo.values().stream()
// TODO This filters out busy master nodes.
.filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
.map(this::convertHeartBeatToServer).collect(Collectors.toList());
// TODO Synchronize master nodes.
syncMasterNodes(serverList);
}
Calculation of totalSlot
and currentSlot
private void syncMasterNodes(List<Server> masterNodes) {
slotLock.lock();
try {
this.masterPriorityQueue.clear();
// TODO All master nodes are added to masterPriorityQueue, for example, [192.168.220.1:12345,192.168.220.2:12345].
this.masterPriorityQueue.putAll(masterNodes);
// TODO Get the position of the local IP in the queue.
int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
// TODO Total number of nodes.
int tempTotalSlot = masterNodes.size();
// TODO Normally, this should not be less than 0.
if (tempCurrentSlot < 0) {
totalSlot = 0;
currentSlot = 0;
log.warn("Current master is not in active master list");
} else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
// TODO This essentially records the slot index, e.g., with two slots, the index would be 0 or 1.
totalSlot = tempTotalSlot;
currentSlot = tempCurrentSlot;
log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
}
} finally {
slotLock.unlock();
}
}
this.masterPriorityQueue.putAll(masterNodes);
Computes the Index
public void putAll(Collection<Server> serverList) {
for (Server server : serverList) {
this.queue.put(server);
}
// TODO Updates `hostIndexMap`, storing -> index mappings.
refreshMasterList();
}
private void refreshMasterList() {
hostIndexMap.clear();
Iterator<Server> iterator = queue.iterator();
int index = 0;
while (iterator.hasNext()) {
Server server = iterator.next();
String addr = NetUtils.getAddr(server.getHost(), server.getPort());
hostIndexMap.put(addr, index);
index += 1;
}
}
Master Consuming Commands to Generate Process Instances
Logic for Retrieving Commands:
For example, with two Master nodes:
masterCount=2 thisMasterSlot=0 master1
masterCount=2 thisMasterSlot=1 master2
Data in the command table:
1 master2
2 master1
3 master2
4 master1
SELECT *
FROM t_ds_command
WHERE id % #{masterCount} = #{thisMasterSlot}
ORDER BY process_instance_priority, id ASC
LIMIT #{limit};
A common question: What if one master updates but the other does not?
For example, in `master1`, the table looks like this:
1 master2
2 master1
3 master2
4 master1
However, in `master2`, all commands seem to belong to it, leading to potential duplicate consumption:
1 master1
2 master1
3 master1
4 master1
Located in org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand
@Transactional
public @Nullable ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateException {
// TODO Create process instance
ProcessInstance processInstance = constructProcessInstance(command, host);
// Cannot construct process instance, return null
if (processInstance == null) {
log.error("Scan command, command parameter is incorrect: {}", command);
commandService.moveToErrorCommand(command, "Process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
processInstance.setTestFlag(command.getTestFlag());
// If the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
// TODO Check if execution is serial
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId());
// TODO This is a poor design, returning null here affects task triggering
return null;
}
} else {
// TODO Execute in parallel
processInstanceDao.upsertProcessInstance(processInstance);
}
// TODO Insert a record into `triggerRelation` table linking process instance and triggerCode
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
// TODO Set parameters for subprocesses
setSubProcessParam(processInstance);
// TODO Delete the command
deleteCommandWithCheck(command.getId());
return processInstance;
}
Note:
This method is annotated with @Transactional
, meaning that both process instance creation and command deletion occur in the same transaction. If different Masters consume the same command, one of them will fail to delete the command, causing an exception and triggering a database rollback.
Workflow Start Process
DAG Partitioning & Task Submission
Master Event State Transition
Diagram link: Master Event State Transition
Analysis of TaskEventService
Component's TaskEventDispatchThread
and TaskEventHandlerThread
Essentially, both the Master’s own state (DISPATCH
) and the status reports from the Worker (RUNNING
, UPDATE_PID
, RESULT
) are added to eventQueue
.
-
TaskEventDispatchThread
retrieves events in a blocking manner and places them into the correspondingTaskExecuteRunnable
(but does not execute them). - Only
TaskEventHandlerThread
actually submitsTaskExecuteRunnable
toTaskExecuteThreadPool
.