Previously, we explained the Worker Module Source Code Analysis: How DolphinScheduler Achieves Billion-Level Task Scheduling. Today, let's explore the principles of the Master module source code.

Master Slot Calculation

Image description

Core Code Logic:

Located in org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify

public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
    List<Server> serverList = masterNodeInfo.values().stream()
            // TODO This filters out busy master nodes.
            .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
            .map(this::convertHeartBeatToServer).collect(Collectors.toList());
    // TODO Synchronize master nodes.
    syncMasterNodes(serverList);
}

Calculation of totalSlot and currentSlot

private void syncMasterNodes(List<Server> masterNodes) {
    slotLock.lock();
    try {
        this.masterPriorityQueue.clear();
        // TODO All master nodes are added to masterPriorityQueue, for example, [192.168.220.1:12345,192.168.220.2:12345].
        this.masterPriorityQueue.putAll(masterNodes);
        // TODO Get the position of the local IP in the queue.
        int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
        // TODO Total number of nodes.
        int tempTotalSlot = masterNodes.size();
        // TODO Normally, this should not be less than 0.
        if (tempCurrentSlot < 0) {
            totalSlot = 0;
            currentSlot = 0;
            log.warn("Current master is not in active master list");
        } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
            // TODO This essentially records the slot index, e.g., with two slots, the index would be 0 or 1.
            totalSlot = tempTotalSlot;
            currentSlot = tempCurrentSlot;
            log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
        }
    } finally {
        slotLock.unlock();
    }
}

this.masterPriorityQueue.putAll(masterNodes); Computes the Index

public void putAll(Collection<Server> serverList) {
    for (Server server : serverList) {
        this.queue.put(server);
    }
    // TODO Updates `hostIndexMap`, storing  -> index mappings.
    refreshMasterList();
}

private void refreshMasterList() {
    hostIndexMap.clear();
    Iterator<Server> iterator = queue.iterator();
    int index = 0;
    while (iterator.hasNext()) {
        Server server = iterator.next();
        String addr = NetUtils.getAddr(server.getHost(), server.getPort());
        hostIndexMap.put(addr, index);
        index += 1;
    }
}

Master Consuming Commands to Generate Process Instances

Image description

Logic for Retrieving Commands:

For example, with two Master nodes:
masterCount=2 thisMasterSlot=0  master1
masterCount=2 thisMasterSlot=1  master2

Data in the command table:
1 master2
2 master1
3 master2
4 master1

SELECT * 
FROM t_ds_command
WHERE id % #{masterCount} = #{thisMasterSlot}
ORDER BY process_instance_priority, id ASC
LIMIT #{limit};

A common question: What if one master updates but the other does not?

For example, in `master1`, the table looks like this:
1  master2
2  master1
3  master2
4  master1

However, in `master2`, all commands seem to belong to it, leading to potential duplicate consumption:
1 master1
2 master1
3 master1
4 master1

Located in org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand

@Transactional
public @Nullable ProcessInstance handleCommand(String host,
                                               Command command) throws CronParseException, CodeGenerateException {
    // TODO Create process instance
    ProcessInstance processInstance = constructProcessInstance(command, host);
    // Cannot construct process instance, return null
    if (processInstance == null) {
        log.error("Scan command, command parameter is incorrect: {}", command);
        commandService.moveToErrorCommand(command, "Process instance is null");
        return null;
    }
    processInstance.setCommandType(command.getCommandType());
    processInstance.addHistoryCmd(command.getCommandType());
    processInstance.setTestFlag(command.getTestFlag());
    // If the processDefinition is serial
    ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
            processInstance.getProcessDefinitionVersion());
    // TODO Check if execution is serial
    if (processDefinition.getExecutionType().typeIsSerial()) {
        saveSerialProcess(processInstance, processDefinition);
        if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
            setSubProcessParam(processInstance);
            triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
            deleteCommandWithCheck(command.getId());
            // TODO This is a poor design, returning null here affects task triggering
            return null;
        }
    } else {
        // TODO Execute in parallel
        processInstanceDao.upsertProcessInstance(processInstance);
    }

    // TODO Insert a record into `triggerRelation` table linking process instance and triggerCode
    triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
    // TODO Set parameters for subprocesses
    setSubProcessParam(processInstance);
    // TODO Delete the command
    deleteCommandWithCheck(command.getId());
    return processInstance;
}

Note:

This method is annotated with @Transactional, meaning that both process instance creation and command deletion occur in the same transaction. If different Masters consume the same command, one of them will fail to delete the command, causing an exception and triggering a database rollback.

Workflow Start Process

Image description

DAG Partitioning & Task Submission

Image description

Master Event State Transition

Diagram link: Master Event State Transition

Analysis of TaskEventService Component's TaskEventDispatchThread and TaskEventHandlerThread

Image description

Essentially, both the Master’s own state (DISPATCH) and the status reports from the Worker (RUNNING, UPDATE_PID, RESULT) are added to eventQueue.

  • TaskEventDispatchThread retrieves events in a blocking manner and places them into the corresponding TaskExecuteRunnable (but does not execute them).
  • Only TaskEventHandlerThread actually submits TaskExecuteRunnable to TaskExecuteThreadPool.