CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS
sql_stmt VARCHAR2(10000);
merge_stmt CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
set_clause VARCHAR2(10000);
insert_clause_dst VARCHAR2(10000);
insert_clause_src VARCHAR2(10000);
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
resultset tbl_nmbr;
insert_rows_affected NUMBER(10);
v_merge_count NUMBER := 0;

chunk_sql               VARCHAR2(4000);
task_name               VARCHAR2(100);
L_TRY                   NUMBER(6);
L_STATUS                NUMBER(6);
chunk_size              NUMBER(10) := 20000;

CURSOR cur_tables IS
    SELECT *
    FROM temp_date_col_mapper
    WHERE table_name IN (
        SELECT table_name
        FROM leaf_nodes
        WHERE table_name IN ('UD_MPI_MPG_ID')
    );

BEGIN
FOR table_rec IN cur_tables LOOP
BEGIN
-- Get table metadata
SELECT last_update_date, create_date, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM temp_date_col_mapper
WHERE table_name = table_rec.table_name;

-- Define task name
        task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS');

        -- Create task
        DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);

        -- Chunk SQL based on ROW_NUMBER()
        chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' ||
                     'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
                     'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' ||
                     'WHERE MOD(rn - 1, ' || chunk_size || ') = 0';

        -- Create chunks
        DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
            task_name => task_name,
            sql_stmt  => chunk_sql,
            by_rowid  => FALSE
        );

        -- Define MERGE dynamic block that prints merged count
        merge_stmt := '
        DECLARE
            v_cnt NUMBER;
        BEGIN
            ' || ip_merge_stmt_1 || ' WHERE rn BETWEEN :start_id AND :end_id; ' || '
            v_cnt := SQL%ROWCOUNT;
            DBMS_OUTPUT.PUT_LINE(''Task: ' || ''' || task_name || ''' || ', Chunk Start ID: '' || :start_id || '', Rows Merged: '' || v_cnt);
            COMMIT;
        END;';

        -- Run merge in parallel
        DBMS_PARALLEL_EXECUTE.RUN_TASK(
            task_name      => task_name,
            sql_stmt       => merge_stmt,
            language_flag  => DBMS_SQL.NATIVE,
            parallel_level => 8
        );

        -- Handle retries if needed
        L_TRY := 0;
        L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);

        WHILE L_TRY < 2 AND L_STATUS != DBMS_PARALLEL_EXECUTE.FINISHED LOOP
            DBMS_OUTPUT.PUT_LINE('Retrying task: ' || task_name);
            L_TRY := L_TRY + 1;
            DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name);
            L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
        END LOOP;

        -- Clean up
        DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name);

    EXCEPTION
        WHEN OTHERS THEN
            DBMS_OUTPUT.PUT_LINE('Error processing table: ' || table_rec.TABLE_NAME);
            DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
    END;
END LOOP;

END;