[ https://issues.apache.org/jira/browse/CARBONDATA-4042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Venugopal Reddy K updated CARBONDATA-4042: ------------------------------------------ Description: *Issue:* At present, When we do insert into table select from or create table as select from, we lauch one single task per node. Whereas when we do a simple select * from table query, tasks launched are equal to number of carbondata files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). Thus, slows down the load performance of insert into select and ctas cases. Refer [Community discussion regd. task lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html] *Suggestion:* Lauch the same number of tasks as in select query for insert into select and ctas cases when the target table is of no-sort. SI creationSI creation 1. DDL -> Parser -> CarbonCreateSecondaryIndexCommand do all Validations(list important once) acquireLockForSecondaryIndexCreation()acquire locks(compact, meta, dele_seg lock) preparetableInfo(prepare column schema, set positionref as sort,inherit local dict from main table ) for SI table & addIndexInfoToParentTable (create indexinfo and add to main table) CreateTablePreExecutionEvent(for acl work) Create SI table(sparksession.sql(create ...)) addIndexTableInfo, refreshTable index table, add indexInfo to hive metastore as Serde addOrModifyTableProperty(indexTableExists -> true) and refresh table(refresh catalog table) 2. try load, LoadDataForSecondaryIndex 1. prepare load model for SI 2. read table status and setinto load model 3. if loadmeta is empty, just return, else start load to SI 4. getValidseg, if yes go ahead, else return 5. prepare segmentIdToLoadStartTimeMapping, and prepare secindeModel 6. create exeSer based on threadpool size for parallel load of segments to SI 7. LoadTableSIPreExecutionEvent(ACL load events) 8. try toget seg local for all valid segment, if u get for all , add to valid, else add to skipped segment 9. start load for valid segs, update SI table status for in progress 10. if sort scope not global sort CarbonSecondaryIndexRDD internalGetPartitions prepareInputFormat and getSplits() internalCompute Sort blocks, prepareTaskBlockMap, prepare CarbonSecondaryIndexExecutor exec.processTableBlocks(prepare query model and execute query and return Iterator<CarbonRowBatch>) SecondaryIndexQueryResultProcessor(prepare seg prop from processingQuery result) SecondaryIndexQueryResultProcessor.processQueryResult(init tempLoc, sort data rows,processResult(does sort on data in iterators) prepareRowObjectForSorting addRowForSorting and startSorting initializeFinalThreadMergerForMergeSort();,initDataHandler();readAndLoadDataFromSortTempFiles(); write the carbon files to indextable store path WriteSegmentfile getLoadresult from future and make, SUccess and failed seg list if (failedSeglist not empty) if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else \{ just make markedfordelet and next load take care } } else create projections list including PR and create a datafram from MT loadDataUsingGlobalSort writeSegmentFile getLoadresult from future and make, SUccess and failed seg list if (failedSeglist not empty) if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else \{ just make markedfordelet and next load take care } } 11. if (successSISegments.nonEmpty && !isCompactionCall) update status for in progress (can avoid this) mergeIndexFiles writeSegmentFile(can be avoided, shreelekya working on it) readTable statusfile and prepareLoad model for merge datafiles mergeDataFilesSISegments -> scanSegmentsAndSubmitJob -> triggerCompaction -> CarbonSIRebuildRDD internalGetPartitions prepareInputFormat and getSplits() internalCompute CarbonCompactionExecutor.processTableBlocks() close(delete old data files) deleteOldIndexOrMergeIndexFiles writeSegmentFile for each mergedSegment updateTableStatusFile readTableStatusFile writeLoadDetailsIntoFile(updated, new Index and datasize into tablestatus file) mergeIndexFiles for newly generated index files for merged data files If IndexServer enabled clear cahce else clear driver cache 12. update table staus for success 13. if (!isCompactionCall) { triggerPrepriming(trigger pre priming for SI) 14. if (failedSISegments.nonEmpty && !isCompactionCall) { update table status for MFD 15. if (!isCompactionCall) { LoadTableSIPostExecutionEvent 16. if skippedSegmentNotEmpty, isSITableEnabled to false 17. deleteLoadsAndUpdateMetadata 18. Relase segment locks3. if checkMainTableSegEqualToSISeg isSITableEnabled - true 4. CreateTablePostExecutionEvent 5. releaseLocks(meta, dele_seg, compact) Refresh issue1. calling 3 refresh avoid it2. check if dummy is req or not3. it inherits same sort scope as MT during load, but if we have load option sort scope in MT, it will not inherit, check once4. no need to create load model first, check load meta and then do load model5. carbonLoadModel.getLoadMetadataDetails always null6. avoid CarbonEnv.getCarbonTable in createSecIndex7. createSecondaryIndex, if indextable is null, get from carbon metastore, instea of lookup relation to hive metastore8. avoid multiple in progress9. why new load model for merge data files was: *Issue:* At present, When we do insert into table select from or create table as select from, we lauch one single task per node. Whereas when we do a simple select * from table query, tasks launched are equal to number of carbondata files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). Thus, slows down the load performance of insert into select and ctas cases. Refer [Community discussion regd. task lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html] *Suggestion:* Lauch the same number of tasks as in select query for insert into select and ctas cases when the target table is of no-sort. > Insert into select and CTAS launches fewer tasks(task count limited to number of nodes in cluster) even when target table is of no_sort > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: CARBONDATA-4042 > URL: https://issues.apache.org/jira/browse/CARBONDATA-4042 > Project: CarbonData > Issue Type: Improvement > Components: data-load, spark-integration > Reporter: Venugopal Reddy K > Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > *Issue:* > At present, When we do insert into table select from or create table as select from, we lauch one single task per node. Whereas when we do a simple select * from table query, tasks launched are equal to number of carbondata files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK). > Thus, slows down the load performance of insert into select and ctas cases. > Refer [Community discussion regd. task lauch|http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html] > > *Suggestion:* > Lauch the same number of tasks as in select query for insert into select and ctas cases when the target table is of no-sort. > > SI creationSI creation > 1. DDL -> Parser -> CarbonCreateSecondaryIndexCommand do all Validations(list important once) acquireLockForSecondaryIndexCreation()acquire locks(compact, meta, dele_seg lock) preparetableInfo(prepare column schema, set positionref as sort,inherit local dict from main table ) for SI table & addIndexInfoToParentTable (create indexinfo and add to main table) CreateTablePreExecutionEvent(for acl work) Create SI table(sparksession.sql(create ...)) addIndexTableInfo, refreshTable index table, add indexInfo to hive metastore as Serde addOrModifyTableProperty(indexTableExists -> true) and refresh table(refresh catalog table) 2. try load, LoadDataForSecondaryIndex 1. prepare load model for SI 2. read table status and setinto load model 3. if loadmeta is empty, just return, else start load to SI 4. getValidseg, if yes go ahead, else return 5. prepare segmentIdToLoadStartTimeMapping, and prepare secindeModel 6. create exeSer based on threadpool size for parallel load of segments to SI 7. LoadTableSIPreExecutionEvent(ACL load events) 8. try toget seg local for all valid segment, if u get for all , add to valid, else add to skipped segment 9. start load for valid segs, update SI table status for in progress 10. if sort scope not global sort CarbonSecondaryIndexRDD internalGetPartitions prepareInputFormat and getSplits() internalCompute Sort blocks, prepareTaskBlockMap, prepare CarbonSecondaryIndexExecutor exec.processTableBlocks(prepare query model and execute query and return Iterator<CarbonRowBatch>) SecondaryIndexQueryResultProcessor(prepare seg prop from processingQuery result) SecondaryIndexQueryResultProcessor.processQueryResult(init tempLoc, sort data rows,processResult(does sort on data in iterators) prepareRowObjectForSorting addRowForSorting and startSorting initializeFinalThreadMergerForMergeSort();,initDataHandler();readAndLoadDataFromSortTempFiles(); write the carbon files to indextable store path WriteSegmentfile getLoadresult from future and make, SUccess and failed seg list if (failedSeglist not empty) if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else \{ just make markedfordelet and next load take care } } else create projections list including PR and create a datafram from MT loadDataUsingGlobalSort writeSegmentFile getLoadresult from future and make, SUccess and failed seg list if (failedSeglist not empty) if (isCompactionCall || !isLoadToFailedSISegments) \{ fail the SI load } else \{ just make markedfordelet and next load take care } } 11. if (successSISegments.nonEmpty && !isCompactionCall) update status for in progress (can avoid this) mergeIndexFiles writeSegmentFile(can be avoided, shreelekya working on it) readTable statusfile and prepareLoad model for merge datafiles mergeDataFilesSISegments -> scanSegmentsAndSubmitJob -> triggerCompaction -> CarbonSIRebuildRDD internalGetPartitions prepareInputFormat and getSplits() internalCompute CarbonCompactionExecutor.processTableBlocks() close(delete old data files) deleteOldIndexOrMergeIndexFiles writeSegmentFile for each mergedSegment updateTableStatusFile readTableStatusFile writeLoadDetailsIntoFile(updated, new Index and datasize into tablestatus file) mergeIndexFiles for newly generated index files for merged data files If IndexServer enabled clear cahce else clear driver cache 12. update table staus for success 13. if (!isCompactionCall) { triggerPrepriming(trigger pre priming for SI) 14. if (failedSISegments.nonEmpty && !isCompactionCall) { update table status for MFD 15. if (!isCompactionCall) { LoadTableSIPostExecutionEvent 16. if skippedSegmentNotEmpty, isSITableEnabled to false 17. deleteLoadsAndUpdateMetadata 18. Relase segment locks3. if checkMainTableSegEqualToSISeg isSITableEnabled - true 4. CreateTablePostExecutionEvent 5. releaseLocks(meta, dele_seg, compact) > > > > > > > > Refresh issue1. calling 3 refresh avoid it2. check if dummy is req or not3. it inherits same sort scope as MT during load, but if we have load option sort scope in MT, it will not inherit, check once4. no need to create load model first, check load meta and then do load model5. carbonLoadModel.getLoadMetadataDetails always null6. avoid CarbonEnv.getCarbonTable in createSecIndex7. createSecondaryIndex, if indextable is null, get from carbon metastore, instea of lookup relation to hive metastore8. avoid multiple in progress9. why new load model for merge data files > > -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |