GitHub user manishgupta88 opened a pull request:
https://github.com/apache/incubator-carbondata/pull/703 [CARBONDATA-780] Alter table support for compaction through sort step Alter table need to support compaction process where complete data need to be sorted again and then written to file. Currently in compaction process data is directly given to writer step where it is splitted into columns and written. But as columns are sorted from left to right, on dropping a column data will again become unorganized as dropped column data will not be considered during compaction. In these scenarios complete data need to be sorted again and then submitted to writer step. You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/incubator-carbondata compaction_restructure_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/703.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #703 ---- commit b108c22024f6381385f0c394ea6ebe515a2e96b4 Author: ravikiran <[hidden email]> Date: 2017-03-15T15:07:26Z Added class to handle sorting of data for compaction scenario commit 11f80e3f22f68332ced85ae8da3a122d0a52447e Author: manishgupta88 <[hidden email]> Date: 2017-03-15T13:54:05Z Handling for compaction for restructure case. Handled to completely sort the data again if any restructured block is selected for compaction ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Failed with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1350/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Failed with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1360/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1361/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1383/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1424/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110093494 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/AbstractResultProcessor.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.merger; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.processing.datatypes.GenericDataType; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; + +/** + * This class contains the common methods required for result processing during compaction based on + * restructure and mormal scenarios --- End diff -- typo `mormal` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110093866 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/AbstractResultProcessor.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.merger; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.processing.datatypes.GenericDataType; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; + +/** + * This class contains the common methods required for result processing during compaction based on + * restructure and mormal scenarios + */ +public abstract class AbstractResultProcessor { + + /** + * This method will perform the desired tasks of merging the selected slices + * + * @param resultIteratorList + * @return + */ + public abstract boolean execute(List<RawResultIterator> resultIteratorList); + + /** + * This method will create a model object for carbon fact data handler + * + * @param loadModel + * @return + */ --- End diff -- Move this method to CarbonFactDataHandlerModel class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110096123 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java --- @@ -351,4 +351,33 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { } return cardinality; } + + /** + * This method will check for any restructured block in the blocks selected for compaction + * + * @param segmentMapping + * @param dataFileMetadataSegMapping + * @param tableLastUpdatedTime + * @return + */ + public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { + boolean restructuredBlockExists = false; + for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { + String segmentId = taskMap.getKey(); + List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); + for (DataFileFooter dataFileFooter : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, --- End diff -- even for table rename also are we updating the schema timestamp ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110097631 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.merger; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; +import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * This class will process the query result and convert the data + * into a format compatible for data load + */ +public class CompactionResultSortProcessor extends AbstractResultProcessor { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName()); + /** + * carbon load model that contains all the required information for load + */ + private CarbonLoadModel carbonLoadModel; + /** + * carbon table + */ + private CarbonTable carbonTable; + /** + * sortDataRows instance for sorting each row read ad writing to sort temp file + */ + private SortDataRows sortDataRows; + /** + * final merger for merge sort + */ + private SingleThreadFinalSortFilesMerger finalMerger; + /** + * data handler VO object + */ + private CarbonFactHandler dataHandler; + /** + * segment properties for getting dimension cardinality and other required information of a block + */ + private SegmentProperties segmentProperties; + /** + * compaction type to decide whether taskID need to be extracted from carbondata files + */ + private CompactionType compactionType; + /** + * boolean mapping for no dictionary columns in schema + */ + private boolean[] noDictionaryColMapping; + /** + * agg type defined for measures + */ + private char[] aggType; + /** + * segment id + */ + private String segmentId; + /** + * temp store location to be sued during data load + */ + private String tempStoreLocation; + /** + * table name + */ + private String tableName; + /** + * no dictionary column count in schema + */ + private int noDictionaryCount; + /** + * total count of measures in schema + */ + private int measureCount; + /** + * dimension count excluding complex dimension and no dictionary column count + */ + private int dimensionColumnCount; + /** + * whether the allocated tasks has any record + */ + private boolean isRecordFound; + + /** + * @param carbonLoadModel + * @param carbonTable + * @param segmentProperties + * @param compactionType + * @param tableName + */ + public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable, + SegmentProperties segmentProperties, CompactionType compactionType, String tableName) { + this.carbonLoadModel = carbonLoadModel; + this.carbonTable = carbonTable; + this.segmentProperties = segmentProperties; + this.segmentId = carbonLoadModel.getSegmentId(); + this.compactionType = compactionType; + this.tableName = tableName; + } + + /** + * This method will iterate over the query result and convert it into a format compatible + * for data loading + * + * @param resultIteratorList + */ + public boolean execute(List<RawResultIterator> resultIteratorList) { + boolean isCompactionSuccess = false; + try { + initTempStoreLocation(); + initSortDataRows(); + initAggType(); + processResult(resultIteratorList); + // After delete command, if no records are fetched from one split, + // below steps are not required to be initialized. + if (isRecordFound) { + initializeFinalThreadMergerForMergeSort(); + initDataHandler(); + readAndLoadDataFromSortTempFiles(); + } + isCompactionSuccess = true; + } catch (Exception e) { + LOGGER.error(e, "Compaction failed: " + e.getMessage()); + } finally { + // clear temp files and folders created during compaction + deleteTempStoreLocation(); + } + return isCompactionSuccess; + } + + /** + * This method will clean up the local folders and files created during compaction process + */ + private void deleteTempStoreLocation() { + if (null != tempStoreLocation) { + try { + CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) }); + } catch (IOException | InterruptedException e) { + LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage()); + } + } + } + + /** + * This method will iterate over the query result and perform row sorting operation + * + * @param resultIteratorList + */ + private void processResult(List<RawResultIterator> resultIteratorList) + throws Exception { + for (RawResultIterator resultIterator : resultIteratorList) { + while (resultIterator.hasNext()) { + addRowForSorting(prepareRowObjectForSorting(resultIterator.next())); + isRecordFound = true; + } + } + try { + sortDataRows.startSorting(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + + /** + * This method will prepare the data from raw object that will take part in sorting + * + * @param row + * @return + */ + private Object[] prepareRowObjectForSorting(Object[] row) { + ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0]; + // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount]; + List<CarbonDimension> dimensions = segmentProperties.getDimensions(); + Object[] preparedRow = new Object[dimensions.size() + measureCount]; + // convert the dictionary from MDKey to surrogate key + byte[] dictionaryKey = wrapper.getDictionaryKey(); + long[] keyArray = segmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey); + Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount]; + for (int i = 0; i < keyArray.length; i++) { + dictionaryValues[i] = Long.valueOf(keyArray[i]).intValue(); + } + int noDictionaryIndex = 0; + int dictionaryIndex = 0; + for (int i = 0; i < dimensions.size(); i++) { + CarbonDimension dims = dimensions.get(i); + if (dims.hasEncoding(Encoding.DICTIONARY)) { + // dictionary + preparedRow[i] = dictionaryValues[dictionaryIndex++]; + } else { + // no dictionary dims + preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); + } + } + // fill all the measures + // measures will always start from 1st index in the row object array + int measureIndexInRow = 1; + for (int i = 0; i < measureCount; i++) { + preparedRow[dimensionColumnCount + i] = + getConvertedMeasureValue(row[measureIndexInRow++], aggType[i]); + } + return preparedRow; + } + + /** + * This method will convert the spark decimal to java big decimal type + * + * @param value + * @param aggType + * @return + */ + private Object getConvertedMeasureValue(Object value, char aggType) { + switch (aggType) { + case CarbonCommonConstants.BIG_DECIMAL_MEASURE: + value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal(); + return value; + default: + return value; + } + } + + /** + * This method will read sort temp files, perform merge sort and add it to store for data loading + */ + private void readAndLoadDataFromSortTempFiles() throws Exception { + try { + finalMerger.startFinalMerge(); + while (finalMerger.hasNext()) { + Object[] rowRead = finalMerger.next(); + CarbonRow row = new CarbonRow(rowRead); + // convert the row from surrogate key to MDKey + Object[] outputRow = CarbonDataProcessorUtil + .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount, + segmentProperties.getComplexDimensions().size()); + dataHandler.addDataToStore(outputRow); + } + dataHandler.finish(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } finally { + if (null != dataHandler) { + try { + dataHandler.closeHandler(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + } + } + + /** + * add row to a temp array which will we written to a sort temp file after sorting + * + * @param row + */ + private void addRowForSorting(Object[] row) throws Exception { + try { + // prepare row array using RemoveDictionaryUtil class + sortDataRows.addRow(row); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Row addition for sorting failed during compaction: " + e.getMessage()); + } + } + + /** + * create an instance of sort data rows + */ + private void initSortDataRows() throws Exception { + measureCount = carbonTable.getMeasureByTableName(tableName).size(); + List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + noDictionaryColMapping = new boolean[dimensions.size()]; + int i = 0; + for (CarbonDimension dimension : dimensions) { + if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) { + i++; + continue; + } + noDictionaryColMapping[i++] = true; + noDictionaryCount++; + } + dimensionColumnCount = dimensions.size(); + SortParameters parameters = createSortParameters(); + SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters); + this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger); + try { + this.sortDataRows.initialize(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception( + "Error initializing sort data rows object during compaction: " + e.getMessage()); + } + } + + /** + * This method will create the sort parameters VO object + * + * @return + */ + private SortParameters createSortParameters() { + SortParameters parameters = SortParameters + .createSortParameters(carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, + segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, + carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(), + noDictionaryColMapping); + return parameters; + } + + /** + * create an instance of finalThread merger which will perform merge sort on all the + * sort temp files + */ + private void initializeFinalThreadMergerForMergeSort() { + String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + finalMerger = + new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount, + segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, + aggType, noDictionaryColMapping); + } + + /** + * initialise carbon data writer instance + */ + private void initDataHandler() throws Exception { + CarbonFactDataHandlerModel carbonFactDataHandlerModel = + getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, + tempStoreLocation); + setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, + carbonFactDataHandlerModel); + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, + CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + try { + dataHandler.initialise(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem initialising data handler during compaction: " + e.getMessage()); + } + } + + /** + * initialise temporary store location + */ + private void initTempStoreLocation() { + tempStoreLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName, + carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, false); + } + + /** + * initialise aggregation type for measures for their storage format + */ + private void initAggType() { --- End diff -- it exists multiple places, better move to `CarbonDataProcessorUtil` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110098333 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.merger; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; +import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * This class will process the query result and convert the data + * into a format compatible for data load + */ +public class CompactionResultSortProcessor extends AbstractResultProcessor { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName()); + /** + * carbon load model that contains all the required information for load + */ + private CarbonLoadModel carbonLoadModel; + /** + * carbon table + */ + private CarbonTable carbonTable; + /** + * sortDataRows instance for sorting each row read ad writing to sort temp file + */ + private SortDataRows sortDataRows; + /** + * final merger for merge sort + */ + private SingleThreadFinalSortFilesMerger finalMerger; + /** + * data handler VO object + */ + private CarbonFactHandler dataHandler; + /** + * segment properties for getting dimension cardinality and other required information of a block + */ + private SegmentProperties segmentProperties; + /** + * compaction type to decide whether taskID need to be extracted from carbondata files + */ + private CompactionType compactionType; + /** + * boolean mapping for no dictionary columns in schema + */ + private boolean[] noDictionaryColMapping; + /** + * agg type defined for measures + */ + private char[] aggType; + /** + * segment id + */ + private String segmentId; + /** + * temp store location to be sued during data load + */ + private String tempStoreLocation; + /** + * table name + */ + private String tableName; + /** + * no dictionary column count in schema + */ + private int noDictionaryCount; + /** + * total count of measures in schema + */ + private int measureCount; + /** + * dimension count excluding complex dimension and no dictionary column count + */ + private int dimensionColumnCount; + /** + * whether the allocated tasks has any record + */ + private boolean isRecordFound; + + /** + * @param carbonLoadModel + * @param carbonTable + * @param segmentProperties + * @param compactionType + * @param tableName + */ + public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable, + SegmentProperties segmentProperties, CompactionType compactionType, String tableName) { + this.carbonLoadModel = carbonLoadModel; + this.carbonTable = carbonTable; + this.segmentProperties = segmentProperties; + this.segmentId = carbonLoadModel.getSegmentId(); + this.compactionType = compactionType; + this.tableName = tableName; + } + + /** + * This method will iterate over the query result and convert it into a format compatible + * for data loading + * + * @param resultIteratorList + */ + public boolean execute(List<RawResultIterator> resultIteratorList) { + boolean isCompactionSuccess = false; + try { + initTempStoreLocation(); + initSortDataRows(); + initAggType(); + processResult(resultIteratorList); + // After delete command, if no records are fetched from one split, + // below steps are not required to be initialized. + if (isRecordFound) { + initializeFinalThreadMergerForMergeSort(); + initDataHandler(); + readAndLoadDataFromSortTempFiles(); + } + isCompactionSuccess = true; + } catch (Exception e) { + LOGGER.error(e, "Compaction failed: " + e.getMessage()); + } finally { + // clear temp files and folders created during compaction + deleteTempStoreLocation(); + } + return isCompactionSuccess; + } + + /** + * This method will clean up the local folders and files created during compaction process + */ + private void deleteTempStoreLocation() { + if (null != tempStoreLocation) { + try { + CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) }); + } catch (IOException | InterruptedException e) { + LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage()); + } + } + } + + /** + * This method will iterate over the query result and perform row sorting operation + * + * @param resultIteratorList + */ + private void processResult(List<RawResultIterator> resultIteratorList) + throws Exception { + for (RawResultIterator resultIterator : resultIteratorList) { + while (resultIterator.hasNext()) { + addRowForSorting(prepareRowObjectForSorting(resultIterator.next())); + isRecordFound = true; + } + } + try { + sortDataRows.startSorting(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + + /** + * This method will prepare the data from raw object that will take part in sorting + * + * @param row + * @return + */ + private Object[] prepareRowObjectForSorting(Object[] row) { + ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0]; + // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount]; + List<CarbonDimension> dimensions = segmentProperties.getDimensions(); + Object[] preparedRow = new Object[dimensions.size() + measureCount]; + // convert the dictionary from MDKey to surrogate key + byte[] dictionaryKey = wrapper.getDictionaryKey(); + long[] keyArray = segmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey); + Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount]; + for (int i = 0; i < keyArray.length; i++) { + dictionaryValues[i] = Long.valueOf(keyArray[i]).intValue(); + } + int noDictionaryIndex = 0; + int dictionaryIndex = 0; + for (int i = 0; i < dimensions.size(); i++) { + CarbonDimension dims = dimensions.get(i); + if (dims.hasEncoding(Encoding.DICTIONARY)) { + // dictionary + preparedRow[i] = dictionaryValues[dictionaryIndex++]; + } else { + // no dictionary dims + preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); + } + } + // fill all the measures + // measures will always start from 1st index in the row object array + int measureIndexInRow = 1; + for (int i = 0; i < measureCount; i++) { + preparedRow[dimensionColumnCount + i] = + getConvertedMeasureValue(row[measureIndexInRow++], aggType[i]); + } + return preparedRow; + } + + /** + * This method will convert the spark decimal to java big decimal type + * + * @param value + * @param aggType + * @return + */ + private Object getConvertedMeasureValue(Object value, char aggType) { + switch (aggType) { + case CarbonCommonConstants.BIG_DECIMAL_MEASURE: + value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal(); + return value; + default: + return value; + } + } + + /** + * This method will read sort temp files, perform merge sort and add it to store for data loading + */ + private void readAndLoadDataFromSortTempFiles() throws Exception { + try { + finalMerger.startFinalMerge(); + while (finalMerger.hasNext()) { + Object[] rowRead = finalMerger.next(); + CarbonRow row = new CarbonRow(rowRead); + // convert the row from surrogate key to MDKey + Object[] outputRow = CarbonDataProcessorUtil + .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount, + segmentProperties.getComplexDimensions().size()); + dataHandler.addDataToStore(outputRow); + } + dataHandler.finish(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } finally { + if (null != dataHandler) { + try { + dataHandler.closeHandler(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + } + } + + /** + * add row to a temp array which will we written to a sort temp file after sorting + * + * @param row + */ + private void addRowForSorting(Object[] row) throws Exception { + try { + // prepare row array using RemoveDictionaryUtil class + sortDataRows.addRow(row); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Row addition for sorting failed during compaction: " + e.getMessage()); + } + } + + /** + * create an instance of sort data rows + */ + private void initSortDataRows() throws Exception { + measureCount = carbonTable.getMeasureByTableName(tableName).size(); + List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + noDictionaryColMapping = new boolean[dimensions.size()]; + int i = 0; + for (CarbonDimension dimension : dimensions) { + if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) { + i++; + continue; + } + noDictionaryColMapping[i++] = true; + noDictionaryCount++; + } + dimensionColumnCount = dimensions.size(); + SortParameters parameters = createSortParameters(); + SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters); + this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger); --- End diff -- Now it is only supported onheap merge, but we can have unsafe merge as well by using UnsafeSortDataRows. Any way you can put as TODO and will handle later --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110098777 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java --- @@ -57,15 +42,9 @@ /** * This is the Merger class responsible for the merging of the segments. */ -public class RowResultMerger { +public class RowResultMerger extends AbstractResultProcessor { --- End diff -- May be you can rename the class to `RowResultMergerProcessor` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 @manishgupta88 It is better to move all merger package to processing module. Spark module nothing to do with merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/703#discussion_r110108215 --- Diff: integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java --- @@ -351,4 +351,33 @@ private static int getDimensionDefaultCardinality(CarbonDimension dimension) { } return cardinality; } + + /** + * This method will check for any restructured block in the blocks selected for compaction + * + * @param segmentMapping + * @param dataFileMetadataSegMapping + * @param tableLastUpdatedTime + * @return + */ + public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, + Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) { + boolean restructuredBlockExists = false; + for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { + String segmentId = taskMap.getKey(); + List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId); + for (DataFileFooter dataFileFooter : listMetadata) { + // if schema modified timestamp is greater than footer stored schema timestamp, --- End diff -- yes...because the entry will be added in schema evolution entry and in case of any failure we need to revert back the schema --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user manishgupta88 commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 @ravipesala fixed review comments...kindly review and merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Failed with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1477/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1480/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user CarbonDataQA commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 Build Success with Spark 1.6.2, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder/1483/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user ravipesala commented on the issue:
https://github.com/apache/incubator-carbondata/pull/703 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
In reply to this post by qiuchenjian-2
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-carbondata/pull/703 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [hidden email] or file a JIRA ticket with INFRA. --- |
Free forum by Nabble | Edit this page |