CREATE OR REPLACE PROCEDURE compare_tables_with_multi AS
sql_stmt VARCHAR2(10000);
merge_stmt CLOB;
ip_merge_stmt_1 CLOB;
ip_merge_stmt_2 CLOB;
set_clause VARCHAR2(10000);
insert_clause_dst VARCHAR2(10000);
insert_clause_src VARCHAR2(10000);
ip_tbl_last_update_col VARCHAR2(1000);
ip_tbl_create_date_col VARCHAR2(1000);
ip_tbl_primary_col VARCHAR2(1000);
resultset tbl_nmbr;
insert_rows_affected NUMBER(10);
v_merge_count NUMBER := 0;
chunk_sql VARCHAR2(4000);
task_name VARCHAR2(100);
L_TRY NUMBER(5);
L_STATUS NUMBER(5);
chunk_size NUMBER(10) := 20000;
CURSOR cur_tables IS
SELECT *
FROM temp_date_col_mapper
WHERE table_name IN (
SELECT table_name
FROM leaf_nodes
WHERE table_name IN ('UD_MPI_MPG_ID')
);
BEGIN
FOR table_rec IN cur_tables LOOP
BEGIN
-- Get metadata for the table
SELECT last_update_col, create_date_col, primary_key, merge_stmt_1, merge_stmt_2
INTO ip_tbl_last_update_col, ip_tbl_create_date_col, ip_tbl_primary_col, ip_merge_stmt_1, ip_merge_stmt_2
FROM temp_date_col_mapper
WHERE table_name = table_rec.table_name;
-- Create unique task name
task_name := 'COMPARE_' || table_rec.TABLE_NAME || '_T' || TO_CHAR(SYSDATE, 'YYYYMMDDHH24MISS');
-- Create the parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => task_name);
-- Create chunk SQL using ROW_NUMBER
chunk_sql := 'SELECT rn AS start_id, rn + ' || chunk_size || ' - 1 AS end_id ' ||
'FROM (SELECT ROW_NUMBER() OVER (ORDER BY ' || ip_tbl_primary_col || ') AS rn ' ||
'FROM cr_s_0_x.' || table_rec.TABLE_NAME || ') ' ||
'WHERE MOD(rn, ' || chunk_size || ') = 0';
-- Create chunks
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => task_name,
sql_stmt => chunk_sql,
by_rowid => FALSE
);
-- Construct MERGE statement for each chunk
merge_stmt := ip_merge_stmt_1 ||
TO_CLOB(' WHERE rn BETWEEN :start_id AND :end_id ') ||
TO_CLOB(') src ') ||
ip_merge_stmt_2;
-- Optional: print merge SQL
DBMS_OUTPUT.PUT_LINE(merge_stmt);
-- Run merge in parallel
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => task_name,
sql_stmt => merge_stmt,
language_flag => DBMS_SQL.NATIVE,
parallel_level => 8
);
-- Get row count using chunk stats
SELECT SUM(sofar)
INTO v_merge_count
FROM user_parallel_execute_chunks
WHERE task_name = task_name;
DBMS_OUTPUT.PUT_LINE('MERGE COUNT = ' || v_merge_count);
-- Retry logic if needed
L_TRY := 0;
L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
WHILE (L_TRY < 2 AND L_STATUS <> DBMS_PARALLEL_EXECUTE.FINISHED) LOOP
DBMS_OUTPUT.PUT_LINE('Retrying parallel execution...');
L_TRY := L_TRY + 1;
DBMS_PARALLEL_EXECUTE.RESUME_TASK(task_name);
L_STATUS := DBMS_PARALLEL_EXECUTE.TASK_STATUS(task_name);
END LOOP;
-- Drop task after processing
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('Error in processing table ' || table_rec.TABLE_NAME);
DBMS_OUTPUT.PUT_LINE('Error: ' || SQLERRM);
DBMS_OUTPUT.PUT_LINE('Trace: ' || DBMS_UTILITY.FORMAT_ERROR_STACK);
DBMS_OUTPUT.PUT_LINE('Backtrace: ' || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE);
-- Drop task if created
BEGIN
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => task_name);
EXCEPTION
WHEN OTHERS THEN NULL;
END;
END;
END LOOP;
END;
/