CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS
    sql_stmt                VARCHAR2(10000);
    merge_stmt              CLOB;
    ip_merge_stmt_1         CLOB;
    ip_merge_stmt_2         CLOB;
    set_clause              VARCHAR2(10000);
    insert_clause_dst       VARCHAR2(10000);
    insert_clause_src       VARCHAR2(10000);
    ip_tbl_last_update_col  VARCHAR2(1000);
    ip_tbl_create_date_col  VARCHAR2(1000);
    ip_tbl_primary_col      VARCHAR2(1000);
    resultset               tbl_nmbr;
    insert_rows_affected    NUMBER(10);
    v_merge_count           NUMBER := 0;

    chunk_sql               VARCHAR2(4000);
    task_name               VARCHAR2(100);
    L_TRY                   NUMBER(5);
    L_STATUS                NUMBER(5);
    chunk_size              NUMBER(10) := 20000;

    CURSOR cur_tables IS
        SELECT *
        FROM temp_date_col_mapper
        WHERE table_name IN (
            SELECT table_name
            FROM leaf_nodes
            WHERE table_name IN ('UD_MPI_MPG_ID')
        );

BEGIN
    FOR table_rec IN cur_tables LOOP
        BEGIN
            -- Get metadata for the table
            SELECT last_update_col, create_date_col, primary_key, merge_stmt_1, merge_stmt_2
            INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
            FROM temp_date_col_mapper
            WHERE table_name = table_rec.table_name;

            -- Create unique task name
            task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_T' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS');

            -- Create the parallel task
            DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);

            -- Create chunk SQL using ROW_NUMBER
            chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' ||
                         'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
                         'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' ||
                         'WHERE MOD(rn, ' || chunk_size || ') = 0';

            -- Create chunks
            DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
                task_name => task_name,
                sql_stmt  => chunk_sql,
                by_rowid  => FALSE
            );

            -- Construct MERGE statement for each chunk
            merge_stmt := ip_merge_stmt_1 ||
                          TO_CLOB(' WHERE rn BETWEEN :start_id AND :end_id ') ||
                          TO_CLOB(') src ') ||
                          ip_merge_stmt_2;

            -- Optional: print merge SQL
            DBMS_OUTPUT.PUT_LINE(merge_stmt);

            -- Run merge in parallel
            DBMS_PARALLEL_EXECUTE.RUN_TASK(
                task_name      => task_name,
                sql_stmt       => merge_stmt,
                language_flag  => DBMS_SQL.NATIVE,
                parallel_level => 8
            );

            -- Get row count using chunk stats
            SELECT SUM(sofar)
            INTO v_merge_count
            FROM user_parallel_execute_chunks
            WHERE task_name = task_name;

            DBMS_OUTPUT.PUT_LINE('MERGE COUNT = ' || v_merge_count);

            -- Retry logic if needed
            L_TRY := 0;
            L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
            WHILE (L_TRY < 2 AND L_STATUS <> DBMS_PARALLEL_EXECUTE.FINISHED) LOOP
                DBMS_OUTPUT.PUT_LINE('Retrying parallel execution...');
                L_TRY := L_TRY + 1;
                DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name);
                L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
            END LOOP;

            -- Drop task after processing
            DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);

        EXCEPTION
            WHEN OTHERS THEN
                DBMS_OUTPUT.PUT_LINE('Error in processing table ' || table_rec.TABLE_NAME);
                DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
                DBMS_OUTPUT.PUT_LINE('Trace: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
                DBMS_OUTPUT.PUT_LINE('Backtrace: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);
                -- Drop task if created
                BEGIN
                    DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
                EXCEPTION
                    WHEN OTHERS THEN NULL;
                END;
        END;
    END LOOP;
END;
/